diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java index dea684bbd3d8..1fa2496422eb 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java @@ -599,8 +599,8 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional 0) { + // we are past delay period and task completed successfully; reset delay previousDelaySchedulingFuture = delaySchedulingFuture; delayStopwatch.reset(); delaySchedulingDuration = Optional.empty(); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java index fb28acf254cd..a54a9cc4f2d7 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java @@ -718,8 +718,19 @@ public void testRetryDelay() assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED); assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); - // T+9.0 task 0.3 completes successfully - should reset delay for stage + // T+9.0 task 0.3 completes successfully - we should not reset delay; backoff still in progress remoteTaskFactory.getTasks().get(getTaskId(0, 3)).finish(); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(6); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+9.7 more than 3s passed since task 1.0 was killed; should restart now + moveTime(700, MILLISECONDS); assertUnblocked(blocked); scheduler.schedule(); blocked = scheduler.isBlocked(); @@ -733,7 +744,22 @@ public void testRetryDelay() assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.RUNNING); assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); - // T+9.0 kill task 1.1; delay should count from 1s again as there was a success + // T+9.7 task 2.0 completes successfully - delay should be reset (we are not in backoff now) + remoteTaskFactory.getTasks().get(getTaskId(2, 0)).finish(); + assertUnblocked(blocked); + scheduler.schedule(); + blocked = scheduler.isBlocked(); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(7); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.FINISHED); + + // T+9.7 kill task 1.1; delay should be 1s now scheduler.reportTaskFailure(getTaskId(1, 1), new RuntimeException("some other failure")); assertUnblocked(blocked); scheduler.schedule(); @@ -746,10 +772,22 @@ public void testRetryDelay() assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED); assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED); assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.FAILED); - assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.FINISHED); - // T+10.1 task 1.2 should be started - moveTime(1100, MILLISECONDS); + // T+10.6 task 1.2 should not start yet + moveTime(900, MILLISECONDS); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(7); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.FINISHED); + + // T+10.8 more than 1s passed; task 1.2 should start now + moveTime(200, MILLISECONDS); assertUnblocked(blocked); scheduler.schedule(); blocked = scheduler.isBlocked(); @@ -762,10 +800,10 @@ public void testRetryDelay() assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED); assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.FAILED); assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 2)).getTaskStatus().getState(), TaskState.RUNNING); - assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.FINISHED); - // T+10.1 if we kill task with out of memory error next try should be started right away - scheduler.reportTaskFailure(getTaskId(2, 0), new TrinoException(StandardErrorCode.CLUSTER_OUT_OF_MEMORY, "oom")); + // T+10.8 if we kill task with out of memory error next try should be started right away + scheduler.reportTaskFailure(getTaskId(1, 2), new TrinoException(StandardErrorCode.CLUSTER_OUT_OF_MEMORY, "oom")); assertUnblocked(blocked); scheduler.schedule(); blocked = scheduler.isBlocked(); @@ -777,9 +815,9 @@ public void testRetryDelay() assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED); assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED); assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.FAILED); - assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 2)).getTaskStatus().getState(), TaskState.RUNNING); - assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.FAILED); - assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 1)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 2)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 3)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.FINISHED); } }