From b144b9439a722bad5aabc2367ee893d709c4675b Mon Sep 17 00:00:00 2001 From: William Lo Date: Fri, 30 Jun 2023 18:31:50 -0400 Subject: [PATCH 1/5] Add tags to dagmanager metrics for extensibility --- .../gobblin/metrics/MetricTagNames.java | 22 +++++++++++++++++++ .../modules/orchestration/DagManager.java | 2 +- .../orchestration/DagManagerMetrics.java | 12 ++++++++++ 3 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java new file mode 100644 index 00000000000..bce085d2b5e --- /dev/null +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics; + +public class MetricTagNames { + public static final String METRIC_BACKEND_REPRESENTATION = "metricBackendRepresentation"; +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 80da8a9e99d..acbf9f71da8 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -230,7 +230,7 @@ public DagManager(Config config, JobStatusRetriever jobStatusRetriever, boolean } else { this.eventSubmitter = Optional.absent(); } - this.dagManagerMetrics = new DagManagerMetrics(metricContext); + this.dagManagerMetrics = new DagManagerMetrics(); TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT)); this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME)); this.jobStatusRetriever = jobStatusRetriever; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java index e9e605b0661..13eebcca126 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java @@ -28,16 +28,21 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareMeter; +import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.MetricTagNames; import org.apache.gobblin.metrics.RootMetricContext; import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.metrics.metric.filter.MetricNameRegexFilter; import org.apache.gobblin.service.FlowId; import org.apache.gobblin.service.RequesterService; @@ -75,6 +80,13 @@ public DagManagerMetrics(MetricContext metricContext) { this.metricContext = metricContext; } + public DagManagerMetrics() { + // Create a new metric context for the DagManagerMetrics tagged appropriately + List> tags = new ArrayList<>(); + tags.add(new Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION, GobblinMetrics.MetricType.COUNTER)); + this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), this.getClass(), tags); + } + public void activate() { if (this.metricContext != null) { allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, From 67b2dab5c75fa6de010d3ab4f93d24c58b2f91c1 Mon Sep 17 00:00:00 2001 From: William Lo Date: Sun, 9 Jul 2023 13:24:42 -0400 Subject: [PATCH 2/5] Fix concurrency bug in test --- .../service/modules/orchestration/DagManagerTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java index 0a572cf26f3..fe511d4f89d 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java @@ -709,6 +709,9 @@ public void testResumeCancelledDag() throws URISyntaxException, IOException { @Test (dependsOnMethods = "testResumeCancelledDag") public void testJobStartSLAKilledDag() throws URISyntaxException, IOException { + String slakilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER); + long slaKilledMeterCount = metricContext.getParent().get().getMeters().get(slakilledMeterName) == null? 0 : + metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(); long flowExecutionId = System.currentTimeMillis(); String flowGroupId = "0"; String flowGroup = "group" + flowGroupId; @@ -780,8 +783,7 @@ public void testJobStartSLAKilledDag() throws URISyntaxException, IOException { Assert.assertEquals(this.dagToJobs.size(), 1); Assert.assertTrue(this.dags.containsKey(dagId1)); - String slakilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER); - Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(), 1); + Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(), slaKilledMeterCount + 1); // Cleanup this._dagManagerThread.run(); From 6811c89db5eeec36de76c2f0568a87cc73b9baa9 Mon Sep 17 00:00:00 2001 From: William Lo Date: Sun, 9 Jul 2023 16:40:26 -0400 Subject: [PATCH 3/5] Add job level metrics in dagmanager test --- .../service/modules/orchestration/DagManagerTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java index fe511d4f89d..2babd068311 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java @@ -1164,10 +1164,13 @@ public void testJobSlaKilledMetrics() throws URISyntaxException, IOException { Config executorOneConfig = ConfigFactory.empty() .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorOne")) .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId)) - .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10)); + .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10)) + .withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, ConfigValueFactory.fromAnyRef(true)); Config executorTwoConfig = ConfigFactory.empty() .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorTwo")) - .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10)); + .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10)) + .withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, ConfigValueFactory.fromAnyRef(true)); + List> dagList = buildDagList(2, "newUser", executorOneConfig); dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "newUser", executorTwoConfig)); @@ -1231,7 +1234,6 @@ public void testJobSlaKilledMetrics() throws URISyntaxException, IOException { String slakilledMeterName2 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER); String failedFlowGauge = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group1","flow1", ServiceMetricNames.RUNNING_STATUS); - String slakilledGroupName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER); Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(), 2); Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(), 1); // Cleanup From 80508311834fbe940e6826ffac1439abcc0ca1ac Mon Sep 17 00:00:00 2001 From: William Lo Date: Mon, 10 Jul 2023 12:34:20 -0400 Subject: [PATCH 4/5] Test not cleaning dm threads --- .../service/modules/orchestration/DagManagerMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java index 13eebcca126..1e048925716 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java @@ -264,7 +264,7 @@ public void cleanup() { if(this.metricContext != null) { // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton. // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement - RootMetricContext.get().removeMatching(getMetricsFilterForDagManager()); +// RootMetricContext.get().removeMatching(getMetricsFilterForDagManager()); } } } From a550c323c592a1dc7949ceb73d7db84049b5974d Mon Sep 17 00:00:00 2001 From: William Lo Date: Mon, 10 Jul 2023 13:02:12 -0400 Subject: [PATCH 5/5] Only cleanup metrics if threads started by the dagmanager --- .../service/modules/orchestration/DagManagerMetrics.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java index 1e048925716..a5f34cff7f2 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java @@ -33,6 +33,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.GobblinMetricsKeys; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.ContextAwareGauge; @@ -261,10 +262,10 @@ protected static MetricNameRegexFilter getMetricsFilterForDagManager() { public void cleanup() { // Add null check so that unit test will not affect each other when we de-active non-instrumented DagManager - if(this.metricContext != null) { + if(this.metricContext != null && this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName())) { // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton. // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement -// RootMetricContext.get().removeMatching(getMetricsFilterForDagManager()); + RootMetricContext.get().removeMatching(getMetricsFilterForDagManager()); } } }