Skip to content

Commit

Permalink
[improve][plugin][s3reader] Improve performance by refactor codes
Browse files Browse the repository at this point in the history
1. Use Java V2 API
2. Utilize `prefix` to get objects instead of `listObjects` to reduce the number of requests
3. Optimize code
  • Loading branch information
wgzhao committed Sep 20, 2024
1 parent 96c7e82 commit 1a90bf4
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 104 deletions.
51 changes: 28 additions & 23 deletions docs/assets/jobs/s3reader.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,42 @@
"job": {
"setting": {
"speed": {
"channel": 6,
"channel": 1,
"bytes": -1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "s3reader",
"parameter": {
"endpoint": "https://s3.amazonaws.com",
"accessId": "xxxxxxxxxxxx",
"accessKey": "xxxxxxxxxxxxxxxxxxxxxxx",
"bucket": "test",
"object": ["1.csv", "aa.csv"],
"column": ["*"],
"region":"ap-northeast-1",
"fileFormat": "csv",
"fieldDelimiter": ","
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
"content": {
"reader": {
"name": "s3reader",
"parameter": {
"endpoint": "https://s3.amazonaws.com",
"accessId": "xxxxxxxxxxxx",
"accessKey": "xxxxxxxxxxxxxxxxxxxxxxx",
"bucket": "test",
"object": [
"1.csv",
"aa.csv",
"upload_*.csv",
"bb_??.csv"
],
"column": [
"*"
],
"region": "ap-northeast-1",
"fileFormat": "csv",
"fieldDelimiter": ","
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.S3Object;

Expand All @@ -24,6 +26,8 @@
import java.util.List;
import java.util.regex.Pattern;

import static com.wgzhao.addax.plugin.reader.s3reader.S3ReaderErrorCode.REQUIRED_VALUE;

public class S3Reader
extends Reader
{
Expand All @@ -34,6 +38,9 @@ public static class Job

private Configuration readerOriginConfig = null;

private String bucket;
private S3Client client = null;

@Override
public void init()
{
Expand All @@ -45,32 +52,13 @@ public void init()

private void validate()
{
String region = this.readerOriginConfig.getString(S3Key.REGION);
if (StringUtils.isBlank(region)) {
throw AddaxException.asAddaxException(S3ReaderErrorCode.REQUIRED_VALUE, "The item region is required");
}

String accessId = this.readerOriginConfig.getString(S3Key.ACCESS_ID);
if (StringUtils.isBlank(accessId)) {
throw AddaxException.asAddaxException(S3ReaderErrorCode.REQUIRED_VALUE, "The item accessId is required");
}

String accessKey = this.readerOriginConfig.getString(S3Key.ACCESS_KEY);
if (StringUtils.isBlank(accessKey)) {
throw AddaxException.asAddaxException(S3ReaderErrorCode.REQUIRED_VALUE, "The item accesskey is required");
}

String bucket = this.readerOriginConfig.getString(S3Key.BUCKET);
if (StringUtils.isBlank(bucket)) {
throw AddaxException.asAddaxException(S3ReaderErrorCode.REQUIRED_VALUE, "The item bucket is required");
}
readerOriginConfig.getNecessaryValue(S3Key.REGION, REQUIRED_VALUE);
readerOriginConfig.getNecessaryValue(S3Key.ACCESS_ID, REQUIRED_VALUE);
readerOriginConfig.getNecessaryValue(S3Key.ACCESS_KEY, REQUIRED_VALUE);
this.bucket = readerOriginConfig.getNecessaryValue(S3Key.BUCKET, REQUIRED_VALUE);
readerOriginConfig.getNecessaryValue(S3Key.OBJECT, REQUIRED_VALUE);

String object = this.readerOriginConfig.getString(S3Key.OBJECT);
if (StringUtils.isBlank(object)) {
throw AddaxException.asAddaxException(S3ReaderErrorCode.REQUIRED_VALUE, "The item object is required");
}

String encoding = this.readerOriginConfig.getString(S3Key.ENCODING, Constant.DEFAULT_ENCODING);
String encoding = readerOriginConfig.getString(S3Key.ENCODING, Constant.DEFAULT_ENCODING);
try {
Charsets.toCharset(encoding);
}
Expand All @@ -84,24 +72,23 @@ private void validate()
}

// 检测是column 是否为 ["*"] 若是则填为空
List<Configuration> column = this.readerOriginConfig.getListConfiguration(S3Key.COLUMN);
List<Configuration> column = readerOriginConfig.getListConfiguration(S3Key.COLUMN);
if (null != column && 1 == column.size() && ("\"*\"".equals(column.get(0).toString())
|| "'*'".equals(column.get(0).toString()))) {
readerOriginConfig.set(S3Key.COLUMN, new ArrayList<String>());
}
else {
// column: 1. index type 2.value type 3.when type is Data, maybe with format string
List<Configuration> columns = this.readerOriginConfig
.getListConfiguration(S3Key.COLUMN);
List<Configuration> columns = readerOriginConfig.getListConfiguration(S3Key.COLUMN);

if (null == columns || columns.size() == 0) {
if (null == columns || columns.isEmpty()) {
throw AddaxException.asAddaxException(
S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
"The item column is required");
}

for (Configuration eachColumnConf : columns) {
eachColumnConf.getNecessaryValue(S3Key.TYPE, S3ReaderErrorCode.REQUIRED_VALUE);
eachColumnConf.getNecessaryValue(S3Key.TYPE, REQUIRED_VALUE);
Integer columnIndex = eachColumnConf.getInt(S3Key.INDEX);
String columnValue = eachColumnConf.getString(S3Key.VALUE);

Expand All @@ -118,24 +105,15 @@ private void validate()
}
}
}
}

@Override
public void prepare()
{

}

@Override
public void post()
{

this.client = S3Util.initS3Client(readerOriginConfig);
}

@Override
public void destroy()
{

if (null != this.client) {
this.client.close();
}
}

@Override
Expand All @@ -146,7 +124,7 @@ public List<Configuration> split(int adviceNumber)

// 将每个单独的 object 作为一个 slice
List<String> objects = parseOriginObjects(readerOriginConfig.getList(S3Key.OBJECT, String.class));
if (0 == objects.size()) {
if (objects.isEmpty()) {
throw AddaxException.asAddaxException(
S3ReaderErrorCode.EMPTY_BUCKET_EXCEPTION,
String.format(
Expand All @@ -156,10 +134,10 @@ public List<Configuration> split(int adviceNumber)
}

for (String object : objects) {
Configuration splitedConfig = this.readerOriginConfig.clone();
splitedConfig.set(S3Key.OBJECT, object);
readerSplitConfigs.add(splitedConfig);
LOG.info(String.format("S3 object to be read:%s", object));
Configuration splitConfig = this.readerOriginConfig.clone();
splitConfig.set(S3Key.OBJECT, object);
readerSplitConfigs.add(splitConfig);
LOG.info("S3 object to be read {}", object);
}
LOG.debug("split() ok and end...");
return readerSplitConfigs;
Expand All @@ -168,22 +146,10 @@ public List<Configuration> split(int adviceNumber)
private List<String> parseOriginObjects(List<String> originObjects)
{
List<String> parsedObjects = new ArrayList<>();

for (String object : originObjects) {
int firstMetaChar = Math.max(object.indexOf('*'), object.indexOf('?'));

if (firstMetaChar != -1) {
int lastDirSeparator = object.lastIndexOf(IOUtils.DIR_SEPARATOR, firstMetaChar);
String parentDir = object.substring(0, lastDirSeparator + 1);
List<String> remoteObjects = getRemoteObjects(parentDir);
Pattern pattern = Pattern.compile(object.replace("*", ".*")
.replace("?", ".?"));

for (String remoteObject : remoteObjects) {
if (pattern.matcher(remoteObject).matches()) {
parsedObjects.add(remoteObject);
}
}
if (object.indexOf('*') > -1 || object.indexOf('?') > -1) {
List<String> remoteObjects = listObjectsWithPattern(object);
parsedObjects.addAll(remoteObjects);
}
else {
parsedObjects.add(object);
Expand All @@ -192,21 +158,39 @@ private List<String> parseOriginObjects(List<String> originObjects)
return parsedObjects;
}

private List<String> getRemoteObjects(String parentDir)
private List<String> listObjectsWithPattern(String pattern)
{
LOG.debug("find objects in {}", parentDir);
// Extract the prefix from the pattern up to the first wildcard character
int firstWildcardIndex = Math.min(
pattern.indexOf('*') == -1 ? pattern.length() : pattern.indexOf('*'),
pattern.indexOf('?') == -1 ? pattern.length() : pattern.indexOf('?')
);
String prefix = pattern.substring(0, firstWildcardIndex);
// Convert the pattern to a regex
String regex = pattern.replace("?", ".{1}").replace("*", ".*");
Pattern compiledPattern = Pattern.compile(regex);

ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request.builder()
.bucket(bucket)
.prefix(prefix)
.build();

ListObjectsV2Response listObjectsV2Response;
List<String> remoteObjects = new ArrayList<>();
S3Client client = S3Util.initS3Client(readerOriginConfig);
do {
listObjectsV2Response = client.listObjectsV2(listObjectsV2Request);

ListObjectsRequest listObjects = ListObjectsRequest
.builder()
.bucket(readerOriginConfig.getString(S3Key.BUCKET))
.build();
ListObjectsResponse res = client.listObjects(listObjects);
List<S3Object> objects = res.contents();
for (S3Object myValue : objects) {
remoteObjects.add(myValue.key());
for (S3Object s3Object : listObjectsV2Response.contents()) {
if (compiledPattern.matcher(s3Object.key()).matches()) {
remoteObjects.add(s3Object.key());
}
}

listObjectsV2Request = listObjectsV2Request.toBuilder()
.continuationToken(listObjectsV2Response.nextContinuationToken())
.build();
}
while (listObjectsV2Response.isTruncated());

return remoteObjects;
}
Expand All @@ -215,33 +199,30 @@ private List<String> getRemoteObjects(String parentDir)
public static class Task
extends Reader.Task
{
private static final Logger LOG = LoggerFactory.getLogger(Reader.Task.class);
private static final Logger LOG = LoggerFactory.getLogger(Task.class);

private Configuration readerSliceConfig;

@Override
public void startRead(RecordSender recordSender)
{
LOG.debug("read start");
LOG.debug("Begin to start reading");
String object = readerSliceConfig.getString(S3Key.OBJECT);
S3Client client = S3Util.initS3Client(readerSliceConfig);

GetObjectRequest s3Object = GetObjectRequest.builder()
.bucket(readerSliceConfig.getString(S3Key.BUCKET))
.key(object)
.build();
try {
try (S3Client client = S3Util.initS3Client(readerSliceConfig)) {
InputStream objectStream = client.getObject(s3Object);
StorageReaderUtil.readFromStream(objectStream, object,
this.readerSliceConfig, recordSender,
this.getTaskPluginCollector());
recordSender.flush();
}
catch (NoSuchKeyException e) {
throw AddaxException.asAddaxException(S3ReaderErrorCode.OBJECT_NOT_EXIST,
"The object " + object + " does not exists");
LOG.warn("The object {} does not exists", object);
}

}

@Override
Expand Down

0 comments on commit 1a90bf4

Please sign in to comment.