Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
import org.apache.gobblin.util.TaskEventMetadataUtils;

Expand All @@ -56,6 +57,7 @@ public class KafkaExtractorStatsTracker {

private static final String EMPTY_STRING = "";
private static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka";
private static final String KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME = "KafkaExtractorContainerTransitionEvent";
private static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME = "KafkaExtractorTopicMetadata";
private static final String LOW_WATERMARK = "lowWatermark";
private static final String ACTUAL_HIGH_WATERMARK = "actualHighWatermark";
Expand Down Expand Up @@ -497,6 +499,21 @@ public void emitTrackingEvents(MetricContext context, Map<KafkaPartition, Map<St
}
}

/**
* Emit Tracking events reporting the topic partition information this extractor handled to be consumed by a monitoring application.
* @param context the current {@link MetricContext}
*/
public void submitEventToIndicateContainerTransition(MetricContext context) {
for (int i = 0; i < this.partitions.size(); i++) {
KafkaPartition partitionKey = this.partitions.get(i);
GobblinEventBuilder gobblinEventBuilder = new GobblinEventBuilder(KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME, GOBBLIN_KAFKA_NAMESPACE);
gobblinEventBuilder.addMetadata(TOPIC, partitionKey.getTopicName());
gobblinEventBuilder.addMetadata(PARTITION, Integer.toString(partitionKey.getId()));
gobblinEventBuilder.addAdditionalMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState, KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME));
EventSubmitter.submit(context, gobblinEventBuilder);
}
}

/**
* A helper function to merge tags for KafkaPartition. Separate into a package-private method for ease of testing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ public KafkaStreamingExtractor(WorkUnitState state) {
state.getPropAsLong(KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY,
DEFAULT_KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES) * 60 * 1000;
resetExtractorStatsAndWatermarks(true);
//Even though we haven't start ingesting yet, emit event to indicate the container transition.
submitEventToIndicateContainerTransition();

//Schedule a thread for reporting Kafka consumer metrics
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
Expand All @@ -271,6 +273,12 @@ public KafkaStreamingExtractor(WorkUnitState state) {
this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name()));
}

private void submitEventToIndicateContainerTransition() {
if (this.isInstrumentationEnabled()) {
this.statsTracker.submitEventToIndicateContainerTransition(getMetricContext());
}
}

private Map<KafkaPartition, LongWatermark> getTopicPartitionWatermarks(List<KafkaPartition> topicPartitions) {
List<String> topicPartitionStrings =
topicPartitions.stream().map(topicPartition -> topicPartition.toString()).collect(Collectors.toList());
Expand Down