Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class RuntimeMetrics {
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED = DAG_ACTION_STORE_MONITOR_PREFIX + ".malformedMessagedSkipped";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
private ContextAwareMeter unexpectedErrors;
private ContextAwareMeter messageProcessedMeter;
private ContextAwareMeter messageFilteredOutMeter;
private ContextAwareMeter malformedMessagesSkippedMeter;
private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge

private volatile Long produceToConsumeDelayValue = -1L;
Expand Down Expand Up @@ -123,6 +124,12 @@ protected void processMessage(DecodeableKafkaRecord message) {
String flowName = value.getFlowName();
String flowExecutionId = value.getFlowExecutionId();

if (value.getDagAction() == null) {
log.warn("Skipping null dag action type received for flow group: {} name: {} executionId: {} tid: {} operation: "
+ "{}", flowGroup, flowName, flowExecutionId, tid, operation);
this.malformedMessagesSkippedMeter.mark();
return;
}
DagActionStore.FlowActionType dagActionType = DagActionStore.FlowActionType.valueOf(value.getDagAction().toString());

produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
Expand Down Expand Up @@ -225,6 +232,7 @@ protected void createMetrics() {
this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
this.messageFilteredOutMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT);
this.malformedMessagesSkippedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED);
this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue);
this.getMetricContext().register(this.produceToConsumeDelayMillis);
}
Expand Down