Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -411,7 +412,7 @@ public static class ScanFilterAndProjectOperatorFactory
private final int operatorId;
private final PlanNodeId planNodeId;
private final Supplier<CursorProcessor> cursorProcessor;
private final Supplier<PageProcessor> pageProcessor;
private final Function<DynamicFilter, PageProcessor> pageProcessor;
private final PlanNodeId sourceId;
private final PageSourceProvider pageSourceProvider;
private final TableHandle table;
Expand All @@ -428,7 +429,7 @@ public ScanFilterAndProjectOperatorFactory(
PlanNodeId sourceId,
PageSourceProviderFactory pageSourceProvider,
Supplier<CursorProcessor> cursorProcessor,
Supplier<PageProcessor> pageProcessor,
Function<DynamicFilter, PageProcessor> pageProcessor,
TableHandle table,
Iterable<ColumnHandle> columns,
DynamicFilter dynamicFilter,
Expand Down Expand Up @@ -496,7 +497,7 @@ public WorkProcessorSourceOperator create(
split,
pageSourceProvider,
cursorProcessor.get(),
pageProcessor.get(),
pageProcessor.apply(dynamicFilter),
table,
columns,
dynamicFilter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.operator.project.PageProcessor;
import io.trino.operator.project.PageProjection;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.DynamicFilter;
import io.trino.sql.gen.columnar.ColumnarFilterCompiler;
import io.trino.sql.gen.columnar.DynamicPageFilter;
import io.trino.sql.gen.columnar.FilterEvaluator;
Expand All @@ -40,6 +41,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -106,7 +108,7 @@ public Supplier<CursorProcessor> compileCursorProcessor(Optional<RowExpression>
};
}

public Supplier<PageProcessor> compilePageProcessor(
public Function<DynamicFilter, PageProcessor> compilePageProcessor(
boolean columnarFilterEvaluationEnabled,
Optional<RowExpression> filter,
Optional<DynamicPageFilter> dynamicPageFilter,
Expand All @@ -125,7 +127,7 @@ public Supplier<PageProcessor> compilePageProcessor(
.collect(toImmutableList());

Optional<Supplier<PageFilter>> finalFilterFunctionSupplier = filterFunctionSupplier;
return () -> {
return (dynamicFilter) -> {
Optional<FilterEvaluator> filterEvaluator = columnarFilterEvaluatorSupplier.map(Supplier::get);
if (filterEvaluator.isEmpty()) {
filterEvaluator = finalFilterFunctionSupplier
Expand All @@ -136,7 +138,7 @@ public Supplier<PageProcessor> compilePageProcessor(
.map(Supplier::get)
.collect(toImmutableList());
Optional<FilterEvaluator> dynamicFilterEvaluator = dynamicPageFilter
.map(dynamicFilter -> dynamicFilter.createDynamicPageFilterEvaluator(columnarFilterCompiler))
.map(pageFilter -> pageFilter.createDynamicPageFilterEvaluator(columnarFilterCompiler, dynamicFilter))
.map(Supplier::get);
return new PageProcessor(filterEvaluator, dynamicFilterEvaluator, pageProjections, initialBatchSize);
};
Expand All @@ -145,13 +147,15 @@ public Supplier<PageProcessor> compilePageProcessor(
@VisibleForTesting
public Supplier<PageProcessor> compilePageProcessor(Optional<RowExpression> filter, List<? extends RowExpression> projections)
{
return compilePageProcessor(true, filter, Optional.empty(), projections, Optional.empty(), OptionalInt.empty());
return () -> compilePageProcessor(true, filter, Optional.empty(), projections, Optional.empty(), OptionalInt.empty())
.apply(DynamicFilter.EMPTY);
}

@VisibleForTesting
public Supplier<PageProcessor> compilePageProcessor(Optional<RowExpression> filter, List<? extends RowExpression> projections, int initialBatchSize)
{
return compilePageProcessor(true, filter, Optional.empty(), projections, Optional.empty(), OptionalInt.of(initialBatchSize));
return () -> compilePageProcessor(true, filter, Optional.empty(), projections, Optional.empty(), OptionalInt.of(initialBatchSize))
.apply(DynamicFilter.EMPTY);
}

private <T> Class<? extends T> compile(Optional<RowExpression> filter, List<RowExpression> projections, BodyCompiler bodyCompiler, Class<? extends T> superType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public final class DynamicPageFilter
private final Session session;
private final IrExpressionOptimizer irExpressionOptimizer;
private final DomainTranslator domainTranslator;
private final DynamicFilter dynamicFilter;
private final Map<ColumnHandle, Symbol> columnHandles;
private final Map<Symbol, Integer> sourceLayout;
private final double selectivityThreshold;
Expand All @@ -64,11 +63,13 @@ public final class DynamicPageFilter
@Nullable
@GuardedBy("this")
private CompletableFuture<?> isBlocked;
@Nullable
@GuardedBy("this")
private DynamicFilter currentDynamicFilter;

public DynamicPageFilter(
PlannerContext plannerContext,
Session session,
DynamicFilter dynamicFilter,
Map<Symbol, ColumnHandle> columnHandles,
Map<Symbol, Integer> sourceLayout,
double selectivityThreshold)
Expand All @@ -78,20 +79,25 @@ public DynamicPageFilter(
this.session = requireNonNull(session, "session is null");
this.irExpressionOptimizer = newOptimizer(plannerContext);
this.domainTranslator = new DomainTranslator(plannerContext.getMetadata());
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
this.columnHandles = columnHandles.entrySet()
.stream()
.collect(toImmutableMap(Map.Entry::getValue, Map.Entry::getKey));
this.sourceLayout = ImmutableMap.copyOf(sourceLayout);
this.selectivityThreshold = selectivityThreshold;
this.isBlocked = dynamicFilter.isBlocked();
}

// Compiled dynamic filter is fixed per-split and generated duration page source creation.
// Page source implementations may subsequently implement blocking on completion of dynamic filters, but since
// that occurs after page source creation, we cannot be guaranteed a completed dynamic filter here for initial splits
public synchronized Supplier<FilterEvaluator> createDynamicPageFilterEvaluator(ColumnarFilterCompiler compiler)
public synchronized Supplier<FilterEvaluator> createDynamicPageFilterEvaluator(ColumnarFilterCompiler compiler, DynamicFilter dynamicFilter)
{
requireNonNull(dynamicFilter, "dynamicFilter is null");
// Sub-query cache may provide different instance of DynamicFilter per-split.
if (!dynamicFilter.equals(currentDynamicFilter)) {
compiledDynamicFilter = null;
currentDynamicFilter = dynamicFilter;
isBlocked = dynamicFilter.isBlocked();
}
if (isBlocked == null) {
return compiledDynamicFilter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1992,12 +1992,11 @@ else if (sourceNode instanceof SampleNode sampleNode) {
dynamicPageFilterFactory = Optional.of(new DynamicPageFilter(
plannerContext,
session,
dynamicFilter,
((TableScanNode) sourceNode).getAssignments(),
sourceLayout,
getDynamicRowFilterSelectivityThreshold(session)));
}
Supplier<PageProcessor> pageProcessor = expressionCompiler.compilePageProcessor(
Function<DynamicFilter, PageProcessor> pageProcessor = expressionCompiler.compilePageProcessor(
columnarFilterEvaluationEnabled,
translatedFilter,
dynamicPageFilterFactory,
Expand Down Expand Up @@ -2028,7 +2027,7 @@ else if (sourceNode instanceof SampleNode sampleNode) {
OperatorFactory operatorFactory = FilterAndProjectOperator.createOperatorFactory(
context.getNextOperatorId(),
planNodeId,
pageProcessor,
() -> pageProcessor.apply(dynamicFilter),
getTypes(projections),
getFilterAndProjectMinOutputPageSize(session),
getFilterAndProjectMinOutputPageRowCount(session));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private void createScanFilterAndProjectOperatorFactories(List<Page> inputPages,
new PlanNodeId("test_source"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new FixedPageSource(inputPages),
() -> cursorProcessor,
() -> pageProcessor,
(_) -> pageProcessor,
TEST_TABLE_HANDLE,
columnHandles,
DynamicFilter.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testPageSource()
new PlanNodeId("0"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new FixedPageSource(ImmutableList.of(input)),
cursorProcessor,
pageProcessor,
(_) -> pageProcessor.get(),
TEST_TABLE_HANDLE,
ImmutableList.of(),
DynamicFilter.EMPTY,
Expand Down Expand Up @@ -175,7 +175,7 @@ public void testPageSourceMergeOutput()
new PlanNodeId("0"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new FixedPageSource(input),
cursorProcessor,
pageProcessor,
(_) -> pageProcessor.get(),
TEST_TABLE_HANDLE,
ImmutableList.of(),
DynamicFilter.EMPTY,
Expand Down Expand Up @@ -220,7 +220,7 @@ public void testPageSourceLazyLoad()
new PlanNodeId("0"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new SinglePagePageSource(input),
cursorProcessor,
() -> pageProcessor,
(_) -> pageProcessor,
TEST_TABLE_HANDLE,
ImmutableList.of(),
DynamicFilter.EMPTY,
Expand Down Expand Up @@ -254,7 +254,7 @@ public void testRecordCursorSource()
new PlanNodeId("0"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new RecordPageSource(new PageRecordSet(ImmutableList.of(VARCHAR), input)),
cursorProcessor,
pageProcessor,
(_) -> pageProcessor.get(),
TEST_TABLE_HANDLE,
ImmutableList.of(),
DynamicFilter.EMPTY,
Expand Down Expand Up @@ -309,7 +309,7 @@ public void testPageYield()
new PlanNodeId("0"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new FixedPageSource(ImmutableList.of(input)),
cursorProcessor,
pageProcessor,
(_) -> pageProcessor.get(),
TEST_TABLE_HANDLE,
ImmutableList.of(),
DynamicFilter.EMPTY,
Expand Down Expand Up @@ -377,7 +377,7 @@ public void testRecordCursorYield()
new PlanNodeId("0"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new RecordPageSource(new PageRecordSet(ImmutableList.of(BIGINT), input)),
cursorProcessor,
pageProcessor,
(_) -> pageProcessor.get(),
TEST_TABLE_HANDLE,
ImmutableList.of(),
DynamicFilter.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.operator.project.PageProcessor;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.connector.DynamicFilter;
import io.trino.sql.relational.CallExpression;
import io.trino.sql.relational.InputReferenceExpression;
import io.trino.sql.relational.RowExpression;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void setup()
projections,
Optional.empty(),
OptionalInt.empty())
.get();
.apply(DynamicFilter.EMPTY);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.spi.block.IntArrayBlock;
import io.trino.spi.block.LongArrayBlock;
import io.trino.spi.block.ShortArrayBlock;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.function.OperatorType;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -165,7 +166,7 @@ public void setup()
ImmutableList.of(field(0, type)),
Optional.empty(),
OptionalInt.empty())
.get();
.apply(DynamicFilter.EMPTY);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.operator.project.PageProcessor;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.function.OperatorType;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -196,7 +197,7 @@ public void setup()
ImmutableList.of(project),
Optional.empty(),
OptionalInt.empty())
.get();
.apply(DynamicFilter.EMPTY);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.trino.spi.block.LongArrayBlock;
import io.trino.spi.block.VariableWidthBlockBuilder;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.function.LiteralParameters;
import io.trino.spi.function.ScalarFunction;
import io.trino.spi.function.SqlNullable;
Expand Down Expand Up @@ -630,7 +631,7 @@ private static List<Page> processFilter(List<Page> inputPages, boolean columnarE
ImmutableList.of(field(ROW_NUM_CHANNEL, BIGINT)),
Optional.empty(),
OptionalInt.empty())
.get();
.apply(DynamicFilter.EMPTY);
LocalMemoryContext context = newSimpleAggregatedMemoryContext().newLocalMemoryContext(PageProcessor.class.getSimpleName());
ImmutableList.Builder<Page> outputPagesBuilder = ImmutableList.builder();
for (Page inputPage : inputPages) {
Expand Down
Loading