diff --git a/presto-common/src/main/java/com/facebook/presto/common/Page.java b/presto-common/src/main/java/com/facebook/presto/common/Page.java index 2e941461dca9f..d4a7fd9ff12ca 100644 --- a/presto-common/src/main/java/com/facebook/presto/common/Page.java +++ b/presto-common/src/main/java/com/facebook/presto/common/Page.java @@ -474,6 +474,18 @@ public Page replaceColumn(int channelIndex, Block column) return Page.wrapBlocksWithoutCopy(newBlocks.length, newBlocks); } + // TODO: Is this valid in presto as well? + public Page getColumns(int... columns) + { + requireNonNull(columns, "columns is null"); + + Block[] blocks = new Block[columns.length]; + for (int i = 0; i < columns.length; i++) { + blocks[i] = this.blocks[columns[i]]; + } + return wrapBlocksWithoutCopy(positionCount, blocks); + } + private static class DictionaryBlockIndexes { private final List blocks = new ArrayList<>(); diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/FunctionAndTypeManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/FunctionAndTypeManager.java index 456bdef5dba80..c39e48be16e88 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/FunctionAndTypeManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/FunctionAndTypeManager.java @@ -52,6 +52,7 @@ import com.facebook.presto.spi.function.SqlFunctionId; import com.facebook.presto.spi.function.SqlFunctionSupplier; import com.facebook.presto.spi.function.SqlInvokedFunction; +import com.facebook.presto.spi.function.table.TableFunctionProcessorProvider; import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.analyzer.FunctionAndTypeResolver; import com.facebook.presto.sql.analyzer.FunctionsConfig; @@ -878,4 +879,34 @@ public String toString() .toString(); } } + + // This come from Trino FunctionManager. + // The TableFunctionProcessProvider contains all of the logic for executing a table Function + // It is created during analysis and contains both a Data Processor and a Split Processor. + // Data Processor - Used to generate and process data. Ex. ExcludeColumns(getDataProcessor). + // Used by TableFunctionOperator. + // Split Processor - Used to process splits. Ex. SequenceFunction(SequenceFunctionProcessor/TableFunctionSplitProcessor) + // Used by LeafTableFunctionOperator. + + // getTableFunctionProcessorProvider - Is in both the FunctionManager and the GlobalFunctionCatalog? + // Connector one just returns the provider based on Connector functionHandle. EX getExcludeColumnsFunctionProcessorProvider + // public TableFunctionProcessorProvider getTableFunctionProcessorProvider(ConnectorTableFunctionHandle functionHandle) + + + // We don't have Catalog Handle/Name anymore. + // We don't have a FunctionProvider in spi. + public TableFunctionProcessorProvider getTableFunctionProcessorProvider(TableFunctionHandle tableFunctionHandle) + { + CatalogHandle catalogHandle = tableFunctionHandle.getCatalogHandle(); + FunctionProvider provider; + if (catalogHandle.equals(GlobalSystemConnector.CATALOG_HANDLE)) { + provider = globalFunctionCatalog; + } + else { + provider = functionProviders.getService(catalogHandle); + checkArgument(provider != null, "No function provider for catalog: '%s'", catalogHandle); + } + + return provider.getTableFunctionProcessorProvider(tableFunctionHandle.getFunctionHandle()); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java b/presto-main/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java index 0c07b99aaab4e..dfc48db0793de 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java @@ -14,10 +14,12 @@ package com.facebook.presto.metadata; import com.facebook.presto.index.IndexHandleJacksonModule; +import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandleJacksonModule; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Scopes; + import static com.facebook.airlift.json.JsonBinder.jsonBinder; public class HandleJsonModule @@ -37,7 +39,7 @@ public void configure(Binder binder) jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class); jsonBinder(binder).addModuleBinding().to(FunctionHandleJacksonModule.class); jsonBinder(binder).addModuleBinding().to(MetadataUpdateJacksonModule.class); - + jsonBinder(binder).addModuleBinding().to(ConnectorTableFunctionHandleJacksonModule.class); binder.bind(HandleResolver.class).in(Scopes.SINGLETON); } } diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/TableFunctionHandle.java b/presto-main/src/main/java/com/facebook/presto/metadata/TableFunctionHandle.java new file mode 100644 index 0000000000000..4b6c31d3be070 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/metadata/TableFunctionHandle.java @@ -0,0 +1,58 @@ + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.metadata; + +/* +import io.trino.spi.connector.CatalogHandle; +*/ +import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +import static java.util.Objects.requireNonNull; + +// TODO: This class should be implemented via table function by connector. +// catalogHandle - came from catalogName at the split which doesn't seem to exist anymore. +// ConnectorTableFunctionHandle - Should have been added by table function implementation +// transaction handle - already exists. Though in different location to trino. +// It was a record but switched to public final class due to java 8 restrictions. +// Contains all metadata information for executing a table function. +public final class TableFunctionHandle +{ + //CatalogHandle catalogHandle, + ConnectorTableFunctionHandle functionHandle; + ConnectorTransactionHandle transactionHandle; + + public TableFunctionHandle(ConnectorTableFunctionHandle functionHandle, + ConnectorTransactionHandle transactionHandle) + { + //requireNonNull(catalogHandle, "catalogHandle is null"); + this.functionHandle = requireNonNull(functionHandle, "functionHandle is null"); + this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null"); + } + + /* + public CatalogHandle getCatalogHandle() { + return catalogHandle; + }*/ + + public ConnectorTableFunctionHandle getFunctionHandle() { + return functionHandle; + } + + public ConnectorTransactionHandle getTransactionHandle() { + return transactionHandle; + } + +} diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PageBuffer.java b/presto-main/src/main/java/com/facebook/presto/operator/PageBuffer.java new file mode 100644 index 0000000000000..6bb5902079947 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/PageBuffer.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.operator; + +import com.facebook.presto.common.Page; +import javax.annotation.Nullable; + +import static com.google.common.base.Preconditions.checkState; +import static com.facebook.presto.operator.WorkProcessor.ProcessState.finished; +import static com.facebook.presto.operator.WorkProcessor.ProcessState.ofResult; +import static com.facebook.presto.operator.WorkProcessor.ProcessState.yield; +import static java.util.Objects.requireNonNull; + +/** + * Provides a bridge between classic {@link Operator} push input model + * and {@link WorkProcessorOperator} pull model. + */ +public class PageBuffer +{ + @Nullable + private Page page; + private boolean finished; + + public WorkProcessor pages() + { + return WorkProcessor.create(() -> { + if (isFinished() && isEmpty()) { + return finished(); + } + + if (!isEmpty()) { + Page result = page; + page = null; + return ofResult(result); + } + + return yield(); + }); + } + + public boolean isEmpty() + { + return page == null; + } + + public boolean isFinished() + { + return finished; + } + + public void add(Page page) + { + checkState(isEmpty(), "page buffer is not empty"); + checkState(!isFinished(), "page buffer is finished"); + requireNonNull(page, "page is null"); + + if (page.getPositionCount() == 0) { + return; + } + + this.page = page; + } + + public void finish() + { + finished = true; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PagesIndex.java b/presto-main/src/main/java/com/facebook/presto/operator/PagesIndex.java index bd2f5ffbc817a..b1d9c15bea5a9 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PagesIndex.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PagesIndex.java @@ -271,9 +271,9 @@ public void swap(int a, int b) valueAddresses.swap(a, b); } - public int buildPage(int position, int[] outputChannels, PageBuilder pageBuilder) + public int buildPage(int position, int endPosition, int[] outputChannels, PageBuilder pageBuilder) { - while (!pageBuilder.isFull() && position < positionCount) { + while (!pageBuilder.isFull() && position < positionCount && position < endPosition) { long pageAddress = valueAddresses.get(position); int blockIndex = decodeSliceIndex(pageAddress); int blockPosition = decodePosition(pageAddress); @@ -563,10 +563,29 @@ protected Page computeNext() } public Iterator getSortedPages() + { + return getSortedPagesFromRange(0, positionCount); + } + + /** + * Get sorted pages from the specified section of the PagesIndex. + * + * @param start start position of the section, inclusive + * @param end end position of the section, exclusive + * @return iterator of pages + */ + public Iterator getSortedPages(int start, int end) + { + checkArgument(start >= 0 && end <= positionCount, "position range out of bounds"); + checkArgument(start <= end, "invalid position range"); + return getSortedPagesFromRange(start, end); + } + + public Iterator getSortedPagesFromRange(int start, int end) { return new AbstractIterator() { - private int currentPosition; + private int currentPosition = start; private final PageBuilder pageBuilder = new PageBuilder(types); private final int[] outputChannels = new int[types.size()]; @@ -577,7 +596,7 @@ public Iterator getSortedPages() @Override public Page computeNext() { - currentPosition = buildPage(currentPosition, outputChannels, pageBuilder); + currentPosition = buildPage(currentPosition, end, outputChannels, pageBuilder); if (pageBuilder.isEmpty()) { return endOfData(); } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/function/EmptyTableFunctionPartition.java b/presto-main/src/main/java/com/facebook/presto/operator/function/EmptyTableFunctionPartition.java new file mode 100644 index 0000000000000..11da4033d4713 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/function/EmptyTableFunctionPartition.java @@ -0,0 +1,111 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.operator.function; + +import com.facebook.presto.operator.WorkProcessor; +import com.facebook.presto.common.Page; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.block.RunLengthEncodedBlock; +import com.facebook.presto.spi.function.table.TableFunctionDataProcessor; +import com.facebook.presto.spi.function.table.TableFunctionProcessorState; +import com.facebook.presto.spi.function.table.TableFunctionProcessorState.Blocked; +import com.facebook.presto.spi.function.table.TableFunctionProcessorState.Processed; +import com.facebook.presto.common.type.Type; + +import java.util.List; + +import static com.facebook.airlift.concurrent.MoreFutures.toListenableFuture; +import static com.facebook.presto.spi.StandardErrorCode.FUNCTION_IMPLEMENTATION_ERROR; +import static com.facebook.presto.spi.function.table.TableFunctionProcessorState.Finished.FINISHED; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +/** + * This is a class representing empty input to a table function. An EmptyTableFunctionPartition is created + * when the table function has KEEP WHEN EMPTY property, which means that the function should be executed + * even if the input is empty, and all the table arguments are empty relations. + *

+ * An EmptyTableFunctionPartition is created and processed once per node. To avoid duplicated execution, + * a table function having KEEP WHEN EMPTY property must have single distribution. + */ +public class EmptyTableFunctionPartition + implements TableFunctionPartition +{ + private final TableFunctionDataProcessor tableFunction; + private final int properChannelsCount; + private final int passThroughSourcesCount; + private final Type[] passThroughTypes; + + public EmptyTableFunctionPartition(TableFunctionDataProcessor tableFunction, int properChannelsCount, int passThroughSourcesCount, List passThroughTypes) + { + this.tableFunction = requireNonNull(tableFunction, "tableFunction is null"); + this.properChannelsCount = properChannelsCount; + this.passThroughSourcesCount = passThroughSourcesCount; + this.passThroughTypes = passThroughTypes.toArray(new Type[] {}); + } + + @Override + public WorkProcessor toOutputPages() + { + return WorkProcessor.create(() -> { + TableFunctionProcessorState state = tableFunction.process(null); + if (state == FINISHED) { + return WorkProcessor.ProcessState.finished(); + } + if (state instanceof Blocked) { + Blocked blocked = (Blocked) state; + return WorkProcessor.ProcessState.blocked(toListenableFuture(blocked.getFuture())); + } + Processed processed = (Processed) state; + if (processed.getResult() != null) { + return WorkProcessor.ProcessState.ofResult(appendNullsForPassThroughColumns(processed.getResult())); + } + throw new PrestoException(FUNCTION_IMPLEMENTATION_ERROR, "When function got no input, it should either produce output or return Blocked state"); + }); + } + + private Page appendNullsForPassThroughColumns(Page page) + { + if (page.getChannelCount() != properChannelsCount + passThroughSourcesCount) { + throw new PrestoException( + FUNCTION_IMPLEMENTATION_ERROR, + format( + "Table function returned a page containing %s channels. Expected channel number: %s (%s proper columns, %s pass-through index columns)", + page.getChannelCount(), + properChannelsCount + passThroughSourcesCount, + properChannelsCount, + passThroughSourcesCount)); + } + + Block[] resultBlocks = new Block[properChannelsCount + passThroughTypes.length]; + + // proper outputs first + for (int channel = 0; channel < properChannelsCount; channel++) { + resultBlocks[channel] = page.getBlock(channel); + } + + // pass-through columns next + // because no input was processed, all pass-through indexes in the result page must be null (there are no input rows they could refer to). + // for performance reasons this is not checked. All pass-through columns are filled with nulls. + int channel = properChannelsCount; + for (Type type : passThroughTypes) { + resultBlocks[channel] = RunLengthEncodedBlock.create(type, null, page.getPositionCount()); + channel++; + } + + // pass the position count so that the Page can be successfully created in the case when there are no output channels (resultBlocks is empty) + return new Page(page.getPositionCount(), resultBlocks); + } +} \ No newline at end of file diff --git a/presto-main/src/main/java/com/facebook/presto/operator/function/LeafTableFunctionOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/function/LeafTableFunctionOperator.java new file mode 100644 index 0000000000000..220a81381948a --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/function/LeafTableFunctionOperator.java @@ -0,0 +1,234 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.operator.function; + +import com.facebook.presto.execution.ScheduledSplit; +import com.facebook.presto.operator.DriverContext; +import com.facebook.presto.operator.OperatorContext; +import com.facebook.presto.operator.SourceOperator; +import com.facebook.presto.operator.SourceOperatorFactory; +import com.facebook.presto.spi.UpdatablePageSource; +import com.google.common.util.concurrent.ListenableFuture; +import com.facebook.presto.metadata.Split; +import com.facebook.presto.common.Page; +import com.facebook.presto.spi.ConnectorSplit; + +//TODO: This ConnectorTableFunctionHandle is not set up correctly. +import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle; +import com.facebook.presto.spi.function.table.TableFunctionProcessorProvider; +import com.facebook.presto.spi.function.table.TableFunctionProcessorState; +import com.facebook.presto.spi.function.table.TableFunctionProcessorState.Blocked; +import com.facebook.presto.spi.function.table.TableFunctionProcessorState.Processed; +import com.facebook.presto.spi.function.table.TableFunctionSplitProcessor; +import com.facebook.presto.spi.plan.PlanNodeId; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +import static com.google.common.base.Preconditions.checkState; +import static com.facebook.airlift.concurrent.MoreFutures.toListenableFuture; +import static com.facebook.presto.spi.function.table.TableFunctionProcessorState.Finished.FINISHED; +import static java.util.Objects.requireNonNull; + +public class LeafTableFunctionOperator + implements SourceOperator +{ + public static class LeafTableFunctionOperatorFactory + implements SourceOperatorFactory + { + private final int operatorId; + private final PlanNodeId sourceId; + private final TableFunctionProcessorProvider tableFunctionProvider; + private final ConnectorTableFunctionHandle functionHandle; + private boolean closed; + + public LeafTableFunctionOperatorFactory(int operatorId, PlanNodeId sourceId, TableFunctionProcessorProvider tableFunctionProvider, ConnectorTableFunctionHandle functionHandle) + { + this.operatorId = operatorId; + this.sourceId = requireNonNull(sourceId, "sourceId is null"); + this.tableFunctionProvider = requireNonNull(tableFunctionProvider, "tableFunctionProvider is null"); + this.functionHandle = requireNonNull(functionHandle, "functionHandle is null"); + } + + @Override + public PlanNodeId getSourceId() + { + return sourceId; + } + + @Override + public SourceOperator createOperator(DriverContext driverContext) + { + checkState(!closed, "Factory is already closed"); + OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, sourceId, LeafTableFunctionOperator.class.getSimpleName()); + return new LeafTableFunctionOperator(operatorContext, sourceId, tableFunctionProvider, functionHandle); + } + + @Override + public void noMoreOperators() + { + closed = true; + } + } + + private final OperatorContext operatorContext; + private final PlanNodeId sourceId; + private final TableFunctionProcessorProvider tableFunctionProvider; + private final ConnectorTableFunctionHandle functionHandle; + + private ConnectorSplit currentSplit; + private final List pendingSplits = new ArrayList<>(); + private boolean noMoreSplits; + + private TableFunctionSplitProcessor processor; + private boolean processorUsedData; + private boolean processorFinishedSplit = true; + private ListenableFuture processorBlocked = NOT_BLOCKED; + + public LeafTableFunctionOperator(OperatorContext operatorContext, PlanNodeId sourceId, TableFunctionProcessorProvider tableFunctionProvider, ConnectorTableFunctionHandle functionHandle) + { + this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); + this.sourceId = requireNonNull(sourceId, "sourceId is null"); + this.tableFunctionProvider = requireNonNull(tableFunctionProvider, "tableFunctionProvider is null"); + this.functionHandle = requireNonNull(functionHandle, "functionHandle is null"); + } + + private void resetProcessor() + { + this.processor = tableFunctionProvider.getSplitProcessor(functionHandle); + this.processorUsedData = false; + this.processorFinishedSplit = false; + this.processorBlocked = NOT_BLOCKED; + } + + @Override + public OperatorContext getOperatorContext() + { + return operatorContext; + } + + @Override + public PlanNodeId getSourceId() + { + return sourceId; + } + + @Override + public boolean needsInput() + { + return false; + } + + @Override + public void addInput(Page page) + { + throw new UnsupportedOperationException(getClass().getName() + " does not take input"); + } + + @Override + public Supplier> addSplit(ScheduledSplit split) { + checkState(!noMoreSplits, "no more splits expected"); + pendingSplits.add(split.getSplit().getConnectorSplit()); + + // Todo: This is different to Trino. Do we want to return something here? + // Probably nothing as its mainly used alter the sources of the connector. + // EX, Hive/Iceberg tables have an updatable page source. So for + // Table scan operator we have this to addSplit. + /* + if (source instanceof UpdatablePageSource) { + return Optional.of((UpdatablePageSource) source); + } + */ + return Optional::empty; + } + + /* + @Override + public void addSplit(Split split) + { + checkState(!noMoreSplits, "no more splits expected"); + pendingSplits.add(split.getConnectorSplit()); + } + */ + + @Override + public void noMoreSplits() + { + noMoreSplits = true; + } + + @Override + public Page getOutput() + { + if (processorFinishedSplit) { + // start processing a new split + if (pendingSplits.isEmpty()) { + // no more splits to process at the moment + return null; + } + currentSplit = pendingSplits.remove(0); + resetProcessor(); + } + else { + // a split is being processed + requireNonNull(currentSplit, "currentSplit is null"); + } + + TableFunctionProcessorState state = processor.process(processorUsedData ? null : currentSplit); + if (state == FINISHED) { + processorFinishedSplit = true; + } + if (state instanceof Blocked) { + Blocked blocked = (Blocked) state; + processorBlocked = toListenableFuture(blocked.getFuture()); + } + if (state instanceof Processed) { + Processed processed = (Processed) state; + if (processed.isUsedInput()) { + processorUsedData = true; + } + if (processed.getResult() != null) { + return processed.getResult(); + } + } + return null; + } + + @Override + public ListenableFuture isBlocked() + { + return processorBlocked; + } + + @Override + public void finish() + { + // this method is redundant. the operator takes no input at all. noMoreSplits() should be called instead. + } + + @Override + public boolean isFinished() + { + return processorFinishedSplit && pendingSplits.isEmpty() && noMoreSplits; + } + + @Override + public void close() + throws Exception + { + // TODO + } +} \ No newline at end of file diff --git a/presto-main/src/main/java/com/facebook/presto/operator/function/RegularTableFunctionPartition.java b/presto-main/src/main/java/com/facebook/presto/operator/function/RegularTableFunctionPartition.java new file mode 100644 index 0000000000000..95fbb4960c0c9 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/function/RegularTableFunctionPartition.java @@ -0,0 +1,433 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.operator.function; + +import com.facebook.presto.operator.WorkProcessor; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.primitives.Ints; +import com.facebook.presto.common.Page; +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.block.BlockBuilder; +import com.facebook.presto.common.block.RunLengthEncodedBlock; +import com.facebook.presto.operator.PagesIndex; +import com.facebook.presto.spi.function.table.TableFunctionDataProcessor; +import com.facebook.presto.spi.function.table.TableFunctionProcessorState; +import com.facebook.presto.spi.function.table.TableFunctionProcessorState.Blocked; +import com.facebook.presto.spi.function.table.TableFunctionProcessorState.Processed; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.common.type.Type; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static com.facebook.airlift.concurrent.MoreFutures.toListenableFuture; +import static com.facebook.presto.spi.StandardErrorCode.FUNCTION_IMPLEMENTATION_ERROR; +import static com.facebook.presto.spi.function.table.TableFunctionProcessorState.Finished.FINISHED; +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static java.lang.Math.min; +import static java.lang.Math.toIntExact; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class RegularTableFunctionPartition + implements TableFunctionPartition +{ + private final PagesIndex pagesIndex; + private final int partitionStart; + private final int partitionEnd; + private final Iterator sortedPages; + + private final TableFunctionDataProcessor tableFunction; + private final int properChannelsCount; + private final int passThroughSourcesCount; + + // channels required by the table function, listed by source in order of argument declarations + private final int[][] requiredChannels; + + // for each input channel, the end position of actual data in that channel (exclusive) relative to partition. The remaining rows are "filler" rows, and should not be passed to table function or passed-through + private final int[] endOfData; + + // a builder for each pass-through column, in order of argument declarations + private final PassThroughColumnProvider[] passThroughProviders; + + // number of processed input positions from partition start. all sources have been processed up to this position, except the sources whose partitions ended earlier. + private int processedPositions; + + public RegularTableFunctionPartition( + PagesIndex pagesIndex, + int partitionStart, + int partitionEnd, + TableFunctionDataProcessor tableFunction, + int properChannelsCount, + int passThroughSourcesCount, + List> requiredChannels, + Optional> markerChannels, + List passThroughSpecifications) + + { + checkArgument(pagesIndex.getPositionCount() != 0, "PagesIndex is empty for regular table function partition"); + this.pagesIndex = pagesIndex; + this.partitionStart = partitionStart; + this.partitionEnd = partitionEnd; + this.sortedPages = pagesIndex.getSortedPages(partitionStart, partitionEnd); + this.tableFunction = requireNonNull(tableFunction, "tableFunction is null"); + this.properChannelsCount = properChannelsCount; + this.passThroughSourcesCount = passThroughSourcesCount; + this.requiredChannels = requiredChannels.stream() + .map(Ints::toArray) + .toArray(int[][]::new); + this.endOfData = findEndOfData(markerChannels, requiredChannels, passThroughSpecifications); + for (List channels : requiredChannels) { + checkState( + channels.stream() + .mapToInt(channel -> endOfData[channel]) + .distinct() + .count() <= 1, + "end-of-data position is inconsistent within a table function source"); + } + this.passThroughProviders = new PassThroughColumnProvider[passThroughSpecifications.size()]; + for (int i = 0; i < passThroughSpecifications.size(); i++) { + passThroughProviders[i] = createColumnProvider(passThroughSpecifications.get(i)); + } + } + + @Override + public WorkProcessor toOutputPages() + { + // Changed from <> -> + return WorkProcessor.create(new WorkProcessor.Process() + { + List> inputPages = prepareInputPages(); + + @Override + public WorkProcessor.ProcessState process() + { + TableFunctionProcessorState state = tableFunction.process(inputPages); + boolean functionGotNoData = inputPages == null; + if (state == FINISHED) { + return WorkProcessor.ProcessState.finished(); + } + if (state instanceof Blocked) { + Blocked blocked = (Blocked) state; + return WorkProcessor.ProcessState.blocked(toListenableFuture(blocked.getFuture())); + } + Processed processed = (Processed) state; + if (processed.isUsedInput()) { + inputPages = prepareInputPages(); + } + if (processed.getResult() != null) { + return WorkProcessor.ProcessState.ofResult(appendPassThroughColumns(processed.getResult())); + } + if (functionGotNoData) { + throw new PrestoException(FUNCTION_IMPLEMENTATION_ERROR, "When function got no input, it should either produce output or return Blocked state"); + } + return WorkProcessor.ProcessState.blocked(immediateFuture(null)); + } + }); + } + + /** + * Iterate over the partition by page and extract pages for each table function source from the input page. + * For each source, project the columns required by the table function. + * If for some source all data in the partition has been consumed, Optional.empty() is returned for that source. + * It happens when the partition of this source is shorter than the partition of some other source. + * The overall length of the table function partition is equal to the length of the longest source partition. + * When all sources are fully consumed, this method returns null. + *

+ * NOTE: There are two types of table function's source semantics: set and row. The two types of sources should be handled + * by the TableFunctionProcessor in different ways. For a source with set semantics, the whole partition can be used for computations, + * while for a source with row semantics, each row should be processed independently from all other rows. + * To enforce that behavior, we could pass to the TableFunctionProcessor only one row from a table with row semantics. + * However, for performance reasons, we handle sources with row and set semantics in the same way: the TableFunctionProcessor + * gets a page of data from each source. The TableFunctionProcessor is responsible for using the provided data accordingly + * to the declared source semantics (set or rows). + * + * @return A List containing: + * - Optional Page for every source that is not fully consumed + * - Optional.empty() for every source that is fully consumed + * or null if all sources are fully consumed. + */ + private List> prepareInputPages() + { + if (!sortedPages.hasNext()) { + return null; + } + + Page inputPage = sortedPages.next(); + ImmutableList.Builder> sourcePages = ImmutableList.builder(); + + for (int[] channelsForSource : requiredChannels) { + if (channelsForSource.length == 0) { + sourcePages.add(Optional.of(new Page(inputPage.getPositionCount()))); + } + else { + int endOfDataForSource = endOfData[channelsForSource[0]]; // end-of-data position is validated to be consistent for all channels from source + if (endOfDataForSource <= processedPositions) { + // all data for this source was already processed + sourcePages.add(Optional.empty()); + } + else { + Block[] sourceBlocks = new Block[channelsForSource.length]; + if (endOfDataForSource < processedPositions + inputPage.getPositionCount()) { + // data for this source ends within the current page + for (int i = 0; i < channelsForSource.length; i++) { + int inputChannel = channelsForSource[i]; + sourceBlocks[i] = inputPage.getBlock(inputChannel).getRegion(0, endOfDataForSource - processedPositions); + } + } + else { + // data for this source does not end within the current page + for (int i = 0; i < channelsForSource.length; i++) { + int inputChannel = channelsForSource[i]; + sourceBlocks[i] = inputPage.getBlock(inputChannel); + } + } + sourcePages.add(Optional.of(new Page(sourceBlocks))); + } + } + } + + processedPositions += inputPage.getPositionCount(); + + return sourcePages.build(); + } + + /** + * There are two types of table function's source semantics: set and row. + *

+ * For a source with row semantics, the table function result depends on the whole partition, + * so it is not always possible to associate an output row with a specific input row. + * The TableFunctionProcessor can return null as the pass-through index to indicate that + * the output row is not associated with any row from the given source. + *

+ * For a source with row semantics, the output is determined on a row-by-row basis, so every + * output row is associated with a specific input row. In such case, the pass-through index + * should never be null. + *

+ * In our implementation, we handle sources with row and set semantics in the same way. + * For performance reasons, we do not validate the null pass-through indexes. + * The TableFunctionProcessor is responsible for using the pass-through capability + * accordingly to the declared source semantics (set or rows). + */ + private Page appendPassThroughColumns(Page page) + { + if (page.getChannelCount() != properChannelsCount + passThroughSourcesCount) { + throw new PrestoException( + FUNCTION_IMPLEMENTATION_ERROR, + format( + "Table function returned a page containing %s channels. Expected channel number: %s (%s proper columns, %s pass-through index columns)", + page.getChannelCount(), + properChannelsCount + passThroughSourcesCount, + properChannelsCount, + passThroughSourcesCount)); + } + // TODO is it possible to verify types of columns returned by TF? + + Block[] resultBlocks = new Block[properChannelsCount + passThroughProviders.length]; + + // proper outputs first + for (int channel = 0; channel < properChannelsCount; channel++) { + resultBlocks[channel] = page.getBlock(channel); + } + + // pass-through columns next + int channel = properChannelsCount; + for (PassThroughColumnProvider provider : passThroughProviders) { + resultBlocks[channel] = provider.getPassThroughColumn(page); + channel++; + } + + // pass the position count so that the Page can be successfully created in the case when there are no output channels (resultBlocks is empty) + return new Page(page.getPositionCount(), resultBlocks); + } + + private int[] findEndOfData(Optional> markerChannels, List> requiredChannels, List passThroughSpecifications) + { + Set referencedChannels = ImmutableSet.builder() + .addAll(requiredChannels.stream() + .flatMap(Collection::stream) + .collect(toImmutableList())) + .addAll(passThroughSpecifications.stream() + .map(PassThroughColumnSpecification::getInputChannel) + .collect(toImmutableList())) + .build(); + + if (referencedChannels.isEmpty()) { + // no required or pass-through channels + return null; + } + + // Changed to throw PrestoException. + int maxInputChannel = referencedChannels.stream() + .mapToInt(Integer::intValue) + .max() + .orElseThrow(() -> new PrestoException(FUNCTION_IMPLEMENTATION_ERROR, "No maximum value found")); + + int[] result = new int[maxInputChannel + 1]; + Arrays.fill(result, -1); + + // if table function had one source, adding a marker channel was not necessary. + // end-of-data position is equal to partition end for each input channel + if (!markerChannels.isPresent() || markerChannels.get().isEmpty()) { + referencedChannels.stream() + .forEach(channel -> result[channel] = partitionEnd - partitionStart); + return result; + } + + // Modified orElseThrow to throw PrestoException. + // if table function had more than one source, the markers map shall be present, and it shall contain mapping for each input channel + ImmutableMap.Builder endOfDataPerMarkerBuilder = ImmutableMap.builder(); + for (int markerChannel : ImmutableSet.copyOf(markerChannels.orElseThrow( + () -> new PrestoException(FUNCTION_IMPLEMENTATION_ERROR, "Channel not found")).values())) { + endOfDataPerMarkerBuilder.put(markerChannel, findFirstNullPosition(markerChannel)); + } + Map endOfDataPerMarker = endOfDataPerMarkerBuilder.buildOrThrow(); + referencedChannels.stream() + .forEach(channel -> result[channel] = endOfDataPerMarker.get(markerChannels.orElseThrow( + () -> new PrestoException(FUNCTION_IMPLEMENTATION_ERROR, "Channel not found")).get(channel)) - partitionStart); + return result; + } + + private int findFirstNullPosition(int markerChannel) + { + if (pagesIndex.isNull(markerChannel, partitionStart)) { + return partitionStart; + } + if (!pagesIndex.isNull(markerChannel, partitionEnd - 1)) { + return partitionEnd; + } + + int start = partitionStart; + int end = partitionEnd; + // value at start is not null, value at end is null + while (end - start > 1) { + int mid = start + end >>> 1; + if (pagesIndex.isNull(markerChannel, mid)) { + end = mid; + } + else { + start = mid; + } + } + return end; + } + + // Modified to remove record. Getters added. + public class PassThroughColumnSpecification { + private final boolean isPartitioningColumn; + private final int inputChannel; + private final int indexChannel; + + public PassThroughColumnSpecification(boolean isPartitioningColumn, int inputChannel, int indexChannel) { + this.isPartitioningColumn = isPartitioningColumn; + this.inputChannel = inputChannel; + this.indexChannel = indexChannel; + } + + public boolean isPartitioningColumn() { + return isPartitioningColumn; + } + + public int getInputChannel() { + return inputChannel; + } + + public int getIndexChannel() { + return indexChannel; + } + } + + private PassThroughColumnProvider createColumnProvider(PassThroughColumnSpecification specification) { + if (specification.isPartitioningColumn()) { + return new PartitioningColumnProvider(pagesIndex.getSingleValueBlock(specification.getInputChannel(), partitionStart)); + } + return new NonPartitioningColumnProvider(specification.getInputChannel(), specification.getIndexChannel()); + } + + private interface PassThroughColumnProvider { + Block getPassThroughColumn(Page page); + } + + private class PartitioningColumnProvider implements PassThroughColumnProvider { + private final Block partitioningValue; + + public PartitioningColumnProvider(Block partitioningValue) { + this.partitioningValue = Objects.requireNonNull(partitioningValue, "partitioningValue is null"); + } + + @Override + public Block getPassThroughColumn(Page page) { + return new RunLengthEncodedBlock(partitioningValue, page.getPositionCount()); + } + } + + private final class NonPartitioningColumnProvider + implements PassThroughColumnProvider + { + private final int inputChannel; + private final Type type; + private final int indexChannel; + + public NonPartitioningColumnProvider(int inputChannel, int indexChannel) + { + this.inputChannel = inputChannel; + this.type = pagesIndex.getType(inputChannel); + this.indexChannel = indexChannel; + } + + @Override + public Block getPassThroughColumn(Page page) + { + Block indexes = page.getBlock(indexChannel); + BlockBuilder builder = type.createBlockBuilder(null, page.getPositionCount()); + for (int position = 0; position < page.getPositionCount(); position++) { + if (indexes.isNull(position)) { + builder.appendNull(); + } + else { + // table function returns index from partition start + long index = BIGINT.getLong(indexes, position); + // validate index + if (index < 0 || index >= endOfData[inputChannel] || index >= processedPositions) { + int end = min(endOfData[inputChannel], processedPositions) - 1; + if (end >= 0) { + throw new PrestoException(FUNCTION_IMPLEMENTATION_ERROR, format("Index of a pass-through row: %s out of processed portion of partition [0, %s]", index, end)); + + } + else { + throw new PrestoException(FUNCTION_IMPLEMENTATION_ERROR, "Index of a pass-through row must be null when no input data from the partition was processed. Actual: " + index); + } + } + // index in PagesIndex + long absoluteIndex = partitionStart + index; + pagesIndex.appendTo(inputChannel, toIntExact(absoluteIndex), builder); + } + } + + return builder.build(); + } + } +} \ No newline at end of file diff --git a/presto-main/src/main/java/com/facebook/presto/operator/function/TableFunctionOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/function/TableFunctionOperator.java new file mode 100644 index 0000000000000..1f9c64256d01e --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/function/TableFunctionOperator.java @@ -0,0 +1,622 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.operator.function; + +import com.facebook.presto.operator.Operator; +import com.facebook.presto.operator.OperatorContext; +import com.facebook.presto.operator.OperatorFactory; +import com.facebook.presto.operator.PagesIndex; +import com.facebook.presto.operator.PagesHashStrategy; +import com.facebook.presto.operator.DriverContext; +import com.facebook.presto.operator.WorkProcessor; +import com.facebook.presto.operator.PageBuffer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.ListenableFuture; +import com.facebook.presto.memory.context.LocalMemoryContext; +import com.facebook.presto.operator.function.RegularTableFunctionPartition.PassThroughColumnSpecification; +import com.facebook.presto.common.Page; +import com.facebook.presto.common.block.SortOrder; +import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle; +import com.facebook.presto.spi.function.table.TableFunctionProcessorProvider; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.spi.plan.PlanNodeId; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkPositionIndex; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.concat; +import static com.facebook.presto.common.block.SortOrder.ASC_NULLS_LAST; +import static java.util.Collections.nCopies; +import static java.util.Objects.requireNonNull; + +public class TableFunctionOperator + implements Operator +{ + public static class TableFunctionOperatorFactory + implements OperatorFactory + { + private final int operatorId; + private final PlanNodeId planNodeId; + + // a provider of table function processor to be called once per partition + private final TableFunctionProcessorProvider tableFunctionProvider; + + // all information necessary to execute the table function collected during analysis + private final ConnectorTableFunctionHandle functionHandle; + + // number of proper columns produced by the table function + private final int properChannelsCount; + + // number of input tables declared as pass-through + private final int passThroughSourcesCount; + + // columns required by the table function, in order of input tables + private final List> requiredChannels; + + // map from input channel to marker channel + // for each input table, there is a channel that marks which rows contain original data, and which are "filler" rows. + // the "filler" rows are part of the algorithm, and they should not be processed by the table function, or passed-through. + // In this map, every original column from the input table is associated with the appropriate marker. + private final Optional> markerChannels; + + // necessary information to build a pass-through column, for all pass-through columns, ordered as expected on the output + // it includes columns from sources declared as pass-through as well as partitioning columns from other sources + private final List passThroughSpecifications; + + // specifies whether the function should be pruned or executed when the input is empty + // pruneWhenEmpty is false if and only if all original input tables are KEEP WHEN EMPTY + private final boolean pruneWhenEmpty; + + // partitioning channels from all sources + private final List partitionChannels; + + // subset of partition channels that are already grouped + private final List prePartitionedChannels; + + // channels necessary to sort all sources: + // - for a single source, these are the source's sort channels + // - for multiple sources, this is a single synthesized row number channel + private final List sortChannels; + private final List sortOrders; + + // number of leading sort channels that are already sorted + private final int preSortedPrefix; + + private final List sourceTypes; + private final int expectedPositions; + private final PagesIndex.Factory pagesIndexFactory; + + private boolean closed; + + public TableFunctionOperatorFactory( + int operatorId, + PlanNodeId planNodeId, + TableFunctionProcessorProvider tableFunctionProvider, + ConnectorTableFunctionHandle functionHandle, + int properChannelsCount, + int passThroughSourcesCount, + List> requiredChannels, + Optional> markerChannels, + List passThroughSpecifications, + boolean pruneWhenEmpty, + List partitionChannels, + List prePartitionedChannels, + List sortChannels, + List sortOrders, + int preSortedPrefix, + List sourceTypes, + int expectedPositions, + PagesIndex.Factory pagesIndexFactory) + { + requireNonNull(planNodeId, "planNodeId is null"); + requireNonNull(tableFunctionProvider, "tableFunctionProvider is null"); + requireNonNull(functionHandle, "functionHandle is null"); + requireNonNull(requiredChannels, "requiredChannels is null"); + requireNonNull(markerChannels, "markerChannels is null"); + requireNonNull(passThroughSpecifications, "passThroughSpecifications is null"); + requireNonNull(partitionChannels, "partitionChannels is null"); + requireNonNull(prePartitionedChannels, "prePartitionedChannels is null"); + checkArgument(partitionChannels.containsAll(prePartitionedChannels), "prePartitionedChannels must be a subset of partitionChannels"); + requireNonNull(sortChannels, "sortChannels is null"); + requireNonNull(sortOrders, "sortOrders is null"); + checkArgument(sortChannels.size() == sortOrders.size(), "The number of sort channels must be equal to the number of sort orders"); + checkArgument(preSortedPrefix <= sortChannels.size(), "The number of pre-sorted channels must be lower or equal to the number of sort channels"); + checkArgument(preSortedPrefix == 0 || ImmutableSet.copyOf(prePartitionedChannels).equals(ImmutableSet.copyOf(partitionChannels)), "preSortedPrefix can only be greater than zero if all partition channels are pre-grouped"); + requireNonNull(sourceTypes, "sourceTypes is null"); + requireNonNull(pagesIndexFactory, "pagesIndexFactory is null"); + + this.operatorId = operatorId; + this.planNodeId = planNodeId; + this.tableFunctionProvider = tableFunctionProvider; + this.functionHandle = functionHandle; + this.properChannelsCount = properChannelsCount; + this.passThroughSourcesCount = passThroughSourcesCount; + this.requiredChannels = requiredChannels.stream() + .map(ImmutableList::copyOf) + .collect(toImmutableList()); + this.markerChannels = markerChannels.map(ImmutableMap::copyOf); + this.passThroughSpecifications = ImmutableList.copyOf(passThroughSpecifications); + this.pruneWhenEmpty = pruneWhenEmpty; + this.partitionChannels = ImmutableList.copyOf(partitionChannels); + this.prePartitionedChannels = ImmutableList.copyOf(prePartitionedChannels); + this.sortChannels = ImmutableList.copyOf(sortChannels); + this.sortOrders = ImmutableList.copyOf(sortOrders); + this.preSortedPrefix = preSortedPrefix; + this.sourceTypes = ImmutableList.copyOf(sourceTypes); + this.expectedPositions = expectedPositions; + this.pagesIndexFactory = pagesIndexFactory; + } + + @Override + public Operator createOperator(DriverContext driverContext) + { + checkState(!closed, "Factory is already closed"); + + OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, TableFunctionOperator.class.getSimpleName()); + return new TableFunctionOperator( + operatorContext, + tableFunctionProvider, + functionHandle, + properChannelsCount, + passThroughSourcesCount, + requiredChannels, + markerChannels, + passThroughSpecifications, + pruneWhenEmpty, + partitionChannels, + prePartitionedChannels, + sortChannels, + sortOrders, + preSortedPrefix, + sourceTypes, + expectedPositions, + pagesIndexFactory); + } + + @Override + public void noMoreOperators() + { + closed = true; + } + + @Override + public OperatorFactory duplicate() + { + return new TableFunctionOperatorFactory( + operatorId, + planNodeId, + tableFunctionProvider, + functionHandle, + properChannelsCount, + passThroughSourcesCount, + requiredChannels, + markerChannels, + passThroughSpecifications, + pruneWhenEmpty, + partitionChannels, + prePartitionedChannels, + sortChannels, + sortOrders, + preSortedPrefix, + sourceTypes, + expectedPositions, + pagesIndexFactory); + } + } + + private final OperatorContext operatorContext; + + // TODO: Set PageBuffer max to 100 based on other usages. Is this correct? + private final PageBuffer pageBuffer = new PageBuffer(); + private final WorkProcessor outputPages; + private final boolean processEmptyInput; + + public TableFunctionOperator( + OperatorContext operatorContext, + TableFunctionProcessorProvider tableFunctionProvider, + ConnectorTableFunctionHandle functionHandle, + int properChannelsCount, + int passThroughSourcesCount, + List> requiredChannels, + Optional> markerChannels, + List passThroughSpecifications, + boolean pruneWhenEmpty, + List partitionChannels, + List prePartitionedChannels, + List sortChannels, + List sortOrders, + int preSortedPrefix, + List sourceTypes, + int expectedPositions, + PagesIndex.Factory pagesIndexFactory) + { + requireNonNull(operatorContext, "operatorContext is null"); + requireNonNull(tableFunctionProvider, "tableFunctionProvider is null"); + requireNonNull(functionHandle, "functionHandle is null"); + requireNonNull(requiredChannels, "requiredChannels is null"); + requireNonNull(markerChannels, "markerChannels is null"); + requireNonNull(passThroughSpecifications, "passThroughSpecifications is null"); + requireNonNull(partitionChannels, "partitionChannels is null"); + requireNonNull(prePartitionedChannels, "prePartitionedChannels is null"); + checkArgument(partitionChannels.containsAll(prePartitionedChannels), "prePartitionedChannels must be a subset of partitionChannels"); + requireNonNull(sortChannels, "sortChannels is null"); + requireNonNull(sortOrders, "sortOrders is null"); + checkArgument(sortChannels.size() == sortOrders.size(), "The number of sort channels must be equal to the number of sort orders"); + checkArgument(preSortedPrefix <= sortChannels.size(), "The number of pre-sorted channels must be lower or equal to the number of sort channels"); + checkArgument(preSortedPrefix == 0 || ImmutableSet.copyOf(prePartitionedChannels).equals(ImmutableSet.copyOf(partitionChannels)), "preSortedPrefix can only be greater than zero if all partition channels are pre-grouped"); + requireNonNull(sourceTypes, "sourceTypes is null"); + requireNonNull(pagesIndexFactory, "pagesIndexFactory is null"); + + this.operatorContext = operatorContext; + + this.processEmptyInput = !pruneWhenEmpty; + + PagesIndex pagesIndex = pagesIndexFactory.newPagesIndex(sourceTypes, expectedPositions); + HashStrategies hashStrategies = new HashStrategies(pagesIndex, partitionChannels, prePartitionedChannels, sortChannels, sortOrders, preSortedPrefix); + + this.outputPages = pageBuffer.pages() + .transform(new PartitionAndSort(pagesIndex, hashStrategies, processEmptyInput)) + .flatMap(groupPagesIndex -> pagesIndexToTableFunctionPartitions( + groupPagesIndex, + hashStrategies, + tableFunctionProvider, + functionHandle, + properChannelsCount, + passThroughSourcesCount, + requiredChannels, + markerChannels, + passThroughSpecifications, + processEmptyInput)) + .flatMap(TableFunctionPartition::toOutputPages); + } + + @Override + public OperatorContext getOperatorContext() + { + return operatorContext; + } + + @Override + public void finish() + { + pageBuffer.finish(); + } + + @Override + public boolean isFinished() + { + return outputPages.isFinished(); + } + + @Override + public ListenableFuture isBlocked() + { + if (outputPages.isBlocked()) { + return outputPages.getBlockedFuture(); + } + + return NOT_BLOCKED; + } + + @Override + public boolean needsInput() + { + return pageBuffer.isEmpty() && !pageBuffer.isFinished(); + } + + @Override + public void addInput(Page page) + { + pageBuffer.add(page); + } + + @Override + public Page getOutput() + { + if (!outputPages.process()) { + return null; + } + + if (outputPages.isFinished()) { + return null; + } + + return outputPages.getResult(); + } + + private static class HashStrategies + { + final PagesHashStrategy prePartitionedStrategy; + final PagesHashStrategy remainingPartitionStrategy; + final PagesHashStrategy preSortedStrategy; + final List remainingPartitionAndSortChannels; + final List remainingSortOrders; + final int[] prePartitionedChannelsArray; + + public HashStrategies( + PagesIndex pagesIndex, + List partitionChannels, + List prePartitionedChannels, + List sortChannels, + List sortOrders, + int preSortedPrefix) + { + this.prePartitionedStrategy = pagesIndex.createPagesHashStrategy(prePartitionedChannels, OptionalInt.empty()); + + List remainingPartitionChannels = partitionChannels.stream() + .filter(channel -> !prePartitionedChannels.contains(channel)) + .collect(toImmutableList()); + this.remainingPartitionStrategy = pagesIndex.createPagesHashStrategy(remainingPartitionChannels, OptionalInt.empty()); + + List preSortedChannels = sortChannels.stream() + .limit(preSortedPrefix) + .collect(toImmutableList()); + this.preSortedStrategy = pagesIndex.createPagesHashStrategy(preSortedChannels, OptionalInt.empty()); + + if (preSortedPrefix > 0) { + // preSortedPrefix > 0 implies that all partition channels are already pre-partitioned (enforced by check in the constructor), so we only need to do the remaining sort + this.remainingPartitionAndSortChannels = ImmutableList.copyOf(Iterables.skip(sortChannels, preSortedPrefix)); + this.remainingSortOrders = ImmutableList.copyOf(Iterables.skip(sortOrders, preSortedPrefix)); + } + else { + // we need to sort by the remaining partition channels so that the input is fully partitioned, + // and then need to we sort by all the sort channels so that the input is fully sorted + this.remainingPartitionAndSortChannels = ImmutableList.copyOf(concat(remainingPartitionChannels, sortChannels)); + this.remainingSortOrders = ImmutableList.copyOf(concat(nCopies(remainingPartitionChannels.size(), ASC_NULLS_LAST), sortOrders)); + } + + this.prePartitionedChannelsArray = Ints.toArray(prePartitionedChannels); + } + } + + private class PartitionAndSort + implements WorkProcessor.Transformation + { + private final PagesIndex pagesIndex; + private final HashStrategies hashStrategies; + private final LocalMemoryContext memoryContext; + + private boolean resetPagesIndex; + private int inputPosition; + private boolean processEmptyInput; + + public PartitionAndSort(PagesIndex pagesIndex, HashStrategies hashStrategies, boolean processEmptyInput) + { + this.pagesIndex = pagesIndex; + this.hashStrategies = hashStrategies; + this.memoryContext = operatorContext.aggregateUserMemoryContext().newLocalMemoryContext(PartitionAndSort.class.getSimpleName()); + this.processEmptyInput = processEmptyInput; + } + + void updateMemoryUsage() + { + memoryContext.setBytes(pagesIndex.getEstimatedSize().toBytes()); + } + + @Override + public WorkProcessor.TransformationState process(Optional input) + { + Page resolvedInput = input.orElse(null); + if (resetPagesIndex) { + pagesIndex.clear(); + updateMemoryUsage(); + resetPagesIndex = false; + } + + if ( resolvedInput == null && pagesIndex.getPositionCount() == 0) { + if (processEmptyInput) { + // it can only happen at the first call to process(), which implies that there is no input. Empty PagesIndex can be passed on only once. + processEmptyInput = false; + return WorkProcessor.TransformationState.ofResult(pagesIndex, false); + } + else { + memoryContext.close(); + return WorkProcessor.TransformationState.finished(); + } + } + + // there is input, so we are not interested in processing empty input + processEmptyInput = false; + + if (resolvedInput != null) { + // append rows from input which belong to the current group wrt pre-partitioned columns + // it might be one or more partitions + inputPosition = appendCurrentGroup(pagesIndex, hashStrategies, resolvedInput, inputPosition); + updateMemoryUsage(); + + if (inputPosition >= resolvedInput.getPositionCount()) { + inputPosition = 0; + return WorkProcessor.TransformationState.needsMoreData(); + } + } + + // we have unused input or the input is finished. we have buffered a full group + // the group contains one or more partitions, as it was determined by the pre-partitioned columns + // sorting serves two purposes: + // - sort by the remaining partition channels so that the input is fully partitioned, + // - sort by all the sort channels so that the input is fully sorted + sortCurrentGroup(pagesIndex, hashStrategies); + resetPagesIndex = true; + return WorkProcessor.TransformationState.ofResult(pagesIndex, false); + } + } + + private static int appendCurrentGroup(PagesIndex pagesIndex, HashStrategies hashStrategies, Page page, int startPosition) + { + checkArgument(page.getPositionCount() > startPosition); + + PagesHashStrategy prePartitionedStrategy = hashStrategies.prePartitionedStrategy; + Page prePartitionedPage = page.getColumns(hashStrategies.prePartitionedChannelsArray); + + if (pagesIndex.getPositionCount() == 0 || pagesIndex.positionEqualsRow(prePartitionedStrategy, 0, startPosition, prePartitionedPage)) { + // we are within the current group. find the position where the pre-grouped columns change + int groupEnd = findGroupEnd(prePartitionedPage, prePartitionedStrategy, startPosition); + + // add the section of the page that contains values for the current group + pagesIndex.addPage(page.getRegion(startPosition, groupEnd - startPosition)); + + if (page.getPositionCount() - groupEnd > 0) { + // the remaining prt of the page contains the next group + return groupEnd; + } + // page fully consumed: it contains the current group only + return page.getPositionCount(); + } + + // we had previous results buffered, but the remaining page starts with new group values + return startPosition; + } + + private static void sortCurrentGroup(PagesIndex pagesIndex, HashStrategies hashStrategies) + { + PagesHashStrategy preSortedStrategy = hashStrategies.preSortedStrategy; + List remainingPartitionAndSortChannels = hashStrategies.remainingPartitionAndSortChannels; + List remainingSortOrders = hashStrategies.remainingSortOrders; + + if (pagesIndex.getPositionCount() > 1 && !remainingPartitionAndSortChannels.isEmpty()) { + int startPosition = 0; + while (startPosition < pagesIndex.getPositionCount()) { + int endPosition = findGroupEnd(pagesIndex, preSortedStrategy, startPosition); + pagesIndex.sort(remainingPartitionAndSortChannels, remainingSortOrders, startPosition, endPosition); + startPosition = endPosition; + } + } + } + + // TODO: Changed strategies to match latest trino + // Assumes input grouped on relevant pagesHashStrategy columns + private static int findGroupEnd(Page page, PagesHashStrategy pagesHashStrategy, int startPosition) + { + checkArgument(page.getPositionCount() > 0, "Must have at least one position"); + checkPositionIndex(startPosition, page.getPositionCount(), "startPosition out of bounds"); + + return findEndPosition(startPosition, page.getPositionCount(), (firstPosition, secondPosition) -> pagesHashStrategy.rowEqualsRow(firstPosition, page, secondPosition, page)); + } + + // Assumes input grouped on relevant pagesHashStrategy columns + private static int findGroupEnd(PagesIndex pagesIndex, PagesHashStrategy pagesHashStrategy, int startPosition) + { + checkArgument(pagesIndex.getPositionCount() > 0, "Must have at least one position"); + checkPositionIndex(startPosition, pagesIndex.getPositionCount(), "startPosition out of bounds"); + + return findEndPosition(startPosition, pagesIndex.getPositionCount(), (firstPosition, secondPosition) -> pagesIndex.positionEqualsPosition(pagesHashStrategy, firstPosition, secondPosition)); + } + + /** + * @param startPosition - inclusive + * @param endPosition - exclusive + * @param comparator - returns true if positions given as parameters are equal + * @return the end of the group position exclusive (the position the very next group starts) + */ + @VisibleForTesting + static int findEndPosition(int startPosition, int endPosition, PositionComparator comparator) + { + checkArgument(startPosition >= 0, "startPosition must be greater or equal than zero: %s", startPosition); + checkArgument(startPosition < endPosition, "startPosition (%s) must be less than endPosition (%s)", startPosition, endPosition); + + int left = startPosition; + int right = endPosition; + + while (right - left > 1) { + int middle = (left + right) >>> 1; + + if (comparator.test(startPosition, middle)) { + left = middle; + } + else { + right = middle; + } + } + + return right; + } + + private interface PositionComparator + { + boolean test(int first, int second); + } + + private WorkProcessor pagesIndexToTableFunctionPartitions( + PagesIndex pagesIndex, + HashStrategies hashStrategies, + TableFunctionProcessorProvider tableFunctionProvider, + ConnectorTableFunctionHandle functionHandle, + int properChannelsCount, + int passThroughSourcesCount, + List> requiredChannels, + Optional> markerChannels, + List passThroughSpecifications, + boolean processEmptyInput) + { + // pagesIndex contains the full grouped and sorted data for one or more partitions + + PagesHashStrategy remainingPartitionStrategy = hashStrategies.remainingPartitionStrategy; + + return WorkProcessor.create(new WorkProcessor.Process() + { + private int partitionStart; + private boolean processEmpty = processEmptyInput; + + @Override + public WorkProcessor.ProcessState process() + { + if (partitionStart == pagesIndex.getPositionCount()) { + if (processEmpty && pagesIndex.getPositionCount() == 0) { + // empty PagesIndex can only be passed once as the result of PartitionAndSort. Neither this nor any future instance of Process will ever get an empty PagesIndex again. + processEmpty = false; + return WorkProcessor.ProcessState.ofResult(new EmptyTableFunctionPartition( + tableFunctionProvider.getDataProcessor(functionHandle), + properChannelsCount, + passThroughSourcesCount, + passThroughSpecifications.stream() + .map(PassThroughColumnSpecification::getInputChannel) + .map(pagesIndex::getType) + .collect(toImmutableList()))); + } + return WorkProcessor.ProcessState.finished(); + } + + // there is input, so we are not interested in processing empty input + processEmpty = false; + + int partitionEnd = findGroupEnd(pagesIndex, remainingPartitionStrategy, partitionStart); + + RegularTableFunctionPartition partition = new RegularTableFunctionPartition( + pagesIndex, + partitionStart, + partitionEnd, + tableFunctionProvider.getDataProcessor(functionHandle), + properChannelsCount, + passThroughSourcesCount, + requiredChannels, + markerChannels, + passThroughSpecifications); + + partitionStart = partitionEnd; + return WorkProcessor.ProcessState.ofResult(partition); + } + }); + } +} \ No newline at end of file diff --git a/presto-main/src/main/java/com/facebook/presto/operator/function/TableFunctionPartition.java b/presto-main/src/main/java/com/facebook/presto/operator/function/TableFunctionPartition.java new file mode 100644 index 0000000000000..ca57bd7fddc67 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/function/TableFunctionPartition.java @@ -0,0 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.operator.function; + +import com.facebook.presto.common.Page; +import com.facebook.presto.operator.WorkProcessor; + +public interface TableFunctionPartition +{ + WorkProcessor toOutputPages(); +} \ No newline at end of file diff --git a/presto-main/src/main/java/com/facebook/presto/split/SplitManager.java b/presto-main/src/main/java/com/facebook/presto/split/SplitManager.java index adb189379ed36..a126ed7c51547 100644 --- a/presto-main/src/main/java/com/facebook/presto/split/SplitManager.java +++ b/presto-main/src/main/java/com/facebook/presto/split/SplitManager.java @@ -19,6 +19,7 @@ import com.facebook.presto.metadata.Metadata; import com.facebook.presto.metadata.MetadataManager; import com.facebook.presto.metadata.TableLayoutResult; +import com.facebook.presto.metadata.TableFunctionHandle; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplitSource; @@ -99,6 +100,21 @@ public SplitSource getSplits(Session session, TableHandle table, SplitScheduling return splitSource; } + // TODO: We do not have a CatalogHandle/getSchemaFunctionName. + // Required to get splits that are connector, catalog, function, schema, and transaction aware. + public SplitSource getSplits(Session session, TableFunctionHandle function) + { + CatalogHandle catalogHandle = function.getCatalogHandle(); + ConnectorSplitManager splitManager = splitManagerProvider.getService(catalogHandle); + + ConnectorSplitSource source = splitManager.getSplits( + function.getTransactionHandle(), + session.toConnectorSession(catalogHandle), + function.getFunctionHandle()); + + return new ConnectorAwareSplitSource(catalogHandle, source); + } + private ConnectorSplitManager getConnectorSplitManager(ConnectorId connectorId) { ConnectorSplitManager result = splitManagers.get(connectorId); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index 348e031539142..d79b947812445 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -270,6 +270,24 @@ public PlanNode visitValues(ValuesNode node, RewriteContext return context.defaultRewrite(node, context.get()); } + // Brought over from Trino PlanFragmenter + // TODO: Missing the node implementations. + @Override + public PlanNode visitTableFunction(TableFunctionNode node, RewriteContext context) + { + throw new IllegalStateException(format("Unexpected node: TableFunctionNode (%s)", node.getName())); + } + + @Override + public PlanNode visitTableFunctionProcessor(TableFunctionProcessorNode node, RewriteContext context) + { + if (node.getSource().isEmpty()) { + // context is mutable. The leaf node should set the PartitioningHandle. + context.get().addSourceDistribution(node.getId(), SOURCE_DISTRIBUTION, metadata, session); + } + return context.defaultRewrite(node, context.get()); + } + @Override public PlanNode visitExchange(ExchangeNode exchange, RewriteContext context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index b6034d0c95b93..9248b0e3264ed 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -13,6 +13,20 @@ */ package com.facebook.presto.sql.planner; +import com.facebook.presto.operator.function.LeafTableFunctionOperator.LeafTableFunctionOperatorFactory; +import com.facebook.presto.operator.function.RegularTableFunctionPartition.PassThroughColumnSpecification; +import com.facebook.presto.operator.function.TableFunctionOperator.TableFunctionOperatorFactory; +import com.facebook.presto.spi.function.table.TableFunctionProcessorProvider; + +// TODO: Planner differences +import io.trino.sql.planner.plan.DataOrganizationSpecification; +import io.trino.sql.planner.plan.TableFunctionNode.PassThroughColumn; +import io.trino.sql.planner.plan.TableFunctionNode.PassThroughSpecification; + +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static java.lang.Math.toIntExact; +import static java.lang.String.format; + import com.facebook.airlift.json.JsonCodec; import com.facebook.presto.Session; import com.facebook.presto.SystemSessionProperties; @@ -1213,6 +1227,111 @@ public PhysicalOperation visitWindow(WindowNode node, LocalExecutionPlanContext return new PhysicalOperation(operatorFactory, outputMappings.build(), context, source); } + // TODO: We require a TableFunctionNode implementation to check. + @Override + public PhysicalOperation visitTableFunction(TableFunctionNode node, LocalExecutionPlanContext context) + { + throw new IllegalStateException(format("Unexpected node: TableFunctionNode (%s)", node.getName())); + } + + // TODO: We need TableFunctionProcessorNode implementation. Missing Pass through specification from this as well. + // Michael: This is where we create the physical operation for the Table Function Operators out of the + // operator classes. + @Override + public PhysicalOperation visitTableFunctionProcessor(TableFunctionProcessorNode node, LocalExecutionPlanContext context) + { + // Michael: Get the processors Split/Data Processors + TableFunctionProcessorProvider processorProvider = plannerContext.getFunctionManager().getTableFunctionProcessorProvider(node.getHandle()); + + // Michael: No input arguments to this table function. Using a LeafTableFunctionOperator. No + // operations on anything coming in so just return. + if (node.getSource().isEmpty()) { + OperatorFactory operatorFactory = new LeafTableFunctionOperatorFactory( + context.getNextOperatorId(), + node.getId(), + node.getHandle().catalogHandle(), + processorProvider, + node.getHandle().functionHandle()); + return new PhysicalOperation(operatorFactory, makeLayout(node)); + } + + PhysicalOperation source = node.getSource().orElseThrow().accept(this, context); + + int properChannelsCount = node.getProperOutputs().size(); + + long passThroughSourcesCount = node.getPassThroughSpecifications().stream() + .filter(PassThroughSpecification::declaredAsPassThrough) + .count(); + + List> requiredChannels = node.getRequiredSymbols().stream() + .map(list -> getChannelsForSymbols(list, source.getLayout())) + .collect(toImmutableList()); + + Optional> markerChannels = node.getMarkerSymbols() + .map(map -> map.entrySet().stream() + .collect(toImmutableMap(entry -> source.getLayout().get(entry.getKey()), entry -> source.getLayout().get(entry.getValue())))); + + int channel = properChannelsCount; + ImmutableList.Builder passThroughColumnSpecifications = ImmutableList.builder(); + for (PassThroughSpecification specification : node.getPassThroughSpecifications()) { + // the table function produces one index channel for each source declared as pass-through. They are laid out after the proper channels. + int indexChannel = specification.declaredAsPassThrough() ? channel++ : -1; + for (PassThroughColumn column : specification.columns()) { + passThroughColumnSpecifications.add(new PassThroughColumnSpecification(column.isPartitioningColumn(), source.getLayout().get(column.symbol()), indexChannel)); + } + } + + List partitionChannels = node.getSpecification() + .map(DataOrganizationSpecification::partitionBy) + .map(list -> getChannelsForSymbols(list, source.getLayout())) + .orElse(ImmutableList.of()); + + List sortChannels = ImmutableList.of(); + List sortOrders = ImmutableList.of(); + if (node.getSpecification().flatMap(DataOrganizationSpecification::orderingScheme).isPresent()) { + OrderingScheme orderingScheme = node.getSpecification().flatMap(DataOrganizationSpecification::orderingScheme).orElseThrow(); + sortChannels = getChannelsForSymbols(orderingScheme.orderBy(), source.getLayout()); + sortOrders = orderingScheme.orderingList(); + } + + OperatorFactory operator = new TableFunctionOperatorFactory( + context.getNextOperatorId(), + node.getId(), + processorProvider, + node.getHandle().catalogHandle(), + node.getHandle().functionHandle(), + properChannelsCount, + toIntExact(passThroughSourcesCount), + requiredChannels, + markerChannels, + passThroughColumnSpecifications.build(), + node.isPruneWhenEmpty(), + partitionChannels, + getChannelsForSymbols(ImmutableList.copyOf(node.getPrePartitioned()), source.getLayout()), + sortChannels, + sortOrders, + node.getPreSorted(), + source.getTypes(), + 10_000, + pagesIndexFactory); + + ImmutableMap.Builder outputMappings = ImmutableMap.builder(); + for (int i = 0; i < node.getProperOutputs().size(); i++) { + outputMappings.put(node.getProperOutputs().get(i), i); + } + List passThroughSymbols = node.getPassThroughSpecifications().stream() + .map(PassThroughSpecification::columns) + .flatMap(Collection::stream) + .map(PassThroughColumn::symbol) + .collect(toImmutableList()); + int outputChannel = properChannelsCount; + for (Symbol passThroughSymbol : passThroughSymbols) { + outputMappings.put(passThroughSymbol, outputChannel++); + } + + return new PhysicalOperation(operator, outputMappings.buildOrThrow(), source); + } + @Override public PhysicalOperation visitTopN(TopNNode node, LocalExecutionPlanContext context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java index c4707b487744e..c2eb98209d18e 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java @@ -419,6 +419,21 @@ public Map visitPlan(PlanNode node, Context context) { throw new UnsupportedOperationException("not yet implemented: " + node.getClass().getName()); } + + // TODO: Missing Node implementation + @Override + public Map visitTableFunctionProcessor(TableFunctionProcessorNode node, Void context) + { + if (node.getSource().isEmpty()) { + // this is a source node, so produce splits + SplitSource splitSource = splitManager.getSplits(session, node.getHandle()); + splitSources.add(splitSource); + + return ImmutableMap.of(node.getId(), splitSource); + } + + return node.getSource().orElseThrow().accept(this, context); + } } private static class Context diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index 460c5272fdc89..9452b03837dc0 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -1503,6 +1503,60 @@ public PlanWithProperties visitLateralJoin(LateralJoinNode node, PreferredProper throw new IllegalStateException("Unexpected node: " + node.getClass().getName()); } + // TODO: Missing Node implementations + @Override + public PlanWithProperties visitTableFunction(TableFunctionNode node, PreferredProperties preferredProperties) + { + throw new IllegalStateException(format("Unexpected node: TableFunctionNode (%s)", node.getName())); + } + + @Override + public PlanWithProperties visitTableFunctionProcessor(TableFunctionProcessorNode node, PreferredProperties preferredProperties) + { + if (node.getSource().isEmpty()) { + return new PlanWithProperties(node, deriveProperties(node, ImmutableList.of())); + } + + if (node.getSpecification().isEmpty()) { + // node.getSpecification.isEmpty() indicates that there were no sources or a single source with row semantics. + // The case of no sources was addressed above. + // The case of a single source with row semantics is addressed here. A single source with row semantics can be distributed arbitrarily. + PlanWithProperties child = planChild(node, PreferredProperties.any()); + return rebaseAndDeriveProperties(node, child); + } + + List partitionBy = node.getSpecification().orElseThrow().getPartitionBy(); + List> desiredProperties = new ArrayList<>(); + if (!partitionBy.isEmpty()) { + desiredProperties.add(new GroupingProperty<>(partitionBy)); + } + node.getSpecification().orElseThrow().getOrderingScheme().ifPresent(orderingScheme -> desiredProperties.addAll(orderingScheme.toLocalProperties())); + + PlanWithProperties child = planChild(node, partitionedWithLocal(ImmutableSet.copyOf(partitionBy), desiredProperties)); + + // TODO do not gather if already gathered + if (!node.isPruneWhenEmpty()) { + child = withDerivedProperties( + gatheringExchange(idAllocator.getNextId(), REMOTE, child.getNode()), + child.getProperties()); + } + else if (!isStreamPartitionedOn(child.getProperties(), partitionBy) && + !isNodePartitionedOn(child.getProperties(), partitionBy)) { + if (partitionBy.isEmpty()) { + child = withDerivedProperties( + gatheringExchange(idAllocator.getNextId(), REMOTE, child.getNode()), + child.getProperties()); + } + else { + child = withDerivedProperties( + partitionedExchange(idAllocator.getNextId(), REMOTE, child.getNode(), partitionBy, node.getHashSymbol()), + child.getProperties()); + } + } + + return rebaseAndDeriveProperties(node, child); + } + private PlanWithProperties planChild(PlanNode node, PreferredProperties preferredProperties) { return accept(getOnlyElement(node.getSources()), preferredProperties); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java index 7158a92de6592..4eed35a3785a9 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java @@ -894,6 +894,83 @@ public PlanWithProperties visitIndexJoin(IndexJoinNode node, StreamPreferredProp return rebaseAndDeriveProperties(node, ImmutableList.of(probe, index)); } + // TODO: Missing node implementations + @Override + public PlanWithProperties visitTableFunction(TableFunctionNode node, StreamPreferredProperties parentPreferences) + { + throw new IllegalStateException(format("Unexpected node: TableFunctionNode (%s)", node.getName())); + } + + @Override + public PlanWithProperties visitTableFunctionProcessor(TableFunctionProcessorNode node, StreamPreferredProperties parentPreferences) + { + if (node.getSource().isEmpty()) { + return deriveProperties(node, ImmutableList.of()); + } + + if (node.getSpecification().isEmpty()) { + // node.getSpecification.isEmpty() indicates that there were no sources or a single source with row semantics. + // The case of no sources was addressed above. + // The case of a single source with row semantics is addressed here. Source's properties do not hold after the TableFunctionProcessorNode + PlanWithProperties child = planAndEnforce(node.getSource().orElseThrow(), StreamPreferredProperties.any(), StreamPreferredProperties.any()); + return rebaseAndDeriveProperties(node, ImmutableList.of(child)); + } + + List partitionBy = node.getSpecification().orElseThrow().getPartitionBy(); + StreamPreferredProperties childRequirements; + if (!node.isPruneWhenEmpty()) { + childRequirements = singleStream(); + } + else { + childRequirements = parentPreferences + .constrainTo(node.getSource().orElseThrow().getOutputSymbols()) + .withDefaultParallelism(session) + .withPartitioning(partitionBy); + } + + PlanWithProperties child = planAndEnforce(node.getSource().orElseThrow(), childRequirements, childRequirements); + + List> desiredProperties = new ArrayList<>(); + if (!partitionBy.isEmpty()) { + desiredProperties.add(new GroupingProperty<>(partitionBy)); + } + node.getSpecification().flatMap(DataOrganizationSpecification::getOrderingScheme).ifPresent(orderingScheme -> desiredProperties.addAll(orderingScheme.toLocalProperties())); + Iterator>> matchIterator = LocalProperties.match(child.getProperties().getLocalProperties(), desiredProperties).iterator(); + + Set prePartitionedInputs = ImmutableSet.of(); + if (!partitionBy.isEmpty()) { + Optional> groupingRequirement = matchIterator.next(); + Set unPartitionedInputs = groupingRequirement.map(LocalProperty::getColumns).orElse(ImmutableSet.of()); + prePartitionedInputs = partitionBy.stream() + .filter(symbol -> !unPartitionedInputs.contains(symbol)) + .collect(toImmutableSet()); + } + + int preSortedOrderPrefix = 0; + if (prePartitionedInputs.equals(ImmutableSet.copyOf(partitionBy))) { + while (matchIterator.hasNext() && matchIterator.next().isEmpty()) { + preSortedOrderPrefix++; + } + } + + TableFunctionProcessorNode result = new TableFunctionProcessorNode( + node.getId(), + node.getName(), + node.getProperOutputs(), + Optional.of(child.getNode()), + node.isPruneWhenEmpty(), + node.getPassThroughSpecifications(), + node.getRequiredSymbols(), + node.getMarkerSymbols(), + node.getSpecification(), + prePartitionedInputs, + preSortedOrderPrefix, + node.getHashSymbol(), + node.getHandle()); + + return deriveProperties(result, child.getProperties()); + } + // // Helpers // diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java index 71e64a09370a5..0d37800a88a4f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java @@ -826,6 +826,51 @@ public ActualProperties visitRemoteSource(RemoteSourceNode node, List inputProperties) + { + throw new IllegalStateException(format("Unexpected node: TableFunctionNode (%s)", node.getName())); + } + + @Override + public ActualProperties visitTableFunctionProcessor(TableFunctionProcessorNode node, List inputProperties) + { + ImmutableList.Builder> localProperties = ImmutableList.builder(); + + if (node.getSource().isPresent()) { + ActualProperties properties = Iterables.getOnlyElement(inputProperties); + + // Only the partitioning properties of the source are passed-through, because the pass-through mechanism preserves the partitioning values. + // Sorting properties might be broken because input rows can be shuffled or nulls can be inserted as the result of pass-through. + // Constant properties might be broken because nulls can be inserted as the result of pass-through. + if (!node.getPrePartitioned().isEmpty()) { + GroupingProperty prePartitionedProperty = new GroupingProperty<>(node.getPrePartitioned()); + for (LocalProperty localProperty : properties.getLocalProperties()) { + if (!prePartitionedProperty.isSimplifiedBy(localProperty)) { + break; + } + localProperties.add(localProperty); + } + } + } + + List partitionBy = node.getSpecification() + .map(DataOrganizationSpecification::getPartitionBy) + .orElse(ImmutableList.of()); + if (!partitionBy.isEmpty()) { + localProperties.add(new GroupingProperty<>(partitionBy)); + } + + // TODO add global single stream property when there's Specification present with no partitioning columns + + return ActualProperties.builder() + .local(localProperties.build()) + .build() + // Crop properties to output columns. + .translate(symbol -> node.getOutputSymbols().contains(symbol) ? Optional.of(symbol) : Optional.empty()); + } + private Global deriveGlobalProperties(TableLayout layout, Map assignments, Map constants) { Optional> streamPartitioning = layout.getStreamPartitioningColumns() diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java index 3dea1cefdc7e5..6e8d8594cdf17 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java @@ -643,6 +643,33 @@ public StreamProperties visitRemoteSource(RemoteSourceNode node, List inputProperties) + { + throw new IllegalStateException(format("Unexpected node: TableFunctionNode (%s)", node.getName())); + } + + @Override + public StreamProperties visitTableFunctionProcessor(TableFunctionProcessorNode node, List inputProperties) + { + if (node.getSource().isEmpty()) { + return StreamProperties.singleStream(); // TODO allow multiple; return partitioning properties + } + + StreamProperties properties = Iterables.getOnlyElement(inputProperties); + + Set passThroughInputs = Sets.intersection(ImmutableSet.copyOf(node.getSource().orElseThrow().getOutputSymbols()), ImmutableSet.copyOf(node.getOutputSymbols())); + StreamProperties translatedProperties = properties.translate(column -> { + if (passThroughInputs.contains(column)) { + return Optional.of(column); + } + return Optional.empty(); + }); + + return translatedProperties.unordered(true); + } } public static final class StreamProperties diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java index 9805efad17939..24cd7092fdf84 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java @@ -62,6 +62,11 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static java.util.Objects.requireNonNull; +// TODO: Some additions here for bug fixes and comments on table function changes +// TODO: Missing implementation for public TableFunctionProcessorNode map(TableFunctionProcessorNode node, PlanNode source) +// This PR creates a new newPassThroughSpecifications without duplicate columns. +// Based on multiple sources. + public class SymbolMapper { private final Map mapping; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/InternalPlanVisitor.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/InternalPlanVisitor.java index e1935487c6abb..9b3442a435bb3 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/InternalPlanVisitor.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/InternalPlanVisitor.java @@ -13,12 +13,16 @@ */ package com.facebook.presto.sql.planner.plan; +import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanVisitor; +import com.facebook.presto.sql.planner.BasePlanFragmenter; import com.facebook.presto.sql.planner.CanonicalJoinNode; import com.facebook.presto.sql.planner.CanonicalTableScanNode; import com.facebook.presto.sql.planner.StatsEquivalentPlanNodeWithLimit; import com.facebook.presto.sql.planner.iterative.GroupReference; +import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; + public abstract class InternalPlanVisitor extends PlanVisitor { @@ -141,4 +145,15 @@ public R visitSequence(SequenceNode node, C context) { return visitPlan(node, context); } + + // TODO: Include and add TableFunction/ProcessorNode Implementations. Implement the class. There are a lot. + public R visitTableFunction(TableFunctionNode node, C context) + { + return visitPlan(node, context); + } + + public R visitTableFunctionProcessor(TableFunctionProcessorNode node, C context) + { + return visitPlan(node, context); + } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorSplitManager.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorSplitManager.java index 69ac79c9f7522..d1555a4652cca 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorSplitManager.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorSplitManager.java @@ -17,6 +17,8 @@ import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.api.Experimental; +import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle; import static java.util.Objects.requireNonNull; @@ -28,6 +30,14 @@ ConnectorSplitSource getSplits( ConnectorTableLayoutHandle layout, SplitSchedulingContext splitSchedulingContext); + @Experimental + default ConnectorSplitSource getSplits( + ConnectorTransactionHandle transaction, + ConnectorSession session, + ConnectorTableFunctionHandle function) { + throw new UnsupportedOperationException(); + } + enum SplitSchedulingStrategy { UNGROUPED_SCHEDULING, diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorSplitManager.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorSplitManager.java index 4efb85e07c088..b74f52936adda 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorSplitManager.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorSplitManager.java @@ -19,6 +19,7 @@ import com.facebook.presto.spi.classloader.ThreadContextClassLoader; import com.facebook.presto.spi.connector.ConnectorSplitManager; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle; import static java.util.Objects.requireNonNull; @@ -41,4 +42,15 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand return delegate.getSplits(transactionHandle, session, layout, splitSchedulingContext); } } + + @Override + public ConnectorSplitSource getSplits( + ConnectorTransactionHandle transaction, + ConnectorSession session, + ConnectorTableFunctionHandle function) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getSplits(transaction, session, function); + } + } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/table/ConnectorTableFunctionHandle.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/table/ConnectorTableFunctionHandle.java new file mode 100644 index 0000000000000..929feb29a76d7 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/table/ConnectorTableFunctionHandle.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.function.table; + +import com.facebook.presto.spi.api.Experimental; + +// TODO: Should be added by connector table function implementation. +/** + * An area to store all information necessary to execute the table function, gathered at analysis time + */ +@Experimental +public interface ConnectorTableFunctionHandle +{ +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/table/ConnectorTableFunctionHandleJacksonModule.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/table/ConnectorTableFunctionHandleJacksonModule.java new file mode 100644 index 0000000000000..777784342505a --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/table/ConnectorTableFunctionHandleJacksonModule.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//TODO: We need this Jackson module, however they are all defined in presto-main module. +// We can't add dependency due to it being circular. Should we move the Handle out of spi +// to presto-main? +// NOTE: Trino moved HandleJsonModule to com.facebook.presto out of metadata. +package com.facebook.presto.spi.function.table; + +import com.facebook.presto.spi.ConnectorMetadataUpdateHandle; + +import javax.inject.Inject; + +public class ConnectorTableFunctionHandleJacksonModule + extends AbstractTypedJacksonModule +{ + @Inject + public MetadataUpdateJacksonModule(HandleResolver handleResolver) + { + super(ConnectorMetadataUpdateHandle.class, handleResolver::getId, handleResolver::getMetadataUpdateHandleClass); + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/table/TableFunctionDataProcessor.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/table/TableFunctionDataProcessor.java new file mode 100644 index 0000000000000..b726885eb746c --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/table/TableFunctionDataProcessor.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.spi.function.table; + +import com.facebook.presto.common.Page; + +import java.util.List; +import java.util.Optional; + +public interface TableFunctionDataProcessor +{ + /** + * This method processes a portion of data. It is called multiple times until the partition is fully processed. + * + * @param input a tuple of {@link Page} including one page for each table function's input table. + * Pages list is ordered according to the corresponding argument specifications in {@link ConnectorTableFunction}. + * A page for an argument consists of columns requested during analysis (see {@link TableFunctionAnalysis#getRequiredColumns()}}. + * If any of the sources is fully processed, {@code Optional.empty)()} is returned for that source. + * If all sources are fully processed, the argument is {@code null}. + * @return {@link TableFunctionProcessorState} including the processor's state and optionally a portion of result. + * After the returned state is {@code FINISHED}, the method will not be called again. + */ + TableFunctionProcessorState process(List> input); +} \ No newline at end of file diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/table/TableFunctionProcessorProvider.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/table/TableFunctionProcessorProvider.java new file mode 100644 index 0000000000000..a652f01c253bc --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/table/TableFunctionProcessorProvider.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.function.table; + +import com.facebook.presto.spi.api.Experimental; + +@Experimental +public interface TableFunctionProcessorProvider +{ + /** + * This method returns a {@code TableFunctionDataProcessor}. All the necessary information collected during analysis is available + * in the form of {@link ConnectorTableFunctionHandle}. It is called once per each partition processed by the table function. + */ + default TableFunctionDataProcessor getDataProcessor(ConnectorTableFunctionHandle handle) + { + throw new UnsupportedOperationException("this table function does not process input data"); + } + + /** + * This method returns a {@code TableFunctionSplitProcessor}. All the necessary information collected during analysis is available + * in the form of {@link ConnectorTableFunctionHandle}. It is called once per each split processed by the table function. + */ + default TableFunctionSplitProcessor getSplitProcessor(ConnectorTableFunctionHandle handle) + { + throw new UnsupportedOperationException("this table function does not process splits"); + } +} \ No newline at end of file diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/table/TableFunctionProcessorState.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/table/TableFunctionProcessorState.java new file mode 100644 index 0000000000000..6c648c2eda90a --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/table/TableFunctionProcessorState.java @@ -0,0 +1,108 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.function.table; + +import com.facebook.presto.spi.api.Experimental; +import com.facebook.presto.common.Page; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; + +/** + * The result of processing input by {@link TableFunctionDataProcessor} or {@link TableFunctionSplitProcessor}. + * It can optionally include a portion of output data in the form of {@link Page} + * The returned {@link Page} should consist of: + * - proper columns produced by the table function + * - one column of type {@code BIGINT} for each table function's input table having the pass-through property (see {@link TableArgumentSpecification#isPassThroughColumns}), + * in order of the corresponding argument specifications. Entries in these columns are the indexes of input rows (from partition start) to be attached to output, + * or null to indicate that a row of nulls should be attached instead of an input row. The indexes are validated to be within the portion of the partition + * provided to the function so far. + * Note: when the input is empty, the only valid index value is null, because there are no input rows that could be attached to output. In such case, for performance + * reasons, the validation of indexes is skipped, and all pass-through columns are filled with nulls. + */ +@Experimental +public interface TableFunctionProcessorState +{ + final class Blocked + implements TableFunctionProcessorState + { + private final CompletableFuture future; + + private Blocked(CompletableFuture future) + { + this.future = requireNonNull(future, "future is null"); + } + + public static Blocked blocked(CompletableFuture future) + { + return new Blocked(future); + } + + public CompletableFuture getFuture() + { + return future; + } + } + + final class Finished + implements TableFunctionProcessorState + { + public static final Finished FINISHED = new Finished(); + + private Finished() {} + } + + final class Processed + implements TableFunctionProcessorState + { + private final boolean usedInput; + private final Page result; + + private Processed(boolean usedInput, @Nullable Page result) + { + this.usedInput = usedInput; + this.result = result; + } + + public static Processed usedInput() + { + return new Processed(true, null); + } + + public static Processed produced(Page result) + { + requireNonNull(result, "result is null"); + return new Processed(false, result); + } + + public static Processed usedInputAndProduced(Page result) + { + requireNonNull(result, "result is null"); + return new Processed(true, result); + } + + public boolean isUsedInput() + { + return usedInput; + } + + public Page getResult() + { + return result; + } + } +} \ No newline at end of file diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/table/TableFunctionSplitProcessor.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/table/TableFunctionSplitProcessor.java new file mode 100644 index 0000000000000..9f4655e8df537 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/table/TableFunctionSplitProcessor.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.function.table; + +import com.facebook.presto.spi.ConnectorSplit; + +public interface TableFunctionSplitProcessor +{ + /** + * This method processes a split. It is called multiple times until the whole output for the split is produced. + * + * @param split a {@link ConnectorSplit} representing a subtask. + * @return {@link TableFunctionProcessorState} including the processor's state and optionally a portion of result. + * After the returned state is {@code FINISHED}, the method will not be called again. + */ + TableFunctionProcessorState process(ConnectorSplit split); +} \ No newline at end of file