diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java index 6f1069e151f..b9a3521a8e0 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java @@ -429,6 +429,62 @@ public void testProcessMessageForCancelledAndKilledEvent() throws IOException, R jobStatusMonitor.shutDown(); } + @Test (dependsOnMethods = "testProcessingRetriedForApparentlyTransientErrors") + public void testProcessMessageForFlowPendingResume() throws IOException, ReflectiveOperationException { + KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic4"); + + //Submit GobblinTrackingEvents to Kafka + ImmutableList.of( + createFlowCompiledEvent(), + createJobOrchestratedEvent(1, 2), + createJobCancelledEvent(), + createFlowPendingResumeEvent(), + createJobOrchestratedEvent(2, 2), + createJobStartEvent(), + createJobSucceededEvent() + ).forEach(event -> { + context.submitEvent(event); + kafkaReporter.report(); + }); + + try { + Thread.sleep(1000); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer()); + jobStatusMonitor.buildMetricsContextAndMetrics(); + Iterator recordIterator = Iterators.transform( + this.kafkaTestHelper.getIteratorForTopic(TOPIC), + this::convertMessageAndMetadataToDecodableKafkaRecord); + + State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA"); + Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name()); + + state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName); + Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name()); + + state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName); + Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.CANCELLED.name()); + + // Job for flow pending resume status after it was cancelled or failed + state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA"); + Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.PENDING_RESUME.name()); + + state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName); + //Job orchestrated for retrying + Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name()); + + state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName); + Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name()); + + state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName); + Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name()); + + jobStatusMonitor.shutDown(); + } + @Test (dependsOnMethods = "testProcessMessageForCancelledAndKilledEvent") public void testProcessProgressingMessageWhenNoPreviousStatus() throws IOException, ReflectiveOperationException { KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic5"); @@ -619,6 +675,14 @@ private GobblinTrackingEvent createJobStartSLAKilledEvent() { return createGTE(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED, Maps.newHashMap()); } + private GobblinTrackingEvent createFlowPendingResumeEvent() { + GobblinTrackingEvent event = createGTE(TimingEvent.FlowTimings.FLOW_PENDING_RESUME, Maps.newHashMap()); + event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_NAME_FIELD); + event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD); + return event; + + } + private GobblinTrackingEvent createGTE(String eventName, Map customMetadata) { String namespace = "org.apache.gobblin.metrics"; Long timestamp = System.currentTimeMillis(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java index 7d1654af3cd..87e1c3b5a14 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java @@ -251,13 +251,17 @@ static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobS int currentGeneration = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, previousGeneration); int previousAttempts = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1); int currentAttempts = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, previousAttempts); + // Verify if the current job status is flow status. If yes, we check for its current execution status to be PENDING_RESUME (limiting to just resume flow statuses) + // When the above two conditions satisfy, we NEED NOT check for the out-of-order events since GaaS would manage the lifecycle of these events + // Hence, we update the merge state accordingly so that the flow can proceed with its execution to the next state in the DAG + boolean isFlowStatusAndPendingResume = isFlowStatusAndPendingResume(jobName, jobGroup, currentStatus); // We use three things to accurately count and thereby bound retries, even amidst out-of-order events (by skipping late arrivals). // The generation is monotonically increasing, while the attempts may re-initialize back to 0. this two-part form prevents the composite value from ever repeating. // And job status reflect the execution status in one attempt - if (previousStatus != null && currentStatus != null && (previousGeneration > currentGeneration || ( + if (!isFlowStatusAndPendingResume && (previousStatus != null && currentStatus != null && (previousGeneration > currentGeneration || ( previousGeneration == currentGeneration && previousAttempts > currentAttempts) || (previousGeneration == currentGeneration && previousAttempts == currentAttempts && ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) - < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) { + < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus)))))) { log.warn(String.format( "Received status [generation.attempts] = %s [%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)", currentStatus, currentGeneration, currentAttempts, previousStatus, previousGeneration, previousAttempts, @@ -280,6 +284,11 @@ static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobS } } + private static boolean isFlowStatusAndPendingResume(String jobName, String jobGroup, String currentStatus) { + return jobName != null && jobGroup != null && jobName.equals(JobStatusRetriever.NA_KEY) && jobGroup.equals(JobStatusRetriever.NA_KEY) + && currentStatus.equals(ExecutionStatus.PENDING_RESUME.name()); + } + private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) { int maxAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1); int currentAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);