diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java index eb26acd161a..4f3442597fb 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java @@ -49,10 +49,9 @@ public FlowId getFlowId() { /** * Replace flow execution id with agreed upon event time to easily track the flow */ - public static DagActionStore.DagAction updateFlowExecutionId(DagActionStore.DagAction flowAction, - long eventTimeMillis) { - return new DagActionStore.DagAction(flowAction.getFlowGroup(), flowAction.getFlowName(), - String.valueOf(eventTimeMillis), flowAction.getFlowActionType()); + public DagAction updateFlowExecutionId(long eventTimeMillis) { + return new DagAction(this.getFlowGroup(), this.getFlowName(), + String.valueOf(eventTimeMillis), this.getFlowActionType()); } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index c6161d93695..338e908a2e0 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -321,14 +321,14 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l // Lease is valid if (leaseValidityStatus == 1) { if (isWithinEpsilon) { - DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbEventTimestamp.getTime()); + DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbEventTimestamp.getTime()); log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid", updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); // Utilize db timestamp for reminder return new LeasedToAnotherStatus(updatedFlowAction, dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime()); } - DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, dbCurrentTimestamp.getTime()); + DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(dbCurrentTimestamp.getTime()); log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid", updatedFlowAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); // Utilize db lease acquisition timestamp for wait time @@ -518,7 +518,7 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated, if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) { return new NoLongerLeasingStatus(); } - DagActionStore.DagAction updatedFlowAction = updateFlowExecutionId(flowAction, selectInfoResult.eventTimeMillis); + DagActionStore.DagAction updatedFlowAction = flowAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis); if (numRowsUpdated == 1) { log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", updatedFlowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java index 5e8daaa26a3..7b494201eca 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java @@ -95,7 +95,7 @@ public abstract class HighLevelConsumer extends AbstractIdleService { */ @Getter private MetricContext metricContext; - private Counter messagesRead; + protected Counter messagesRead; @Getter private final GobblinKafkaConsumerClient gobblinKafkaConsumerClient; private final ScheduledExecutorService consumerExecutor; @@ -329,6 +329,8 @@ public void run() { } } } catch (InterruptedException e) { + log.warn("Encountered exception while processing queue ", e); + // TODO: evaluate whether we should interrupt the thread or continue processing Thread.currentThread().interrupt(); } } diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java index 7bafc78ff34..08630ab3661 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java @@ -217,7 +217,7 @@ public void testConditionallyAcquireLeaseIfFinishedLeasingStatement() // Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction); - DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction, + DagActionStore.DagAction updatedResumeDagAction = resumeDagAction.updateFlowExecutionId( selectInfoResult.getEventTimeMillis()); boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus( updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get())); @@ -299,7 +299,7 @@ public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException, // Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction); - DagActionStore.DagAction updatedResumeDagAction = DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction, + DagActionStore.DagAction updatedResumeDagAction = resumeDagAction.updateFlowExecutionId( selectInfoResult.getEventTimeMillis()); boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus( updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get())); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java index 8abaa209c25..c5a5bb8e0ee 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java @@ -112,8 +112,12 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo if (multiActiveLeaseArbiter.isPresent()) { MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease( flowAction, eventTimeMillis, isReminderEvent); + // The flow action contained in the`LeaseAttemptStatus` from the lease arbiter contains an updated flow execution + // id. From this point onwards, always use the newer version of the flow action to easily track the action through + // orchestration and execution. if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) { - MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus; + MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) + leaseAttemptStatus; this.leaseObtainedCount.inc(); if (persistFlowAction(leaseObtainedStatus)) { log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getFlowAction(), @@ -122,11 +126,9 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo } // If persisting the flow action failed, then we set another trigger for this event to occur immediately to // re-attempt handling the event - DagActionStore.DagAction updatedFlowAction = DagActionStore.DagAction.updateFlowExecutionId(flowAction, - leaseObtainedStatus.getEventTimeMillis()); scheduleReminderForEvent(jobProps, - new MultiActiveLeaseArbiter.LeasedToAnotherStatus(updatedFlowAction, 0L), - eventTimeMillis); + new MultiActiveLeaseArbiter.LeasedToAnotherStatus(leaseObtainedStatus.getFlowAction(), + 0L), eventTimeMillis); return; } else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) { this.leasedToAnotherStatusCount.inc(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java index 33934ef0644..a2d68fbc0d1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java @@ -35,7 +35,7 @@ public static boolean shouldProcessMessage(String changeIdentifier, LoadingCache String operation, String timestamp) { // If we've already processed a message with this timestamp and key before then skip duplicate message if (cache.getIfPresent(changeIdentifier) != null) { - log.debug("Duplicate change event with identifier {}", changeIdentifier); + log.info("Duplicate change event with identifier {}", changeIdentifier); return false; } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 1435e076ae1..e5a2d090d3a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -34,6 +34,7 @@ import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareMeter; +import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.runtime.api.DagActionStore; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.SpecNotFoundException; @@ -216,7 +217,7 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName) { @Override protected void createMetrics() { - super.createMetrics(); + super.messagesRead = this.getMetricContext().counter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ); this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED); this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED); this.flowsLaunched = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);