Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.gobblin.kafka.client.GobblinConsumerRebalanceListener;
import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
Expand Down Expand Up @@ -101,6 +102,7 @@ public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
private final ScheduledExecutorService consumerExecutor;
private final ExecutorService queueExecutor;
private final BlockingQueue[] queues;
private ContextAwareGauge[] queueSizeGauges;
private final AtomicInteger recordsProcessed;
private final Map<KafkaPartition, Long> partitionOffsetsToCommit;
private final boolean enableAutoCommit;
Expand All @@ -127,7 +129,7 @@ public HighLevelConsumer(String topic, Config config, int numThreads) {
this.consumerExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("HighLevelConsumerThread")));
this.queueExecutor = Executors.newFixedThreadPool(this.numThreads, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("QueueProcessor-%d")));
this.queues = new LinkedBlockingQueue[numThreads];
for(int i=0; i<queues.length; i++) {
for(int i = 0; i < queues.length; i++) {
this.queues[i] = new LinkedBlockingQueue();
}
this.recordsProcessed = new AtomicInteger(0);
Expand Down Expand Up @@ -196,7 +198,24 @@ protected void shutdownMetrics() throws IOException {
* this method to instantiate their own metrics.
*/
protected void createMetrics() {
this.messagesRead = this.metricContext.counter(RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
String prefix = getMetricsPrefix();
this.messagesRead = this.metricContext.counter(prefix +
RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
this.queueSizeGauges = new ContextAwareGauge[numThreads];
for (int i = 0; i < numThreads; i++) {
// An 'effectively' final variable is needed inside the lambda expression below
int finalI = i;
Comment thread
umustafi marked this conversation as resolved.
this.queueSizeGauges[i] = this.metricContext.newContextAwareGauge(prefix +
RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_QUEUE_SIZE_PREFIX + "-" + i,
() -> queues[finalI].size());
}
}

/**
* Used by child classes to distinguish prefixes from one another
*/
protected String getMetricsPrefix() {
return "";
}

/**
Expand Down Expand Up @@ -237,6 +256,7 @@ protected void startUp() {
private void consume() {
try {
Iterator<KafkaConsumerRecord> itr = gobblinKafkaConsumerClient.consume();
// TODO: we may be committing too early and only want to commit after process messages
if(!enableAutoCommit) {
commitOffsets();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class RuntimeMetrics {
// Metric names
public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ =
"gobblin.kafka.highLevelConsumer.messagesRead";
public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_QUEUE_SIZE_PREFIX = "gobblin.kafka.highLevelConsumer.queueSize";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_TOTAL_SPECS = "gobblin.jobMonitor.kafka.totalSpecs";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_NEW_SPECS = "gobblin.jobMonitor.kafka.newSpecs";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_UPDATED_SPECS = "gobblin.jobMonitor.kafka.updatedSpecs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
Expand Down Expand Up @@ -217,7 +216,8 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName) {

@Override
protected void createMetrics() {
super.messagesRead = this.getMetricContext().counter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
super.createMetrics();
// Dag Action specific metrics
this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
this.flowsLaunched = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);
Expand All @@ -228,4 +228,9 @@ protected void createMetrics() {
this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue);
this.getMetricContext().register(this.produceToConsumeDelayMillis);
}

@Override
protected String getMetricsPrefix() {
return RuntimeMetrics.DAG_ACTION_STORE_MONITOR_PREFIX + ".";
}
}