From 50440b9ff95c1942127116fa63c1e280b11ababd Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Mon, 25 Nov 2024 19:42:57 +0800 Subject: [PATCH] [feature][plugin][s3writer] add pathStyleAccessEnabled config (#1186) Fix error 400 when setting pathStyleAccessEnabled to true for MinIO --- .../addax/plugin/writer/s3writer/S3Key.java | 2 ++ .../addax/plugin/writer/s3writer/S3Util.java | 11 +++++-- .../plugin/writer/s3writer/S3Writer.java | 32 +++++++++++-------- pom.xml | 2 +- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Key.java b/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Key.java index 25c66e012..110944c95 100644 --- a/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Key.java +++ b/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Key.java @@ -20,4 +20,6 @@ public class S3Key extends Key public static final String MAX_FILE_SIZE = "maxFileSize"; public static final String DEFAULT_SUFFIX = "defaultSuffix"; + + public static final String PATH_STYLE_ACCESS_ENABLED = "pathStyleAccessEnabled"; } diff --git a/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Util.java b/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Util.java index 6e7f5db88..03aeb8785 100644 --- a/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Util.java +++ b/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Util.java @@ -18,18 +18,23 @@ public static S3Client initS3Client(Configuration conf) { Region region = Region.of(regionStr); String accessId = conf.getString(S3Key.ACCESS_ID); String accessKey = conf.getString(S3Key.ACCESS_KEY); + String pathStyleAccessEnabled =conf.getString(S3Key.PATH_STYLE_ACCESS_ENABLED,""); - return initS3Client(conf.getString(S3Key.ENDPOINT), region, accessId, accessKey); + return initS3Client(conf.getString(S3Key.ENDPOINT), region, accessId, accessKey ,pathStyleAccessEnabled); } - public static S3Client initS3Client(String endpoint, Region region, String accessId, String accessKey) { + public static S3Client initS3Client(String endpoint, Region region, String accessId, String accessKey ,String pathStyleAccessEnabled) { if (null == region) { region = Region.of("ap-northeast-1"); } try { AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessId, accessKey); - return S3Client.builder() + return S3Client.builder().serviceConfiguration(e -> { + if("true".equals(pathStyleAccessEnabled)) { + e.pathStyleAccessEnabled(true); + } + }) .credentialsProvider(StaticCredentialsProvider.create(awsCreds)) .region(region) .endpointOverride(URI.create(endpoint)) diff --git a/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Writer.java b/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Writer.java index c707228c6..6de34697d 100644 --- a/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Writer.java +++ b/plugin/writer/s3writer/src/main/java/com/wgzhao/addax/plugin/writer/s3writer/S3Writer.java @@ -307,22 +307,26 @@ public void startWrite(RecordReceiver lineReceiver) completedParts.add(completedPart); outputStream.reset(); } - // Finally, call completeMultipartUpload operation to tell S3 to merge all uploaded - // parts and finish the multipart operation. - CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder() - .parts(completedParts) - .build(); + if(!completedParts.isEmpty()) { + // Finally, call completeMultipartUpload operation to tell S3 to merge all uploaded + // parts and finish the multipart operation. + CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder() + .parts(completedParts) + .build(); - CompleteMultipartUploadRequest completeMultipartUploadRequest = - CompleteMultipartUploadRequest.builder() - .bucket(bucket) - .key(object) - .uploadId(uploadId) - .multipartUpload(completedMultipartUpload) - .build(); + CompleteMultipartUploadRequest completeMultipartUploadRequest = + CompleteMultipartUploadRequest.builder() + .bucket(bucket) + .key(object) + .uploadId(uploadId) + .multipartUpload(completedMultipartUpload) + .build(); - s3Client.completeMultipartUpload(completeMultipartUploadRequest); - LOG.info("end do write"); + s3Client.completeMultipartUpload(completeMultipartUploadRequest); + LOG.info("end do write"); + } else { + LOG.info("no content do write"); + } } private String record2String(Record record) diff --git a/pom.xml b/pom.xml index 8f02122af..98d64beaf 100644 --- a/pom.xml +++ b/pom.xml @@ -29,7 +29,7 @@ 0.25 - 2.20.5 + 2.29.20 1.11.3 1.9.4 1.10