Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,8 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional<ExchangeSinkInstan
partitionToRemoteTaskMap.get(partitionId).forEach(RemoteTask::abort);
partitionMemoryEstimator.registerPartitionFinished(session, memoryLimits, taskStatus.getPeakMemoryReservation(), true, Optional.empty());

if (delayStopwatch.isRunning()) {
// task completed successfully; reset delay
if (delayStopwatch.isRunning() && delayStopwatch.elapsed().compareTo(delaySchedulingDuration.get()) > 0) {
// we are past delay period and task completed successfully; reset delay
previousDelaySchedulingFuture = delaySchedulingFuture;
delayStopwatch.reset();
delaySchedulingDuration = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,8 +718,19 @@ public void testRetryDelay()
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING);

// T+9.0 task 0.3 completes successfully - should reset delay for stage
// T+9.0 task 0.3 completes successfully - we should not reset delay; backoff still in progress
remoteTaskFactory.getTasks().get(getTaskId(0, 3)).finish();
assertBlocked(blocked);
assertThat(remoteTaskFactory.getTasks()).hasSize(6);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING);

// T+9.7 more than 3s passed since task 1.0 was killed; should restart now
moveTime(700, MILLISECONDS);
assertUnblocked(blocked);
scheduler.schedule();
blocked = scheduler.isBlocked();
Expand All @@ -733,7 +744,22 @@ public void testRetryDelay()
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.RUNNING);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING);

// T+9.0 kill task 1.1; delay should count from 1s again as there was a success
// T+9.7 task 2.0 completes successfully - delay should be reset (we are not in backoff now)
remoteTaskFactory.getTasks().get(getTaskId(2, 0)).finish();
assertUnblocked(blocked);
scheduler.schedule();
blocked = scheduler.isBlocked();
assertBlocked(blocked);
assertThat(remoteTaskFactory.getTasks()).hasSize(7);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.RUNNING);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.FINISHED);

// T+9.7 kill task 1.1; delay should be 1s now
scheduler.reportTaskFailure(getTaskId(1, 1), new RuntimeException("some other failure"));
assertUnblocked(blocked);
scheduler.schedule();
Expand All @@ -746,10 +772,22 @@ public void testRetryDelay()
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.FINISHED);

// T+10.1 task 1.2 should be started
moveTime(1100, MILLISECONDS);
// T+10.6 task 1.2 should not start yet
moveTime(900, MILLISECONDS);
assertBlocked(blocked);
assertThat(remoteTaskFactory.getTasks()).hasSize(7);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.FINISHED);

// T+10.8 more than 1s passed; task 1.2 should start now
moveTime(200, MILLISECONDS);
assertUnblocked(blocked);
scheduler.schedule();
blocked = scheduler.isBlocked();
Expand All @@ -762,10 +800,10 @@ public void testRetryDelay()
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 2)).getTaskStatus().getState(), TaskState.RUNNING);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.FINISHED);

// T+10.1 if we kill task with out of memory error next try should be started right away
scheduler.reportTaskFailure(getTaskId(2, 0), new TrinoException(StandardErrorCode.CLUSTER_OUT_OF_MEMORY, "oom"));
// T+10.8 if we kill task with out of memory error next try should be started right away
scheduler.reportTaskFailure(getTaskId(1, 2), new TrinoException(StandardErrorCode.CLUSTER_OUT_OF_MEMORY, "oom"));
assertUnblocked(blocked);
scheduler.schedule();
blocked = scheduler.isBlocked();
Expand All @@ -777,9 +815,9 @@ public void testRetryDelay()
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 2)).getTaskStatus().getState(), TaskState.RUNNING);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 1)).getTaskStatus().getState(), TaskState.RUNNING);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 2)).getTaskStatus().getState(), TaskState.FAILED);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 3)).getTaskStatus().getState(), TaskState.RUNNING);
assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.FINISHED);
}
}

Expand Down