From 1a65363bc9298cabbe0adde58c86fd5939ea780c Mon Sep 17 00:00:00 2001 From: Yingjie Luan <1275963@gmail.com> Date: Mon, 1 Aug 2022 13:47:44 -0700 Subject: [PATCH] Fix flaky testFailStuckSplitTasks unit test --- .../trino/execution/TestSqlTaskManager.java | 68 +++++++++++++++++-- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java index e122340a9ce1..339a330d735b 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java @@ -49,11 +49,15 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.Test; +import javax.annotation.concurrent.GuardedBy; + import java.net.URI; import java.util.List; import java.util.Optional; import java.util.OptionalInt; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; @@ -249,6 +253,7 @@ public void testRemoveOldTasks() @Test public void testFailStuckSplitTasks() + throws InterruptedException, ExecutionException, TimeoutException { TestingTicker ticker = new TestingTicker(); @@ -265,6 +270,9 @@ public void testFailStuckSplitTasks() taskExecutor.enqueueSplits(taskHandle, false, ImmutableList.of(mockSplitRunner)); taskExecutor.start(); + // wait for the task executor to start processing the split + mockSplitRunner.waitForStart(); + TaskManagerConfig taskManagerConfig = new TaskManagerConfig() .setInterruptStuckSplitTasksEnabled(true) .setInterruptStuckSplitTasksDetectionInterval(new Duration(10, SECONDS)) @@ -272,12 +280,22 @@ public void testFailStuckSplitTasks() .setInterruptStuckSplitTasksTimeout(new Duration(10, SECONDS)); try (SqlTaskManager sqlTaskManager = createSqlTaskManager(taskManagerConfig, new NodeMemoryConfig(), taskExecutor, stackTraceElements -> true)) { + sqlTaskManager.addStateChangeListener(TASK_ID, (state) -> { + if (state.isDone()) { + taskExecutor.removeTask(taskHandle); + } + }); + ticker.increment(30, SECONDS); sqlTaskManager.failStuckSplitTasks(); + mockSplitRunner.waitForFinish(); assertEquals(sqlTaskManager.getAllTaskInfo().size(), 1); assertEquals(sqlTaskManager.getAllTaskInfo().get(0).getTaskStatus().getState(), TaskState.FAILED); } + finally { + taskExecutor.stop(); + } } @Test @@ -444,17 +462,45 @@ public URI createMemoryInfoLocation(InternalNode node) private static class MockSplitRunner implements SplitRunner { - private SettableFuture interrupted = SettableFuture.create(); + private final SettableFuture startedFuture = SettableFuture.create(); + private final SettableFuture finishedFuture = SettableFuture.create(); + + @GuardedBy("this") + private Thread runnerThread; + @GuardedBy("this") + private boolean closed; + + public void waitForStart() + throws ExecutionException, InterruptedException, TimeoutException + { + startedFuture.get(10, SECONDS); + } + + public void waitForFinish() + throws ExecutionException, InterruptedException, TimeoutException + { + finishedFuture.get(10, SECONDS); + } @Override - public boolean isFinished() + public synchronized boolean isFinished() { - return interrupted.isDone(); + return closed; } @Override public ListenableFuture processFor(Duration duration) { + startedFuture.set(null); + synchronized (this) { + runnerThread = Thread.currentThread(); + + if (closed) { + finishedFuture.set(null); + return immediateVoidFuture(); + } + } + while (true) { try { Thread.sleep(100000); @@ -463,19 +509,29 @@ public ListenableFuture processFor(Duration duration) break; } } - interrupted.set(true); + + synchronized (this) { + closed = true; + } + finishedFuture.set(null); + return immediateVoidFuture(); } @Override public String getInfo() { - return ""; + return "MockSplitRunner"; } @Override - public void close() + public synchronized void close() { + closed = true; + + if (runnerThread != null) { + runnerThread.interrupt(); + } } } }