Skip to content

Commit 6233476

Browse files
committed
further fixup
1 parent 2c3cfcc commit 6233476

File tree

3 files changed

+3
-4
lines changed

3 files changed

+3
-4
lines changed

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public void handleFlowKillTriggerEvent(Properties jobProps, DagActionStore.Lease
117117

118118
private void handleFlowTriggerEvent(Properties jobProps, DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId)
119119
throws IOException {
120+
log.info("Handling trigger {} (adoptConsensusTimestamp: {})", leaseParams, adoptConsensusFlowExecutionId);
120121
long origEventTimeMillis = leaseParams.getEventTimeMillis();
121122
LeaseAttemptStatus leaseAttempt = this.multiActiveLeaseArbiter.tryAcquireLease(leaseParams, adoptConsensusFlowExecutionId);
122123
if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,7 @@ private LeaseAttemptStatus doTryAcquireLease(DagActionStore.LeaseParams leasePar
302302

303303
// TODO: check whether reminder event before replacing flowExecutionId
304304
if (adoptConsensusFlowExecutionId) {
305-
log.info("NOTE: Multi-active arbiter will use current DB epoch millis ({}) to launder {}", dbCurrentTimestamp.getTime(),
306-
contextualizeLeasing(leaseParams));
305+
log.info("Multi-active will use DB time ({}) to launder {}", dbCurrentTimestamp.getTime(), contextualizeLeasing(leaseParams));
307306
}
308307
/* Note that we use `adoptConsensusFlowExecutionId` parameter's value to determine whether we should use the db
309308
laundered event timestamp as the flowExecutionId or maintain the original one

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
190190
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(launchDagAction, isReminderEvent, triggerTimestampMillis);
191191
// `flowSpec.isScheduled()` ==> adopt consensus `flowExecutionId` as clock drift safeguard, yet w/o disrupting API-layer's ad hoc ID assignment
192192
flowLaunchHandler.handleFlowLaunchTriggerEvent(jobProps, leaseParams, flowSpec.isScheduled());
193-
_log.info("Multi-active scheduler finished handling trigger event: [{}, is: {}, triggerEventTimestamp: {}]",
194-
launchDagAction, isReminderEvent ? "reminder" : "original", triggerTimestampMillis);
193+
_log.info("Multi-active scheduler finished handling {}", leaseParams);
195194
} else {
196195
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
197196
throw new RuntimeException("Spec not of type FlowSpec, cannot orchestrate: " + spec);

0 commit comments

Comments
 (0)