S3上传创建临时文件,然后分片上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
String username = SecurityUtils.getUsername();
String regex = ".*[\\u4e00-\\u9fa5]+.*";
// 创建线程池,根据实际环境调整线程数
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<Future<?>> futures = new ArrayList<>();
InputStream inputStream = file.getInputStream();
try (ZipArchiveInputStream zipInputStream = new ZipArchiveInputStream(inputStream)) {
ZipArchiveEntry entry;
while ((entry = zipInputStream.getNextZipEntry()) != null) {
if (!entry.isDirectory()) {
String zipEntryName = entry.getName();
if (zipEntryName.matches(regex)) {
throw new Exception("压缩文件中不能包含中文名");
}
// 处理 key
String key = StringUtils.isNotEmpty(folderName)
? folderName + "/" + zipEntryName.replaceAll(" ", "_")
: zipEntryName.replaceAll(" ", "_");

// 创建临时文件存放当前条目的数据(确保不占用大量内存)
File tempFile = File.createTempFile("upload_", "_" + zipEntryName.replaceAll("[\\\\/]", "_"));
try (OutputStream out = new FileOutputStream(tempFile)) {
byte[] buffer = new byte[8192];
int len;
while ((len = zipInputStream.read(buffer)) != -1) {
out.write(buffer, 0, len);
}
}
// 提交上传任务,每个任务从磁盘读取临时文件进行上传
ZipArchiveEntry finalEntry = entry;
futures.add(executor.submit(() -> {
try (InputStream uploadStream = new FileInputStream(tempFile)) {
// 设置对象元数据
ObjectMetadata objectMetadata = new ObjectMetadata(); objectMetadata.setContentType(getContentType(finalEntry.getName()));
objectMetadata.addUserMetadata("uploader", username);
objectMetadata.setContentLength(tempFile.length());
// 调用分片上传(建议内部采用 TransferManager 或 S3 异步 API)
initiateMultipartUpload2(key, uploadStream, objectMetadata);
} catch (Exception e) {
// log.error("上传文件 {} 失败,原因:{}", key, e.getMessage(), e);
throw new RuntimeException("上传文件 " + key + " 失败", e);
} finally {
// 上传完成后删除临时文件
if (!tempFile.delete()) {
// 如删除失败可记录日志
System.err.println("临时文件删除失败:" + tempFile.getAbsolutePath());
}
}
}));
}
}
} finally {
inputStream.close();
}

// 等待所有上传任务完成
for (Future<?> future : futures) {
future.get(); // 如果某个任务失败,这里会抛出异常
}
executor.shutdown();

分片上传实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 初始化分片上传
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, key, objectMetadata);
InitiateMultipartUploadResult initResponse = amazonS3.initiateMultipartUpload(initRequest);
String uploadId = initResponse.getUploadId();

// 分片上传设置
List<PartETag> partETags = new ArrayList<>();
int partSize = 5 * 1024 * 1024; // 每个分片 5MB
byte[] buffer = new byte[partSize];
int partNumber = 1;

while (true) {
// 按分片大小读取数据
int bytesReadTotal = 0;
while (bytesReadTotal < partSize) {
int bytesRead = inputStream.read(buffer, bytesReadTotal, partSize - bytesReadTotal);
if (bytesRead == -1) {
break;
}
bytesReadTotal += bytesRead;
}
if (bytesReadTotal == 0) {
break; // 没有数据则退出循环
}
// 使用 ByteArrayInputStream 包装当前分片数据
ByteArrayInputStream partStream = new ByteArrayInputStream(buffer, 0, bytesReadTotal);
// 创建上传请求
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(partNumber)
.withPartSize(bytesReadTotal)
.withInputStream(partStream);
UploadPartResult uploadResult = amazonS3.uploadPart(uploadRequest);
partETags.add(uploadResult.getPartETag());
partNumber++;
// 如果本次读取的数据小于分片大小,说明已到末尾,退出循环
if (bytesReadTotal < partSize) {
break;
}
}

// 完成分片上传
CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
bucketName, key, uploadId, partETags);
amazonS3.completeMultipartUpload(completeRequest);

下载文件并打成压缩包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
folderPrefix = folderPrefix.trim().replace(" ", "_");
response.setContentType("application/zip");
response.setHeader("Content-Disposition", "attachment; filename=file.zip");

try (ZipOutputStream zipOutputStream = new ZipOutputStream(response.getOutputStream())) {
ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request()
.withBucketName(bucketName)
.withPrefix(folderPrefix + "/");

String continuationToken = null;
boolean moreFiles = true;
while (moreFiles) {
listObjectsRequest.setContinuationToken(continuationToken);
ListObjectsV2Result result = amazonS3.listObjectsV2(listObjectsRequest);
List<S3ObjectSummary> objectSummaries = result.getObjectSummaries();

for (S3ObjectSummary summary : objectSummaries) {
String key = summary.getKey();
downloadFile(bucketName, key, zipOutputStream, folderPrefix);
}

continuationToken = result.getNextContinuationToken();
moreFiles = continuationToken != null;
}

zipOutputStream.finish();
} catch (Exception e) {
e.printStackTrace();
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}

下载方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try (S3Object s3Object = amazonS3.getObject(bucketName, key);
InputStream inputStream = s3Object.getObjectContent()) {

// 创建一个新的 ZIP 条目
ZipEntry zipEntry = new ZipEntry(key.substring(folderPrefix.length() + 1));
zipOutputStream.putNextEntry(zipEntry);

byte[] buffer = new byte[4096];
int length;
while ((length = inputStream.read(buffer)) > 0) {
zipOutputStream.write(buffer, 0, length);
}
zipOutputStream.closeEntry(); // 关闭当前的 ZIP 条目
} catch (IOException e) {
e.printStackTrace();
}