diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
index 273897412b1..28ce9bd46b5 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
@@ -299,7 +299,7 @@ public void registerAll(MetricSet metrics) throws IllegalArgumentException {
@Override
public synchronized boolean remove(String name) {
MetricContext metricContext = this.metricContext.get();
- if (metricContext != null) {
+ if (metricContext != null && this.contextAwareMetrics.get(name) != null) {
metricContext.removeFromMetrics(this.contextAwareMetrics.get(name).getContextAwareMetric());
}
return this.contextAwareMetrics.remove(name) != null && removeChildrenMetrics(name);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 0dd31d274f1..6f26dc3e527 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -135,7 +135,8 @@ public class DagManager extends AbstractIdleService {
// Default job start SLA time if configured, measured in minutes. Default is 10 minutes
private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
-
+ private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+ private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;
/**
* Action to be performed on a {@link Dag}, in case of a job failure. Currently, we allow 2 modes:
*
@@ -188,6 +189,9 @@ public String toString() {
private final boolean instrumentationEnabled;
private DagStateStore dagStateStore;
private Map topologySpecMap;
+ private int houseKeepingThreadInitialDelay = INITIAL_HOUSEKEEPING_THREAD_DELAY;
+ @Getter
+ private ScheduledExecutorService houseKeepingThreadPool;
@Getter
private final Integer numThreads;
@@ -388,7 +392,7 @@ public synchronized void setActive(boolean active) {
topologySpecMap);
Set failedDagIds = Collections.synchronizedSet(failedDagStateStore.getDagIds());
- this.dagManagerMetrics.activate();
+ this.dagManagerMetrics.activate();
UserQuotaManager quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
@@ -405,10 +409,15 @@ public synchronized void setActive(boolean active) {
}
FailedDagRetentionThread failedDagRetentionThread = new FailedDagRetentionThread(failedDagStateStore, failedDagIds, failedDagRetentionTime);
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0, retentionPollingInterval, TimeUnit.MINUTES);
- List> dags = dagStateStore.getDags();
- log.info("Loading " + dags.size() + " dags from dag state store");
- for (Dag dag : dags) {
- addDag(dag, false, false);
+ loadDagFromDagStateStore();
+ this.houseKeepingThreadPool = Executors.newSingleThreadScheduledExecutor();
+ for (int delay = houseKeepingThreadInitialDelay; delay < MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) {
+ this.houseKeepingThreadPool.schedule(() -> {
+ try {
+ loadDagFromDagStateStore();
+ } catch (Exception e ) {
+ log.error("failed to sync dag state store due to ", e);
+ }}, delay, TimeUnit.MINUTES);
}
if (dagActionStore.isPresent()) {
Collection dagActions = dagActionStore.get().getDagActions();
@@ -429,6 +438,7 @@ public synchronized void setActive(boolean active) {
log.info("Inactivating the DagManager. Shutting down all DagManager threads");
this.scheduledExecutorPool.shutdown();
this.dagManagerMetrics.cleanup();
+ this.houseKeepingThreadPool.shutdown();
try {
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
@@ -441,6 +451,16 @@ public synchronized void setActive(boolean active) {
}
}
+ private void loadDagFromDagStateStore() throws IOException {
+ List> dags = dagStateStore.getDags();
+ log.info("Loading " + dags.size() + " dags from dag state store");
+ for (Dag dag : dags) {
+ if (this.isActive) {
+ addDag(dag, false, false);
+ }
+ }
+ }
+
/**
* Each {@link DagManagerThread} performs 2 actions when scheduled:
*
@@ -789,10 +809,10 @@ private void pollAndAdvanceDag() throws IOException, ExecutionException, Interru
submitJob(node);
}
} catch (Exception e) {
- // Error occurred while processing dag, continue processing other dags assigned to this thread
- log.error(String.format("Exception caught in DagManager while processing dag %s due to ",
- DagManagerUtils.getFullyQualifiedDagName(node)), e);
- }
+ // Error occurred while processing dag, continue processing other dags assigned to this thread
+ log.error(String.format("Exception caught in DagManager while processing dag %s due to ",
+ DagManagerUtils.getFullyQualifiedDagName(node)), e);
+ }
}
for (Map.Entry>> entry: nextSubmitted.entrySet()) {
@@ -1170,11 +1190,11 @@ private synchronized void cleanUpDag(String dagId) {
log.info("Cleaning up dagId {}", dagId);
// clears flow event after cancelled job to allow resume event status to be set
this.dags.get(dagId).setFlowEvent(null);
- try {
- this.dagStateStore.cleanUp(dags.get(dagId));
- } catch (IOException ioe) {
- log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe);
- }
+ try {
+ this.dagStateStore.cleanUp(dags.get(dagId));
+ } catch (IOException ioe) {
+ log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe);
+ }
this.dags.remove(dagId);
this.dagToJobs.remove(dagId);
}
@@ -1222,7 +1242,7 @@ public void run() {
}
}
- log.info("Cleaned " + numCleaned + " dags from the failed dag state store");
+ log.info("Cleaned " + numCleaned + " dags from the failed dag state store");
} catch (Exception e) {
log.error("Failed to run retention on failed dag state store", e);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index acf7c031014..a81bf39b126 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -244,8 +244,11 @@ protected static MetricNameRegexFilter getMetricsFilterForDagManager() {
}
public void cleanup() {
- // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton.
- // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement
- RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
+ // Add null check so that unit test will not affect each other when we de-active non-instrumented DagManager
+ if(this.metricContext != null) {
+ // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton.
+ // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement
+ RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
+ }
}
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 59aeda41f71..9366b7a8a8c 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -33,6 +33,7 @@
import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
import org.mockito.Mockito;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -97,6 +98,12 @@ public void setUp() throws Exception {
Assert.assertEquals(dagActionStore.getDagActions().size(), 0);
}
+ @AfterClass
+ public void cleanUp() throws Exception {
+ dagManager.setActive(false);
+ Assert.assertEquals(dagManager.getHouseKeepingThreadPool().isShutdown(), true);
+ }
+
@Test
void testAddDeleteSpec() throws Exception {
long flowExecutionId1 = System.currentTimeMillis();