diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index d8c16fd3693fe..71f04def46516 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -408,6 +408,16 @@ Task Properties to become overloaded due to excessive resource utilization. This can also be specified on a per-query basis using the ``task_writer_count`` session property. +``task.interrupt-runaway-splits-timeout`` +^^^^^^^^^^^^^^^^^^^^^ + + * **Type:** ``duration`` + * **Default value:** ``10m`` + + Timeout for interrupting split threads blocked without yielding control. + Only threads blocked in specific locations are interrupted. Currently this is just threads + blocked in the Joni regular expression library. + Node Scheduler Properties ------------------------- diff --git a/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java index 03d5d9c7e842a..bebb265caefe0 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java @@ -30,6 +30,8 @@ import java.math.BigDecimal; import java.util.concurrent.TimeUnit; +import static java.util.concurrent.TimeUnit.SECONDS; + @DefunctConfig({ "experimental.big-query-max-task-memory", "task.max-memory", @@ -84,6 +86,8 @@ public class TaskManagerConfig private boolean legacyLifespanCompletionCondition; private TaskPriorityTracking taskPriorityTracking = TaskPriorityTracking.TASK_FAIR; + private Duration interruptRunawaySplitsTimeout = new Duration(600, SECONDS); + @MinDuration("1ms") @MaxDuration("10s") @NotNull @@ -557,4 +561,18 @@ public enum TaskPriorityTracking TASK_FAIR, QUERY_FAIR, } + + @MinDuration("1s") + public Duration getInterruptRunawaySplitsTimeout() + { + return interruptRunawaySplitsTimeout; + } + + @Config("task.interrupt-runaway-splits-timeout") + @ConfigDescription("Interrupt runaway split threads after this timeout if the task is stuck in certain allow listed places") + public TaskManagerConfig setInterruptRunawaySplitsTimeout(Duration interruptRunawaySplitsTimeout) + { + this.interruptRunawaySplitsTimeout = interruptRunawaySplitsTimeout; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java b/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java index 8bf2a986b195d..e93113e846b26 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java @@ -23,6 +23,7 @@ import com.facebook.presto.execution.TaskId; import com.facebook.presto.execution.TaskManagerConfig; import com.facebook.presto.execution.TaskManagerConfig.TaskPriorityTracking; +import com.facebook.presto.operator.scalar.JoniRegexpFunctions; import com.facebook.presto.server.ServerConfig; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.QueryId; @@ -60,11 +61,11 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; import java.util.function.DoubleSupplier; import java.util.function.Function; +import java.util.function.Predicate; import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.airlift.concurrent.Threads.threadsNamed; @@ -74,11 +75,15 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Sets.newConcurrentHashSet; +import static java.lang.System.lineSeparator; +import static java.util.Arrays.asList; import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.joining; @ThreadSafe public class TaskExecutor @@ -86,7 +91,12 @@ public class TaskExecutor private static final Logger log = Logger.get(TaskExecutor.class); // print out split call stack if it has been running for a certain amount of time - private static final Duration LONG_SPLIT_WARNING_THRESHOLD = new Duration(600, TimeUnit.SECONDS); + private static final Duration LONG_SPLIT_WARNING_THRESHOLD = new Duration(600, SECONDS); + // Interrupt a split if it is running longer than this AND it's blocked on something known + private static final Predicate> DEFAULT_INTERRUPTIBLE_SPLIT_PREDICATE = elements -> + elements.stream() + .anyMatch(element -> element.getClassName().equals(JoniRegexpFunctions.class.getName())); + private static final Duration DEFAULT_INTERRUPT_SPLIT_INTERVAL = new Duration(60, SECONDS); private static final AtomicLong NEXT_RUNNER_ID = new AtomicLong(); @@ -101,6 +111,10 @@ public class TaskExecutor private final Ticker ticker; + private final Duration interruptRunawaySplitsTimeout; + private final Predicate> interruptibleSplitPredicate; + private final Duration interruptSplitInterval; + private final ScheduledExecutorService splitMonitorExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("TaskExecutor")); private final SortedSet runningSplitInfos = new ConcurrentSkipListSet<>(); @@ -174,6 +188,9 @@ public TaskExecutor(TaskManagerConfig config, EmbedVersion embedVersion, Multile config.getMinDriversPerTask(), config.getMaxDriversPerTask(), config.getTaskPriorityTracking(), + config.getInterruptRunawaySplitsTimeout(), + DEFAULT_INTERRUPTIBLE_SPLIT_PREDICATE, + DEFAULT_INTERRUPT_SPLIT_INTERVAL, embedVersion, splitQueue, Ticker.systemTicker()); @@ -194,6 +211,9 @@ public TaskExecutor( guaranteedNumberOfDriversPerTask, maximumNumberOfDriversPerTask, taskPriorityTracking, + new TaskManagerConfig().getInterruptRunawaySplitsTimeout(), + DEFAULT_INTERRUPTIBLE_SPLIT_PREDICATE, + DEFAULT_INTERRUPT_SPLIT_INTERVAL, new EmbedVersion(new ServerConfig()), new MultilevelSplitQueue(2), ticker); } @@ -214,6 +234,9 @@ public TaskExecutor( guaranteedNumberOfDriversPerTask, maximumNumberOfDriversPerTask, taskPriorityTracking, + new TaskManagerConfig().getInterruptRunawaySplitsTimeout(), + DEFAULT_INTERRUPTIBLE_SPLIT_PREDICATE, + DEFAULT_INTERRUPT_SPLIT_INTERVAL, new EmbedVersion(new ServerConfig()), splitQueue, ticker); @@ -226,6 +249,9 @@ public TaskExecutor( int guaranteedNumberOfDriversPerTask, int maximumNumberOfDriversPerTask, TaskPriorityTracking taskPriorityTracking, + Duration interruptRunawaySplitsTimeout, + Predicate> interruptibleSplitPredicate, + Duration interruptSplitInterval, EmbedVersion embedVersion, MultilevelSplitQueue splitQueue, Ticker ticker) @@ -234,6 +260,8 @@ public TaskExecutor( checkArgument(guaranteedNumberOfDriversPerTask > 0, "guaranteedNumberOfDriversPerTask must be at least 1"); checkArgument(maximumNumberOfDriversPerTask > 0, "maximumNumberOfDriversPerTask must be at least 1"); checkArgument(guaranteedNumberOfDriversPerTask <= maximumNumberOfDriversPerTask, "guaranteedNumberOfDriversPerTask cannot be greater than maximumNumberOfDriversPerTask"); + checkArgument(interruptRunawaySplitsTimeout.getValue(SECONDS) >= 1.0, "interruptRunawaySplitsTimeout must be at least 1 second"); + checkArgument(interruptSplitInterval.getValue(SECONDS) >= 1.0, "interruptSplitInterval must be at least 1 second"); // we manage thread pool size directly, so create an unlimited pool this.executor = newCachedThreadPool(threadsNamed("task-processor-%s")); @@ -263,6 +291,9 @@ public TaskExecutor( } this.taskPriorityTrackerFactory = taskPriorityTrackerFactory; this.tasks = new LinkedList<>(); + this.interruptRunawaySplitsTimeout = interruptRunawaySplitsTimeout; + this.interruptibleSplitPredicate = interruptibleSplitPredicate; + this.interruptSplitInterval = interruptSplitInterval; } @PostConstruct @@ -272,6 +303,10 @@ public synchronized void start() for (int i = 0; i < runnerThreads; i++) { addRunnerThread(); } + if (interruptRunawaySplitsTimeout != null) { + long interval = (long) interruptSplitInterval.getValue(SECONDS); + splitMonitorExecutor.scheduleAtFixedRate(this::interruptRunawaySplits, interval, interval, SECONDS); + } } @PreDestroy @@ -514,6 +549,25 @@ private synchronized PrioritizedSplitRunner pollNextSplitWorker() return null; } + private void interruptRunawaySplits() + { + for (RunningSplitInfo splitInfo : runningSplitInfos) { + Duration duration = Duration.succinctNanos(ticker.read() - splitInfo.getStartTime()); + if (duration.compareTo(interruptRunawaySplitsTimeout) < 0) { + continue; + } + + List stack = asList(splitInfo.getThread().getStackTrace()); + if (interruptibleSplitPredicate.test(stack)) { + String stackString = stack.stream() + .map(Object::toString) + .collect(joining(lineSeparator())); + log.warn("Interrupting runaway split " + splitInfo.getSplitInfo() + lineSeparator() + stackString); + splitInfo.getThread().interrupt(); + } + } + } + private class TaskRunner implements Runnable { diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java index 6cabca1c5a717..6b5dccb9bc737 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java @@ -28,6 +28,7 @@ import static com.facebook.presto.execution.TaskManagerConfig.TaskPriorityTracking.QUERY_FAIR; import static com.facebook.presto.execution.TaskManagerConfig.TaskPriorityTracking.TASK_FAIR; import static io.airlift.units.DataSize.Unit; +import static java.util.concurrent.TimeUnit.SECONDS; public class TestTaskManagerConfig { @@ -37,9 +38,9 @@ public void testDefaults() assertRecordedDefaults(recordDefaults(TaskManagerConfig.class) .setInitialSplitsPerNode(Runtime.getRuntime().availableProcessors() * 2) .setSplitConcurrencyAdjustmentInterval(new Duration(100, TimeUnit.MILLISECONDS)) - .setStatusRefreshMaxWait(new Duration(1, TimeUnit.SECONDS)) - .setInfoUpdateInterval(new Duration(3, TimeUnit.SECONDS)) - .setInfoRefreshMaxWait(new Duration(0, TimeUnit.SECONDS)) + .setStatusRefreshMaxWait(new Duration(1, SECONDS)) + .setInfoUpdateInterval(new Duration(3, SECONDS)) + .setInfoRefreshMaxWait(new Duration(0, SECONDS)) .setPerOperatorCpuTimerEnabled(true) .setTaskCpuTimerEnabled(true) .setPerOperatorAllocationTrackingEnabled(false) @@ -68,7 +69,8 @@ public void testDefaults() .setLevelTimeMultiplier(new BigDecimal("2")) .setStatisticsCpuTimerEnabled(true) .setLegacyLifespanCompletionCondition(false) - .setTaskPriorityTracking(TASK_FAIR)); + .setTaskPriorityTracking(TASK_FAIR) + .setInterruptRunawaySplitsTimeout(new Duration(600, SECONDS))); } @Test @@ -109,14 +111,15 @@ public void testExplicitPropertyMappings() .put("task.statistics-cpu-timer-enabled", "false") .put("task.legacy-lifespan-completion-condition", "true") .put("task.task-priority-tracking", "QUERY_FAIR") + .put("task.interrupt-runaway-splits-timeout", "599s") .build(); TaskManagerConfig expected = new TaskManagerConfig() .setInitialSplitsPerNode(1) - .setSplitConcurrencyAdjustmentInterval(new Duration(1, TimeUnit.SECONDS)) - .setStatusRefreshMaxWait(new Duration(2, TimeUnit.SECONDS)) - .setInfoUpdateInterval(new Duration(2, TimeUnit.SECONDS)) - .setInfoRefreshMaxWait(new Duration(3, TimeUnit.SECONDS)) + .setSplitConcurrencyAdjustmentInterval(new Duration(1, SECONDS)) + .setStatusRefreshMaxWait(new Duration(2, SECONDS)) + .setInfoUpdateInterval(new Duration(2, SECONDS)) + .setInfoRefreshMaxWait(new Duration(3, SECONDS)) .setPerOperatorCpuTimerEnabled(false) .setTaskCpuTimerEnabled(false) .setPerOperatorAllocationTrackingEnabled(true) @@ -131,7 +134,7 @@ public void testExplicitPropertyMappings() .setMaxDriversPerTask(13) .setMaxTasksPerStage(999) .setInfoMaxAge(new Duration(22, TimeUnit.MINUTES)) - .setClientTimeout(new Duration(10, TimeUnit.SECONDS)) + .setClientTimeout(new Duration(10, SECONDS)) .setSinkMaxBufferSize(new DataSize(42, Unit.MEGABYTE)) .setMaxPagePartitioningBufferSize(new DataSize(40, Unit.MEGABYTE)) .setWriterCount(4) @@ -145,7 +148,8 @@ public void testExplicitPropertyMappings() .setLevelTimeMultiplier(new BigDecimal("2.1")) .setStatisticsCpuTimerEnabled(false) .setLegacyLifespanCompletionCondition(true) - .setTaskPriorityTracking(QUERY_FAIR); + .setTaskPriorityTracking(QUERY_FAIR) + .setInterruptRunawaySplitsTimeout(new Duration(599, SECONDS)); assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/executor/TestTaskExecutor.java b/presto-main/src/test/java/com/facebook/presto/execution/executor/TestTaskExecutor.java index c50971e9ecc90..ff5f107fbf5aa 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/executor/TestTaskExecutor.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/executor/TestTaskExecutor.java @@ -16,6 +16,9 @@ import com.facebook.airlift.testing.TestingTicker; import com.facebook.presto.execution.SplitRunner; import com.facebook.presto.execution.TaskId; +import com.facebook.presto.server.ServerConfig; +import com.facebook.presto.version.EmbedVersion; +import com.google.common.base.Ticker; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -28,6 +31,7 @@ import java.util.OptionalInt; import java.util.concurrent.Future; import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -451,6 +455,41 @@ public void testUserSpecifiedMaxDriversPerTask() } } + @Test + public void testTaskExecutorRunawaySplitInterrupt() + throws Exception + { + TaskExecutor taskExecutor = new TaskExecutor( + 8, + 16, + 3, + 4, + TASK_FAIR, + new Duration(1, SECONDS), + elements -> elements.stream() + .anyMatch(element -> element.getFileName().equals("TestTaskExecutor.java")), + new Duration(1, SECONDS), + new EmbedVersion(new ServerConfig()), + new MultilevelSplitQueue(2), + Ticker.systemTicker()); + taskExecutor.start(); + + try { + TaskId taskId = new TaskId("foo", 0, 0, 0); + TaskHandle taskHandle = taskExecutor.addTask( + taskId, () -> 1.0, + 1, + new Duration(1, TimeUnit.SECONDS), + OptionalInt.of(1)); + MockSplitRunner mockSplitRunner = new MockSplitRunner(); + taskExecutor.enqueueSplits(taskHandle, false, ImmutableList.of(mockSplitRunner)); + mockSplitRunner.interrupted.get(60, TimeUnit.SECONDS); + } + finally { + taskExecutor.stop(); + } + } + private void assertSplitStates(int endIndex, TestingJob[] splits) { // assert that splits up to and including endIndex are all started @@ -573,4 +612,42 @@ public Future getCompletedFuture() return completed; } } + + private static class MockSplitRunner + implements SplitRunner + { + private SettableFuture interrupted = SettableFuture.create(); + + @Override + public boolean isFinished() + { + return interrupted.isDone(); + } + + @Override + public ListenableFuture processFor(Duration duration) + { + while (true) { + try { + Thread.sleep(1); + } + catch (InterruptedException e) { + break; + } + } + interrupted.set(true); + return Futures.immediateFuture(null); + } + + @Override + public String getInfo() + { + return ""; + } + + @Override + public void close() + { + } + } }