Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public class ServiceMetricNames {

// DagManager Related Metrics
public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagManager";
public static final String
DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".failedLaunchEventsOnStartupCount";
public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT = DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount";

//Job status poll timer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.DagManager;
Expand Down Expand Up @@ -66,7 +67,7 @@ class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor {
public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads,
boolean isMultiActiveSchedulerEnabled) {
super(topic, config, mock(DagManager.class), numThreads, mock(FlowCatalog.class), mock(Orchestrator.class),
isMultiActiveSchedulerEnabled);
mock(DagActionStore.class), isMultiActiveSchedulerEnabled);
}

protected void processMessageForTest(DecodeableKafkaRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ public class RuntimeMetrics {
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_HEARTBEAT_MESSAGES = DAG_ACTION_STORE_MONITOR_PREFIX + "heartbeatMessages";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_NULL_DAG_ACTION_TYPE_MESSAGES = DAG_ACTION_STORE_MONITOR_PREFIX + "nullDagActionTypeMessages";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + "resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = DAG_ACTION_STORE_MONITOR_PREFIX + "flows.launched";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_SUCCESSFUL_LAUNCH_SUBMISSIONS = DAG_ACTION_STORE_MONITOR_PREFIX + "flows.launched";

public static final String GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS = DAG_ACTION_STORE_MONITOR_PREFIX + "failedFlowLaunchSubmissions";
public static final String GOBBLIN_DAG_ACTION_STORE_SUCCESSFUL_LAUNCH_SUBMISSIONS_ON_STARTUP = DAG_ACTION_STORE_MONITOR_PREFIX + "successfulLaunchSubmissionsOnStartup";
public static final String GOBBLIN_DAG_ACTION_STORE_FAILED_LAUNCH_SUBMISSIONS_ON_STARTUP = DAG_ACTION_STORE_MONITOR_PREFIX + "failedLaunchSubmissionsOnStartup";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = DAG_ACTION_STORE_MONITOR_PREFIX + "unexpected.errors";
public static final String
GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = DAG_ACTION_STORE_MONITOR_PREFIX + "produce.to.consume.delay";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
Expand All @@ -34,9 +33,7 @@
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -69,7 +66,6 @@
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
Expand Down Expand Up @@ -469,24 +465,6 @@ public synchronized void setActive(boolean active) {
log.error("failed to sync dag state store due to ", e);
}}, delay, TimeUnit.MINUTES);
}
if (dagActionStore.isPresent()) {
Collection<DagActionStore.DagAction> dagActions = dagActionStore.get().getDagActions();
for (DagActionStore.DagAction action : dagActions) {
switch (action.getFlowActionType()) {
case KILL:
this.handleKillFlowEvent(new KillFlowEvent(action.getFlowGroup(), action.getFlowName(), Long.parseLong(action.getFlowExecutionId())));
break;
case RESUME:
this.handleResumeFlowEvent(new ResumeFlowEvent(action.getFlowGroup(), action.getFlowName(), Long.parseLong(action.getFlowExecutionId())));
break;
case LAUNCH:
this.handleLaunchFlowEvent(action);
break;
default:
log.warn("Unsupported dagAction: " + action.getFlowActionType().toString());
}
}
}
} else { //Mark the DagManager inactive.
log.info("Inactivating the DagManager. Shutting down all DagManager threads");
this.scheduledExecutorPool.shutdown();
Expand All @@ -504,44 +482,6 @@ public synchronized void setActive(boolean active) {
}
}

/**
* Used by the DagManager to launch a new execution for a flow action event loaded from the DagActionStore upon
* setting this instance of the DagManager to active. Because it may be a completely new DAG not contained in the
* dagStore, we compile the flow to generate the dag before calling addDag(), handling any errors that may result in
* the process.
*/
public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) {
Preconditions.checkArgument(launchAction.getFlowActionType() == DagActionStore.FlowActionType.LAUNCH);
log.info("Handle launch flow event for action {}", launchAction);
FlowId flowId = launchAction.getFlowId();
try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec, Optional.absent());
if (optionalJobExecutionPlanDag.isPresent()) {
addDag(optionalJobExecutionPlanDag.get(), true, true);
} else {
log.warn("Failed flow compilation of spec causing launch flow event to be skipped on startup. Flow {}", flowId);
this.dagManagerMetrics.incrementFailedLaunchCount();
}
} catch (URISyntaxException e) {
log.warn(String.format("Could not create URI object for flowId %s due to exception", flowId), e);
this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (SpecNotFoundException e) {
log.warn(String.format("Spec not found for flowId %s due to exception", flowId), e);
this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (IOException e) {
log.warn(String.format("Failed to add Job Execution Plan for flowId %s OR delete dag action from dagActionStore "
+ "(check stacktrace) due to exception", flowId), e);
this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (InterruptedException e) {
log.warn(String.format("SpecCompiler failed to reach healthy state before compilation of flowId %s due to "
+ "exception", flowId), e);
this.dagManagerMetrics.incrementFailedLaunchCount();
}
}

private void loadDagFromDagStateStore() throws IOException {
List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
log.info("Loading " + dags.size() + " dags from dag state store");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ public class DagManagerMetrics {
private final Map<String, ContextAwareMeter> executorStartSlaExceededMeters = Maps.newConcurrentMap();
private final Map<String, ContextAwareMeter> executorSlaExceededMeters = Maps.newConcurrentMap();
private final Map<String, ContextAwareMeter> executorJobSentMeters = Maps.newConcurrentMap();

// Metrics for unexpected flow handling failures
private ContextAwareCounter failedLaunchEventsOnActivationCount;
MetricContext metricContext;

public DagManagerMetrics(MetricContext metricContext) {
Expand All @@ -103,9 +100,6 @@ public void activate() {
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER));
allRunningMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
failedLaunchEventsOnActivationCount = metricContext.contextAwareCounter(
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT));
}
}

Expand Down Expand Up @@ -205,13 +199,6 @@ public void incrementCountsStartSlaExceeded(Dag.DagNode<JobExecutionPlan> node)
}
}

// Increment the count for num of failed launches during leader activation
public void incrementFailedLaunchCount() {
if (this.metricContext != null) {
this.failedLaunchEventsOnActivationCount.inc();
}
}

private List<ContextAwareCounter> getRunningJobsCounterForUser(Dag.DagNode<JobExecutionPlan> dagNode) {
Config configs = dagNode.getValue().getJobSpec().getConfig();
String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null);
Expand Down
Loading