Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@
public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableKafkaRecord> {
public static final String DATASET_KEY = "dataset";
public static final String DATASET_PARTITION_KEY = "datasetPartition";
public static final String MAX_KAFKA_BUFFER_SIZE_IN_BYTES = "kafka.streaming.max.kafka.buffer.size.in.bytes";
public static final Long DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES = Long.valueOf(50 * 1024 * 1024);
// Max number of records to be pulled in single polling.
private static final String KAFKA_MAX_POLL_RECORDS_KEY = "kafka.consumer.maxPollRecords";
private static final int DEFAULT_MAX_POLL_RECORDS = 100;
private static final Long MAX_LOG_ERRORS = 100L;

private static final String KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY =
Expand Down Expand Up @@ -115,6 +120,7 @@ public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableK
protected MultiLongWatermark lowWatermark;
protected MultiLongWatermark highWatermark;
protected MultiLongWatermark nextWatermark;
protected long maxAvgRecordSize = -1;
protected Map<Integer, DecodeableKafkaRecord> perPartitionLastSuccessfulRecord;
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);

Expand Down Expand Up @@ -214,6 +220,19 @@ public LongWatermark getLwm() {

public KafkaStreamingExtractor(WorkUnitState state) {
super(state);
this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
Map<KafkaPartition, LongWatermark> topicPartitionWatermarks = getTopicPartitionWatermarks(this.topicPartitions);
if (this.maxAvgRecordSize > 0 ) {
long maxPollRecords =
state.getPropAsLong(MAX_KAFKA_BUFFER_SIZE_IN_BYTES, DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES) / maxAvgRecordSize;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the intuition behind a 50 MB default max buffer size?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it have to do with

source.kafka.consumerConfig.receive.buffer.bytes=655360

in gobblin-kafka-jobs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no relationship between those two config. The value should not be super small as we need to guarantee the performance, and not super large as we want to avoid OOM issue. So I gave 50MB here as default. And this should only take effect when record size are large, for normal topic, it should fall back to use KAFKA_MAX_POLL_RECORDS_KEY

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I think 50MB seems like a good default.

Here's an example of how the throughput would look for some higher volume topics. All topics under 5kb would be able to do 1000 records per second (pageviewevent is only ~800bytes and URE is ~3MB).

maxPollRecords = Math.min(maxPollRecords, state.getPropAsInt(KAFKA_MAX_POLL_RECORDS_KEY, DEFAULT_MAX_POLL_RECORDS));
state.setProp(KAFKA_MAX_POLL_RECORDS_KEY, maxPollRecords);
log.info("set max.poll.records to be " + maxPollRecords);
} else {
// As there is no avg record size available, using lower number to make sure we don't hit OOM issue
state.setProp(KAFKA_MAX_POLL_RECORDS_KEY, DEFAULT_MAX_POLL_RECORDS);
log.info("set max.poll.records to be {}", DEFAULT_MAX_POLL_RECORDS);
}
this.kafkaConsumerClientResolver =
new ClassAliasResolver<>(GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory.class);
try {
Expand All @@ -229,8 +248,7 @@ public KafkaStreamingExtractor(WorkUnitState state) {
this._schemaRegistry = state.contains(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS) ? Optional.of(
KafkaSchemaRegistry.<String, S>get(state.getProperties())) : Optional.<KafkaSchemaRegistry<String, S>>absent();

this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
this.kafkaConsumerClient.assignAndSeek(topicPartitions, getTopicPartitionWatermarks(this.topicPartitions));
this.kafkaConsumerClient.assignAndSeek(topicPartitions, topicPartitionWatermarks);
this.messageIterator = this.kafkaConsumerClient.consume();

this.partitions = KafkaUtils.getPartitions(state);
Expand Down Expand Up @@ -292,6 +310,7 @@ private Map<KafkaPartition, LongWatermark> getTopicPartitionWatermarks(List<Kafk
if (kafkaWatermarkMap.containsKey(topicPartitionString)) {
LongWatermark longWatermark = ((KafkaWatermark) kafkaWatermarkMap.get(topicPartitionString)).getLwm();
longWatermarkMap.put(topicPartition, longWatermark);
maxAvgRecordSize = Math.max(maxAvgRecordSize, ((KafkaWatermark) kafkaWatermarkMap.get(topicPartitionString)).getAvgRecordSize());
} else {
longWatermarkMap.put(topicPartition, new LongWatermark(0L));
}
Expand Down