diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index ec789ab28f49b..7c5755d28c195 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -66,6 +66,9 @@ static class Config { // control whether to filter the s3 objects starting with this prefix static final String S3_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.key.prefix"; static final String S3_FS_PREFIX = "hoodie.deltastreamer.source.s3incr.fs.prefix"; + + // control whether to ignore the s3 objects starting with this prefix + static final String S3_IGNORE_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.ignore.key.prefix"; } public S3EventsHoodieIncrSource( @@ -110,6 +113,9 @@ public Pair>, String> fetchNextBatch(Option lastCkpt if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) { filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'"; } + if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX))) { + filter = filter + " and s3.object.key not like '" + props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'"; + } String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase(); String s3Prefix = s3FS + "://"; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java index 7252494d989a8..33c67e4a8a5aa 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java @@ -66,8 +66,8 @@ public class CloudObjectsSelector { static final String S3_FILE_PATH = "filePath"; public final String queueUrl; public final int longPollWait; - public final int maxMessagesPerRequest; public final int maxMessagePerBatch; + public final int maxMessagesPerRequest; public final int visibilityTimeout; public final TypedProperties props; public final String fsName; @@ -84,8 +84,8 @@ public CloudObjectsSelector(TypedProperties props) { this.fsName = props.getString(Config.S3_SOURCE_QUEUE_FS, "s3").toLowerCase(); this.longPollWait = props.getInteger(Config.S3_QUEUE_LONG_POLL_WAIT, 20); this.maxMessagePerBatch = props.getInteger(Config.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH, 5); + this.maxMessagesPerRequest = props.getInteger(Config.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST, 10); this.visibilityTimeout = props.getInteger(Config.S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT, 30); - this.maxMessagesPerRequest = 10; } /** @@ -264,6 +264,12 @@ public static class Config { public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH = HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.max.messages.per.batch"; + /** + * {@value #S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST} is max messages for each request. + */ + public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST = + HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.max.messages.per.request"; + /** * {@value #S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT} is visibility timeout for messages in queue. After we * consume the message, queue will move the consumed messages to in-flight state, these messages