diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 5fe8f001af1..f838fc606a5 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -117,10 +117,10 @@ public class ConfigurationKeys { public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL = "-1"; public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent"; public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis"; - public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000; + public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 2000; // Note: linger should be on the order of seconds even though we measure in millis public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis"; - public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000; + public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 90000; public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis"; public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 10000; diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index 5550513c436..866644f50f0 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -36,14 +36,16 @@ public class ServiceMetricNames { public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.delay"; // Flow Trigger Handler - public static final String FLOW_TRIGGER_HANDLER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowTriggerHandler"; - public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = FLOW_TRIGGER_HANDLER_PREFIX + ".numFlowsSubmitted"; - public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained"; - public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother"; - public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing"; - public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler"; - public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount"; - public static final String FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leasesObtainedDueToReminderCount"; + public static final String FLOW_TRIGGER_HANDLER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowTriggerHandler."; + public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = FLOW_TRIGGER_HANDLER_PREFIX + "numFlowsSubmitted"; + public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "leaseObtained"; + public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "leasedToAnother"; + public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "noLongerLeasing"; + public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "jobDoesNotExistInScheduler"; + public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "failedToSetReminderCount"; + public static final String FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "leasesObtainedDueToReminderCount"; + public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_RECORD_LEASE_SUCCESS_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "failedToRecordLeaseSuccessCount"; + public static final String FLOW_TRIGGER_HANDLER_RECORDED_LEASE_SUCCESS_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + "recordedLeaseSuccessCount"; // DagManager Related Metrics public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagManager"; diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index 05449767cf1..cc945d06b0e 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -243,10 +243,9 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l // Query lease arbiter table about this flow action Optional getResult = getExistingEventInfo(flowAction, isReminderEvent, eventTimeMillis); - // TODO: change all the `CASE N: ...` statements back to debug statements after uncovering issue try { if (!getResult.isPresent()) { - log.info("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 1: no existing row for this flow action," + log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 1: no existing row for this flow action," + " then go ahead and insert", flowAction, isReminderEvent ? "reminder" : "original", eventTimeMillis); int numRowsUpdated = attemptLeaseIfNewRow(flowAction); return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.empty(), isReminderEvent); @@ -265,7 +264,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l // because db laundering tells us that the currently worked on db event is newer and will have its own reminders if (isReminderEvent) { if (eventTimeMillis < dbEventTimestamp.getTime()) { - log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - A new event trigger " + log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - A new event trigger " + "is being worked on, so this older reminder will be dropped.", flowAction, isReminderEvent ? "reminder" : "original", eventTimeMillis, dbEventTimestamp); return new NoLongerLeasingStatus(); @@ -278,8 +277,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l isReminderEvent ? "reminder" : "original", eventTimeMillis, dbEventTimestamp.getTime()); } if (eventTimeMillis == dbEventTimestamp.getTime()) { - // TODO: change this to a debug after fixing issue - log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time " + log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time " + "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original", eventTimeMillis, dbEventTimestamp); } @@ -293,21 +291,21 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l if (leaseValidityStatus == 1) { if (isWithinEpsilon) { DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbEventTimestamp.getTime()); - log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid", + log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid", updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); // Utilize db timestamp for reminder return new LeasedToAnotherStatus(updatedFlowAction, dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime()); } DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbCurrentTimestamp.getTime()); - log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid", + log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid", updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); // Utilize db lease acquisition timestamp for wait time return new LeasedToAnotherStatus(updatedFlowAction, dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime()); } // Lease is invalid else if (leaseValidityStatus == 2) { - log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 4: Lease is out of date (regardless of " + log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 4: Lease is out of date (regardless of " + "whether same or distinct event)", flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); if (isWithinEpsilon && !isReminderEvent) { @@ -321,11 +319,11 @@ else if (leaseValidityStatus == 2) { return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp), isReminderEvent); } // No longer leasing this event if (isWithinEpsilon) { - log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 5: Same event, no longer leasing event" + log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 5: Same event, no longer leasing event" + " in db", flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); return new NoLongerLeasingStatus(); } - log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 6: Distinct event, no longer leasing " + log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 6: Distinct event, no longer leasing " + "event in db", flowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); // Use our event to acquire lease, check for previous db eventTimestamp and NULL leaseAcquisitionTimestamp int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, flowAction, diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java index af65390ec50..a1b141d951d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java @@ -80,6 +80,8 @@ public class FlowTriggerHandler { private ContextAwareCounter jobDoesNotExistInSchedulerCount; private ContextAwareCounter failedToSetEventReminderCount; private ContextAwareMeter leasesObtainedDueToReminderCount; + private ContextAwareMeter failedToRecordLeaseSuccessCount; + private ContextAwareMeter recordedLeaseSuccessCount; @Inject public FlowTriggerHandler(Config config, Optional leaseDeterminationStore, @@ -98,6 +100,8 @@ public FlowTriggerHandler(Config config, Optional lease this.jobDoesNotExistInSchedulerCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT); this.failedToSetEventReminderCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT); this.leasesObtainedDueToReminderCount = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT); + this.failedToRecordLeaseSuccessCount = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_FAILED_TO_RECORD_LEASE_SUCCESS_COUNT); + this.recordedLeaseSuccessCount = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_RECORDED_LEASE_SUCCESS_COUNT); } /** @@ -127,8 +131,10 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo if (persistFlowAction(leaseObtainedStatus)) { log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(), leaseObtainedStatus.getEventTimeMillis()); + this.recordedLeaseSuccessCount.mark(); return; } + this.failedToRecordLeaseSuccessCount.mark(); // If persisting the flow action failed, then we set another trigger for this event to occur immediately to // re-attempt handling the event scheduleReminderForEvent(jobProps,