Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.trino.metadata.Split;
import io.trino.spi.Page;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
import io.trino.spi.ptf.TableFunctionProcessorProvider;
Expand All @@ -41,14 +43,21 @@ public static class LeafTableFunctionOperatorFactory
{
private final int operatorId;
private final PlanNodeId sourceId;
private final CatalogHandle functionCatalog;
private final TableFunctionProcessorProvider tableFunctionProvider;
private final ConnectorTableFunctionHandle functionHandle;
private boolean closed;

public LeafTableFunctionOperatorFactory(int operatorId, PlanNodeId sourceId, TableFunctionProcessorProvider tableFunctionProvider, ConnectorTableFunctionHandle functionHandle)
public LeafTableFunctionOperatorFactory(
int operatorId,
PlanNodeId sourceId,
CatalogHandle functionCatalog,
TableFunctionProcessorProvider tableFunctionProvider,
ConnectorTableFunctionHandle functionHandle)
{
this.operatorId = operatorId;
this.sourceId = requireNonNull(sourceId, "sourceId is null");
this.functionCatalog = requireNonNull(functionCatalog, "functionCatalog is null");
this.tableFunctionProvider = requireNonNull(tableFunctionProvider, "tableFunctionProvider is null");
this.functionHandle = requireNonNull(functionHandle, "functionHandle is null");
}
Expand All @@ -64,7 +73,7 @@ public SourceOperator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, sourceId, LeafTableFunctionOperator.class.getSimpleName());
return new LeafTableFunctionOperator(operatorContext, sourceId, tableFunctionProvider, functionHandle);
return new LeafTableFunctionOperator(operatorContext, sourceId, functionCatalog, tableFunctionProvider, functionHandle);
}

@Override
Expand All @@ -78,6 +87,7 @@ public void noMoreOperators()
private final PlanNodeId sourceId;
private final TableFunctionProcessorProvider tableFunctionProvider;
private final ConnectorTableFunctionHandle functionHandle;
private final ConnectorSession session;

private ConnectorSplit currentSplit;
private final List<ConnectorSplit> pendingSplits = new ArrayList<>();
Expand All @@ -88,17 +98,23 @@ public void noMoreOperators()
private boolean processorFinishedSplit = true;
private ListenableFuture<Void> processorBlocked = NOT_BLOCKED;

public LeafTableFunctionOperator(OperatorContext operatorContext, PlanNodeId sourceId, TableFunctionProcessorProvider tableFunctionProvider, ConnectorTableFunctionHandle functionHandle)
public LeafTableFunctionOperator(
OperatorContext operatorContext,
PlanNodeId sourceId,
CatalogHandle functionCatalog,
TableFunctionProcessorProvider tableFunctionProvider,
ConnectorTableFunctionHandle functionHandle)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.sourceId = requireNonNull(sourceId, "sourceId is null");
this.tableFunctionProvider = requireNonNull(tableFunctionProvider, "tableFunctionProvider is null");
this.functionHandle = requireNonNull(functionHandle, "functionHandle is null");
this.session = operatorContext.getSession().toConnectorSession(functionCatalog);
}

private void resetProcessor()
{
this.processor = tableFunctionProvider.getSplitProcessor(functionHandle);
this.processor = tableFunctionProvider.getSplitProcessor(session, functionHandle);
this.processorUsedData = false;
this.processorFinishedSplit = false;
this.processorBlocked = NOT_BLOCKED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1674,7 +1674,12 @@ public PhysicalOperation visitTableFunctionProcessor(TableFunctionProcessorNode
TableFunctionProcessorProvider processorProvider = plannerContext.getFunctionManager().getTableFunctionProcessorProvider(node.getHandle());

if (node.getSource().isEmpty()) {
OperatorFactory operatorFactory = new LeafTableFunctionOperatorFactory(context.getNextOperatorId(), node.getId(), processorProvider, node.getHandle().getFunctionHandle());
OperatorFactory operatorFactory = new LeafTableFunctionOperatorFactory(
context.getNextOperatorId(),
node.getId(),
node.getFunctionCatalog(),
processorProvider,
node.getHandle().getFunctionHandle());
return new PhysicalOperation(operatorFactory, makeLayout(node), context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ else if (tableArgument.getPartitionBy().isPresent()) {
PlanNode root = new TableFunctionNode(
idAllocator.getNextId(),
functionAnalysis.getFunctionName(),
functionAnalysis.getCatalogHandle(),
functionAnalysis.getArguments(),
properOutputs,
sources.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public Result apply(TableFunctionNode node, Captures captures, Context context)
return Result.ofPlanNode(new TableFunctionProcessorNode(
node.getId(),
node.getName(),
node.getFunctionCatalog(),
node.getProperOutputs(),
Optional.empty(),
false,
Expand All @@ -183,6 +184,7 @@ public Result apply(TableFunctionNode node, Captures captures, Context context)
return Result.ofPlanNode(new TableFunctionProcessorNode(
node.getId(),
node.getName(),
node.getFunctionCatalog(),
node.getProperOutputs(),
Optional.of(getOnlyElement(node.getSources())),
sourceProperties.isPruneWhenEmpty(),
Expand Down Expand Up @@ -278,6 +280,7 @@ public Result apply(TableFunctionNode node, Captures captures, Context context)
return Result.ofPlanNode(new TableFunctionProcessorNode(
node.getId(),
node.getName(),
node.getFunctionCatalog(),
node.getProperOutputs(),
Optional.of(marked.node()),
pruneWhenEmpty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ public PlanWithProperties visitTableFunctionProcessor(TableFunctionProcessorNode
TableFunctionProcessorNode result = new TableFunctionProcessorNode(
node.getId(),
node.getName(),
node.getFunctionCatalog(),
node.getProperOutputs(),
Optional.of(child.getNode()),
node.isPruneWhenEmpty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ public TableFunctionProcessorNode map(TableFunctionProcessorNode node, PlanNode
return new TableFunctionProcessorNode(
node.getId(),
node.getName(),
node.getFunctionCatalog(),
map(node.getProperOutputs()),
Optional.of(source),
node.isPruneWhenEmpty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ public PlanAndMappings visitTableFunction(TableFunctionNode node, UnaliasContext
new TableFunctionNode(
node.getId(),
node.getName(),
node.getFunctionCatalog(),
node.getArguments(),
newProperOutputs,
newSources.build(),
Expand All @@ -378,6 +379,7 @@ public PlanAndMappings visitTableFunctionProcessor(TableFunctionProcessorNode no
new TableFunctionProcessorNode(
node.getId(),
node.getName(),
node.getFunctionCatalog(),
mapper.map(node.getProperOutputs()),
Optional.empty(),
node.isPruneWhenEmpty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.metadata.TableFunctionHandle;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.ptf.Argument;
import io.trino.sql.planner.Symbol;

Expand All @@ -37,6 +38,7 @@ public class TableFunctionNode
extends PlanNode
{
private final String name;
private final CatalogHandle functionCatalog;
private final Map<String, Argument> arguments;
private final List<Symbol> properOutputs;
private final List<PlanNode> sources;
Expand All @@ -48,6 +50,7 @@ public class TableFunctionNode
public TableFunctionNode(
@JsonProperty("id") PlanNodeId id,
@JsonProperty("name") String name,
@JsonProperty("functionCatalog") CatalogHandle functionCatalog,
@JsonProperty("arguments") Map<String, Argument> arguments,
@JsonProperty("properOutputs") List<Symbol> properOutputs,
@JsonProperty("sources") List<PlanNode> sources,
Expand All @@ -57,6 +60,7 @@ public TableFunctionNode(
{
super(id);
this.name = requireNonNull(name, "name is null");
this.functionCatalog = requireNonNull(functionCatalog, "functionCatalog is null");
this.arguments = ImmutableMap.copyOf(arguments);
this.properOutputs = ImmutableList.copyOf(properOutputs);
this.sources = ImmutableList.copyOf(sources);
Expand All @@ -73,6 +77,12 @@ public String getName()
return name;
}

@JsonProperty
public CatalogHandle getFunctionCatalog()
{
return functionCatalog;
}

@JsonProperty
public Map<String, Argument> getArguments()
{
Expand Down Expand Up @@ -137,7 +147,16 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context)
public PlanNode replaceChildren(List<PlanNode> newSources)
{
checkArgument(sources.size() == newSources.size(), "wrong number of new children");
return new TableFunctionNode(getId(), name, arguments, properOutputs, newSources, tableArgumentProperties, copartitioningLists, handle);
return new TableFunctionNode(
getId(),
name,
functionCatalog,
arguments,
properOutputs,
newSources,
tableArgumentProperties,
copartitioningLists,
handle);
}

public static class TableArgumentProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.metadata.TableFunctionHandle;
import io.trino.spi.connector.CatalogHandle;
import io.trino.sql.planner.OrderingScheme;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.plan.TableFunctionNode.PassThroughSpecification;
Expand All @@ -39,6 +40,8 @@ public class TableFunctionProcessorNode
{
private final String name;

private final CatalogHandle functionCatalog;

// symbols produced by the function
private final List<Symbol> properOutputs;

Expand Down Expand Up @@ -74,6 +77,7 @@ public class TableFunctionProcessorNode
public TableFunctionProcessorNode(
@JsonProperty("id") PlanNodeId id,
@JsonProperty("name") String name,
@JsonProperty("functionCatalog") CatalogHandle functionCatalog,
@JsonProperty("properOutputs") List<Symbol> properOutputs,
@JsonProperty("source") Optional<PlanNode> source,
@JsonProperty("pruneWhenEmpty") boolean pruneWhenEmpty,
Expand All @@ -88,6 +92,7 @@ public TableFunctionProcessorNode(
{
super(id);
this.name = requireNonNull(name, "name is null");
this.functionCatalog = requireNonNull(functionCatalog, "functionCatalog is null");
this.properOutputs = ImmutableList.copyOf(properOutputs);
this.source = requireNonNull(source, "source is null");
this.pruneWhenEmpty = pruneWhenEmpty;
Expand Down Expand Up @@ -122,6 +127,12 @@ public String getName()
return name;
}

@JsonProperty
public CatalogHandle getFunctionCatalog()
{
return functionCatalog;
}

@JsonProperty
public List<Symbol> getProperOutputs()
{
Expand Down Expand Up @@ -221,6 +232,20 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context)
public PlanNode replaceChildren(List<PlanNode> newSources)
{
Optional<PlanNode> newSource = newSources.isEmpty() ? Optional.empty() : Optional.of(getOnlyElement(newSources));
return new TableFunctionProcessorNode(getId(), name, properOutputs, newSource, pruneWhenEmpty, passThroughSpecifications, requiredSymbols, markerSymbols, specification, prePartitioned, preSorted, hashSymbol, handle);
return new TableFunctionProcessorNode(
getId(),
name,
functionCatalog,
properOutputs,
newSource,
pruneWhenEmpty,
passThroughSpecifications,
requiredSymbols,
markerSymbols,
specification,
prePartitioned,
preSorted,
hashSymbol,
handle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ public static class ConstantFunctionProcessorProvider
implements TableFunctionProcessorProvider
{
@Override
public TableFunctionSplitProcessor getSplitProcessor(ConnectorTableFunctionHandle handle)
public TableFunctionSplitProcessor getSplitProcessor(ConnectorSession session, ConnectorTableFunctionHandle handle)
{
return new ConstantFunctionProcessor(((ConstantFunctionHandle) handle).getValue());
}
Expand Down Expand Up @@ -1308,7 +1308,7 @@ public static class EmptySourceFunctionProcessorProvider
implements TableFunctionProcessorProvider
{
@Override
public TableFunctionSplitProcessor getSplitProcessor(ConnectorTableFunctionHandle handle)
public TableFunctionSplitProcessor getSplitProcessor(ConnectorSession session, ConnectorTableFunctionHandle handle)
{
return new EmptySourceFunctionProcessor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,7 @@ public TableFunctionNode tableFunction(
return new TableFunctionNode(
idAllocator.getNextId(),
name,
TEST_CATALOG_HANDLE,
ImmutableMap.of(),
properOutputs,
sources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.spi.ptf;

import io.trino.spi.Experimental;
import io.trino.spi.connector.ConnectorSession;

@Experimental(eta = "2023-03-31")
public interface TableFunctionProcessorProvider
Expand All @@ -31,7 +32,7 @@ default TableFunctionDataProcessor getDataProcessor(ConnectorTableFunctionHandle
* This method returns a {@code TableFunctionSplitProcessor}. All the necessary information collected during analysis is available
* in the form of {@link ConnectorTableFunctionHandle}. It is called once per each split processed by the table function.
*/
default TableFunctionSplitProcessor getSplitProcessor(ConnectorTableFunctionHandle handle)
default TableFunctionSplitProcessor getSplitProcessor(ConnectorSession session, ConnectorTableFunctionHandle handle)
{
throw new UnsupportedOperationException("this table function does not process splits");
}
Expand Down