Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1867] Refractor to add observability and makes extractor extendable [WIP] #3730

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ private String getHealthCheckReport() {
public void execute() {
this.ingestionLatencies.add(this.statsTracker.getMaxIngestionLatency(TimeUnit.MINUTES));
this.consumptionRateMBps.add(this.statsTracker.getConsumptionRateMBps());
log.info("ConsumptionRate " + this.statsTracker.getConsumptionRateMBps());
if (ingestionLatencies.size() < this.slidingWindowSize) {
log.info("SUCCESS: Num observations: {} smaller than {}", ingestionLatencies.size(), this.slidingWindowSize);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ private TopicPartitionStats getNewTopicPartitionStats(Long previousMaxOffset, Lo
//must equal the peak consumption rate.
consumeRate = consumeRate >= 0 ? updateMovingAverage(getPenultimateElement(consumptionRateMBps), consumeRate)
: this.statsTracker.getConsumptionRateMBps();
} else if (consumeRate < this.statsTracker.getConsumptionRateMBps()) {
// Probably meet slow container when last backlogged, so update it to be current consumption rate
consumeRate = this.statsTracker.getConsumptionRateMBps();
}
return new TopicPartitionStats(historicProduceRates, consumeRate, newAvgRecordSize, currentProduceRate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableK
kafkaConsumerClientResolver;
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
private final Map<String, AtomicDouble> consumerMetricsGauges = new ConcurrentHashMap<>();
private final KafkaExtractorStatsTracker statsTracker;
protected final KafkaExtractorStatsTracker statsTracker;
private final KafkaProduceRateTracker produceRateTracker;
private final List<KafkaPartition> partitions;
private final long extractorStatsReportingTimeIntervalMillis;
Expand All @@ -107,16 +107,16 @@ public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableK
private final String recordCreationTimestampFieldName;
private final TimeUnit recordCreationTimestampUnit;

private Iterator<KafkaConsumerRecord> messageIterator = null;
private long readStartTime;
protected Iterator<KafkaConsumerRecord> messageIterator = null;
protected long readStartTime;
private long lastExtractorStatsReportingTime;
private Map<KafkaPartition, Long> latestOffsetMap = Maps.newHashMap();

protected MultiLongWatermark lowWatermark;
protected MultiLongWatermark highWatermark;
protected MultiLongWatermark nextWatermark;
protected Map<Integer, DecodeableKafkaRecord> perPartitionLastSuccessfulRecord;
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
protected final AtomicBoolean shutdownRequested = new AtomicBoolean(false);

@Override
public void shutdown() {
Expand Down Expand Up @@ -352,6 +352,24 @@ public List<Tag<?>> generateTags(State state) {
return tags;
}

protected RecordEnvelope<DecodeableKafkaRecord> decodeRecord (KafkaConsumerRecord kafkaConsumerRecord, int partitionIndex) {
// track time for converting KafkaConsumerRecord to a RecordEnvelope
long decodeStartTime = System.nanoTime();
KafkaPartition topicPartition =
new KafkaPartition.Builder().withTopicName(kafkaConsumerRecord.getTopic()).withId(kafkaConsumerRecord.getPartition()).build();
RecordEnvelope<DecodeableKafkaRecord> recordEnvelope = new RecordEnvelope(kafkaConsumerRecord,
new KafkaWatermark(topicPartition, new LongWatermark(kafkaConsumerRecord.getOffset())));
recordEnvelope.setRecordMetadata("topicPartition", topicPartition);
recordEnvelope.setRecordMetadata(DATASET_KEY, topicPartition.getTopicName());
recordEnvelope.setRecordMetadata(DATASET_PARTITION_KEY, "" + topicPartition.getId());
this.statsTracker.onDecodeableRecord(partitionIndex, readStartTime, decodeStartTime,
kafkaConsumerRecord.getValueSizeInBytes(),
kafkaConsumerRecord.isTimestampLogAppend() ? kafkaConsumerRecord.getTimestamp() : 0L,
(this.recordCreationTimestampFieldName != null) ? kafkaConsumerRecord.getRecordCreationTimestamp(
this.recordCreationTimestampFieldName, this.recordCreationTimestampUnit) : 0L);
return recordEnvelope;
}

/**
* Return the next record. Return null if we're shutdown.
*/
Expand All @@ -363,6 +381,7 @@ public RecordEnvelope<DecodeableKafkaRecord> readRecordEnvelopeImpl() throws IOE
}
this.readStartTime = System.nanoTime();
long fetchStartTime = System.nanoTime();
boolean consumeNewBuffer = false;
try {
DecodeableKafkaRecord kafkaConsumerRecord;
while(true) {
Expand All @@ -375,6 +394,7 @@ public RecordEnvelope<DecodeableKafkaRecord> readRecordEnvelopeImpl() throws IOE
try {
fetchStartTime = System.nanoTime();
this.messageIterator = this.kafkaConsumerClient.consume();
consumeNewBuffer = true;
} catch (Exception e) {
log.error("Failed to consume from Kafka", e);
}
Expand All @@ -393,22 +413,11 @@ public RecordEnvelope<DecodeableKafkaRecord> readRecordEnvelopeImpl() throws IOE
}

int partitionIndex = this.partitionIdToIndexMap.get(kafkaConsumerRecord.getPartition());
this.statsTracker.onFetchNextMessageBuffer(partitionIndex, fetchStartTime);

// track time for converting KafkaConsumerRecord to a RecordEnvelope
long decodeStartTime = System.nanoTime();
KafkaPartition topicPartition =
new KafkaPartition.Builder().withTopicName(kafkaConsumerRecord.getTopic()).withId(kafkaConsumerRecord.getPartition()).build();
RecordEnvelope<DecodeableKafkaRecord> recordEnvelope = new RecordEnvelope(kafkaConsumerRecord,
new KafkaWatermark(topicPartition, new LongWatermark(kafkaConsumerRecord.getOffset())));
recordEnvelope.setRecordMetadata("topicPartition", topicPartition);
recordEnvelope.setRecordMetadata(DATASET_KEY, topicPartition.getTopicName());
recordEnvelope.setRecordMetadata(DATASET_PARTITION_KEY, "" + topicPartition.getId());
this.statsTracker.onDecodeableRecord(partitionIndex, readStartTime, decodeStartTime,
kafkaConsumerRecord.getValueSizeInBytes(),
kafkaConsumerRecord.isTimestampLogAppend() ? kafkaConsumerRecord.getTimestamp() : 0L,
(this.recordCreationTimestampFieldName != null) ? kafkaConsumerRecord.getRecordCreationTimestamp(
this.recordCreationTimestampFieldName, this.recordCreationTimestampUnit) : 0L);
if (consumeNewBuffer) {
this.statsTracker.onFetchNextMessageBuffer(partitionIndex, fetchStartTime);
}

RecordEnvelope<DecodeableKafkaRecord> recordEnvelope = decodeRecord(kafkaConsumerRecord, partitionIndex);
this.perPartitionLastSuccessfulRecord.put(partitionIndex, kafkaConsumerRecord);
this.nextWatermark.set(partitionIndex, kafkaConsumerRecord.getNextOffset());
return recordEnvelope;
Expand All @@ -421,17 +430,19 @@ public RecordEnvelope<DecodeableKafkaRecord> readRecordEnvelopeImpl() throws IOE
}
}

private boolean shouldLogError() {
protected boolean shouldLogError() {
return (this.statsTracker.getUndecodableMessageCount() + this.statsTracker.getNullRecordCount()) <= MAX_LOG_ERRORS;
}

@Override
protected void onFlushAck() throws IOException {
try {
//Refresh the latest offsets of TopicPartitions processed by the KafkaExtractor.
this.latestOffsetMap = this.kafkaConsumerClient.getLatestOffsets(this.partitions);
} catch (KafkaOffsetRetrievalFailureException e) {
log.error("Unable to retrieve latest offsets due to {}", e);
synchronized (this.kafkaConsumerClient) {
try {
//Refresh the latest offsets of TopicPartitions processed by the KafkaExtractor.
this.latestOffsetMap = this.kafkaConsumerClient.getLatestOffsets(this.partitions);
} catch (KafkaOffsetRetrievalFailureException e) {
log.error("Unable to retrieve latest offsets due to {}", e);
}
}
long currentTime = System.currentTimeMillis();
//Update the watermarks to include the current topic partition produce rates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public enum ContainerCapacityComputationStrategy {
private final ContainerCapacityComputationStrategy containerCapacityComputationStrategy;
private final Map<String, KafkaStreamingExtractor.KafkaWatermark> lastCommittedWatermarks = Maps.newHashMap();
private final Map<String, List<Double>> capacitiesByTopic = Maps.newHashMap();
private final Map<String, Long> maxAvgRecordSizeByTopic = Maps.newHashMap();
private final EventSubmitter eventSubmitter;
private SourceState state;

Expand Down Expand Up @@ -274,6 +275,7 @@ private void addStatsToWorkUnits(Map<String, List<WorkUnit>> workUnitsByTopic) t
for (Map.Entry<String, List<WorkUnit>> entry : workUnitsByTopic.entrySet()) {
String topic = entry.getKey();
List<WorkUnit> workUnits = entry.getValue();
long maxAvgSize = -1;
for (WorkUnit workUnit : workUnits) {
int partitionId = Integer.parseInt(workUnit.getProp(KafkaSource.PARTITION_ID));
String topicPartition = new KafkaPartition.Builder().withTopicName(topic).withId(partitionId).build().toString();
Expand All @@ -285,9 +287,13 @@ private void addStatsToWorkUnits(Map<String, List<WorkUnit>> workUnitsByTopic) t
// avgRecordSize is unknown when bootstrapping. so skipping setting this
// and ORC writer will use the default setting for the tunning feature.
if (watermark != null && watermark.getAvgRecordSize() > 0) {
maxAvgSize = Math.max(maxAvgSize, watermark.getAvgRecordSize());
workUnit.setProp(ConfigurationKeys.AVG_RECORD_SIZE, watermark.getAvgRecordSize());
}
}
if (maxAvgSize > 0) {
maxAvgRecordSizeByTopic.put(topic, maxAvgSize);
}
}
}

Expand Down Expand Up @@ -334,7 +340,10 @@ protected List<WorkUnit> squeezeMultiWorkUnits(List<MultiWorkUnit> multiWorkUnit
// Select a sample WU.
WorkUnit indexedWorkUnit = mwu.getWorkUnits().get(0);
List<KafkaPartition> topicPartitions = getPartitionsFromMultiWorkUnit(mwu);

String topic = topicPartitions.get(0).getTopicName();
if (maxAvgRecordSizeByTopic.containsKey(topic)) {
indexedWorkUnit.setProp(ConfigurationKeys.AVG_RECORD_SIZE, maxAvgRecordSizeByTopic.get(topic));
}
// Indexing all topics/partitions into this WU.
populateMultiPartitionWorkUnit(topicPartitions, indexedWorkUnit);

Expand Down