diff --git a/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java b/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java index e60aee08cba8..2818cf686e82 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java +++ b/core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java @@ -760,7 +760,7 @@ private static class ExchangeOutputSource { private final Set selectedTasks; private final QueryId queryId; - private final ListenableFuture exchangeSourceFuture; + private ListenableFuture exchangeSourceFuture; private ExchangeSource exchangeSource; private boolean finished; @@ -848,6 +848,7 @@ public void close() return; } finished = true; + exchangeSource = null; addCallback(exchangeSourceFuture, new FutureCallback<>() { @Override @@ -868,6 +869,7 @@ public void onFailure(Throwable ignored) // It a failure occurred it is expected to be propagated by the getNext method } }, directExecutor()); + exchangeSourceFuture = null; } } } diff --git a/core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java b/core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java index f7df7bd0b1bb..00c8702447fd 100644 --- a/core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.trino.connector.CatalogName; import io.trino.exchange.ExchangeManagerRegistry; @@ -54,6 +55,8 @@ public class ExchangeOperator implements SourceOperator { + private static final Logger log = Logger.get(ExchangeOperator.class); + public static final CatalogName REMOTE_CONNECTOR_ID = new CatalogName("$remote"); public static class ExchangeOperatorFactory @@ -459,15 +462,21 @@ public void close() private static class SpoolingExchangeDataSource implements ExchangeDataSource { - private final ExchangeSource exchangeSource; + // This field is not final to allow releasing the memory retained by the ExchangeSource instance. + // It is modified (assigned to null) when the ExchangeOperator is closed. + // It doesn't have to be declared as volatile as the nullification of this variable doesn't have to be immediately visible to other threads. + // However since close can be called at any moment this variable has to be accessed in a safe way (avoiding "check-then-use"). + private ExchangeSource exchangeSource; private final List exchangeSourceHandles; private final LocalMemoryContext systemMemoryContext; + private volatile boolean closed; private SpoolingExchangeDataSource( ExchangeSource exchangeSource, List exchangeSourceHandles, LocalMemoryContext systemMemoryContext) { + // this assignment is expected to be followed by an assignment of a final field to ensure safe publication this.exchangeSource = requireNonNull(exchangeSource, "exchangeSource is null"); this.exchangeSourceHandles = ImmutableList.copyOf(requireNonNull(exchangeSourceHandles, "exchangeSourceHandles is null")); this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null"); @@ -476,20 +485,39 @@ private SpoolingExchangeDataSource( @Override public Slice pollPage() { + ExchangeSource exchangeSource = this.exchangeSource; + if (exchangeSource == null) { + return null; + } + Slice data = exchangeSource.read(); systemMemoryContext.setBytes(exchangeSource.getMemoryUsage()); + + // If the data source has been closed in a meantime reset memory usage back to 0 + if (closed) { + systemMemoryContext.setBytes(0); + } + return data; } @Override public boolean isFinished() { + ExchangeSource exchangeSource = this.exchangeSource; + if (exchangeSource == null) { + return true; + } return exchangeSource.isFinished(); } @Override public ListenableFuture isBlocked() { + ExchangeSource exchangeSource = this.exchangeSource; + if (exchangeSource == null) { + return immediateVoidFuture(); + } return toListenableFuture(exchangeSource.isBlocked()); } @@ -522,9 +550,22 @@ public OperatorInfo getInfo() } @Override - public void close() + public synchronized void close() { - exchangeSource.close(); + if (closed) { + return; + } + closed = true; + try { + exchangeSource.close(); + } + catch (RuntimeException e) { + log.warn(e, "error closing exchange source"); + } + finally { + exchangeSource = null; + systemMemoryContext.setBytes(0); + } } } }