From 85a6ddcb0d893e0976b6dc2738fd811c76f7e1c5 Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Fri, 25 Mar 2022 10:22:56 -0700 Subject: [PATCH 1/5] address comments --- .../source/extractor/extract/restapi/RestApiConnector.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java index 6cdc8b7741a..73fcd05a119 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java @@ -149,6 +149,8 @@ protected HttpClient getHttpClient() { .createClient(); if (httpClient instanceof Closeable) { this.closer.register((Closeable)httpClient); + } else { + log.warn("httpClient is not closable, we will not be able to handle the resources close, please make sure the implementation handle it correctly"); } } return this.httpClient; From 0043a9fd11dc14687afe1cc1d2426f9e3168c882 Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Fri, 25 Mar 2022 10:53:41 -0700 Subject: [PATCH 2/5] use connectionmanager when httpclient is not cloesable --- .../source/extractor/extract/restapi/RestApiConnector.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java index 73fcd05a119..6cdc8b7741a 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java @@ -149,8 +149,6 @@ protected HttpClient getHttpClient() { .createClient(); if (httpClient instanceof Closeable) { this.closer.register((Closeable)httpClient); - } else { - log.warn("httpClient is not closable, we will not be able to handle the resources close, please make sure the implementation handle it correctly"); } } return this.httpClient; From 9e80276a70a02be78d3176daaab4b63a1d1cab1d Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Thu, 2 Feb 2023 14:57:11 -0800 Subject: [PATCH 3/5] [GOBBLIN-1778] Add house keeping thread in DagManager to periodically sync in memory state with mysql table --- .../gobblin/metrics/InnerMetricContext.java | 2 +- .../modules/orchestration/DagManager.java | 50 +++++++++++++------ .../orchestration/DagManagerFlowTest.java | 7 +++ 3 files changed, 43 insertions(+), 16 deletions(-) diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java index 273897412b1..28ce9bd46b5 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java @@ -299,7 +299,7 @@ public void registerAll(MetricSet metrics) throws IllegalArgumentException { @Override public synchronized boolean remove(String name) { MetricContext metricContext = this.metricContext.get(); - if (metricContext != null) { + if (metricContext != null && this.contextAwareMetrics.get(name) != null) { metricContext.removeFromMetrics(this.contextAwareMetrics.get(name).getContextAwareMetric()); } return this.contextAwareMetrics.remove(name) != null && removeChildrenMetrics(name); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 0dd31d274f1..3bd6ed54c7d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -188,6 +188,9 @@ public String toString() { private final boolean instrumentationEnabled; private DagStateStore dagStateStore; private Map topologySpecMap; + private int houseKeepingThreadInitialDelay = 2; + @Getter + private ScheduledExecutorService houseKeepingThreadPool; @Getter private final Integer numThreads; @@ -272,6 +275,9 @@ protected void startUp() { * @param setStatus if true, set all jobs in the dag to pending */ synchronized void addDag(Dag dag, boolean persist, boolean setStatus) throws IOException { + if (!this.isActive) { + return; + } if (persist) { //Persist the dag this.dagStateStore.writeCheckpoint(dag); @@ -388,7 +394,7 @@ public synchronized void setActive(boolean active) { topologySpecMap); Set failedDagIds = Collections.synchronizedSet(failedDagStateStore.getDagIds()); - this.dagManagerMetrics.activate(); + this.dagManagerMetrics.activate(); UserQuotaManager quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class, ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config); @@ -405,10 +411,15 @@ public synchronized void setActive(boolean active) { } FailedDagRetentionThread failedDagRetentionThread = new FailedDagRetentionThread(failedDagStateStore, failedDagIds, failedDagRetentionTime); this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0, retentionPollingInterval, TimeUnit.MINUTES); - List> dags = dagStateStore.getDags(); - log.info("Loading " + dags.size() + " dags from dag state store"); - for (Dag dag : dags) { - addDag(dag, false, false); + loadingDagsFromDagStateStore(); + this.houseKeepingThreadPool = Executors.newSingleThreadScheduledExecutor(); + for (int delay = houseKeepingThreadInitialDelay; delay < 180; delay *= 2) { + this.houseKeepingThreadPool.schedule(() -> { + try { + loadingDagsFromDagStateStore(); + } catch (Exception e ) { + log.error("failed to sync dag state store due to ", e); + }}, delay, TimeUnit.MINUTES); } if (dagActionStore.isPresent()) { Collection dagActions = dagActionStore.get().getDagActions(); @@ -429,6 +440,7 @@ public synchronized void setActive(boolean active) { log.info("Inactivating the DagManager. Shutting down all DagManager threads"); this.scheduledExecutorPool.shutdown(); this.dagManagerMetrics.cleanup(); + this.houseKeepingThreadPool.shutdown(); try { this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS); } catch (InterruptedException e) { @@ -441,6 +453,14 @@ public synchronized void setActive(boolean active) { } } + private void loadingDagsFromDagStateStore() throws IOException { + List> dags = dagStateStore.getDags(); + log.info("Loading " + dags.size() + " dags from dag state store"); + for (Dag dag : dags) { + addDag(dag, false, false); + } + } + /** * Each {@link DagManagerThread} performs 2 actions when scheduled: *
    @@ -789,10 +809,10 @@ private void pollAndAdvanceDag() throws IOException, ExecutionException, Interru submitJob(node); } } catch (Exception e) { - // Error occurred while processing dag, continue processing other dags assigned to this thread - log.error(String.format("Exception caught in DagManager while processing dag %s due to ", - DagManagerUtils.getFullyQualifiedDagName(node)), e); - } + // Error occurred while processing dag, continue processing other dags assigned to this thread + log.error(String.format("Exception caught in DagManager while processing dag %s due to ", + DagManagerUtils.getFullyQualifiedDagName(node)), e); + } } for (Map.Entry>> entry: nextSubmitted.entrySet()) { @@ -1170,11 +1190,11 @@ private synchronized void cleanUpDag(String dagId) { log.info("Cleaning up dagId {}", dagId); // clears flow event after cancelled job to allow resume event status to be set this.dags.get(dagId).setFlowEvent(null); - try { - this.dagStateStore.cleanUp(dags.get(dagId)); - } catch (IOException ioe) { - log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe); - } + try { + this.dagStateStore.cleanUp(dags.get(dagId)); + } catch (IOException ioe) { + log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe); + } this.dags.remove(dagId); this.dagToJobs.remove(dagId); } @@ -1222,7 +1242,7 @@ public void run() { } } - log.info("Cleaned " + numCleaned + " dags from the failed dag state store"); + log.info("Cleaned " + numCleaned + " dags from the failed dag state store"); } catch (Exception e) { log.error("Failed to run retention on failed dag state store", e); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java index 59aeda41f71..9366b7a8a8c 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java @@ -33,6 +33,7 @@ import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore; import org.mockito.Mockito; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -97,6 +98,12 @@ public void setUp() throws Exception { Assert.assertEquals(dagActionStore.getDagActions().size(), 0); } + @AfterClass + public void cleanUp() throws Exception { + dagManager.setActive(false); + Assert.assertEquals(dagManager.getHouseKeepingThreadPool().isShutdown(), true); + } + @Test void testAddDeleteSpec() throws Exception { long flowExecutionId1 = System.currentTimeMillis(); From f635acda72a317074076517ef0c82ee6f47ce8a6 Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Thu, 2 Feb 2023 16:24:31 -0800 Subject: [PATCH 4/5] fix unit test --- .../service/modules/orchestration/DagManagerMetrics.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java index acf7c031014..a81bf39b126 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java @@ -244,8 +244,11 @@ protected static MetricNameRegexFilter getMetricsFilterForDagManager() { } public void cleanup() { - // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton. - // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement - RootMetricContext.get().removeMatching(getMetricsFilterForDagManager()); + // Add null check so that unit test will not affect each other when we de-active non-instrumented DagManager + if(this.metricContext != null) { + // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton. + // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement + RootMetricContext.get().removeMatching(getMetricsFilterForDagManager()); + } } } From 4e08c19dcde0eed567581628b8a66055e6841635 Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Fri, 3 Feb 2023 13:56:38 -0800 Subject: [PATCH 5/5] address comments --- .../modules/orchestration/DagManager.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 3bd6ed54c7d..6f26dc3e527 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -135,7 +135,8 @@ public class DagManager extends AbstractIdleService { // Default job start SLA time if configured, measured in minutes. Default is 10 minutes private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME; private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT; - + private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180; + private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2; /** * Action to be performed on a {@link Dag}, in case of a job failure. Currently, we allow 2 modes: *
      @@ -188,7 +189,7 @@ public String toString() { private final boolean instrumentationEnabled; private DagStateStore dagStateStore; private Map topologySpecMap; - private int houseKeepingThreadInitialDelay = 2; + private int houseKeepingThreadInitialDelay = INITIAL_HOUSEKEEPING_THREAD_DELAY; @Getter private ScheduledExecutorService houseKeepingThreadPool; @@ -275,9 +276,6 @@ protected void startUp() { * @param setStatus if true, set all jobs in the dag to pending */ synchronized void addDag(Dag dag, boolean persist, boolean setStatus) throws IOException { - if (!this.isActive) { - return; - } if (persist) { //Persist the dag this.dagStateStore.writeCheckpoint(dag); @@ -411,12 +409,12 @@ public synchronized void setActive(boolean active) { } FailedDagRetentionThread failedDagRetentionThread = new FailedDagRetentionThread(failedDagStateStore, failedDagIds, failedDagRetentionTime); this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0, retentionPollingInterval, TimeUnit.MINUTES); - loadingDagsFromDagStateStore(); + loadDagFromDagStateStore(); this.houseKeepingThreadPool = Executors.newSingleThreadScheduledExecutor(); - for (int delay = houseKeepingThreadInitialDelay; delay < 180; delay *= 2) { + for (int delay = houseKeepingThreadInitialDelay; delay < MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) { this.houseKeepingThreadPool.schedule(() -> { try { - loadingDagsFromDagStateStore(); + loadDagFromDagStateStore(); } catch (Exception e ) { log.error("failed to sync dag state store due to ", e); }}, delay, TimeUnit.MINUTES); @@ -453,11 +451,13 @@ public synchronized void setActive(boolean active) { } } - private void loadingDagsFromDagStateStore() throws IOException { + private void loadDagFromDagStateStore() throws IOException { List> dags = dagStateStore.getDags(); log.info("Loading " + dags.size() + " dags from dag state store"); for (Dag dag : dags) { - addDag(dag, false, false); + if (this.isActive) { + addDag(dag, false, false); + } } }