diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 79e4abbcf9000..ec789ab28f49b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; @@ -49,7 +50,6 @@ import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT; -import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_PREFIX; /** * This source will use the S3 events meta information from hoodie table generate by {@link S3EventsSource}. @@ -62,6 +62,10 @@ static class Config { // control whether we do existence check for files before consuming them static final String ENABLE_EXISTS_CHECK = "hoodie.deltastreamer.source.s3incr.check.file.exists"; static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = false; + + // control whether to filter the s3 objects starting with this prefix + static final String S3_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.key.prefix"; + static final String S3_FS_PREFIX = "hoodie.deltastreamer.source.s3incr.fs.prefix"; } public S3EventsHoodieIncrSource( @@ -101,9 +105,18 @@ public Pair>, String> fetchNextBatch(Option lastCkpt .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft()) .option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight()); Dataset source = metaReader.load(srcPath); + + String filter = "s3.object.size > 0"; + if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) { + filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'"; + } + + String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase(); + String s3Prefix = s3FS + "://"; + // Extract distinct file keys from s3 meta hoodie table final List cloudMetaDf = source - .filter("s3.object.size > 0") + .filter(filter) .select("s3.bucket.name", "s3.object.key") .distinct() .collectAsList(); @@ -113,9 +126,9 @@ public Pair>, String> fetchNextBatch(Option lastCkpt for (Row row : cloudMetaDf) { // construct file path, row index 0 refers to bucket and 1 refers to key String bucket = row.getString(0); - String filePath = S3_PREFIX + bucket + "/" + row.getString(1); + String filePath = s3Prefix + bucket + "/" + row.getString(1); if (checkExists) { - FileSystem fs = FSUtils.getFs(S3_PREFIX + bucket, sparkSession.sparkContext().hadoopConfiguration()); + FileSystem fs = FSUtils.getFs(s3Prefix + bucket, sparkSession.sparkContext().hadoopConfiguration()); try { if (fs.exists(new Path(filePath))) { cloudFiles.add(filePath);