From 198ad7f36ebc40753d43953233452336aae353fb Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Tue, 24 Oct 2023 14:57:10 -0700 Subject: [PATCH 1/2] Skip over null dag actions from malformed messages --- .../service/monitoring/DagActionStoreChangeMonitor.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 6855ca66910..9b0642ed297 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -123,6 +123,12 @@ protected void processMessage(DecodeableKafkaRecord message) { String flowName = value.getFlowName(); String flowExecutionId = value.getFlowExecutionId(); + if (value.getDagAction() == null) { + log.warn("Skipping null dag action type received for flow group: {} name: {} executionId: {} tid: {} operation: " + + "{}", flowGroup, flowName, flowExecutionId, tid, operation); + this.messageFilteredOutMeter.mark(); + return; + } DagActionStore.FlowActionType dagActionType = DagActionStore.FlowActionType.valueOf(value.getDagAction().toString()); produceToConsumeDelayValue = calcMillisSince(produceTimestamp); From 0ab052a97dbc45163989fc3b5e8e1cffd636918a Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Tue, 24 Oct 2023 15:32:30 -0700 Subject: [PATCH 2/2] Add new metric for skipped messages --- .../org/apache/gobblin/runtime/metrics/RuntimeMetrics.java | 1 + .../service/monitoring/DagActionStoreChangeMonitor.java | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java index dc4e26e9185..86adb467127 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java @@ -48,6 +48,7 @@ public class RuntimeMetrics { public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked"; public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed"; public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED = DAG_ACTION_STORE_MONITOR_PREFIX + ".malformedMessagedSkipped"; public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked"; public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched"; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 9b0642ed297..b05e7310521 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -62,6 +62,7 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer { private ContextAwareMeter unexpectedErrors; private ContextAwareMeter messageProcessedMeter; private ContextAwareMeter messageFilteredOutMeter; + private ContextAwareMeter malformedMessagesSkippedMeter; private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge private volatile Long produceToConsumeDelayValue = -1L; @@ -126,7 +127,7 @@ protected void processMessage(DecodeableKafkaRecord message) { if (value.getDagAction() == null) { log.warn("Skipping null dag action type received for flow group: {} name: {} executionId: {} tid: {} operation: " + "{}", flowGroup, flowName, flowExecutionId, tid, operation); - this.messageFilteredOutMeter.mark(); + this.malformedMessagesSkippedMeter.mark(); return; } DagActionStore.FlowActionType dagActionType = DagActionStore.FlowActionType.valueOf(value.getDagAction().toString()); @@ -231,6 +232,7 @@ protected void createMetrics() { this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS); this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED); this.messageFilteredOutMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT); + this.malformedMessagesSkippedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED); this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue); this.getMetricContext().register(this.produceToConsumeDelayMillis); }