diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index d05da728e0f5..e2587ffada40 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -156,6 +156,7 @@ public final class SystemSessionProperties public static final String MAX_TASKS_WAITING_FOR_NODE_PER_STAGE = "max_tasks_waiting_for_node_per_stage"; public static final String RETRY_INITIAL_DELAY = "retry_initial_delay"; public static final String RETRY_MAX_DELAY = "retry_max_delay"; + public static final String RETRY_DELAY_SCALE_FACTOR = "retry_delay_scale_factor"; public static final String HIDE_INACCESSIBLE_COLUMNS = "hide_inaccessible_columns"; public static final String FAULT_TOLERANT_EXECUTION_TARGET_TASK_INPUT_SIZE = "fault_tolerant_execution_target_task_input_size"; public static final String FAULT_TOLERANT_EXECUTION_MIN_TASK_SPLIT_COUNT = "fault_tolerant_execution_min_task_split_count"; @@ -748,6 +749,18 @@ public SystemSessionProperties( "Maximum delay before initiating a retry attempt. Delay increases exponentially for each subsequent attempt starting from 'retry_initial_delay'", queryManagerConfig.getRetryMaxDelay(), false), + doubleProperty( + RETRY_DELAY_SCALE_FACTOR, + "Maximum delay before initiating a retry attempt. Delay increases exponentially for each subsequent attempt starting from 'retry_initial_delay'", + queryManagerConfig.getRetryDelayScaleFactor(), + value -> { + if (value < 1.0) { + throw new TrinoException( + INVALID_SESSION_PROPERTY, + format("%s must be greater or equal to 1.0", RETRY_MAX_DELAY)); + } + }, + false), booleanProperty( HIDE_INACCESSIBLE_COLUMNS, "When enabled non-accessible columns are silently filtered from results from SELECT * statements", @@ -1397,6 +1410,11 @@ public static Duration getRetryMaxDelay(Session session) return session.getSystemProperty(RETRY_MAX_DELAY, Duration.class); } + public static double getRetryDelayScaleFactor(Session session) + { + return session.getSystemProperty(RETRY_DELAY_SCALE_FACTOR, Double.class); + } + public static boolean isHideInaccessibleColumns(Session session) { return session.getSystemProperty(HIDE_INACCESSIBLE_COLUMNS, Boolean.class); diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index 504c99b0a678..ffae98b9ff6a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -29,6 +29,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; +import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.units.DataSize.Unit.GIGABYTE; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; @@ -81,6 +82,7 @@ public class QueryManagerConfig private int taskRetryAttemptsOverall = Integer.MAX_VALUE; private Duration retryInitialDelay = new Duration(10, SECONDS); private Duration retryMaxDelay = new Duration(1, MINUTES); + private double retryDelayScaleFactor = 2.0; private int maxTasksWaitingForNodePerStage = 5; @@ -485,6 +487,21 @@ public QueryManagerConfig setRetryMaxDelay(Duration retryMaxDelay) return this; } + @NotNull + public double getRetryDelayScaleFactor() + { + return retryDelayScaleFactor; + } + + @Config("retry-delay-scale-factor") + @ConfigDescription("Factor by which retry delay is scaled on subsequent failures") + public QueryManagerConfig setRetryDelayScaleFactor(double retryDelayScaleFactor) + { + checkArgument(retryDelayScaleFactor >= 1.0, "retry-delay-scale-factor must be greater or equal to 1"); + this.retryDelayScaleFactor = retryDelayScaleFactor; + return this; + } + @Min(1) public int getMaxTasksWaitingForNodePerStage() { diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java index 0b6401155a99..dea684bbd3d8 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java @@ -13,6 +13,8 @@ */ package io.trino.execution.scheduler; +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import com.google.common.base.VerifyException; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; @@ -20,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; +import com.google.common.collect.Ordering; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.airlift.concurrent.MoreFutures; @@ -52,6 +55,7 @@ import javax.annotation.concurrent.GuardedBy; +import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; @@ -66,6 +70,7 @@ import java.util.function.Function; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.propagateIfPossible; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -78,12 +83,17 @@ import static io.airlift.concurrent.MoreFutures.asVoid; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.MoreFutures.toListenableFuture; +import static io.trino.SystemSessionProperties.getRetryDelayScaleFactor; +import static io.trino.SystemSessionProperties.getRetryInitialDelay; +import static io.trino.SystemSessionProperties.getRetryMaxDelay; import static io.trino.execution.buffer.OutputBuffers.BufferType.PARTITIONED; import static io.trino.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; import static io.trino.execution.buffer.OutputBuffers.createSpoolingExchangeOutputBuffers; import static io.trino.execution.scheduler.ErrorCodes.isOutOfMemoryError; import static io.trino.failuredetector.FailureDetector.State.GONE; import static io.trino.operator.ExchangeOperator.REMOTE_CONNECTOR_ID; +import static io.trino.spi.ErrorType.EXTERNAL; +import static io.trino.spi.ErrorType.INTERNAL_ERROR; import static io.trino.spi.ErrorType.USER_ERROR; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.REMOTE_HOST_GONE; @@ -113,12 +123,25 @@ public class FaultTolerantStageScheduler private final Optional sourceBucketToPartitionMap; private final Optional sourceBucketNodeMap; + private final DelayedFutureCompletor futureCompletor; + @GuardedBy("this") private ListenableFuture blocked = immediateVoidFuture(); @GuardedBy("this") private SettableFuture taskFinishedFuture; + private final Duration minRetryDelay; + private final Duration maxRetryDelay; + private final double retryDelayScaleFactor; + + @GuardedBy("this") + private Optional delaySchedulingDuration = Optional.empty(); + @GuardedBy("this") + private final Stopwatch delayStopwatch; + @GuardedBy("this") + private SettableFuture delaySchedulingFuture; + @GuardedBy("this") private TaskSource taskSource; @GuardedBy("this") @@ -158,6 +181,8 @@ public FaultTolerantStageScheduler( TaskDescriptorStorage taskDescriptorStorage, PartitionMemoryEstimator partitionMemoryEstimator, TaskLifecycleListener taskLifecycleListener, + DelayedFutureCompletor futureCompletor, + Ticker ticker, Optional sinkExchange, Optional sinkBucketToPartitionMap, Map sourceExchanges, @@ -177,6 +202,7 @@ public FaultTolerantStageScheduler( this.taskDescriptorStorage = requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null"); this.partitionMemoryEstimator = requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null"); this.taskLifecycleListener = requireNonNull(taskLifecycleListener, "taskLifecycleListener is null"); + this.futureCompletor = requireNonNull(futureCompletor, "futureCompletor is null"); this.sinkExchange = requireNonNull(sinkExchange, "sinkExchange is null"); this.sinkBucketToPartitionMap = requireNonNull(sinkBucketToPartitionMap, "sinkBucketToPartitionMap is null"); this.sourceExchanges = ImmutableMap.copyOf(requireNonNull(sourceExchanges, "sourceExchanges is null")); @@ -185,6 +211,10 @@ public FaultTolerantStageScheduler( this.remainingRetryAttemptsOverall = requireNonNull(remainingRetryAttemptsOverall, "remainingRetryAttemptsOverall is null"); this.maxRetryAttemptsPerTask = taskRetryAttemptsPerTask; this.maxTasksWaitingForNodePerStage = maxTasksWaitingForNodePerStage; + this.minRetryDelay = Duration.ofMillis(getRetryInitialDelay(session).toMillis()); + this.maxRetryDelay = Duration.ofMillis(getRetryMaxDelay(session).toMillis()); + this.retryDelayScaleFactor = getRetryDelayScaleFactor(session); + this.delayStopwatch = Stopwatch.createUnstarted(ticker); } public StageId getStageId() @@ -217,6 +247,12 @@ public synchronized void schedule() return; } + if (delaySchedulingFuture != null && !delaySchedulingFuture.isDone()) { + // let's wait a bit more + blocked = delaySchedulingFuture; + return; + } + if (taskSource == null) { Map>> sourceHandles = sourceExchanges.entrySet().stream() .collect(toImmutableMap(Map.Entry::getKey, entry -> toListenableFuture(entry.getValue().getSourceHandles()))); @@ -531,12 +567,13 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional future; + SettableFuture previousTaskFinishedFuture; + SettableFuture previousDelaySchedulingFuture = null; synchronized (this) { TaskId taskId = taskStatus.getTaskId(); runningTasks.remove(taskId); - future = taskFinishedFuture; + previousTaskFinishedFuture = taskFinishedFuture; if (!runningTasks.isEmpty()) { taskFinishedFuture = SettableFuture.create(); } @@ -561,6 +598,15 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional 0) { + // we are past previous delay period and still getting failures; let's make it longer + delayStopwatch.reset().start(); + delaySchedulingDuration = delaySchedulingDuration.map(duration -> + Ordering.natural().min( + Duration.ofMillis((long) (duration.toMillis() * retryDelayScaleFactor)), + maxRetryDelay)); + + // create new future + previousDelaySchedulingFuture = delaySchedulingFuture; + SettableFuture newDelaySchedulingFuture = SettableFuture.create(); + delaySchedulingFuture = newDelaySchedulingFuture; + futureCompletor.completeFuture(newDelaySchedulingFuture, delaySchedulingDuration.get()); + } + } + else { + // initialize delaying of tasks scheduling + delayStopwatch.start(); + delaySchedulingDuration = Optional.of(minRetryDelay); + delaySchedulingFuture = SettableFuture.create(); + futureCompletor.completeFuture(delaySchedulingFuture, delaySchedulingDuration.get()); + } + } } else { failure = failureInfo.toException(); @@ -614,8 +689,11 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional future, Duration delay); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java index 52d0417706a2..a216a4cd6a03 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java @@ -108,6 +108,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Ticker.systemTicker; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -125,6 +126,7 @@ import static io.trino.SystemSessionProperties.getHashPartitionCount; import static io.trino.SystemSessionProperties.getMaxTasksWaitingForNodePerStage; import static io.trino.SystemSessionProperties.getQueryRetryAttempts; +import static io.trino.SystemSessionProperties.getRetryDelayScaleFactor; import static io.trino.SystemSessionProperties.getRetryInitialDelay; import static io.trino.SystemSessionProperties.getRetryMaxDelay; import static io.trino.SystemSessionProperties.getRetryPolicy; @@ -203,6 +205,7 @@ public class SqlQueryScheduler private final AtomicInteger currentAttempt = new AtomicInteger(); private final Duration retryInitialDelay; private final Duration retryMaxDelay; + private final double retryDelayScaleFactor; @GuardedBy("this") private boolean started; @@ -282,6 +285,7 @@ public SqlQueryScheduler( maxTasksWaitingForNodePerStage = getMaxTasksWaitingForNodePerStage(queryStateMachine.getSession()); retryInitialDelay = getRetryInitialDelay(queryStateMachine.getSession()); retryMaxDelay = getRetryMaxDelay(queryStateMachine.getSession()); + retryDelayScaleFactor = getRetryDelayScaleFactor(queryStateMachine.getSession()); } public synchronized void start() @@ -411,7 +415,7 @@ else if (state == DistributedStagesSchedulerState.CANCELED) { .orElseGet(() -> new StageFailureInfo(toFailure(new VerifyException("distributedStagesScheduler failed but failure cause is not present")), Optional.empty())); ErrorCode errorCode = stageFailureInfo.getFailureInfo().getErrorCode(); if (shouldRetry(errorCode)) { - long delayInMillis = min(retryInitialDelay.toMillis() * ((long) pow(2, currentAttempt.get())), retryMaxDelay.toMillis()); + long delayInMillis = min(retryInitialDelay.toMillis() * ((long) pow(retryDelayScaleFactor, currentAttempt.get())), retryMaxDelay.toMillis()); currentAttempt.incrementAndGet(); scheduleRetryWithDelay(delayInMillis); } @@ -1828,6 +1832,8 @@ public static FaultTolerantDistributedStagesScheduler create( taskDescriptorStorage, partitionMemoryEstimatorFactory.createPartitionMemoryEstimator(), taskLifecycleListener, + (future, delay) -> scheduledExecutorService.schedule(() -> future.set(null), delay.toMillis(), MILLISECONDS), + systemTicker(), exchange, bucketToPartitionCache.apply(fragment.getPartitioningScheme().getPartitioning().getHandle()).getBucketToPartitionMap(), sourceExchanges.buildOrThrow(), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java index 8ccbfcd568fb..ae94a9501f36 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java @@ -68,6 +68,7 @@ public void testDefaults() .setTaskRetryAttemptsPerTask(4) .setRetryInitialDelay(new Duration(10, SECONDS)) .setRetryMaxDelay(new Duration(1, MINUTES)) + .setRetryDelayScaleFactor(2.0) .setMaxTasksWaitingForNodePerStage(5) .setFaultTolerantExecutionTargetTaskInputSize(DataSize.of(1, GIGABYTE)) .setFaultTolerantExecutionMinTaskSplitCount(16) @@ -109,6 +110,7 @@ public void testExplicitPropertyMappings() .put("task-retry-attempts-per-task", "9") .put("retry-initial-delay", "1m") .put("retry-max-delay", "1h") + .put("retry-delay-scale-factor", "2.3") .put("max-tasks-waiting-for-node-per-stage", "3") .put("fault-tolerant-execution-target-task-input-size", "222MB") .put("fault-tolerant-execution-min-task-split-count", "2") @@ -147,6 +149,7 @@ public void testExplicitPropertyMappings() .setTaskRetryAttemptsPerTask(9) .setRetryInitialDelay(new Duration(1, MINUTES)) .setRetryMaxDelay(new Duration(1, HOURS)) + .setRetryDelayScaleFactor(2.3) .setMaxTasksWaitingForNodePerStage(3) .setFaultTolerantExecutionTargetTaskInputSize(DataSize.of(222, MEGABYTE)) .setFaultTolerantExecutionMinTaskSplitCount(2) diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java index d516538a7d49..fb28acf254cd 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFaultTolerantStageScheduler.java @@ -13,10 +13,15 @@ */ package io.trino.execution.scheduler; +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.airlift.testing.TestingTicker; import io.airlift.units.DataSize; import io.trino.Session; import io.trino.client.NodeVersion; @@ -38,6 +43,8 @@ import io.trino.metadata.InternalNode; import io.trino.metadata.Split; import io.trino.spi.QueryId; +import io.trino.spi.StandardErrorCode; +import io.trino.spi.TrinoException; import io.trino.spi.exchange.Exchange; import io.trino.spi.predicate.TupleDomain; import io.trino.sql.planner.Partitioning; @@ -58,9 +65,13 @@ import org.testng.annotations.Test; import java.net.URI; +import java.time.Duration; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.collect.Iterables.cycle; @@ -79,6 +90,8 @@ import static io.trino.testing.TestingHandles.TEST_TABLE_HANDLE; import static io.trino.testing.TestingSession.testSessionBuilder; import static io.trino.testing.TestingSplit.createRemoteSplit; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; @@ -109,12 +122,17 @@ public class TestFaultTolerantStageScheduler private NodeTaskMap nodeTaskMap; private FixedCountNodeAllocatorService nodeAllocatorService; + private TestingTicker ticker; + private TestFutureCompletor futureCompletor; + @BeforeClass public void beforeClass() { finalizerService = new FinalizerService(); finalizerService.start(); nodeTaskMap = new NodeTaskMap(finalizerService); + ticker = new TestingTicker(); + futureCompletor = new TestFutureCompletor(ticker); } @AfterClass(alwaysRun = true) @@ -213,6 +231,7 @@ public void testHappyPath() assertUnblocked(scheduler.isBlocked()); // schedule more tasks + moveTime(10, SECONDS); // skip retry delay scheduler.schedule(); tasks = remoteTaskFactory.getTasks(); @@ -412,6 +431,7 @@ public void testTaskLifecycleListener() remoteTaskFactory.getTasks().get(getTaskId(0, 0)).fail(new RuntimeException("some exception")); assertUnblocked(scheduler.isBlocked()); + moveTime(10, SECONDS); // skip retry delay scheduler.schedule(); assertBlocked(scheduler.isBlocked()); @@ -510,6 +530,7 @@ public void testReportTaskFailure() assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); assertUnblocked(blocked); + moveTime(10, SECONDS); // skip retry delay scheduler.schedule(); assertThat(remoteTaskFactory.getTasks()).containsKey(getTaskId(0, 1)); @@ -522,6 +543,246 @@ public void testReportTaskFailure() } } + @Test + public void testRetryDelay() + throws Exception + { + TestingRemoteTaskFactory remoteTaskFactory = new TestingRemoteTaskFactory(); + TestingTaskSourceFactory taskSourceFactory = createTaskSourceFactory(3, 1); + TestingNodeSupplier nodeSupplier = TestingNodeSupplier.create(ImmutableMap.of( + NODE_1, ImmutableList.of(CATALOG), + NODE_2, ImmutableList.of(CATALOG), + NODE_3, ImmutableList.of(CATALOG))); + setupNodeAllocatorService(nodeSupplier); + + TestingExchange sourceExchange1 = new TestingExchange(false); + TestingExchange sourceExchange2 = new TestingExchange(false); + + Session session = testSessionBuilder() + .setQueryId(QUERY_ID) + .setSystemProperty("retry_initial_delay", "1s") + .setSystemProperty("retry_max_delay", "3s") + .setSystemProperty("retry_delay_scale_factor", "2.0") + .build(); + + try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(session, 1)) { + FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler( + session, + remoteTaskFactory, + taskSourceFactory, + nodeAllocator, + TaskLifecycleListener.NO_OP, + Optional.empty(), + ImmutableMap.of(SOURCE_FRAGMENT_ID_1, sourceExchange1, SOURCE_FRAGMENT_ID_2, sourceExchange2), + 6, + 1); + + sourceExchange1.setSourceHandles(ImmutableList.of(new TestingExchangeSourceHandle(0, 1))); + sourceExchange2.setSourceHandles(ImmutableList.of(new TestingExchangeSourceHandle(0, 1))); + assertUnblocked(scheduler.isBlocked()); + scheduler.schedule(); + + ListenableFuture blocked = scheduler.isBlocked(); + + // T+0.0 all tasks are running + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(3); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+0.0 fail task 0.0 + scheduler.reportTaskFailure(getTaskId(0, 0), new RuntimeException("some failure")); + assertUnblocked(blocked); + scheduler.schedule(); + blocked = scheduler.isBlocked(); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(3); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+0.9 retry should not trigger yet + moveTime(900, MILLISECONDS); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(3); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+1.4s past retry delay for task 0 + moveTime(500, MILLISECONDS); + assertUnblocked(blocked); + scheduler.schedule(); + blocked = scheduler.isBlocked(); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(4); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+1.4 fail task 0.1 + scheduler.reportTaskFailure(getTaskId(0, 1), new RuntimeException("some other failure")); + assertUnblocked(blocked); + scheduler.schedule(); + blocked = scheduler.isBlocked(); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(4); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+3.3 another retry should not happen yet (delay is 2s on second failure) + moveTime(1900, MILLISECONDS); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(4); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+3.5s past retry delay for task 0.1 + moveTime(200, MILLISECONDS); + assertUnblocked(blocked); + scheduler.schedule(); + blocked = scheduler.isBlocked(); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(5); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+3.5 fail task 0.2 + scheduler.reportTaskFailure(getTaskId(0, 2), new RuntimeException("some other failure")); + assertUnblocked(blocked); + scheduler.schedule(); + blocked = scheduler.isBlocked(); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(5); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+6.4 another retry should not happen yet (delay is 3s on thirf failure (we reached limit) + moveTime(2900, MILLISECONDS); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(5); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+6.6s past retry delay for task 0.2 + moveTime(200, MILLISECONDS); + assertUnblocked(blocked); + scheduler.schedule(); + blocked = scheduler.isBlocked(); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(6); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+6.6 task 1 failure + scheduler.reportTaskFailure(getTaskId(1, 0), new RuntimeException("some other failure")); + assertUnblocked(blocked); + scheduler.schedule(); + blocked = scheduler.isBlocked(); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(6); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+9.0 task1 still not retried (delay is 3s) + moveTime(2400, MILLISECONDS); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(6); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+9.0 task 0.3 completes successfully - should reset delay for stage + remoteTaskFactory.getTasks().get(getTaskId(0, 3)).finish(); + assertUnblocked(blocked); + scheduler.schedule(); + blocked = scheduler.isBlocked(); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(7); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+9.0 kill task 1.1; delay should count from 1s again as there was a success + scheduler.reportTaskFailure(getTaskId(1, 1), new RuntimeException("some other failure")); + assertUnblocked(blocked); + scheduler.schedule(); + blocked = scheduler.isBlocked(); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(7); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+10.1 task 1.2 should be started + moveTime(1100, MILLISECONDS); + assertUnblocked(blocked); + scheduler.schedule(); + blocked = scheduler.isBlocked(); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(8); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 2)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.RUNNING); + + // T+10.1 if we kill task with out of memory error next try should be started right away + scheduler.reportTaskFailure(getTaskId(2, 0), new TrinoException(StandardErrorCode.CLUSTER_OUT_OF_MEMORY, "oom")); + assertUnblocked(blocked); + scheduler.schedule(); + blocked = scheduler.isBlocked(); + assertBlocked(blocked); + assertThat(remoteTaskFactory.getTasks()).hasSize(9); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 2)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(0, 3)).getTaskStatus().getState(), TaskState.FINISHED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 1)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(1, 2)).getTaskStatus().getState(), TaskState.RUNNING); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 0)).getTaskStatus().getState(), TaskState.FAILED); + assertEquals(remoteTaskFactory.getTasks().get(getTaskId(2, 1)).getTaskStatus().getState(), TaskState.RUNNING); + } + } + @Test public void testCancellation() throws Exception @@ -594,11 +855,34 @@ private FaultTolerantStageScheduler createFaultTolerantTaskScheduler( Map sourceExchanges, int retryAttempts, int maxTasksWaitingForNodePerStage) + { + return createFaultTolerantTaskScheduler( + SESSION, + remoteTaskFactory, + taskSourceFactory, + nodeAllocator, + taskLifecycleListener, + sinkExchange, + sourceExchanges, + retryAttempts, + maxTasksWaitingForNodePerStage); + } + + private FaultTolerantStageScheduler createFaultTolerantTaskScheduler( + Session session, + RemoteTaskFactory remoteTaskFactory, + TaskSourceFactory taskSourceFactory, + NodeAllocator nodeAllocator, + TaskLifecycleListener taskLifecycleListener, + Optional sinkExchange, + Map sourceExchanges, + int retryAttempts, + int maxTasksWaitingForNodePerStage) { TaskDescriptorStorage taskDescriptorStorage = new TaskDescriptorStorage(DataSize.of(10, MEGABYTE)); taskDescriptorStorage.initialize(SESSION.getQueryId()); return new FaultTolerantStageScheduler( - SESSION, + session, createSqlStage(remoteTaskFactory), new NoOpFailureDetector(), taskSourceFactory, @@ -606,6 +890,8 @@ private FaultTolerantStageScheduler createFaultTolerantTaskScheduler( taskDescriptorStorage, new ConstantPartitionMemoryEstimator(), taskLifecycleListener, + futureCompletor, + ticker, sinkExchange, Optional.empty(), sourceExchanges, @@ -702,4 +988,53 @@ private static void assertUnblocked(ListenableFuture blocked) { assertTrue(blocked.isDone()); } + + private void moveTime(int delta, TimeUnit unit) + { + ticker.increment(delta, unit); + futureCompletor.trigger(); + } + + private static class TestFutureCompletor + implements FaultTolerantStageScheduler.DelayedFutureCompletor + { + private final Stopwatch stopwatch; + private final Set entries = Sets.newConcurrentHashSet(); + + private TestFutureCompletor(Ticker ticker) + { + this.stopwatch = Stopwatch.createStarted(ticker); + } + + @Override + public void completeFuture(SettableFuture future, Duration delay) + { + entries.add(new Entry(future, stopwatch.elapsed().plus(delay))); + } + + public void trigger() + { + Duration now = stopwatch.elapsed(); + Iterator iterator = entries.iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + if (entry.completionTime.compareTo(now) <= 0) { + entry.future.set(null); + iterator.remove(); + } + } + } + + private static class Entry + { + private final SettableFuture future; + private final Duration completionTime; + + public Entry(SettableFuture future, Duration completionTime) + { + this.future = future; + this.completionTime = completionTime; + } + } + } } diff --git a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst index d55a2db3f368..6f3b25100c3f 100644 --- a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst +++ b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst @@ -135,18 +135,24 @@ queries/tasks are no longer retried in the event of repeated failures: - ``4`` - Only ``TASK`` * - ``retry-initial-delay`` - - Minimum time that a failed query must wait before it is retried. May be + - Minimum time that a failed query or task must wait before it is retried. May be overridden with the ``retry_initial_delay`` :ref:`session property `. - ``10s`` - - Only ``QUERY`` + - ``QUERY`` and ``TASK`` * - ``retry-max-delay`` - - Maximum time that a failed query must wait before it is retried. - Wait time is increased on each subsequent query failure. May be + - Maximum time that a failed query or task must wait before it is retried. + Wait time is increased on each subsequent failure. May be overridden with the ``retry_max_delay`` :ref:`session property `. - ``1m`` - - Only ``QUERY`` + - ``QUERY`` and ``TASK`` + * - ``retry-delay-scale-factor`` + - Factor by which retry delay is increased on each query or task failure. May be + overridden with the ``retry_delay_scale_factor`` :ref:`session property + `. + - ``2.0`` + - ``QUERY`` and ``TASK`` Task sizing ^^^^^^^^^^^