Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static io.trino.spi.type.TimeZoneKey.getTimeZoneKey;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;

public final class SystemSessionProperties
implements SystemSessionPropertiesProvider
Expand Down Expand Up @@ -207,6 +208,8 @@ public final class SystemSessionProperties
public static final String USE_COST_BASED_PARTITIONING = "use_cost_based_partitioning";
public static final String FORCE_SPILLING_JOIN = "force_spilling_join";
public static final String PAGE_PARTITIONING_BUFFER_POOL_SIZE = "page_partitioning_buffer_pool_size";
public static final String IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD = "idle_writer_min_data_size_threshold";
public static final String CLOSE_IDLE_WRITERS_TRIGGER_DURATION = "close_idle_writers_trigger_duration";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -1058,6 +1061,14 @@ public SystemSessionProperties(
integerProperty(PAGE_PARTITIONING_BUFFER_POOL_SIZE,
"Maximum number of free buffers in the per task partitioned page buffer pool. Setting this to zero effectively disables the pool",
taskManagerConfig.getPagePartitioningBufferPoolSize(),
true),
dataSizeProperty(IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD,
"Minimum amount of data written by a writer operator on average before it tries to close the idle writers",
DataSize.of(256, MEGABYTE),
true),
durationProperty(CLOSE_IDLE_WRITERS_TRIGGER_DURATION,
"The duration after which the writer operator tries to close the idle writers",
new Duration(5, SECONDS),
true));
}

Expand Down Expand Up @@ -1896,4 +1907,14 @@ public static int getPagePartitioningBufferPoolSize(Session session)
{
return session.getSystemProperty(PAGE_PARTITIONING_BUFFER_POOL_SIZE, Integer.class);
}

public static DataSize getIdleWriterMinDataSizeThreshold(Session session)
{
return session.getSystemProperty(IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD, DataSize.class);
}

public static Duration getCloseIdleWritersTriggerDuration(Session session)
{
return session.getSystemProperty(CLOSE_IDLE_WRITERS_TRIGGER_DURATION, Duration.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class SqlTaskManager

private final ScheduledExecutorService taskManagementExecutor;
private final ScheduledExecutorService driverYieldExecutor;
private final ScheduledExecutorService driverTimeoutExecutor;

private final Duration infoCacheTime;
private final Duration clientTimeout;
Expand Down Expand Up @@ -216,6 +217,7 @@ public SqlTaskManager(

this.taskManagementExecutor = taskManagementExecutor.getExecutor();
this.driverYieldExecutor = newScheduledThreadPool(config.getTaskYieldThreads(), threadsNamed("task-yield-%s"));
this.driverTimeoutExecutor = newScheduledThreadPool(config.getDriverTimeoutThreads(), threadsNamed("task-driver-timeout-%s"));

SqlTaskExecutionFactory sqlTaskExecutionFactory = new SqlTaskExecutionFactory(taskNotificationExecutor, taskExecutor, planner, splitMonitor, tracer, config);

Expand Down Expand Up @@ -269,6 +271,7 @@ private QueryContext createQueryContext(
gcMonitor,
taskNotificationExecutor,
driverYieldExecutor,
driverTimeoutExecutor,
maxQuerySpillPerNode,
localSpillManager.getSpillSpaceTracker());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class TaskManagerConfig

private int taskNotificationThreads = 5;
private int taskYieldThreads = 3;
private int driverTimeoutThreads = 5;

private BigDecimal levelTimeMultiplier = new BigDecimal(2.0);

Expand Down Expand Up @@ -569,6 +570,20 @@ public TaskManagerConfig setTaskYieldThreads(int taskYieldThreads)
return this;
}

@Min(1)
public int getDriverTimeoutThreads()
{
return driverTimeoutThreads;
}

@Config("task.driver-timeout-threads")
@ConfigDescription("Number of threads used for timing out blocked drivers if the timeout is set")
public TaskManagerConfig setDriverTimeoutThreads(int driverTimeoutThreads)
{
this.driverTimeoutThreads = driverTimeoutThreads;
return this;
}

public boolean isInterruptStuckSplitTasksEnabled()
{
return interruptStuckSplitTasksEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class QueryContext
private final GcMonitor gcMonitor;
private final Executor notificationExecutor;
private final ScheduledExecutorService yieldExecutor;
private final ScheduledExecutorService timeoutExecutor;
private final long maxSpill;
private final SpillSpaceTracker spillSpaceTracker;
private final Map<TaskId, TaskContext> taskContexts = new ConcurrentHashMap<>();
Expand All @@ -86,6 +87,7 @@ public QueryContext(
GcMonitor gcMonitor,
Executor notificationExecutor,
ScheduledExecutorService yieldExecutor,
ScheduledExecutorService timeoutExecutor,
DataSize maxSpill,
SpillSpaceTracker spillSpaceTracker)
{
Expand All @@ -97,6 +99,7 @@ public QueryContext(
gcMonitor,
notificationExecutor,
yieldExecutor,
timeoutExecutor,
maxSpill,
spillSpaceTracker);
}
Expand All @@ -109,6 +112,7 @@ public QueryContext(
GcMonitor gcMonitor,
Executor notificationExecutor,
ScheduledExecutorService yieldExecutor,
ScheduledExecutorService timeoutExecutor,
DataSize maxSpill,
SpillSpaceTracker spillSpaceTracker)
{
Expand All @@ -118,6 +122,7 @@ public QueryContext(
this.gcMonitor = requireNonNull(gcMonitor, "gcMonitor is null");
this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null");
this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null");
this.timeoutExecutor = requireNonNull(timeoutExecutor, "timeoutExecutor is null");
this.maxSpill = maxSpill.toBytes();
this.spillSpaceTracker = requireNonNull(spillSpaceTracker, "spillSpaceTracker is null");
this.guaranteedMemory = guaranteedMemory;
Expand Down Expand Up @@ -257,6 +262,7 @@ public TaskContext addTaskContext(
gcMonitor,
notificationExecutor,
yieldExecutor,
timeoutExecutor,
session,
taskMemoryContext,
notifyStatusChanged,
Expand Down
10 changes: 10 additions & 0 deletions core/trino-main/src/main/java/io/trino/operator/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.base.Verify.verify;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static com.google.common.util.concurrent.Futures.withTimeout;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.trino.operator.Operator.NOT_BLOCKED;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.lang.Boolean.TRUE;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

//
Expand Down Expand Up @@ -455,6 +458,13 @@ private ListenableFuture<Void> processInternal(OperationTimer operationTimer)

// unblock when the first future is complete
ListenableFuture<Void> blocked = firstFinishedFuture(blockedFutures);
if (driverContext.getBlockedTimeout().isPresent()) {
blocked = withTimeout(
nonCancellationPropagating(blocked),
driverContext.getBlockedTimeout().get().toMillis(),
MILLISECONDS,
driverContext.getTimeoutExecutor());
}
// driver records serial blocked time
driverContext.recordBlocked(blocked);
// each blocked operator is responsible for blocking the execution
Expand Down
20 changes: 20 additions & 0 deletions core/trino-main/src/main/java/io/trino/operator/DriverContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.joda.time.DateTime;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -54,6 +55,7 @@ public class DriverContext
private final PipelineContext pipelineContext;
private final Executor notificationExecutor;
private final ScheduledExecutorService yieldExecutor;
private final ScheduledExecutorService timeoutExecutor;

private final AtomicBoolean finished = new AtomicBoolean();

Expand All @@ -70,6 +72,7 @@ public class DriverContext

private final AtomicReference<DateTime> executionStartTime = new AtomicReference<>();
private final AtomicReference<DateTime> executionEndTime = new AtomicReference<>();
private final AtomicReference<Optional<Duration>> blockedTimeout = new AtomicReference<>(Optional.empty());

private final MemoryTrackingContext driverMemoryContext;

Expand All @@ -82,12 +85,14 @@ public DriverContext(
PipelineContext pipelineContext,
Executor notificationExecutor,
ScheduledExecutorService yieldExecutor,
ScheduledExecutorService timeoutExecutor,
MemoryTrackingContext driverMemoryContext,
long splitWeight)
{
this.pipelineContext = requireNonNull(pipelineContext, "pipelineContext is null");
this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null");
this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null");
this.timeoutExecutor = requireNonNull(timeoutExecutor, "timeoutExecutor is null");
this.driverMemoryContext = requireNonNull(driverMemoryContext, "driverMemoryContext is null");
this.yieldSignal = new DriverYieldSignal();
this.splitWeight = splitWeight;
Expand Down Expand Up @@ -447,6 +452,21 @@ public ScheduledExecutorService getYieldExecutor()
return yieldExecutor;
}

public ScheduledExecutorService getTimeoutExecutor()
{
return timeoutExecutor;
}

public void setBlockedTimeout(Duration duration)
{
this.blockedTimeout.set(Optional.of(duration));
}

public Optional<Duration> getBlockedTimeout()
{
return blockedTimeout.get();
}

private static long nanosBetween(long start, long end)
{
return max(0, end - start);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class PipelineContext
private final TaskContext taskContext;
private final Executor notificationExecutor;
private final ScheduledExecutorService yieldExecutor;
private final ScheduledExecutorService timeoutExecutor;
private final int pipelineId;

private final boolean inputPipeline;
Expand Down Expand Up @@ -105,7 +106,7 @@ public class PipelineContext

private final MemoryTrackingContext pipelineMemoryContext;

public PipelineContext(int pipelineId, TaskContext taskContext, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, MemoryTrackingContext pipelineMemoryContext, boolean inputPipeline, boolean outputPipeline, boolean partitioned)
public PipelineContext(int pipelineId, TaskContext taskContext, Executor notificationExecutor, ScheduledExecutorService yieldExecutor, ScheduledExecutorService timeoutExecutor, MemoryTrackingContext pipelineMemoryContext, boolean inputPipeline, boolean outputPipeline, boolean partitioned)
{
this.pipelineId = pipelineId;
this.inputPipeline = inputPipeline;
Expand All @@ -114,6 +115,7 @@ public PipelineContext(int pipelineId, TaskContext taskContext, Executor notific
this.taskContext = requireNonNull(taskContext, "taskContext is null");
this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null");
this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null");
this.timeoutExecutor = requireNonNull(timeoutExecutor, "timeoutExecutor is null");
this.pipelineMemoryContext = requireNonNull(pipelineMemoryContext, "pipelineMemoryContext is null");
// Initialize the local memory contexts with the ExchangeOperator tag as ExchangeOperator will do the local memory allocations
pipelineMemoryContext.initializeLocalMemoryContexts(ExchangeOperator.class.getSimpleName());
Expand Down Expand Up @@ -156,6 +158,7 @@ public DriverContext addDriverContext(long splitWeight)
this,
notificationExecutor,
yieldExecutor,
timeoutExecutor,
pipelineMemoryContext.newMemoryTrackingContext(),
splitWeight);
drivers.add(driverContext);
Expand Down
Loading