Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,16 @@ Task Properties
to become overloaded due to excessive resource utilization. This can also be specified on
a per-query basis using the ``task_writer_count`` session property.

``task.interrupt-runaway-splits-timeout``
^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``duration``
* **Default value:** ``10m``

Timeout for interrupting split threads blocked without yielding control.
Only threads blocked in specific locations are interrupted. Currently this is just threads
blocked in the Joni regular expression library.


Node Scheduler Properties
-------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.SECONDS;

@DefunctConfig({
"experimental.big-query-max-task-memory",
"task.max-memory",
Expand Down Expand Up @@ -84,6 +86,8 @@ public class TaskManagerConfig
private boolean legacyLifespanCompletionCondition;
private TaskPriorityTracking taskPriorityTracking = TaskPriorityTracking.TASK_FAIR;

private Duration interruptRunawaySplitsTimeout = new Duration(600, SECONDS);

@MinDuration("1ms")
@MaxDuration("10s")
@NotNull
Expand Down Expand Up @@ -557,4 +561,18 @@ public enum TaskPriorityTracking
TASK_FAIR,
QUERY_FAIR,
}

@MinDuration("1s")
public Duration getInterruptRunawaySplitsTimeout()
{
return interruptRunawaySplitsTimeout;
}

@Config("task.interrupt-runaway-splits-timeout")
@ConfigDescription("Interrupt runaway split threads after this timeout if the task is stuck in certain allow listed places")
public TaskManagerConfig setInterruptRunawaySplitsTimeout(Duration interruptRunawaySplitsTimeout)
{
this.interruptRunawaySplitsTimeout = interruptRunawaySplitsTimeout;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.TaskManagerConfig.TaskPriorityTracking;
import com.facebook.presto.operator.scalar.JoniRegexpFunctions;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
Expand Down Expand Up @@ -60,11 +61,11 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.function.DoubleSupplier;
import java.util.function.Function;
import java.util.function.Predicate;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.concurrent.Threads.threadsNamed;
Expand All @@ -74,19 +75,28 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static java.lang.System.lineSeparator;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;

@ThreadSafe
public class TaskExecutor
{
private static final Logger log = Logger.get(TaskExecutor.class);

// print out split call stack if it has been running for a certain amount of time
private static final Duration LONG_SPLIT_WARNING_THRESHOLD = new Duration(600, TimeUnit.SECONDS);
private static final Duration LONG_SPLIT_WARNING_THRESHOLD = new Duration(600, SECONDS);
// Interrupt a split if it is running longer than this AND it's blocked on something known
private static final Predicate<List<StackTraceElement>> DEFAULT_INTERRUPTIBLE_SPLIT_PREDICATE = elements ->
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of a blacklist, shall we use TaskManager::getAllTaskInfo to decide if a task is still alive and tracked to decide if we want to preempt the current thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task is no longer alive then the threads have already been interrupted so there is maybe no point in interrupting them again. So to some extent just making the Joni regex functions interruptible is already a win because it will make query cancellation and timeout work better.

What this brings to the table is that in a specific allow listed cases we will interrupt the splits with a timeout that is different (shorter) from the full query timeout which may be in the many hours range. The idea being this a step towards more automatically and aggressively remediating runaway splits.

elements.stream()
.anyMatch(element -> element.getClassName().equals(JoniRegexpFunctions.class.getName()));
private static final Duration DEFAULT_INTERRUPT_SPLIT_INTERVAL = new Duration(60, SECONDS);

private static final AtomicLong NEXT_RUNNER_ID = new AtomicLong();

Expand All @@ -101,6 +111,10 @@ public class TaskExecutor

private final Ticker ticker;

private final Duration interruptRunawaySplitsTimeout;
private final Predicate<List<StackTraceElement>> interruptibleSplitPredicate;
private final Duration interruptSplitInterval;

private final ScheduledExecutorService splitMonitorExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("TaskExecutor"));
private final SortedSet<RunningSplitInfo> runningSplitInfos = new ConcurrentSkipListSet<>();

Expand Down Expand Up @@ -174,6 +188,9 @@ public TaskExecutor(TaskManagerConfig config, EmbedVersion embedVersion, Multile
config.getMinDriversPerTask(),
config.getMaxDriversPerTask(),
config.getTaskPriorityTracking(),
config.getInterruptRunawaySplitsTimeout(),
DEFAULT_INTERRUPTIBLE_SPLIT_PREDICATE,
DEFAULT_INTERRUPT_SPLIT_INTERVAL,
embedVersion,
splitQueue,
Ticker.systemTicker());
Expand All @@ -194,6 +211,9 @@ public TaskExecutor(
guaranteedNumberOfDriversPerTask,
maximumNumberOfDriversPerTask,
taskPriorityTracking,
new TaskManagerConfig().getInterruptRunawaySplitsTimeout(),
DEFAULT_INTERRUPTIBLE_SPLIT_PREDICATE,
DEFAULT_INTERRUPT_SPLIT_INTERVAL,
Comment on lines 215 to 216
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these defaults are not going to be changed, I guess we don't need to pass them in through parameters? Just directly use them in the body of the class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different ones are passed in for unit tests so the tests can complete quickly.

new EmbedVersion(new ServerConfig()),
new MultilevelSplitQueue(2), ticker);
}
Expand All @@ -214,6 +234,9 @@ public TaskExecutor(
guaranteedNumberOfDriversPerTask,
maximumNumberOfDriversPerTask,
taskPriorityTracking,
new TaskManagerConfig().getInterruptRunawaySplitsTimeout(),
DEFAULT_INTERRUPTIBLE_SPLIT_PREDICATE,
DEFAULT_INTERRUPT_SPLIT_INTERVAL,
new EmbedVersion(new ServerConfig()),
splitQueue,
ticker);
Expand All @@ -226,6 +249,9 @@ public TaskExecutor(
int guaranteedNumberOfDriversPerTask,
int maximumNumberOfDriversPerTask,
TaskPriorityTracking taskPriorityTracking,
Duration interruptRunawaySplitsTimeout,
Predicate<List<StackTraceElement>> interruptibleSplitPredicate,
Duration interruptSplitInterval,
EmbedVersion embedVersion,
MultilevelSplitQueue splitQueue,
Ticker ticker)
Expand All @@ -234,6 +260,8 @@ public TaskExecutor(
checkArgument(guaranteedNumberOfDriversPerTask > 0, "guaranteedNumberOfDriversPerTask must be at least 1");
checkArgument(maximumNumberOfDriversPerTask > 0, "maximumNumberOfDriversPerTask must be at least 1");
checkArgument(guaranteedNumberOfDriversPerTask <= maximumNumberOfDriversPerTask, "guaranteedNumberOfDriversPerTask cannot be greater than maximumNumberOfDriversPerTask");
checkArgument(interruptRunawaySplitsTimeout.getValue(SECONDS) >= 1.0, "interruptRunawaySplitsTimeout must be at least 1 second");
checkArgument(interruptSplitInterval.getValue(SECONDS) >= 1.0, "interruptSplitInterval must be at least 1 second");

// we manage thread pool size directly, so create an unlimited pool
this.executor = newCachedThreadPool(threadsNamed("task-processor-%s"));
Expand Down Expand Up @@ -263,6 +291,9 @@ public TaskExecutor(
}
this.taskPriorityTrackerFactory = taskPriorityTrackerFactory;
this.tasks = new LinkedList<>();
this.interruptRunawaySplitsTimeout = interruptRunawaySplitsTimeout;
this.interruptibleSplitPredicate = interruptibleSplitPredicate;
this.interruptSplitInterval = interruptSplitInterval;
}

@PostConstruct
Expand All @@ -272,6 +303,10 @@ public synchronized void start()
for (int i = 0; i < runnerThreads; i++) {
addRunnerThread();
}
if (interruptRunawaySplitsTimeout != null) {
long interval = (long) interruptSplitInterval.getValue(SECONDS);
splitMonitorExecutor.scheduleAtFixedRate(this::interruptRunawaySplits, interval, interval, SECONDS);
}
}

@PreDestroy
Expand Down Expand Up @@ -514,6 +549,25 @@ private synchronized PrioritizedSplitRunner pollNextSplitWorker()
return null;
}

private void interruptRunawaySplits()
{
for (RunningSplitInfo splitInfo : runningSplitInfos) {
Duration duration = Duration.succinctNanos(ticker.read() - splitInfo.getStartTime());
if (duration.compareTo(interruptRunawaySplitsTimeout) < 0) {
continue;
}

List<StackTraceElement> stack = asList(splitInfo.getThread().getStackTrace());
if (interruptibleSplitPredicate.test(stack)) {
String stackString = stack.stream()
.map(Object::toString)
.collect(joining(lineSeparator()));
log.warn("Interrupting runaway split " + splitInfo.getSplitInfo() + lineSeparator() + stackString);
splitInfo.getThread().interrupt();
}
}
}

private class TaskRunner
implements Runnable
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static com.facebook.presto.execution.TaskManagerConfig.TaskPriorityTracking.QUERY_FAIR;
import static com.facebook.presto.execution.TaskManagerConfig.TaskPriorityTracking.TASK_FAIR;
import static io.airlift.units.DataSize.Unit;
import static java.util.concurrent.TimeUnit.SECONDS;

public class TestTaskManagerConfig
{
Expand All @@ -37,9 +38,9 @@ public void testDefaults()
assertRecordedDefaults(recordDefaults(TaskManagerConfig.class)
.setInitialSplitsPerNode(Runtime.getRuntime().availableProcessors() * 2)
.setSplitConcurrencyAdjustmentInterval(new Duration(100, TimeUnit.MILLISECONDS))
.setStatusRefreshMaxWait(new Duration(1, TimeUnit.SECONDS))
.setInfoUpdateInterval(new Duration(3, TimeUnit.SECONDS))
.setInfoRefreshMaxWait(new Duration(0, TimeUnit.SECONDS))
.setStatusRefreshMaxWait(new Duration(1, SECONDS))
.setInfoUpdateInterval(new Duration(3, SECONDS))
.setInfoRefreshMaxWait(new Duration(0, SECONDS))
.setPerOperatorCpuTimerEnabled(true)
.setTaskCpuTimerEnabled(true)
.setPerOperatorAllocationTrackingEnabled(false)
Expand Down Expand Up @@ -68,7 +69,8 @@ public void testDefaults()
.setLevelTimeMultiplier(new BigDecimal("2"))
.setStatisticsCpuTimerEnabled(true)
.setLegacyLifespanCompletionCondition(false)
.setTaskPriorityTracking(TASK_FAIR));
.setTaskPriorityTracking(TASK_FAIR)
.setInterruptRunawaySplitsTimeout(new Duration(600, SECONDS)));
}

@Test
Expand Down Expand Up @@ -109,14 +111,15 @@ public void testExplicitPropertyMappings()
.put("task.statistics-cpu-timer-enabled", "false")
.put("task.legacy-lifespan-completion-condition", "true")
.put("task.task-priority-tracking", "QUERY_FAIR")
.put("task.interrupt-runaway-splits-timeout", "599s")
.build();

TaskManagerConfig expected = new TaskManagerConfig()
.setInitialSplitsPerNode(1)
.setSplitConcurrencyAdjustmentInterval(new Duration(1, TimeUnit.SECONDS))
.setStatusRefreshMaxWait(new Duration(2, TimeUnit.SECONDS))
.setInfoUpdateInterval(new Duration(2, TimeUnit.SECONDS))
.setInfoRefreshMaxWait(new Duration(3, TimeUnit.SECONDS))
.setSplitConcurrencyAdjustmentInterval(new Duration(1, SECONDS))
.setStatusRefreshMaxWait(new Duration(2, SECONDS))
.setInfoUpdateInterval(new Duration(2, SECONDS))
.setInfoRefreshMaxWait(new Duration(3, SECONDS))
.setPerOperatorCpuTimerEnabled(false)
.setTaskCpuTimerEnabled(false)
.setPerOperatorAllocationTrackingEnabled(true)
Expand All @@ -131,7 +134,7 @@ public void testExplicitPropertyMappings()
.setMaxDriversPerTask(13)
.setMaxTasksPerStage(999)
.setInfoMaxAge(new Duration(22, TimeUnit.MINUTES))
.setClientTimeout(new Duration(10, TimeUnit.SECONDS))
.setClientTimeout(new Duration(10, SECONDS))
.setSinkMaxBufferSize(new DataSize(42, Unit.MEGABYTE))
.setMaxPagePartitioningBufferSize(new DataSize(40, Unit.MEGABYTE))
.setWriterCount(4)
Expand All @@ -145,7 +148,8 @@ public void testExplicitPropertyMappings()
.setLevelTimeMultiplier(new BigDecimal("2.1"))
.setStatisticsCpuTimerEnabled(false)
.setLegacyLifespanCompletionCondition(true)
.setTaskPriorityTracking(QUERY_FAIR);
.setTaskPriorityTracking(QUERY_FAIR)
.setInterruptRunawaySplitsTimeout(new Duration(599, SECONDS));

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import com.facebook.airlift.testing.TestingTicker;
import com.facebook.presto.execution.SplitRunner;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.version.EmbedVersion;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -28,6 +31,7 @@
import java.util.OptionalInt;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -451,6 +455,41 @@ public void testUserSpecifiedMaxDriversPerTask()
}
}

@Test
public void testTaskExecutorRunawaySplitInterrupt()
throws Exception
{
TaskExecutor taskExecutor = new TaskExecutor(
8,
16,
3,
4,
TASK_FAIR,
new Duration(1, SECONDS),
elements -> elements.stream()
.anyMatch(element -> element.getFileName().equals("TestTaskExecutor.java")),
new Duration(1, SECONDS),
new EmbedVersion(new ServerConfig()),
new MultilevelSplitQueue(2),
Ticker.systemTicker());
taskExecutor.start();

try {
TaskId taskId = new TaskId("foo", 0, 0, 0);
TaskHandle taskHandle = taskExecutor.addTask(
taskId, () -> 1.0,
1,
new Duration(1, TimeUnit.SECONDS),
OptionalInt.of(1));
MockSplitRunner mockSplitRunner = new MockSplitRunner();
taskExecutor.enqueueSplits(taskHandle, false, ImmutableList.of(mockSplitRunner));
mockSplitRunner.interrupted.get(60, TimeUnit.SECONDS);
}
finally {
taskExecutor.stop();
}
}

private void assertSplitStates(int endIndex, TestingJob[] splits)
{
// assert that splits up to and including endIndex are all started
Expand Down Expand Up @@ -573,4 +612,42 @@ public Future<?> getCompletedFuture()
return completed;
}
}

private static class MockSplitRunner
implements SplitRunner
{
private SettableFuture<Boolean> interrupted = SettableFuture.create();

@Override
public boolean isFinished()
{
return interrupted.isDone();
}

@Override
public ListenableFuture<?> processFor(Duration duration)
{
while (true) {
try {
Thread.sleep(1);
}
catch (InterruptedException e) {
break;
}
}
interrupted.set(true);
return Futures.immediateFuture(null);
}

@Override
public String getInfo()
{
return "";
}

@Override
public void close()
{
}
}
}