diff --git a/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemorySplitManager.java b/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemorySplitManager.java index b3607001f49b..a5c510cde4fe 100644 --- a/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemorySplitManager.java +++ b/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemorySplitManager.java @@ -28,18 +28,24 @@ import java.util.List; import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; + +import static io.trino.spi.connector.DynamicFilter.NOT_BLOCKED; +import static java.util.Objects.requireNonNull; public final class MemorySplitManager implements ConnectorSplitManager { private final int splitsPerNode; private final MemoryMetadata metadata; + private final boolean enableLazyDynamicFiltering; @Inject public MemorySplitManager(MemoryConfig config, MemoryMetadata metadata) { this.splitsPerNode = config.getSplitsPerNode(); this.metadata = metadata; + this.enableLazyDynamicFiltering = config.isEnableLazyDynamicFiltering(); } @Override @@ -72,6 +78,57 @@ public ConnectorSplitSource getSplits( splits.add(new MemorySplit(table.getId(), i, splitsPerNode, dataFragment.getHostAddress(), rows, OptionalLong.empty())); } } - return new FixedSplitSource(splits.build()); + + ConnectorSplitSource splitSource = new FixedSplitSource(splits.build()); + if (enableLazyDynamicFiltering) { + // Needed to avoid scheduling a stage that is waiting for dynamic filters to become available. + // It makes no difference for pipelined execution where the stages are scheduled eagerly and there's no limit on the number of tasks running in parallel. + // However in fault tolerant execution if the stage waiting for dynamic filters is scheduled first it may occupy all available slots leaving no resources + // for the stage that collects dynamic filters to be scheduled effectively creating a deadlock. + splitSource = new DelayedSplitSource(whenCompleted(dynamicFilter), splitSource); + } + return splitSource; + } + + private static CompletableFuture whenCompleted(DynamicFilter dynamicFilter) + { + if (dynamicFilter.isAwaitable()) { + return dynamicFilter.isBlocked().thenCompose(ignored -> whenCompleted(dynamicFilter)); + } + return NOT_BLOCKED; + } + + private static class DelayedSplitSource + implements ConnectorSplitSource + { + private final CompletableFuture delay; + private final ConnectorSplitSource delegate; + + public DelayedSplitSource(CompletableFuture delay, ConnectorSplitSource delegate) + { + this.delay = requireNonNull(delay, "delay is null"); + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public CompletableFuture getNextBatch(int maxSize) + { + return delay.thenCompose(ignored -> delegate.getNextBatch(maxSize)); + } + + @Override + public void close() + { + delegate.close(); + } + + @Override + public boolean isFinished() + { + if (delay.isDone()) { + return delegate.isFinished(); + } + return false; + } } }