diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java index 377d4887f6b..b0f123dcc79 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java @@ -137,6 +137,7 @@ private String getHealthCheckReport() { public void execute() { this.ingestionLatencies.add(this.statsTracker.getMaxIngestionLatency(TimeUnit.MINUTES)); this.consumptionRateMBps.add(this.statsTracker.getConsumptionRateMBps()); + log.info("ConsumptionRate " + this.statsTracker.getConsumptionRateMBps()); if (ingestionLatencies.size() < this.slidingWindowSize) { log.info("SUCCESS: Num observations: {} smaller than {}", ingestionLatencies.size(), this.slidingWindowSize); return; diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java index ab88eecabef..aa64c2bdf6f 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java @@ -200,6 +200,9 @@ private TopicPartitionStats getNewTopicPartitionStats(Long previousMaxOffset, Lo //must equal the peak consumption rate. consumeRate = consumeRate >= 0 ? updateMovingAverage(getPenultimateElement(consumptionRateMBps), consumeRate) : this.statsTracker.getConsumptionRateMBps(); + } else if (consumeRate < this.statsTracker.getConsumptionRateMBps()) { + // Probably meet slow container when last backlogged, so update it to be current consumption rate + consumeRate = this.statsTracker.getConsumptionRateMBps(); } return new TopicPartitionStats(historicProduceRates, consumeRate, newAvgRecordSize, currentProduceRate); } diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java index 3fa4d4c28fe..2242c57315d 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java @@ -97,7 +97,7 @@ public class KafkaStreamingExtractor extends FlushingExtractor consumerMetricsGauges = new ConcurrentHashMap<>(); - private final KafkaExtractorStatsTracker statsTracker; + protected final KafkaExtractorStatsTracker statsTracker; private final KafkaProduceRateTracker produceRateTracker; private final List partitions; private final long extractorStatsReportingTimeIntervalMillis; @@ -107,8 +107,8 @@ public class KafkaStreamingExtractor extends FlushingExtractor messageIterator = null; - private long readStartTime; + protected Iterator messageIterator = null; + protected long readStartTime; private long lastExtractorStatsReportingTime; private Map latestOffsetMap = Maps.newHashMap(); @@ -116,7 +116,7 @@ public class KafkaStreamingExtractor extends FlushingExtractor perPartitionLastSuccessfulRecord; - private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + protected final AtomicBoolean shutdownRequested = new AtomicBoolean(false); @Override public void shutdown() { @@ -352,6 +352,24 @@ public List> generateTags(State state) { return tags; } + protected RecordEnvelope decodeRecord (KafkaConsumerRecord kafkaConsumerRecord, int partitionIndex) { + // track time for converting KafkaConsumerRecord to a RecordEnvelope + long decodeStartTime = System.nanoTime(); + KafkaPartition topicPartition = + new KafkaPartition.Builder().withTopicName(kafkaConsumerRecord.getTopic()).withId(kafkaConsumerRecord.getPartition()).build(); + RecordEnvelope recordEnvelope = new RecordEnvelope(kafkaConsumerRecord, + new KafkaWatermark(topicPartition, new LongWatermark(kafkaConsumerRecord.getOffset()))); + recordEnvelope.setRecordMetadata("topicPartition", topicPartition); + recordEnvelope.setRecordMetadata(DATASET_KEY, topicPartition.getTopicName()); + recordEnvelope.setRecordMetadata(DATASET_PARTITION_KEY, "" + topicPartition.getId()); + this.statsTracker.onDecodeableRecord(partitionIndex, readStartTime, decodeStartTime, + kafkaConsumerRecord.getValueSizeInBytes(), + kafkaConsumerRecord.isTimestampLogAppend() ? kafkaConsumerRecord.getTimestamp() : 0L, + (this.recordCreationTimestampFieldName != null) ? kafkaConsumerRecord.getRecordCreationTimestamp( + this.recordCreationTimestampFieldName, this.recordCreationTimestampUnit) : 0L); + return recordEnvelope; + } + /** * Return the next record. Return null if we're shutdown. */ @@ -363,6 +381,7 @@ public RecordEnvelope readRecordEnvelopeImpl() throws IOE } this.readStartTime = System.nanoTime(); long fetchStartTime = System.nanoTime(); + boolean consumeNewBuffer = false; try { DecodeableKafkaRecord kafkaConsumerRecord; while(true) { @@ -375,6 +394,7 @@ public RecordEnvelope readRecordEnvelopeImpl() throws IOE try { fetchStartTime = System.nanoTime(); this.messageIterator = this.kafkaConsumerClient.consume(); + consumeNewBuffer = true; } catch (Exception e) { log.error("Failed to consume from Kafka", e); } @@ -393,22 +413,11 @@ public RecordEnvelope readRecordEnvelopeImpl() throws IOE } int partitionIndex = this.partitionIdToIndexMap.get(kafkaConsumerRecord.getPartition()); - this.statsTracker.onFetchNextMessageBuffer(partitionIndex, fetchStartTime); - - // track time for converting KafkaConsumerRecord to a RecordEnvelope - long decodeStartTime = System.nanoTime(); - KafkaPartition topicPartition = - new KafkaPartition.Builder().withTopicName(kafkaConsumerRecord.getTopic()).withId(kafkaConsumerRecord.getPartition()).build(); - RecordEnvelope recordEnvelope = new RecordEnvelope(kafkaConsumerRecord, - new KafkaWatermark(topicPartition, new LongWatermark(kafkaConsumerRecord.getOffset()))); - recordEnvelope.setRecordMetadata("topicPartition", topicPartition); - recordEnvelope.setRecordMetadata(DATASET_KEY, topicPartition.getTopicName()); - recordEnvelope.setRecordMetadata(DATASET_PARTITION_KEY, "" + topicPartition.getId()); - this.statsTracker.onDecodeableRecord(partitionIndex, readStartTime, decodeStartTime, - kafkaConsumerRecord.getValueSizeInBytes(), - kafkaConsumerRecord.isTimestampLogAppend() ? kafkaConsumerRecord.getTimestamp() : 0L, - (this.recordCreationTimestampFieldName != null) ? kafkaConsumerRecord.getRecordCreationTimestamp( - this.recordCreationTimestampFieldName, this.recordCreationTimestampUnit) : 0L); + if (consumeNewBuffer) { + this.statsTracker.onFetchNextMessageBuffer(partitionIndex, fetchStartTime); + } + + RecordEnvelope recordEnvelope = decodeRecord(kafkaConsumerRecord, partitionIndex); this.perPartitionLastSuccessfulRecord.put(partitionIndex, kafkaConsumerRecord); this.nextWatermark.set(partitionIndex, kafkaConsumerRecord.getNextOffset()); return recordEnvelope; @@ -421,17 +430,19 @@ public RecordEnvelope readRecordEnvelopeImpl() throws IOE } } - private boolean shouldLogError() { + protected boolean shouldLogError() { return (this.statsTracker.getUndecodableMessageCount() + this.statsTracker.getNullRecordCount()) <= MAX_LOG_ERRORS; } @Override protected void onFlushAck() throws IOException { - try { - //Refresh the latest offsets of TopicPartitions processed by the KafkaExtractor. - this.latestOffsetMap = this.kafkaConsumerClient.getLatestOffsets(this.partitions); - } catch (KafkaOffsetRetrievalFailureException e) { - log.error("Unable to retrieve latest offsets due to {}", e); + synchronized (this.kafkaConsumerClient) { + try { + //Refresh the latest offsets of TopicPartitions processed by the KafkaExtractor. + this.latestOffsetMap = this.kafkaConsumerClient.getLatestOffsets(this.partitions); + } catch (KafkaOffsetRetrievalFailureException e) { + log.error("Unable to retrieve latest offsets due to {}", e); + } } long currentTime = System.currentTimeMillis(); //Update the watermarks to include the current topic partition produce rates diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java index 62cb1844795..4dfea750317 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java @@ -141,6 +141,7 @@ public enum ContainerCapacityComputationStrategy { private final ContainerCapacityComputationStrategy containerCapacityComputationStrategy; private final Map lastCommittedWatermarks = Maps.newHashMap(); private final Map> capacitiesByTopic = Maps.newHashMap(); + private final Map maxAvgRecordSizeByTopic = Maps.newHashMap(); private final EventSubmitter eventSubmitter; private SourceState state; @@ -274,6 +275,7 @@ private void addStatsToWorkUnits(Map> workUnitsByTopic) t for (Map.Entry> entry : workUnitsByTopic.entrySet()) { String topic = entry.getKey(); List workUnits = entry.getValue(); + long maxAvgSize = -1; for (WorkUnit workUnit : workUnits) { int partitionId = Integer.parseInt(workUnit.getProp(KafkaSource.PARTITION_ID)); String topicPartition = new KafkaPartition.Builder().withTopicName(topic).withId(partitionId).build().toString(); @@ -285,9 +287,13 @@ private void addStatsToWorkUnits(Map> workUnitsByTopic) t // avgRecordSize is unknown when bootstrapping. so skipping setting this // and ORC writer will use the default setting for the tunning feature. if (watermark != null && watermark.getAvgRecordSize() > 0) { + maxAvgSize = Math.max(maxAvgSize, watermark.getAvgRecordSize()); workUnit.setProp(ConfigurationKeys.AVG_RECORD_SIZE, watermark.getAvgRecordSize()); } } + if (maxAvgSize > 0) { + maxAvgRecordSizeByTopic.put(topic, maxAvgSize); + } } } @@ -334,7 +340,10 @@ protected List squeezeMultiWorkUnits(List multiWorkUnit // Select a sample WU. WorkUnit indexedWorkUnit = mwu.getWorkUnits().get(0); List topicPartitions = getPartitionsFromMultiWorkUnit(mwu); - + String topic = topicPartitions.get(0).getTopicName(); + if (maxAvgRecordSizeByTopic.containsKey(topic)) { + indexedWorkUnit.setProp(ConfigurationKeys.AVG_RECORD_SIZE, maxAvgRecordSizeByTopic.get(topic)); + } // Indexing all topics/partitions into this WU. populateMultiPartitionWorkUnit(topicPartitions, indexedWorkUnit);