Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public void registerAll(MetricSet metrics) throws IllegalArgumentException {
@Override
public synchronized boolean remove(String name) {
MetricContext metricContext = this.metricContext.get();
if (metricContext != null) {
if (metricContext != null && this.contextAwareMetrics.get(name) != null) {
metricContext.removeFromMetrics(this.contextAwareMetrics.get(name).getContextAwareMetric());
}
return this.contextAwareMetrics.remove(name) != null && removeChildrenMetrics(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public class DagManager extends AbstractIdleService {
// Default job start SLA time if configured, measured in minutes. Default is 10 minutes
private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;

private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;
/**
* Action to be performed on a {@link Dag}, in case of a job failure. Currently, we allow 2 modes:
* <ul>
Expand Down Expand Up @@ -188,6 +189,9 @@ public String toString() {
private final boolean instrumentationEnabled;
private DagStateStore dagStateStore;
private Map<URI, TopologySpec> topologySpecMap;
private int houseKeepingThreadInitialDelay = INITIAL_HOUSEKEEPING_THREAD_DELAY;
@Getter
private ScheduledExecutorService houseKeepingThreadPool;

@Getter
private final Integer numThreads;
Expand Down Expand Up @@ -388,7 +392,7 @@ public synchronized void setActive(boolean active) {
topologySpecMap);
Set<String> failedDagIds = Collections.synchronizedSet(failedDagStateStore.getDagIds());

this.dagManagerMetrics.activate();
this.dagManagerMetrics.activate();

UserQuotaManager quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
Expand All @@ -405,10 +409,15 @@ public synchronized void setActive(boolean active) {
}
FailedDagRetentionThread failedDagRetentionThread = new FailedDagRetentionThread(failedDagStateStore, failedDagIds, failedDagRetentionTime);
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0, retentionPollingInterval, TimeUnit.MINUTES);
List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
log.info("Loading " + dags.size() + " dags from dag state store");
for (Dag<JobExecutionPlan> dag : dags) {
addDag(dag, false, false);
loadDagFromDagStateStore();
this.houseKeepingThreadPool = Executors.newSingleThreadScheduledExecutor();
for (int delay = houseKeepingThreadInitialDelay; delay < MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) {
this.houseKeepingThreadPool.schedule(() -> {
try {
loadDagFromDagStateStore();
} catch (Exception e ) {
log.error("failed to sync dag state store due to ", e);
}}, delay, TimeUnit.MINUTES);
}
if (dagActionStore.isPresent()) {
Collection<DagActionStore.DagAction> dagActions = dagActionStore.get().getDagActions();
Expand All @@ -429,6 +438,7 @@ public synchronized void setActive(boolean active) {
log.info("Inactivating the DagManager. Shutting down all DagManager threads");
this.scheduledExecutorPool.shutdown();
this.dagManagerMetrics.cleanup();
this.houseKeepingThreadPool.shutdown();
try {
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Expand All @@ -441,6 +451,16 @@ public synchronized void setActive(boolean active) {
}
}

private void loadDagFromDagStateStore() throws IOException {
List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
log.info("Loading " + dags.size() + " dags from dag state store");
for (Dag<JobExecutionPlan> dag : dags) {
if (this.isActive) {
addDag(dag, false, false);
}
}
}

/**
* Each {@link DagManagerThread} performs 2 actions when scheduled:
* <ol>
Expand Down Expand Up @@ -789,10 +809,10 @@ private void pollAndAdvanceDag() throws IOException, ExecutionException, Interru
submitJob(node);
}
} catch (Exception e) {
// Error occurred while processing dag, continue processing other dags assigned to this thread
log.error(String.format("Exception caught in DagManager while processing dag %s due to ",
DagManagerUtils.getFullyQualifiedDagName(node)), e);
}
// Error occurred while processing dag, continue processing other dags assigned to this thread
log.error(String.format("Exception caught in DagManager while processing dag %s due to ",
DagManagerUtils.getFullyQualifiedDagName(node)), e);
}
}

for (Map.Entry<String, Set<DagNode<JobExecutionPlan>>> entry: nextSubmitted.entrySet()) {
Expand Down Expand Up @@ -1170,11 +1190,11 @@ private synchronized void cleanUpDag(String dagId) {
log.info("Cleaning up dagId {}", dagId);
// clears flow event after cancelled job to allow resume event status to be set
this.dags.get(dagId).setFlowEvent(null);
try {
this.dagStateStore.cleanUp(dags.get(dagId));
} catch (IOException ioe) {
log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe);
}
try {
this.dagStateStore.cleanUp(dags.get(dagId));
} catch (IOException ioe) {
log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe);
}
this.dags.remove(dagId);
this.dagToJobs.remove(dagId);
}
Expand Down Expand Up @@ -1222,7 +1242,7 @@ public void run() {
}
}

log.info("Cleaned " + numCleaned + " dags from the failed dag state store");
log.info("Cleaned " + numCleaned + " dags from the failed dag state store");
} catch (Exception e) {
log.error("Failed to run retention on failed dag state store", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,11 @@ protected static MetricNameRegexFilter getMetricsFilterForDagManager() {
}

public void cleanup() {
// The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton.
// To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement
RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
// Add null check so that unit test will not affect each other when we de-active non-instrumented DagManager
if(this.metricContext != null) {
// The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton.
// To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement
RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -97,6 +98,12 @@ public void setUp() throws Exception {
Assert.assertEquals(dagActionStore.getDagActions().size(), 0);
}

@AfterClass
public void cleanUp() throws Exception {
dagManager.setActive(false);
Assert.assertEquals(dagManager.getHouseKeepingThreadPool().isShutdown(), true);
}

@Test
void testAddDeleteSpec() throws Exception {
long flowExecutionId1 = System.currentTimeMillis();
Expand Down