Skip to content

Commit

Permalink
[improve][plugin][s3writer] Improve performance by refactoring code
Browse files Browse the repository at this point in the history
1. Use Java V2 API
2. Utilize `prefix` to get objects instead of `listObjects` to reduce the number of requests
3. Add a random string before the file suffix to preserve the filename; for example, change the filename 'upload.csv' to 'upload_<random uuid>.csv', the previous file name will be `uplaod.csv_<random uuid>`
4. Optimize code
  • Loading branch information
wgzhao committed Sep 20, 2024
1 parent 1a90bf4 commit c7bc7ef
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 96 deletions.
86 changes: 42 additions & 44 deletions docs/assets/jobs/s3writer.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,49 @@
"channel": 1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "DataX",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 11:22:33",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "s3writer",
"parameter": {
"endpoint": "https://s3.amazonaws.com",
"accessId": "xxxxxxxxxxxx",
"accessKey": "xxxxxxxxxxxxxxxxxxxxxxx",
"bucket": "test",
"object": "upload.csv",
"region": "ap-northeast-1",
"encoding": "",
"fieldDelimiter": ",",
"writeMode": "truncate"
}
"content": {
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "Addax",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 11:22:33",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "s3writer",
"parameter": {
"endpoint": "https://s3.amazonaws.com",
"accessId": "xxxxxxxxxxxx",
"accessKey": "xxxxxxxxxxxxxxxxxxxxxxx",
"bucket": "test",
"object": "upload.csv",
"region": "ap-northeast-1",
"encoding": "",
"fieldDelimiter": ",",
"writeMode": "truncate"
}
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
Expand Down Expand Up @@ -58,6 +60,14 @@ public void init()
this.s3Client = S3Util.initS3Client(this.writerSliceConfig);
}

@Override
public void destroy()
{
if (this.s3Client != null) {
this.s3Client.close();
}
}

private void validateParameter()
{
this.writerSliceConfig.getNecessaryValue(S3Key.REGION, S3WriterErrorCode.REQUIRED_VALUE);
Expand All @@ -83,7 +93,7 @@ public void prepare()
deleteBucketObjects(bucket, object);
}
else if ("nonConflict".equals(writeMode)) {
LOG.info("Begin check exists objects which starts with [%s] in bucket [{}] or not Bucket [{}] ", bucket, object);
LOG.info("Begin to check for existing objects that starts with [{}] in bucket [{}]", object, bucket);
List<S3Object> objs = listObjects(bucket, object);
if (!objs.isEmpty()) {
LOG.error("There have {} objects starts with {} in bucket {} ", objs.size(), object, bucket);
Expand All @@ -92,49 +102,41 @@ else if ("nonConflict".equals(writeMode)) {
}
}

@Override
public void post()
{

}

@Override
public void destroy()
{

}

@Override
public List<Configuration> split(int mandatoryNumber)
{
LOG.info("begin do split...");
List<Configuration> writerSplitConfigs = new ArrayList<>();
String object = this.writerSliceConfig.getString(S3Key.OBJECT);
String bucket = this.writerSliceConfig.getString(S3Key.BUCKET);

String objectName = object;
String objectSuffix = null;
// if the object has suffix, it should separate the object name
if (object.contains(".")) {
objectName = object.split("\\.", -1)[0];
objectSuffix = "." + object.split("\\.", -1)[1];
}
Set<String> allObjects = new HashSet<>();
for (S3Object obj : listObjects(bucket, object)) {
allObjects.add(obj.key());
}


String objectSuffix;
String fullObjectName;
for (int i = 0; i < mandatoryNumber; i++) {
// handle same object name
Configuration splitedTaskConfig = this.writerSliceConfig.clone();

String fullObjectName;
objectSuffix = StringUtils.replace(UUID.randomUUID().toString(), "-", "");
fullObjectName = String.format("%s_%s", object, objectSuffix);
while (allObjects.contains(fullObjectName)) {
objectSuffix = StringUtils.replace(UUID.randomUUID().toString(), "-", "");
fullObjectName = String.format("%s_%s", object, objectSuffix);
Configuration splitTaskConfig = this.writerSliceConfig.clone();
do {
fullObjectName = String.format("%s_%s%s", objectName,
StringUtils.replace(UUID.randomUUID().toString(), "-", ""),
objectSuffix
);
}
while (allObjects.contains(fullObjectName));
allObjects.add(fullObjectName);
splitedTaskConfig.set(S3Key.OBJECT, fullObjectName);
LOG.info(String.format("split write object name:[%s]", fullObjectName));
splitTaskConfig.set(S3Key.OBJECT, fullObjectName);
LOG.info("split write object name:[{}]", fullObjectName);

writerSplitConfigs.add(splitedTaskConfig);
writerSplitConfigs.add(splitTaskConfig);
}
LOG.info("end do split.");
return writerSplitConfigs;
Expand All @@ -149,17 +151,26 @@ public List<Configuration> split(int mandatoryNumber)
*/
private List<S3Object> listObjects(String bucket, String objectName)
{
ListObjectsRequest listObjects = ListObjectsRequest
String suffix = null;
if (objectName.contains(".")) {
suffix = "." + objectName.split("\\.", -1)[1];
objectName = objectName.split("\\.", -1)[0];
}
ListObjectsV2Request listObjects = ListObjectsV2Request
.builder()
.bucket(bucket)
.prefix(objectName)
.build();

ListObjectsResponse res = s3Client.listObjects(listObjects);
ListObjectsV2Response res = s3Client.listObjectsV2(listObjects);

List<S3Object> objects = res.contents();
List<S3Object> result = new ArrayList<>();
for (S3Object obj : objects) {
if (obj.key().startsWith(objectName)) {
if (suffix == null) {
result.add(obj);
}
else if (obj.key().endsWith(suffix)) {
result.add(obj);
}
}
Expand All @@ -176,7 +187,7 @@ private void deleteBucketObjects(String bucket, String objectName)
{
List<S3Object> objects = listObjects(bucket, objectName);
ArrayList<ObjectIdentifier> toDelete = new ArrayList<>();
if ( !objects.isEmpty()) {
if (!objects.isEmpty()) {
for (S3Object obj : objects) {
toDelete.add(ObjectIdentifier.builder().key(obj.key()).build());
}
Expand Down Expand Up @@ -232,12 +243,12 @@ public void startWrite(RecordReceiver lineReceiver)
{
// 设置每块字符串长度
final int partSize = 1024 * 1024 * 10;
long numberCacual = (this.maxFileSize * 1024 * 1024L) / partSize;
final long maxPartNumber = numberCacual >= 1 ? numberCacual : 1;
long numberCalc = (this.maxFileSize * 1024 * 1024L) / partSize;
final long maxPartNumber = numberCalc >= 1 ? numberCalc : 1;
//warn: may be StringBuffer->StringBuilder
Record record;

LOG.info(String.format("begin do write, each object maxFileSize: [%s]MB...", maxPartNumber * 10));
LOG.info("Begin do write, each object's max file size is {}MB...", maxPartNumber * 10);
// First create a multipart upload and get the upload id
CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
.bucket(bucket)
Expand All @@ -254,12 +265,12 @@ public void startWrite(RecordReceiver lineReceiver)
.key(object)
.uploadId(uploadId)
.partNumber(currPart).build();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream( );
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Charset charset = Charset.forName(encoding);
boolean needInit = true;
while ((record = lineReceiver.getFromReader()) != null) {
try {
if (needInit && ! header.isEmpty()) {
if (needInit && !header.isEmpty()) {
// write header
outputStream.write(String.join(String.valueOf(fieldDelimiter), header).getBytes(charset));
outputStream.write("\n".getBytes(charset));
Expand Down Expand Up @@ -312,10 +323,11 @@ public void startWrite(RecordReceiver lineReceiver)
LOG.info("end do write");
}

private String record2String(Record record) {
private String record2String(Record record)
{
StringJoiner sj = new StringJoiner(this.fieldDelimiter + "");
int columnNum = record.getColumnNumber();
for (int i=0; i < columnNum; i++) {
for (int i = 0; i < columnNum; i++) {
Column column = record.getColumn(i);
if (column == null || column.asString() == null) {
sj.add(this.nullFormat);
Expand All @@ -325,29 +337,20 @@ private String record2String(Record record) {
if (type == Column.Type.DATE) {
SimpleDateFormat sdf = new SimpleDateFormat(this.dateFormat);
sj.add(sdf.format(column.asDate()));
} else {
}
else {
sj.add(column.asString());
}
}
return sj.toString();
}

@Override
public void prepare()
{

}

@Override
public void post()
{

}

@Override
public void destroy()
{

if (this.s3Client != null) {
this.s3Client.close();
}
}
}
}

0 comments on commit c7bc7ef

Please sign in to comment.