Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,62 @@ public void testProcessMessageForCancelledAndKilledEvent() throws IOException, R
jobStatusMonitor.shutDown();
}

@Test (dependsOnMethods = "testProcessingRetriedForApparentlyTransientErrors")
public void testProcessMessageForFlowPendingResume() throws IOException, ReflectiveOperationException {
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic4");

//Submit GobblinTrackingEvents to Kafka
ImmutableList.of(
createFlowCompiledEvent(),
createJobOrchestratedEvent(1, 2),
createJobCancelledEvent(),
createFlowPendingResumeEvent(),
createJobOrchestratedEvent(2, 2),
createJobStartEvent(),
createJobSucceededEvent()
).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
});

try {
Thread.sleep(1000);
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}

MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);

State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA");
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());

state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());

state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.CANCELLED.name());

// Job for flow pending resume status after it was cancelled or failed
state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA");
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.PENDING_RESUME.name());

state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
//Job orchestrated for retrying
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());

state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());

state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());

jobStatusMonitor.shutDown();
}

@Test (dependsOnMethods = "testProcessMessageForCancelledAndKilledEvent")
public void testProcessProgressingMessageWhenNoPreviousStatus() throws IOException, ReflectiveOperationException {
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic5");
Expand Down Expand Up @@ -619,6 +675,14 @@ private GobblinTrackingEvent createJobStartSLAKilledEvent() {
return createGTE(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED, Maps.newHashMap());
}

private GobblinTrackingEvent createFlowPendingResumeEvent() {
GobblinTrackingEvent event = createGTE(TimingEvent.FlowTimings.FLOW_PENDING_RESUME, Maps.newHashMap());
event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
return event;

}

private GobblinTrackingEvent createGTE(String eventName, Map<String, String> customMetadata) {
String namespace = "org.apache.gobblin.metrics";
Long timestamp = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,17 @@ static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobS
int currentGeneration = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, previousGeneration);
int previousAttempts = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
int currentAttempts = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, previousAttempts);
// Verify if the current job status is flow status. If yes, we check for its current execution status to be PENDING_RESUME (limiting to just resume flow statuses)
// When the above two conditions satisfy, we NEED NOT check for the out-of-order events since GaaS would manage the lifecycle of these events
// Hence, we update the merge state accordingly so that the flow can proceed with its execution to the next state in the DAG
boolean isFlowStatusAndPendingResume = isFlowStatusAndPendingResume(jobName, jobGroup, currentStatus);
// We use three things to accurately count and thereby bound retries, even amidst out-of-order events (by skipping late arrivals).
// The generation is monotonically increasing, while the attempts may re-initialize back to 0. this two-part form prevents the composite value from ever repeating.
// And job status reflect the execution status in one attempt
if (previousStatus != null && currentStatus != null && (previousGeneration > currentGeneration || (
if (!isFlowStatusAndPendingResume && (previousStatus != null && currentStatus != null && (previousGeneration > currentGeneration || (
previousGeneration == currentGeneration && previousAttempts > currentAttempts) || (previousGeneration == currentGeneration && previousAttempts == currentAttempts
&& ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
< ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) {
< ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus)))))) {
log.warn(String.format(
"Received status [generation.attempts] = %s [%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
currentStatus, currentGeneration, currentAttempts, previousStatus, previousGeneration, previousAttempts,
Expand All @@ -280,6 +284,11 @@ static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobS
}
}

private static boolean isFlowStatusAndPendingResume(String jobName, String jobGroup, String currentStatus) {
return jobName != null && jobGroup != null && jobName.equals(JobStatusRetriever.NA_KEY) && jobGroup.equals(JobStatusRetriever.NA_KEY)
&& currentStatus.equals(ExecutionStatus.PENDING_RESUME.name());
}

private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
int maxAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
int currentAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
Expand Down