您现在的位置是:网站首页> 编程开发> java 编程开发

阿里云oss追加上传,解决文件导出过大问题

2023-03-02java 678人已围观

简介背景:公司在做文件导出,虽然已经使用微服务方式,但还是有些数据量过大,导致文件在循环时导出时系统 OOM的产生。目的:使用阿里OSS功能 分批的上传到服务oss服务器中。具体的实现思路:1)各业务使用MQ方式发送需要导出的sql文件2)消费者实现导出具体的sql,但传过来的数据 不要一次性把所有数据拉出来,可以使用分页(每页100行数据)。3)数据组装 ,此时数据100行 & 组装好其他微

阿里云oss追加上传,解决文件导出过大问题

最后更新:2023-03-02 17:42:55

推荐指数


背景:公司在做文件导出,虽然已经使用微服务方式,但还是有些数据量过大,导致文件在循环时导出时系统 OOM的产生。

目的:使用阿里OSS功能 分批的上传到服务oss服务器中。

具体的实现思路:

1)各业务使用MQ方式发送需要导出的sql文件

2)消费者实现导出具体的sql,但传过来的数据 不要一次性把所有数据拉出来,可以使用分页(每页100行数据)。

3)数据组装 ,此时数据100行 & 组装好其他微服务的数据集。

4)每批只上传当前页的数据集到阿里OSS。

具体伪代码如下:

<dependency>
    <groupId>com.aliyun.oss</groupId>
    <artifactId>aliyun-sdk-oss</artifactId>
    <version>3.16.1</version>
</dependency>
import com.aliyun.oss.model.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.OSSException;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.util.Map;

@RestController
@RequestMapping("/test")
public class TestController {
    @GetMapping("/test")
    public R test() {
        // Endpoint以华东1(杭州)为例,其它Region请按实际情况填写。
        String endpoint = "https://oss-me-east-1.aliyuncs.com";
        // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
        String accessKeyId = "LTAI5tR1w3tnV94x5UttbmW5";
        String accessKeySecret = "JDzVz4lB8leH1Vz5gkZofqCKr5O15z";
        // 填写Bucket名称,例如examplebucket。
        String bucketName = "mints-overseas-test";
        // 填写Object完整路径,完整路径中不能包含Bucket名称,例如exampledir/exampleobject.txt。
        String objectName = "csv/20230302.csv";
        String title = "id,name\n";
        String content1 = "1,张三\n";
        // 创建OSSClient实例。
        OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);

        try {
            ObjectMetadata meta = new ObjectMetadata();
            meta.setContentType("text/csv\n");
            AppendObjectRequest appendObjectRequest = null;
            // 创建OSSClient实例。
// 创建 headObject 请求
            // 判断文件是否存在
            boolean exists = ossClient.doesObjectExist(bucketName, objectName);
            long position = 0L;
            if (!exists) {
                // 指定上传的内容类型。
//                //如果不存在则创建csv AppendObjectRequest
                appendObjectRequest = new AppendObjectRequest(bucketName, objectName, new ByteArrayInputStream(title.getBytes()), meta);
                AppendObjectResult appendObjectResult = ossClient.appendObject(appendObjectRequest);
                position = appendObjectResult.getNextPosition();
                System.out.println(appendObjectResult.getObjectCRC());
            } else {
                //如果存在则获取存在文件的position
                HeadObjectRequest request = new HeadObjectRequest(bucketName, objectName);
                ObjectMetadata objectMetadata = ossClient.headObject(request);
// 通过反射获取
                Field metadataField = ObjectMetadata.class.getDeclaredField("metadata");
                metadataField.setAccessible(true);
                Map<String, Object> metadata = (Map<String, Object>) metadataField.get(objectMetadata);
                String positionStr = (String) metadata.get("x-oss-next-append-position");
                position = Long.parseLong(positionStr);
                appendObjectRequest = new AppendObjectRequest(bucketName, objectName, new ByteArrayInputStream(content1.getBytes()), meta);
            }
            // 文件的64位CRC值。此值根据ECMA-182标准计算得出。
            // 第二次追加。
            // nextPosition表示下一次请求中应当提供的Position,即文件当前的长度。

            appendObjectRequest.setPosition(position);
            appendObjectRequest.setInputStream(new ByteArrayInputStream(content1.getBytes()));
            ossClient.appendObject(appendObjectRequest);
        } catch (OSSException oe) {
            System.out.println("Caught an OSSException, which means your request made it to OSS, "
                    + "but was rejected with an error response for some reason.");
            System.out.println("Error Message:" + oe.getErrorMessage());
            System.out.println("Error Code:" + oe.getErrorCode());
            System.out.println("Request ID:" + oe.getRequestId());
            System.out.println("Host ID:" + oe.getHostId());
        } catch (ClientException ce) {
            System.out.println("Caught an ClientException, which means the client encountered "
                    + "a serious internal problem while trying to communicate with OSS, "
                    + "such as not being able to access the network.");
            System.out.println("Error Message:" + ce.getMessage());
        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        } finally {
            if (ossClient != null) {
                ossClient.shutdown();
            }
        }
        return R.success(true);
    }
}


很赞哦! (0)

文章评论

来说两句吧...

验证码: