diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java index 3fa4d4c28fe..8320a82ec06 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java @@ -87,6 +87,11 @@ public class KafkaStreamingExtractor extends FlushingExtractor { public static final String DATASET_KEY = "dataset"; public static final String DATASET_PARTITION_KEY = "datasetPartition"; + public static final String MAX_KAFKA_BUFFER_SIZE_IN_BYTES = "kafka.streaming.max.kafka.buffer.size.in.bytes"; + public static final Long DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES = Long.valueOf(50 * 1024 * 1024); + // Max number of records to be pulled in single polling. + private static final String KAFKA_MAX_POLL_RECORDS_KEY = "kafka.consumer.maxPollRecords"; + private static final int DEFAULT_MAX_POLL_RECORDS = 100; private static final Long MAX_LOG_ERRORS = 100L; private static final String KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY = @@ -115,6 +120,7 @@ public class KafkaStreamingExtractor extends FlushingExtractor perPartitionLastSuccessfulRecord; private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); @@ -214,6 +220,19 @@ public LongWatermark getLwm() { public KafkaStreamingExtractor(WorkUnitState state) { super(state); + this.topicPartitions = getTopicPartitionsFromWorkUnit(state); + Map topicPartitionWatermarks = getTopicPartitionWatermarks(this.topicPartitions); + if (this.maxAvgRecordSize > 0 ) { + long maxPollRecords = + state.getPropAsLong(MAX_KAFKA_BUFFER_SIZE_IN_BYTES, DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES) / maxAvgRecordSize; + maxPollRecords = Math.min(maxPollRecords, state.getPropAsInt(KAFKA_MAX_POLL_RECORDS_KEY, DEFAULT_MAX_POLL_RECORDS)); + state.setProp(KAFKA_MAX_POLL_RECORDS_KEY, maxPollRecords); + log.info("set max.poll.records to be " + maxPollRecords); + } else { + // As there is no avg record size available, using lower number to make sure we don't hit OOM issue + state.setProp(KAFKA_MAX_POLL_RECORDS_KEY, DEFAULT_MAX_POLL_RECORDS); + log.info("set max.poll.records to be {}", DEFAULT_MAX_POLL_RECORDS); + } this.kafkaConsumerClientResolver = new ClassAliasResolver<>(GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory.class); try { @@ -229,8 +248,7 @@ public KafkaStreamingExtractor(WorkUnitState state) { this._schemaRegistry = state.contains(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS) ? Optional.of( KafkaSchemaRegistry.get(state.getProperties())) : Optional.>absent(); - this.topicPartitions = getTopicPartitionsFromWorkUnit(state); - this.kafkaConsumerClient.assignAndSeek(topicPartitions, getTopicPartitionWatermarks(this.topicPartitions)); + this.kafkaConsumerClient.assignAndSeek(topicPartitions, topicPartitionWatermarks); this.messageIterator = this.kafkaConsumerClient.consume(); this.partitions = KafkaUtils.getPartitions(state); @@ -292,6 +310,7 @@ private Map getTopicPartitionWatermarks(List