Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,27 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.ptf.ConnectorTableFunctionHandle;

import static java.util.Objects.requireNonNull;

public class TableFunctionHandle
{
private final CatalogHandle catalogHandle;
private final SchemaFunctionName schemaFunctionName;
private final ConnectorTableFunctionHandle functionHandle;
private final ConnectorTransactionHandle transactionHandle;

@JsonCreator
public TableFunctionHandle(
@JsonProperty("catalogHandle") CatalogHandle catalogHandle,
@JsonProperty("schemaFunctionName") SchemaFunctionName schemaFunctionName,
@JsonProperty("functionHandle") ConnectorTableFunctionHandle functionHandle,
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle)
{
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.schemaFunctionName = requireNonNull(schemaFunctionName, "schemaFunctionName is null");
this.functionHandle = requireNonNull(functionHandle, "functionHandle is null");
this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
}
Expand All @@ -44,6 +48,12 @@ public CatalogHandle getCatalogHandle()
return catalogHandle;
}

@JsonProperty
public SchemaFunctionName getSchemaFunctionName()
{
return schemaFunctionName;
}

@JsonProperty
public ConnectorTableFunctionHandle getFunctionHandle()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2230,6 +2230,7 @@ public TableArgumentAnalysis build()
public static class TableFunctionInvocationAnalysis
{
private final CatalogHandle catalogHandle;
private final String schemaName;
private final String functionName;
private final Map<String, Argument> arguments;
private final List<TableArgumentAnalysis> tableArgumentAnalyses;
Expand All @@ -2241,6 +2242,7 @@ public static class TableFunctionInvocationAnalysis

public TableFunctionInvocationAnalysis(
CatalogHandle catalogHandle,
String schemaName,
String functionName,
Map<String, Argument> arguments,
List<TableArgumentAnalysis> tableArgumentAnalyses,
Expand All @@ -2251,6 +2253,7 @@ public TableFunctionInvocationAnalysis(
ConnectorTransactionHandle transactionHandle)
{
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.functionName = requireNonNull(functionName, "functionName is null");
this.arguments = ImmutableMap.copyOf(arguments);
this.tableArgumentAnalyses = ImmutableList.copyOf(tableArgumentAnalyses);
Expand All @@ -2267,6 +2270,11 @@ public CatalogHandle getCatalogHandle()
return catalogHandle;
}

public String getSchemaName()
{
return schemaName;
}

public String getFunctionName()
{
return functionName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1671,6 +1671,7 @@ else if (argument.getPartitionBy().isPresent()) {

analysis.setTableFunctionAnalysis(node, new TableFunctionInvocationAnalysis(
catalogHandle,
function.getSchema(),
function.getName(),
argumentsAnalysis.getPassedArguments(),
orderedTableArguments.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@
import io.trino.sql.planner.plan.TableExecuteNode;
import io.trino.sql.planner.plan.TableFinishNode;
import io.trino.sql.planner.plan.TableFunctionNode;
import io.trino.sql.planner.plan.TableFunctionProcessorNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.planner.plan.TableWriterNode;
import io.trino.sql.planner.plan.TableWriterNode.MergeTarget;
Expand Down Expand Up @@ -1658,6 +1659,12 @@ public PhysicalOperation visitTableFunction(TableFunctionNode node, LocalExecuti
throw new UnsupportedOperationException("execution by operator is not yet implemented for table function " + node.getName());
}

@Override
public PhysicalOperation visitTableFunctionProcessor(TableFunctionProcessorNode node, LocalExecutionPlanContext context)
{
throw new UnsupportedOperationException("execution by operator is not yet implemented for table function " + node.getName());
}

@Override
public PhysicalOperation visitTopN(TopNNode node, LocalExecutionPlanContext context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import io.trino.sql.planner.iterative.rule.ImplementIntersectDistinctAsUnion;
import io.trino.sql.planner.iterative.rule.ImplementLimitWithTies;
import io.trino.sql.planner.iterative.rule.ImplementOffset;
import io.trino.sql.planner.iterative.rule.ImplementTableFunctionSource;
import io.trino.sql.planner.iterative.rule.InlineProjectIntoFilter;
import io.trino.sql.planner.iterative.rule.InlineProjections;
import io.trino.sql.planner.iterative.rule.MergeExcept;
Expand Down Expand Up @@ -629,7 +630,11 @@ public PlanOptimizers(
costCalculator,
// Temporary hack: separate optimizer step to avoid the sample node being replaced by filter before pushing
// it to table scan node
ImmutableSet.of(new ImplementBernoulliSampleAsFilter(metadata))),
ImmutableSet.of(
new ImplementBernoulliSampleAsFilter(metadata),
// Must run after RewriteTableFunctionToTableScan because that rule applies to TableFunctionNode.
// While the node gets rewritten to TableFunctionProcessorNode, we can no longer pushdown the function to the connector.
new ImplementTableFunctionSource(metadata))),
columnPruningOptimizer,
new IterativeOptimizer(
plannerContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.metadata.TableFunctionHandle;
import io.trino.metadata.TableHandle;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.sql.ExpressionUtils;
Expand All @@ -47,6 +48,8 @@
import io.trino.sql.planner.plan.ProjectNode;
import io.trino.sql.planner.plan.SampleNode;
import io.trino.sql.planner.plan.TableFunctionNode;
import io.trino.sql.planner.plan.TableFunctionNode.PassThroughColumn;
import io.trino.sql.planner.plan.TableFunctionNode.PassThroughSpecification;
import io.trino.sql.planner.plan.TableFunctionNode.TableArgumentProperties;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.planner.plan.UnionNode;
Expand Down Expand Up @@ -428,27 +431,38 @@ protected RelationPlan visitTableFunctionInvocation(TableFunctionInvocation node
specification = Optional.of(new DataOrganizationSpecification(partitionBy, orderBy));
}

sources.add(sourcePlanBuilder.getRoot());
sourceProperties.add(new TableArgumentProperties(
tableArgument.getArgumentName(),
tableArgument.isRowSemantics(),
tableArgument.isPruneWhenEmpty(),
tableArgument.isPassThroughColumns(),
requiredColumns,
specification));

// add output symbols passed from the table argument
ImmutableList.Builder<PassThroughColumn> passThroughColumns = ImmutableList.builder();
if (tableArgument.isPassThroughColumns()) {
// the original output symbols from the source node, not coerced
// note: hidden columns are included. They are present in sourcePlan.fieldMappings
outputSymbols.addAll(sourcePlan.getFieldMappings());
Set<Symbol> partitionBy = specification
.map(DataOrganizationSpecification::getPartitionBy)
.map(ImmutableSet::copyOf)
.orElse(ImmutableSet.of());
sourcePlan.getFieldMappings().stream()
.map(symbol -> new PassThroughColumn(symbol, partitionBy.contains(symbol)))
.forEach(passThroughColumns::add);
}
else if (tableArgument.getPartitionBy().isPresent()) {
tableArgument.getPartitionBy().get().stream()
// the original symbols for partitioning columns, not coerced
.map(sourcePlanBuilder::translate)
.forEach(outputSymbols::add);
.forEach(symbol -> {
outputSymbols.add(symbol);
passThroughColumns.add(new PassThroughColumn(symbol, true));
});
}

sources.add(sourcePlanBuilder.getRoot());
sourceProperties.add(new TableArgumentProperties(
tableArgument.getArgumentName(),
tableArgument.isRowSemantics(),
tableArgument.isPruneWhenEmpty(),
new PassThroughSpecification(tableArgument.isPassThroughColumns(), passThroughColumns.build()),
requiredColumns,
specification));
}

PlanNode root = new TableFunctionNode(
Expand All @@ -459,7 +473,11 @@ else if (tableArgument.getPartitionBy().isPresent()) {
sources.build(),
sourceProperties.build(),
functionAnalysis.getCopartitioningLists(),
new TableFunctionHandle(functionAnalysis.getCatalogHandle(), functionAnalysis.getConnectorTableFunctionHandle(), functionAnalysis.getTransactionHandle()));
new TableFunctionHandle(
functionAnalysis.getCatalogHandle(),
new SchemaFunctionName(functionAnalysis.getSchemaName(), functionAnalysis.getFunctionName()),
functionAnalysis.getConnectorTableFunctionHandle(),
functionAnalysis.getTransactionHandle()));

return new RelationPlan(root, analysis.getScope(node), outputSymbols.build(), outerContext);
}
Expand Down
Loading