Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public class ServiceMetricNames {
public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler";
public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount";

// Dag Action Handling Related Metrics
public static final String DAG_ACTION_HANDLING_PREFIX = "dagActionHandling";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering... couldn't this be a dagManager. metric? dagManager.failedLaunchEventsOn...?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to be clear that this is a failure that occurs related to handling all dagAction related code changes and easily find them when they may originate from dagActionStoreMonitor, dagManager, or other locations. We also don't use a dagManager prefix for other dagManager metrics for some reason

public static final String
DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_ACTION_HANDLING_PREFIX + ".dagManagerFailedLaunchEventsOnStartupCount";

//Job status poll timer
public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ private void runRetentionOnArbitrationTable() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
Runnable retentionTask = () -> {
try {
Thread.sleep(10000);
Comment thread
umustafi marked this conversation as resolved.
withPreparedStatement(thisTableRetentionStatement,
retentionStatement -> {
retentionStatement.setLong(1, retentionPeriodMillis);
Expand All @@ -253,7 +252,7 @@ private void runRetentionOnArbitrationTable() {
}
return numRowsDeleted;
}, true);
} catch (InterruptedException | IOException e) {
} catch (IOException e) {
log.error("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and "
+ "affect our system performance. Examine exception: ", e);
}
Expand Down Expand Up @@ -308,7 +307,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
if (eventTimeMillis == dbEventTimestamp.getTime()) {
// TODO: change this to a debug after fixing issue
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time"
+ "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original",
+ " is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original",
Comment thread
umustafi marked this conversation as resolved.
Outdated
eventTimeMillis, dbEventTimestamp);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ public class RuntimeMetrics {
public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.message.processed";
public static final String GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specstoreMonitor.produce.to.consume.delay";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.kills.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.flows.launched";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.unexpected.errors";
public static final String
GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor.produce.to.consume.delay";
public static final String DAG_ACTION_STORE_MONITOR_PREFIX = "dagActionStoreMonitor";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is no longer a prefix... but why anyway do you prefer to repeat so many times SMNames.GOBBLIN_SERVICE_PREFIX + "."?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good pt, updated to have the prefix contain SMNames.GOBBLIN_SERVICE_PREFIX + "." instead

public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "." + DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "." + DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "." + DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "." + DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "." + DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched";

public static final String GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "." + DAG_ACTION_STORE_MONITOR_PREFIX + ".failedFlowLaunchSubmissions";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "." + DAG_ACTION_STORE_MONITOR_PREFIX + ".unexpected.errors";
public static final String
GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "." + DAG_ACTION_STORE_MONITOR_PREFIX + ".produce.to.consume.delay";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.unexpected.errors";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,13 +510,18 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) {
this.dagActionStore.get().deleteDagAction(launchAction);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {} due to exception {}", flowId, e.getMessage());
this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (SpecNotFoundException e) {
log.warn("Spec not found for flowId {} due to exception {}", flowId, e.getMessage());
this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (IOException e) {
log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag action from dagActionStore (check "
+ "stacktrace) due to exception {}", flowId, e.getMessage());
this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (InterruptedException e) {
log.warn("SpecCompiler failed to reach healthy state before compilation of flowId {}. Exception: ", flowId, e);
log.warn("SpecCompiler failed to reach healthy state before compilation of flowId {} due to exception {}", flowId,
e);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there's the possibility I may be misremembering... and pursuing this needlessly... but my expectation is that a stacktrace would only be written when calling this form:

Logger::warn(String, Throwable)

if you call the form:

Logger::warn(String, Object, Object) // aka. Logger::warn(String, Object...)

are you certain it will print the ST, when the last arg is Throwable and there's no corresponding {} remaining for it in the initial String arg?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two {} and the last one is for the throwable so it will print the getMessage. I updated all the methods to use Logger::warn(String, Throwable) thought to get full stack trace.

this.dagManagerMetrics.incrementFailedLaunchCount();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public class DagManagerMetrics {
private final Map<String, ContextAwareMeter> executorStartSlaExceededMeters = Maps.newConcurrentMap();
private final Map<String, ContextAwareMeter> executorSlaExceededMeters = Maps.newConcurrentMap();
private final Map<String, ContextAwareMeter> executorJobSentMeters = Maps.newConcurrentMap();

// Metrics for unexpected flow handling failures
private ContextAwareCounter failedLaunchEventsOnActivationCount;
MetricContext metricContext;

public DagManagerMetrics(MetricContext metricContext) {
Expand All @@ -100,6 +103,9 @@ public void activate() {
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER));
allRunningMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
failedLaunchEventsOnActivationCount = metricContext.contextAwareCounter(
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT));
}
}

Expand Down Expand Up @@ -199,6 +205,13 @@ public void incrementCountsStartSlaExceeded(Dag.DagNode<JobExecutionPlan> node)
}
}

// Increment the count for num of failed launches during leader activation
public void incrementFailedLaunchCount() {
if (this.metricContext != null) {
this.failedLaunchEventsOnActivationCount.inc();
}
}

private List<ContextAwareCounter> getRunningJobsCounterForUser(Dag.DagNode<JobExecutionPlan> dagNode) {
Config configs = dagNode.getValue().getJobSpec().getConfig();
String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> jobE
//Send the dag to the DagManager.
this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
} catch (Exception ex) {
String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
_log.warn("Orchestrator call - " + failureMessage);
Comment thread
umustafi marked this conversation as resolved.
Outdated
if (this.eventSubmitter.isPresent()) {
// pronounce failed before stack unwinds, to ensure flow not marooned in `COMPILED` state; (failure likely attributable to DB connection/failover)
String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
new TimingEvent(this.eventSubmitter.get(), TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer {
private ContextAwareMeter killsInvoked;
private ContextAwareMeter resumesInvoked;
private ContextAwareMeter flowsLaunched;
private ContextAwareMeter failedFlowLaunchSubmissions;
private ContextAwareMeter unexpectedErrors;
private ContextAwareMeter messageProcessedMeter;
private ContextAwareMeter messageFilteredOutMeter;
private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge

private volatile Long produceToConsumeDelayValue = -1L;
Expand Down Expand Up @@ -130,30 +132,34 @@ protected void processMessage(DecodeableKafkaRecord message) {
String changeIdentifier = tid + key;
if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, dagActionsSeenCache, operation,
produceTimestamp.toString())) {
this.messageFilteredOutMeter.mark();
return;
}

// Used to easily log information to identify the dag action
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
dagActionType);

// We only expect INSERT and DELETE operations done to this table. INSERTs correspond to any type of
// {@link DagActionStore.FlowActionType} flow requests that have to be processed. DELETEs require no action.
try {
if (operation.equals("INSERT")) {
if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
log.info("Received insert dag action and about to send resume flow request");
log.info("Received insert dag action and about to send resume flow request for: {}", dagAction);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: too conversational. how about:

log.info("DagAction change ({}): {}", operation, dagAction)

(i.e. won't the resume/kill/launch be logged as dagAction.dagActionType)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about unifying all the logs like this as well, let me make the change. I will add a bit more context in message but use this format.

dagManager.handleResumeFlowRequest(flowGroup, flowName,Long.parseLong(flowExecutionId));
this.resumesInvoked.mark();
} else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
log.info("Received insert dag action and about to send kill flow request");
log.info("Received insert dag action and about to send kill flow request for: {}", dagAction);
dagManager.handleKillFlowRequest(flowGroup, flowName, Long.parseLong(flowExecutionId));
this.killsInvoked.mark();
} else if (dagActionType.equals(DagActionStore.FlowActionType.LAUNCH)) {
// If multi-active scheduler is NOT turned on we should not receive these type of events
if (!this.isMultiActiveSchedulerEnabled) {
this.unexpectedErrors.mark();
throw new RuntimeException(String.format("Received LAUNCH dagAction while not in multi-active scheduler "
+ "mode for flowAction: %s",
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, dagActionType)));
+ "mode for flowAction: %s", dagAction));
}
log.info("Received insert dag action and about to forward launch request to DagManager");
log.info("Received insert dag action and about to forward launch request to DagManager for: {}", dagAction);
submitFlowToDagManagerHelper(flowGroup, flowName);
} else {
log.warn("Received unsupported dagAction {}. Expected to be a KILL, RESUME, or LAUNCH", dagActionType);
Expand Down Expand Up @@ -191,19 +197,19 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName) {
this.orchestrator.submitFlowToDagManager(spec);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage());
this.unexpectedErrors.mark();
this.failedFlowLaunchSubmissions.mark();
return;
} catch (SpecNotFoundException e) {
log.warn("Spec not found for flowId {} due to exception {}", flowId, e.getMessage());
this.unexpectedErrors.mark();
this.failedFlowLaunchSubmissions.mark();
return;
} catch (IOException e) {
log.warn("Failed to add Job Execution Plan for flowId {} due to exception {}", flowId, e.getMessage());
this.unexpectedErrors.mark();
this.failedFlowLaunchSubmissions.mark();
return;
} catch (InterruptedException e) {
log.warn("SpecCompiler failed to reach healthy state before compilation of flowId {}. Exception: ", flowId, e);
this.unexpectedErrors.mark();
this.failedFlowLaunchSubmissions.mark();
return;
}
// Only mark this if the dag was successfully added
Expand All @@ -216,8 +222,10 @@ protected void createMetrics() {
this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
this.flowsLaunched = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);
this.failedFlowLaunchSubmissions = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS);
this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
this.messageFilteredOutMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT);
this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue);
this.getMetricContext().register(this.produceToConsumeDelayMillis);
}
Expand Down