Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,24 @@

import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;

import static io.trino.spi.connector.DynamicFilter.NOT_BLOCKED;
import static java.util.Objects.requireNonNull;

public final class MemorySplitManager
implements ConnectorSplitManager
{
private final int splitsPerNode;
private final MemoryMetadata metadata;
private final boolean enableLazyDynamicFiltering;

@Inject
public MemorySplitManager(MemoryConfig config, MemoryMetadata metadata)
{
this.splitsPerNode = config.getSplitsPerNode();
this.metadata = metadata;
this.enableLazyDynamicFiltering = config.isEnableLazyDynamicFiltering();
}

@Override
Expand Down Expand Up @@ -72,6 +78,57 @@ public ConnectorSplitSource getSplits(
splits.add(new MemorySplit(table.getId(), i, splitsPerNode, dataFragment.getHostAddress(), rows, OptionalLong.empty()));
}
}
return new FixedSplitSource(splits.build());

ConnectorSplitSource splitSource = new FixedSplitSource(splits.build());
if (enableLazyDynamicFiltering) {
// Needed to avoid scheduling a stage that is waiting for dynamic filters to become available.
// It makes no difference for pipelined execution where the stages are scheduled eagerly and there's no limit on the number of tasks running in parallel.
// However in fault tolerant execution if the stage waiting for dynamic filters is scheduled first it may occupy all available slots leaving no resources
// for the stage that collects dynamic filters to be scheduled effectively creating a deadlock.
splitSource = new DelayedSplitSource(whenCompleted(dynamicFilter), splitSource);
}
return splitSource;
}

private static CompletableFuture<?> whenCompleted(DynamicFilter dynamicFilter)
{
if (dynamicFilter.isAwaitable()) {
return dynamicFilter.isBlocked().thenCompose(ignored -> whenCompleted(dynamicFilter));
}
return NOT_BLOCKED;
}

private static class DelayedSplitSource
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
implements ConnectorSplitSource
{
private final CompletableFuture<?> delay;
private final ConnectorSplitSource delegate;

public DelayedSplitSource(CompletableFuture<?> delay, ConnectorSplitSource delegate)
{
this.delay = requireNonNull(delay, "delay is null");
this.delegate = requireNonNull(delegate, "delegate is null");
}

@Override
public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
{
return delay.thenCompose(ignored -> delegate.getNextBatch(maxSize));
}

@Override
public void close()
{
delegate.close();
}

@Override
public boolean isFinished()
{
if (delay.isDone()) {
return delegate.isFinished();
}
return false;
}
}
}