Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public static class TableScanOperatorFactory
private final Optional<ConnectorTableCredentials> tableCredentials;
private final List<ColumnHandle> columns;
private final List<Type> columnTypes;
private final DynamicFilter dynamicFilter;
private boolean closed;

public TableScanOperatorFactory(
Expand All @@ -71,8 +70,7 @@ public TableScanOperatorFactory(
TableHandle table,
Optional<ConnectorTableCredentials> tableCredentials,
List<ColumnHandle> columns,
List<Type> columnTypes,
DynamicFilter dynamicFilter)
List<Type> columnTypes)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
Expand All @@ -81,7 +79,6 @@ public TableScanOperatorFactory(
this.tableCredentials = requireNonNull(tableCredentials, "tableCredentials is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
this.pageSourceProvider = pageSourceProvider.createPageSourceProvider(table.catalogHandle());
}

Expand All @@ -103,8 +100,7 @@ public SourceOperator createOperator(DriverContext driverContext)
pageSourceProvider,
table,
tableCredentials,
columns,
dynamicFilter);
columns);

if (isSourcePagesValidationEnabled(operatorContext.getSession())) {
return new OutputValidatingSourceOperator(
Expand All @@ -128,7 +124,6 @@ public void noMoreOperators()
private final TableHandle table;
private final Optional<ConnectorTableCredentials> tableCredentials;
private final List<ColumnHandle> columns;
private final DynamicFilter dynamicFilter;
private final LocalMemoryContext memoryContext;
private final SettableFuture<Void> blocked = SettableFuture.create();

Expand All @@ -149,16 +144,14 @@ public TableScanOperator(
PageSourceProvider pageSourceProvider,
TableHandle table,
Optional<ConnectorTableCredentials> tableCredentials,
List<ColumnHandle> columns,
DynamicFilter dynamicFilter)
List<ColumnHandle> columns)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.sourceId = requireNonNull(sourceId, "planNodeId is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.table = requireNonNull(table, "table is null");
this.tableCredentials = requireNonNull(tableCredentials, "tableCredentials is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
this.memoryContext = operatorContext.newLocalUserMemoryContext(TableScanOperator.class.getSimpleName());
}

Expand Down Expand Up @@ -275,10 +268,7 @@ public Page getOutput()
return null;
}
if (source == null) {
if (!dynamicFilter.getCurrentPredicate().isAll()) {
operatorContext.recordDynamicFilterSplitProcessed(1L);
}
source = pageSourceProvider.createPageSource(operatorContext.getSession(), split, table, tableCredentials, columns, dynamicFilter);
source = pageSourceProvider.createPageSource(operatorContext.getSession(), split, table, tableCredentials, columns, DynamicFilter.EMPTY);
}

SourcePage sourcePage = source.getNextSourcePage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2013,20 +2013,8 @@ public PhysicalOperation visitSample(SampleNode node, LocalExecutionPlanContext
@Override
public PhysicalOperation visitFilter(FilterNode node, LocalExecutionPlanContext context)
{
PlanNode sourceNode = node.getSource();
Expression filterExpression = node.getPredicate();

if (node.getSource() instanceof TableScanNode tableScanNode) {
DynamicFilters.ExtractResult extractDynamicFilterResult = extractDynamicFilters(filterExpression);
Expression staticFilter = combineConjuncts(extractDynamicFilterResult.getStaticConjuncts());
if (staticFilter.equals(TRUE) && extractDynamicFilterResult.getDynamicConjuncts().isEmpty()) {
// filter node contains only empty dynamic filter, fallback to normal table scan
return visitTableScan(node.getId(), tableScanNode, filterExpression, context);
}
}

List<Symbol> outputSymbols = node.getOutputSymbols();
return visitScanFilterAndProject(context, node.getId(), sourceNode, Optional.of(filterExpression), Assignments.identity(outputSymbols), outputSymbols);
return visitScanFilterAndProject(context, node.getId(), node.getSource(), Optional.of(node.getPredicate()), Assignments.identity(outputSymbols), outputSymbols);
}

@Override
Expand Down Expand Up @@ -2187,21 +2175,16 @@ private RowExpression toRowExpression(Expression expression, Map<Symbol, Integer
@Override
public PhysicalOperation visitTableScan(TableScanNode node, LocalExecutionPlanContext context)
{
return visitTableScan(node.getId(), node, TRUE, context);
}

private PhysicalOperation visitTableScan(PlanNodeId planNodeId, TableScanNode node, Expression filterExpression, LocalExecutionPlanContext context)
{
PlanNodeId planNodeId = node.getId();
ImmutableList.Builder<ColumnHandle> columns = ImmutableList.builder();
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
for (Symbol symbol : node.getOutputSymbols()) {
columns.add(node.getAssignments().get(symbol));
columnTypes.add(symbol.type());
}

DynamicFilter dynamicFilter = getDynamicFilter(node, filterExpression, context);
Optional<ConnectorTableCredentials> tableCredentials = context.getTaskContext().getTableCredentials(node.getId());
OperatorFactory operatorFactory = new TableScanOperatorFactory(context.getNextOperatorId(), planNodeId, node.getId(), pageSourceManager, node.getTable(), tableCredentials, columns.build(), columnTypes.build(), dynamicFilter);
OperatorFactory operatorFactory = new TableScanOperatorFactory(context.getNextOperatorId(), planNodeId, node.getId(), pageSourceManager, node.getTable(), tableCredentials, columns.build(), columnTypes.build());
return new PhysicalOperation(operatorFactory, makeLayout(node));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.trino.operator.TaskContext;
import io.trino.spi.QueryId;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.type.Type;
import io.trino.sql.planner.plan.PlanNodeId;
Expand Down Expand Up @@ -109,8 +108,7 @@ public void testTableScanMemoryBlocking()
.build()),
TEST_TABLE_HANDLE,
Optional.empty(),
ImmutableList.of(),
DynamicFilter.EMPTY);
ImmutableList.of());
PageConsumerOperator sink = createSinkOperator(types);
Driver driver = Driver.createDriver(driverContext, source, sink);
assertThat(driver.getDriverContext()).isSameAs(driverContext);
Expand Down
10 changes: 4 additions & 6 deletions core/trino-main/src/test/java/io/trino/operator/TestDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.type.Type;
import io.trino.split.PageSourceProvider;
Expand Down Expand Up @@ -176,8 +175,7 @@ public void testAddSourceFinish()
.build()),
TEST_TABLE_HANDLE,
Optional.empty(),
ImmutableList.of(),
DynamicFilter.EMPTY);
ImmutableList.of());

PageConsumerOperator sink = createSinkOperator(types);
Driver driver = Driver.createDriver(driverContext, source, sink);
Expand Down Expand Up @@ -583,7 +581,7 @@ public AlwaysBlockedTableScanOperator(
TableHandle table,
List<ColumnHandle> columns)
{
super(operatorContext, planNodeId, pageSourceProvider, table, Optional.empty(), columns, DynamicFilter.EMPTY);
super(operatorContext, planNodeId, pageSourceProvider, table, Optional.empty(), columns);
}

@Override
Expand All @@ -603,7 +601,7 @@ public AlwaysBlockedMemoryRevokingTableScanOperator(
TableHandle table,
List<ColumnHandle> columns)
{
super(operatorContext, planNodeId, pageSourceProvider, table, Optional.empty(), columns, DynamicFilter.EMPTY);
super(operatorContext, planNodeId, pageSourceProvider, table, Optional.empty(), columns);
}

@Override
Expand All @@ -628,7 +626,7 @@ public NotBlockedTableScanOperator(
TableHandle table,
List<ColumnHandle> columns)
{
super(operatorContext, planNodeId, pageSourceProvider, table, Optional.empty(), columns, DynamicFilter.EMPTY);
super(operatorContext, planNodeId, pageSourceProvider, table, Optional.empty(), columns);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,7 @@ public SourceOperator newTableScanOperator(DriverContext driverContext)
TEST_TABLE_HANDLE,
Optional.empty(),
columns.stream().map(ColumnHandle.class::cast).collect(toImmutableList()),
types,
DynamicFilter.EMPTY);
types);
SourceOperator operator = sourceOperatorFactory.createOperator(driverContext);
operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit()));
return operator;
Expand Down