Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ private static class ExchangeOutputSource
{
private final Set<TaskId> selectedTasks;
private final QueryId queryId;
private final ListenableFuture<ExchangeSource> exchangeSourceFuture;
private ListenableFuture<ExchangeSource> exchangeSourceFuture;

private ExchangeSource exchangeSource;
private boolean finished;
Expand Down Expand Up @@ -848,6 +848,7 @@ public void close()
return;
}
finished = true;
exchangeSource = null;
addCallback(exchangeSourceFuture, new FutureCallback<>()
{
@Override
Expand All @@ -868,6 +869,7 @@ public void onFailure(Throwable ignored)
// It a failure occurred it is expected to be propagated by the getNext method
}
}, directExecutor());
exchangeSourceFuture = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.trino.connector.CatalogName;
import io.trino.exchange.ExchangeManagerRegistry;
Expand Down Expand Up @@ -54,6 +55,8 @@
public class ExchangeOperator
implements SourceOperator
{
private static final Logger log = Logger.get(ExchangeOperator.class);

public static final CatalogName REMOTE_CONNECTOR_ID = new CatalogName("$remote");

public static class ExchangeOperatorFactory
Expand Down Expand Up @@ -459,15 +462,21 @@ public void close()
private static class SpoolingExchangeDataSource
implements ExchangeDataSource
{
private final ExchangeSource exchangeSource;
// This field is not final to allow releasing the memory retained by the ExchangeSource instance.
// It is modified (assigned to null) when the ExchangeOperator is closed.
// It doesn't have to be declared as volatile as the nullification of this variable doesn't have to be immediately visible to other threads.
// However since close can be called at any moment this variable has to be accessed in a safe way (avoiding "check-then-use").
private ExchangeSource exchangeSource;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There two separate issues here. One improving GC and another is ensuring memory pool accounting is fine. It would be nice to have separated commits for them.

private final List<ExchangeSourceHandle> exchangeSourceHandles;
private final LocalMemoryContext systemMemoryContext;
private volatile boolean closed;

private SpoolingExchangeDataSource(
ExchangeSource exchangeSource,
List<ExchangeSourceHandle> exchangeSourceHandles,
LocalMemoryContext systemMemoryContext)
{
// this assignment is expected to be followed by an assignment of a final field to ensure safe publication
this.exchangeSource = requireNonNull(exchangeSource, "exchangeSource is null");
this.exchangeSourceHandles = ImmutableList.copyOf(requireNonNull(exchangeSourceHandles, "exchangeSourceHandles is null"));
this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null");
Expand All @@ -476,20 +485,39 @@ private SpoolingExchangeDataSource(
@Override
public Slice pollPage()
{
ExchangeSource exchangeSource = this.exchangeSource;
if (exchangeSource == null) {
return null;
}

Slice data = exchangeSource.read();
systemMemoryContext.setBytes(exchangeSource.getMemoryUsage());

// If the data source has been closed in a meantime reset memory usage back to 0
if (closed) {
systemMemoryContext.setBytes(0);
}
Comment on lines 496 to 499
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this check needed? systemMemoryContext.setBytes(0); will always be executed in the finally block of close(), right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but pollPage and close can be called concurrently. It is possible that close will set the memory reservation to 0 while the pollPage will reset it back to a non zero value


return data;
}

@Override
public boolean isFinished()
{
ExchangeSource exchangeSource = this.exchangeSource;
if (exchangeSource == null) {
return true;
}
return exchangeSource.isFinished();
}

@Override
public ListenableFuture<Void> isBlocked()
{
ExchangeSource exchangeSource = this.exchangeSource;
if (exchangeSource == null) {
return immediateVoidFuture();
}
return toListenableFuture(exchangeSource.isBlocked());
}

Expand Down Expand Up @@ -522,9 +550,22 @@ public OperatorInfo getInfo()
}

@Override
public void close()
public synchronized void close()
{
exchangeSource.close();
if (closed) {
return;
}
closed = true;
try {
exchangeSource.close();
}
catch (RuntimeException e) {
log.warn(e, "error closing exchange source");
}
finally {
exchangeSource = null;
systemMemoryContext.setBytes(0);
}
}
}
}