Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.metrics;

public class MetricTagNames {
public static final String METRIC_BACKEND_REPRESENTATION = "metricBackendRepresentation";
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public DagManager(Config config, JobStatusRetriever jobStatusRetriever, boolean
} else {
this.eventSubmitter = Optional.absent();
}
this.dagManagerMetrics = new DagManagerMetrics(metricContext);
this.dagManagerMetrics = new DagManagerMetrics();
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,22 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.GobblinMetricsKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.MetricTagNames;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.metric.filter.MetricNameRegexFilter;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.RequesterService;
Expand Down Expand Up @@ -75,6 +81,13 @@ public DagManagerMetrics(MetricContext metricContext) {
this.metricContext = metricContext;
}

public DagManagerMetrics() {
// Create a new metric context for the DagManagerMetrics tagged appropriately
List<Tag<?>> tags = new ArrayList<>();
tags.add(new Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION, GobblinMetrics.MetricType.COUNTER));
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), this.getClass(), tags);
}

public void activate() {
if (this.metricContext != null) {
allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
Expand Down Expand Up @@ -249,7 +262,7 @@ protected static MetricNameRegexFilter getMetricsFilterForDagManager() {

public void cleanup() {
// Add null check so that unit test will not affect each other when we de-active non-instrumented DagManager
if(this.metricContext != null) {
if(this.metricContext != null && this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName())) {
// The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton.
// To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement
RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,9 @@ public void testResumeCancelledDag() throws URISyntaxException, IOException {

@Test (dependsOnMethods = "testResumeCancelledDag")
public void testJobStartSLAKilledDag() throws URISyntaxException, IOException {
String slakilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
long slaKilledMeterCount = metricContext.getParent().get().getMeters().get(slakilledMeterName) == null? 0 :
metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount();
long flowExecutionId = System.currentTimeMillis();
String flowGroupId = "0";
String flowGroup = "group" + flowGroupId;
Expand Down Expand Up @@ -780,8 +783,7 @@ public void testJobStartSLAKilledDag() throws URISyntaxException, IOException {
Assert.assertEquals(this.dagToJobs.size(), 1);
Assert.assertTrue(this.dags.containsKey(dagId1));

String slakilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(), 1);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(), slaKilledMeterCount + 1);

// Cleanup
this._dagManagerThread.run();
Expand Down Expand Up @@ -1162,10 +1164,13 @@ public void testJobSlaKilledMetrics() throws URISyntaxException, IOException {
Config executorOneConfig = ConfigFactory.empty()
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorOne"))
.withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId))
.withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10));
.withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10))
.withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, ConfigValueFactory.fromAnyRef(true));
Config executorTwoConfig = ConfigFactory.empty()
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorTwo"))
.withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10));
.withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10))
.withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, ConfigValueFactory.fromAnyRef(true));

List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "newUser", executorOneConfig);
dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "newUser", executorTwoConfig));

Expand Down Expand Up @@ -1229,7 +1234,6 @@ public void testJobSlaKilledMetrics() throws URISyntaxException, IOException {
String slakilledMeterName2 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
String failedFlowGauge = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group1","flow1", ServiceMetricNames.RUNNING_STATUS);

String slakilledGroupName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(), 2);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(), 1);
// Cleanup
Expand Down