From 20037c9d64ecf68f984ad712a5c9c7df843c3a48 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Wed, 17 Aug 2022 17:37:48 -0400 Subject: [PATCH] Ensure operators are always closed Due to a race condition in Driver#tryWithLock there was a chance an operator might have end up not being properly closed upon completion. - Driver#process executes an operator under the Driver#exclusiveLock - An operator throws an exception and triggers a task failure - A TaskStateMachine listener closes all drivers calling Driver#close - Driver#close is not able to acquire the Driver#exclusiveLock and assumes the driver will be terminated by the lock owner - The lock owner throws an exception and never runs post execution code in Driver#tryWithLock that was expected to close the operators --- .../main/java/io/trino/operator/Driver.java | 54 +++++++++++++------ 1 file changed, 39 insertions(+), 15 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/Driver.java b/core/trino-main/src/main/java/io/trino/operator/Driver.java index e1f968d6c4ec..9841fc5d7c73 100644 --- a/core/trino-main/src/main/java/io/trino/operator/Driver.java +++ b/core/trino-main/src/main/java/io/trino/operator/Driver.java @@ -48,6 +48,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.throwIfUnchecked; +import static com.google.common.base.Verify.verify; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.operator.Operator.NOT_BLOCKED; @@ -698,22 +699,21 @@ private Optional tryWithLock(long timeout, TimeUnit unit, boolean interru return Optional.empty(); } - Optional result; + T result = null; + Throwable failure = null; + try { - result = Optional.of(task.get()); + result = task.get(); + + // opportunistic check to avoid unnecessary lock reacquisition + processNewSources(); + destroyIfNecessary(); + } + catch (Throwable t) { + failure = t; } finally { - try { - try { - processNewSources(); - } - finally { - destroyIfNecessary(); - } - } - finally { - exclusiveLock.unlock(); - } + exclusiveLock.unlock(); } // If there are more assignment updates available, attempt to reacquire the lock and process them. @@ -727,16 +727,40 @@ private Optional tryWithLock(long timeout, TimeUnit unit, boolean interru try { processNewSources(); } - finally { + catch (Throwable t) { + if (failure == null) { + failure = t; + } + else if (failure != t) { + failure.addSuppressed(t); + } + } + + try { destroyIfNecessary(); } + catch (Throwable t) { + if (failure == null) { + failure = t; + } + else if (failure != t) { + failure.addSuppressed(t); + } + } } finally { exclusiveLock.unlock(); } } - return result; + if (failure != null) { + throwIfUnchecked(failure); + // should never happen + throw new AssertionError(failure); + } + + verify(result != null, "result is null"); + return Optional.of(result); } private static class DriverLock