Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ static class Config {
// control whether to filter the s3 objects starting with this prefix
static final String S3_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.key.prefix";
static final String S3_FS_PREFIX = "hoodie.deltastreamer.source.s3incr.fs.prefix";

// control whether to ignore the s3 objects starting with this prefix
static final String S3_IGNORE_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.ignore.key.prefix";
}

public S3EventsHoodieIncrSource(
Expand Down Expand Up @@ -110,6 +113,9 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) {
filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'";
}
if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX))) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apparently S3 incremental source does not have good tests. I have filed a tracking jira here. Will take it as a follow up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add the tests once #4334 gets merged.

filter = filter + " and s3.object.key not like '" + props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'";
}

String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
String s3Prefix = s3FS + "://";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public class CloudObjectsSelector {
static final String S3_FILE_PATH = "filePath";
public final String queueUrl;
public final int longPollWait;
public final int maxMessagesPerRequest;
public final int maxMessagePerBatch;
public final int maxMessagesPerRequest;
public final int visibilityTimeout;
public final TypedProperties props;
public final String fsName;
Expand All @@ -84,8 +84,8 @@ public CloudObjectsSelector(TypedProperties props) {
this.fsName = props.getString(Config.S3_SOURCE_QUEUE_FS, "s3").toLowerCase();
this.longPollWait = props.getInteger(Config.S3_QUEUE_LONG_POLL_WAIT, 20);
this.maxMessagePerBatch = props.getInteger(Config.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH, 5);
this.maxMessagesPerRequest = props.getInteger(Config.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST, 10);
this.visibilityTimeout = props.getInteger(Config.S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT, 30);
this.maxMessagesPerRequest = 10;
}

/**
Expand Down Expand Up @@ -264,6 +264,12 @@ public static class Config {
public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH =
HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.max.messages.per.batch";

/**
* {@value #S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST} is max messages for each request.
*/
public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST =
HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.max.messages.per.request";

/**
* {@value #S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT} is visibility timeout for messages in queue. After we
* consume the message, queue will move the consumed messages to in-flight state, these messages
Expand Down