Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public final class SystemSessionProperties
public static final String MAX_TASKS_WAITING_FOR_NODE_PER_STAGE = "max_tasks_waiting_for_node_per_stage";
public static final String RETRY_INITIAL_DELAY = "retry_initial_delay";
public static final String RETRY_MAX_DELAY = "retry_max_delay";
public static final String RETRY_DELAY_SCALE_FACTOR = "retry_delay_scale_factor";
public static final String HIDE_INACCESSIBLE_COLUMNS = "hide_inaccessible_columns";
public static final String FAULT_TOLERANT_EXECUTION_TARGET_TASK_INPUT_SIZE = "fault_tolerant_execution_target_task_input_size";
public static final String FAULT_TOLERANT_EXECUTION_MIN_TASK_SPLIT_COUNT = "fault_tolerant_execution_min_task_split_count";
Expand Down Expand Up @@ -748,6 +749,18 @@ public SystemSessionProperties(
"Maximum delay before initiating a retry attempt. Delay increases exponentially for each subsequent attempt starting from 'retry_initial_delay'",
queryManagerConfig.getRetryMaxDelay(),
false),
doubleProperty(
RETRY_DELAY_SCALE_FACTOR,
"Maximum delay before initiating a retry attempt. Delay increases exponentially for each subsequent attempt starting from 'retry_initial_delay'",
queryManagerConfig.getRetryDelayScaleFactor(),
value -> {
if (value < 1.0) {
throw new TrinoException(
INVALID_SESSION_PROPERTY,
format("%s must be greater or equal to 1.0", RETRY_MAX_DELAY));
}
},
false),
booleanProperty(
HIDE_INACCESSIBLE_COLUMNS,
"When enabled non-accessible columns are silently filtered from results from SELECT * statements",
Expand Down Expand Up @@ -1397,6 +1410,11 @@ public static Duration getRetryMaxDelay(Session session)
return session.getSystemProperty(RETRY_MAX_DELAY, Duration.class);
}

public static double getRetryDelayScaleFactor(Session session)
{
return session.getSystemProperty(RETRY_DELAY_SCALE_FACTOR, Double.class);
}

public static boolean isHideInaccessibleColumns(Session session)
{
return session.getSystemProperty(HIDE_INACCESSIBLE_COLUMNS, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class QueryManagerConfig
private int taskRetryAttemptsOverall = Integer.MAX_VALUE;
private Duration retryInitialDelay = new Duration(10, SECONDS);
private Duration retryMaxDelay = new Duration(1, MINUTES);
private double retryDelayScaleFactor = 2.0;

private int maxTasksWaitingForNodePerStage = 5;

Expand Down Expand Up @@ -485,6 +487,21 @@ public QueryManagerConfig setRetryMaxDelay(Duration retryMaxDelay)
return this;
}

@NotNull
public double getRetryDelayScaleFactor()
{
return retryDelayScaleFactor;
}

@Config("retry-delay-scale-factor")
@ConfigDescription("Factor by which retry delay is scaled on subsequent failures")
public QueryManagerConfig setRetryDelayScaleFactor(double retryDelayScaleFactor)
{
checkArgument(retryDelayScaleFactor >= 1.0, "retry-delay-scale-factor must be greater or equal to 1");
this.retryDelayScaleFactor = retryDelayScaleFactor;
return this;
}

@Min(1)
public int getMaxTasksWaitingForNodePerStage()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
*/
package io.trino.execution.scheduler;

import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.base.VerifyException;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.MoreFutures;
Expand Down Expand Up @@ -52,6 +55,7 @@

import javax.annotation.concurrent.GuardedBy;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -66,6 +70,7 @@
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.propagateIfPossible;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -78,12 +83,17 @@
import static io.airlift.concurrent.MoreFutures.asVoid;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.concurrent.MoreFutures.toListenableFuture;
import static io.trino.SystemSessionProperties.getRetryDelayScaleFactor;
import static io.trino.SystemSessionProperties.getRetryInitialDelay;
import static io.trino.SystemSessionProperties.getRetryMaxDelay;
import static io.trino.execution.buffer.OutputBuffers.BufferType.PARTITIONED;
import static io.trino.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static io.trino.execution.buffer.OutputBuffers.createSpoolingExchangeOutputBuffers;
import static io.trino.execution.scheduler.ErrorCodes.isOutOfMemoryError;
import static io.trino.failuredetector.FailureDetector.State.GONE;
import static io.trino.operator.ExchangeOperator.REMOTE_CONNECTOR_ID;
import static io.trino.spi.ErrorType.EXTERNAL;
import static io.trino.spi.ErrorType.INTERNAL_ERROR;
import static io.trino.spi.ErrorType.USER_ERROR;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.REMOTE_HOST_GONE;
Expand Down Expand Up @@ -113,12 +123,25 @@ public class FaultTolerantStageScheduler
private final Optional<int[]> sourceBucketToPartitionMap;
private final Optional<BucketNodeMap> sourceBucketNodeMap;

private final DelayedFutureCompletor futureCompletor;

@GuardedBy("this")
private ListenableFuture<Void> blocked = immediateVoidFuture();

@GuardedBy("this")
private SettableFuture<Void> taskFinishedFuture;

private final Duration minRetryDelay;
private final Duration maxRetryDelay;
private final double retryDelayScaleFactor;

@GuardedBy("this")
private Optional<Duration> delaySchedulingDuration = Optional.empty();
@GuardedBy("this")
private final Stopwatch delayStopwatch;
@GuardedBy("this")
private SettableFuture<Void> delaySchedulingFuture;

@GuardedBy("this")
private TaskSource taskSource;
@GuardedBy("this")
Expand Down Expand Up @@ -158,6 +181,8 @@ public FaultTolerantStageScheduler(
TaskDescriptorStorage taskDescriptorStorage,
PartitionMemoryEstimator partitionMemoryEstimator,
TaskLifecycleListener taskLifecycleListener,
DelayedFutureCompletor futureCompletor,
Ticker ticker,
Optional<Exchange> sinkExchange,
Optional<int[]> sinkBucketToPartitionMap,
Map<PlanFragmentId, Exchange> sourceExchanges,
Expand All @@ -177,6 +202,7 @@ public FaultTolerantStageScheduler(
this.taskDescriptorStorage = requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null");
this.partitionMemoryEstimator = requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null");
this.taskLifecycleListener = requireNonNull(taskLifecycleListener, "taskLifecycleListener is null");
this.futureCompletor = requireNonNull(futureCompletor, "futureCompletor is null");
this.sinkExchange = requireNonNull(sinkExchange, "sinkExchange is null");
this.sinkBucketToPartitionMap = requireNonNull(sinkBucketToPartitionMap, "sinkBucketToPartitionMap is null");
this.sourceExchanges = ImmutableMap.copyOf(requireNonNull(sourceExchanges, "sourceExchanges is null"));
Expand All @@ -185,6 +211,10 @@ public FaultTolerantStageScheduler(
this.remainingRetryAttemptsOverall = requireNonNull(remainingRetryAttemptsOverall, "remainingRetryAttemptsOverall is null");
this.maxRetryAttemptsPerTask = taskRetryAttemptsPerTask;
this.maxTasksWaitingForNodePerStage = maxTasksWaitingForNodePerStage;
this.minRetryDelay = Duration.ofMillis(getRetryInitialDelay(session).toMillis());
this.maxRetryDelay = Duration.ofMillis(getRetryMaxDelay(session).toMillis());
this.retryDelayScaleFactor = getRetryDelayScaleFactor(session);
this.delayStopwatch = Stopwatch.createUnstarted(ticker);
}

public StageId getStageId()
Expand Down Expand Up @@ -217,6 +247,12 @@ public synchronized void schedule()
return;
}

if (delaySchedulingFuture != null && !delaySchedulingFuture.isDone()) {
// let's wait a bit more
blocked = delaySchedulingFuture;
return;
}

if (taskSource == null) {
Map<PlanFragmentId, ListenableFuture<List<ExchangeSourceHandle>>> sourceHandles = sourceExchanges.entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> toListenableFuture(entry.getValue().getSourceHandles())));
Expand Down Expand Up @@ -531,12 +567,13 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional<ExchangeSinkInstan

try {
RuntimeException failure = null;
SettableFuture<Void> future;
SettableFuture<Void> previousTaskFinishedFuture;
SettableFuture<Void> previousDelaySchedulingFuture = null;
synchronized (this) {
TaskId taskId = taskStatus.getTaskId();

runningTasks.remove(taskId);
future = taskFinishedFuture;
previousTaskFinishedFuture = taskFinishedFuture;
if (!runningTasks.isEmpty()) {
taskFinishedFuture = SettableFuture.create();
}
Expand All @@ -561,6 +598,15 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional<ExchangeSinkInstan
}
partitionToRemoteTaskMap.get(partitionId).forEach(RemoteTask::abort);
partitionMemoryEstimator.registerPartitionFinished(session, memoryLimits, taskStatus.getPeakMemoryReservation(), true, Optional.empty());

if (delayStopwatch.isRunning()) {
// task completed successfully; reset delay
previousDelaySchedulingFuture = delaySchedulingFuture;
delayStopwatch.reset();
delaySchedulingDuration = Optional.empty();
delaySchedulingFuture = null;
}

break;
case CANCELED:
log.debug("Task cancelled: %s", taskId);
Expand Down Expand Up @@ -600,6 +646,35 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional<ExchangeSinkInstan
// reschedule
queuedPartitions.add(partitionId);
log.debug("Retrying partition %s for stage %s", partitionId, stage.getStageId());

if (errorCode != null && shouldDelayScheduling(errorCode)) {
if (delayStopwatch.isRunning()) {
// we are currently delaying tasks scheduling
checkState(delaySchedulingDuration.isPresent());

if (delayStopwatch.elapsed().compareTo(delaySchedulingDuration.get()) > 0) {
// we are past previous delay period and still getting failures; let's make it longer
delayStopwatch.reset().start();
delaySchedulingDuration = delaySchedulingDuration.map(duration ->
Ordering.natural().min(
Duration.ofMillis((long) (duration.toMillis() * retryDelayScaleFactor)),
maxRetryDelay));

// create new future
previousDelaySchedulingFuture = delaySchedulingFuture;
SettableFuture<Void> newDelaySchedulingFuture = SettableFuture.create();
delaySchedulingFuture = newDelaySchedulingFuture;
futureCompletor.completeFuture(newDelaySchedulingFuture, delaySchedulingDuration.get());
}
}
else {
// initialize delaying of tasks scheduling
delayStopwatch.start();
delaySchedulingDuration = Optional.of(minRetryDelay);
delaySchedulingFuture = SettableFuture.create();
futureCompletor.completeFuture(delaySchedulingFuture, delaySchedulingDuration.get());
}
}
}
else {
failure = failureInfo.toException();
Expand All @@ -614,15 +689,23 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional<ExchangeSinkInstan
// must be called outside the lock
fail(failure);
}
if (future != null && !future.isDone()) {
future.set(null);
if (previousTaskFinishedFuture != null && !previousTaskFinishedFuture.isDone()) {
previousTaskFinishedFuture.set(null);
}
if (previousDelaySchedulingFuture != null && !previousDelaySchedulingFuture.isDone()) {
previousDelaySchedulingFuture.set(null);
}
}
catch (Throwable t) {
fail(t);
}
}

private boolean shouldDelayScheduling(ErrorCode errorCode)
{
return errorCode.getType() == INTERNAL_ERROR || errorCode.getType() == EXTERNAL;
}

private ExecutionFailureInfo rewriteTransportFailure(ExecutionFailureInfo executionFailureInfo)
{
if (executionFailureInfo.getRemoteHost() == null || failureDetector.getState(executionFailureInfo.getRemoteHost()) != GONE) {
Expand Down Expand Up @@ -661,4 +744,9 @@ public NodeAllocator.NodeLease getNodeLease()
return nodeLease;
}
}

public interface DelayedFutureCompletor
{
void completeFuture(SettableFuture<Void> future, Duration delay);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Ticker.systemTicker;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand All @@ -125,6 +126,7 @@
import static io.trino.SystemSessionProperties.getHashPartitionCount;
import static io.trino.SystemSessionProperties.getMaxTasksWaitingForNodePerStage;
import static io.trino.SystemSessionProperties.getQueryRetryAttempts;
import static io.trino.SystemSessionProperties.getRetryDelayScaleFactor;
import static io.trino.SystemSessionProperties.getRetryInitialDelay;
import static io.trino.SystemSessionProperties.getRetryMaxDelay;
import static io.trino.SystemSessionProperties.getRetryPolicy;
Expand Down Expand Up @@ -203,6 +205,7 @@ public class SqlQueryScheduler
private final AtomicInteger currentAttempt = new AtomicInteger();
private final Duration retryInitialDelay;
private final Duration retryMaxDelay;
private final double retryDelayScaleFactor;

@GuardedBy("this")
private boolean started;
Expand Down Expand Up @@ -282,6 +285,7 @@ public SqlQueryScheduler(
maxTasksWaitingForNodePerStage = getMaxTasksWaitingForNodePerStage(queryStateMachine.getSession());
retryInitialDelay = getRetryInitialDelay(queryStateMachine.getSession());
retryMaxDelay = getRetryMaxDelay(queryStateMachine.getSession());
retryDelayScaleFactor = getRetryDelayScaleFactor(queryStateMachine.getSession());
}

public synchronized void start()
Expand Down Expand Up @@ -411,7 +415,7 @@ else if (state == DistributedStagesSchedulerState.CANCELED) {
.orElseGet(() -> new StageFailureInfo(toFailure(new VerifyException("distributedStagesScheduler failed but failure cause is not present")), Optional.empty()));
ErrorCode errorCode = stageFailureInfo.getFailureInfo().getErrorCode();
if (shouldRetry(errorCode)) {
long delayInMillis = min(retryInitialDelay.toMillis() * ((long) pow(2, currentAttempt.get())), retryMaxDelay.toMillis());
long delayInMillis = min(retryInitialDelay.toMillis() * ((long) pow(retryDelayScaleFactor, currentAttempt.get())), retryMaxDelay.toMillis());
currentAttempt.incrementAndGet();
scheduleRetryWithDelay(delayInMillis);
}
Expand Down Expand Up @@ -1828,6 +1832,8 @@ public static FaultTolerantDistributedStagesScheduler create(
taskDescriptorStorage,
partitionMemoryEstimatorFactory.createPartitionMemoryEstimator(),
taskLifecycleListener,
(future, delay) -> scheduledExecutorService.schedule(() -> future.set(null), delay.toMillis(), MILLISECONDS),
systemTicker(),
exchange,
bucketToPartitionCache.apply(fragment.getPartitioningScheme().getPartitioning().getHandle()).getBucketToPartitionMap(),
sourceExchanges.buildOrThrow(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void testDefaults()
.setTaskRetryAttemptsPerTask(4)
.setRetryInitialDelay(new Duration(10, SECONDS))
.setRetryMaxDelay(new Duration(1, MINUTES))
.setRetryDelayScaleFactor(2.0)
.setMaxTasksWaitingForNodePerStage(5)
.setFaultTolerantExecutionTargetTaskInputSize(DataSize.of(1, GIGABYTE))
.setFaultTolerantExecutionMinTaskSplitCount(16)
Expand Down Expand Up @@ -109,6 +110,7 @@ public void testExplicitPropertyMappings()
.put("task-retry-attempts-per-task", "9")
.put("retry-initial-delay", "1m")
.put("retry-max-delay", "1h")
.put("retry-delay-scale-factor", "2.3")
.put("max-tasks-waiting-for-node-per-stage", "3")
.put("fault-tolerant-execution-target-task-input-size", "222MB")
.put("fault-tolerant-execution-min-task-split-count", "2")
Expand Down Expand Up @@ -147,6 +149,7 @@ public void testExplicitPropertyMappings()
.setTaskRetryAttemptsPerTask(9)
.setRetryInitialDelay(new Duration(1, MINUTES))
.setRetryMaxDelay(new Duration(1, HOURS))
.setRetryDelayScaleFactor(2.3)
.setMaxTasksWaitingForNodePerStage(3)
.setFaultTolerantExecutionTargetTaskInputSize(DataSize.of(222, MEGABYTE))
.setFaultTolerantExecutionMinTaskSplitCount(2)
Expand Down
Loading