Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,18 @@ public Page replaceColumn(int channelIndex, Block column)
return Page.wrapBlocksWithoutCopy(newBlocks.length, newBlocks);
}

// TODO: Is this valid in presto as well?
public Page getColumns(int... columns)
{
requireNonNull(columns, "columns is null");

Block[] blocks = new Block[columns.length];
for (int i = 0; i < columns.length; i++) {
blocks[i] = this.blocks[columns[i]];
}
return wrapBlocksWithoutCopy(positionCount, blocks);
}

private static class DictionaryBlockIndexes
{
private final List<DictionaryBlock> blocks = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.facebook.presto.spi.function.SqlFunctionId;
import com.facebook.presto.spi.function.SqlFunctionSupplier;
import com.facebook.presto.spi.function.SqlInvokedFunction;
import com.facebook.presto.spi.function.table.TableFunctionProcessorProvider;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.FunctionAndTypeResolver;
import com.facebook.presto.sql.analyzer.FunctionsConfig;
Expand Down Expand Up @@ -878,4 +879,34 @@ public String toString()
.toString();
}
}

// This come from Trino FunctionManager.
// The TableFunctionProcessProvider contains all of the logic for executing a table Function
// It is created during analysis and contains both a Data Processor and a Split Processor.
// Data Processor - Used to generate and process data. Ex. ExcludeColumns(getDataProcessor).
// Used by TableFunctionOperator.
// Split Processor - Used to process splits. Ex. SequenceFunction(SequenceFunctionProcessor/TableFunctionSplitProcessor)
// Used by LeafTableFunctionOperator.

// getTableFunctionProcessorProvider - Is in both the FunctionManager and the GlobalFunctionCatalog?
// Connector one just returns the provider based on Connector functionHandle. EX getExcludeColumnsFunctionProcessorProvider
// public TableFunctionProcessorProvider getTableFunctionProcessorProvider(ConnectorTableFunctionHandle functionHandle)


// We don't have Catalog Handle/Name anymore.
// We don't have a FunctionProvider in spi.
public TableFunctionProcessorProvider getTableFunctionProcessorProvider(TableFunctionHandle tableFunctionHandle)
{
CatalogHandle catalogHandle = tableFunctionHandle.getCatalogHandle();
FunctionProvider provider;
if (catalogHandle.equals(GlobalSystemConnector.CATALOG_HANDLE)) {
provider = globalFunctionCatalog;
}
else {
provider = functionProviders.getService(catalogHandle);
checkArgument(provider != null, "No function provider for catalog: '%s'", catalogHandle);
}

return provider.getTableFunctionProcessorProvider(tableFunctionHandle.getFunctionHandle());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package com.facebook.presto.metadata;

import com.facebook.presto.index.IndexHandleJacksonModule;
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandleJacksonModule;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;


import static com.facebook.airlift.json.JsonBinder.jsonBinder;

public class HandleJsonModule
Expand All @@ -37,7 +39,7 @@ public void configure(Binder binder)
jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(FunctionHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(MetadataUpdateJacksonModule.class);

jsonBinder(binder).addModuleBinding().to(ConnectorTableFunctionHandleJacksonModule.class);
binder.bind(HandleResolver.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@

/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.metadata;

/*
import io.trino.spi.connector.CatalogHandle;
*/
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;

import static java.util.Objects.requireNonNull;

// TODO: This class should be implemented via table function by connector.
// catalogHandle - came from catalogName at the split which doesn't seem to exist anymore.
// ConnectorTableFunctionHandle - Should have been added by table function implementation
// transaction handle - already exists. Though in different location to trino.
// It was a record but switched to public final class due to java 8 restrictions.
// Contains all metadata information for executing a table function.
public final class TableFunctionHandle
{
//CatalogHandle catalogHandle,
ConnectorTableFunctionHandle functionHandle;
ConnectorTransactionHandle transactionHandle;

public TableFunctionHandle(ConnectorTableFunctionHandle functionHandle,
ConnectorTransactionHandle transactionHandle)
{
//requireNonNull(catalogHandle, "catalogHandle is null");
this.functionHandle = requireNonNull(functionHandle, "functionHandle is null");
this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
}

/*
public CatalogHandle getCatalogHandle() {
return catalogHandle;
}*/

public ConnectorTableFunctionHandle getFunctionHandle() {
return functionHandle;
}

public ConnectorTransactionHandle getTransactionHandle() {
return transactionHandle;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import com.facebook.presto.common.Page;
import javax.annotation.Nullable;

import static com.google.common.base.Preconditions.checkState;
import static com.facebook.presto.operator.WorkProcessor.ProcessState.finished;
import static com.facebook.presto.operator.WorkProcessor.ProcessState.ofResult;
import static com.facebook.presto.operator.WorkProcessor.ProcessState.yield;
import static java.util.Objects.requireNonNull;

/**
* Provides a bridge between classic {@link Operator} push input model
* and {@link WorkProcessorOperator} pull model.
*/
public class PageBuffer
{
@Nullable
private Page page;
private boolean finished;

public WorkProcessor<Page> pages()
{
return WorkProcessor.create(() -> {
if (isFinished() && isEmpty()) {
return finished();
}

if (!isEmpty()) {
Page result = page;
page = null;
return ofResult(result);
}

return yield();
});
}

public boolean isEmpty()
{
return page == null;
}

public boolean isFinished()
{
return finished;
}

public void add(Page page)
{
checkState(isEmpty(), "page buffer is not empty");
checkState(!isFinished(), "page buffer is finished");
requireNonNull(page, "page is null");

if (page.getPositionCount() == 0) {
return;
}

this.page = page;
}

public void finish()
{
finished = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,9 @@ public void swap(int a, int b)
valueAddresses.swap(a, b);
}

public int buildPage(int position, int[] outputChannels, PageBuilder pageBuilder)
public int buildPage(int position, int endPosition, int[] outputChannels, PageBuilder pageBuilder)
{
while (!pageBuilder.isFull() && position < positionCount) {
while (!pageBuilder.isFull() && position < positionCount && position < endPosition) {
long pageAddress = valueAddresses.get(position);
int blockIndex = decodeSliceIndex(pageAddress);
int blockPosition = decodePosition(pageAddress);
Expand Down Expand Up @@ -563,10 +563,29 @@ protected Page computeNext()
}

public Iterator<Page> getSortedPages()
{
return getSortedPagesFromRange(0, positionCount);
}

/**
* Get sorted pages from the specified section of the PagesIndex.
*
* @param start start position of the section, inclusive
* @param end end position of the section, exclusive
* @return iterator of pages
*/
public Iterator<Page> getSortedPages(int start, int end)
{
checkArgument(start >= 0 && end <= positionCount, "position range out of bounds");
checkArgument(start <= end, "invalid position range");
return getSortedPagesFromRange(start, end);
}

public Iterator<Page> getSortedPagesFromRange(int start, int end)
{
return new AbstractIterator<Page>()
{
private int currentPosition;
private int currentPosition = start;
private final PageBuilder pageBuilder = new PageBuilder(types);
private final int[] outputChannels = new int[types.size()];

Expand All @@ -577,7 +596,7 @@ public Iterator<Page> getSortedPages()
@Override
public Page computeNext()
{
currentPosition = buildPage(currentPosition, outputChannels, pageBuilder);
currentPosition = buildPage(currentPosition, end, outputChannels, pageBuilder);
if (pageBuilder.isEmpty()) {
return endOfData();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator.function;

import com.facebook.presto.operator.WorkProcessor;
import com.facebook.presto.common.Page;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.RunLengthEncodedBlock;
import com.facebook.presto.spi.function.table.TableFunctionDataProcessor;
import com.facebook.presto.spi.function.table.TableFunctionProcessorState;
import com.facebook.presto.spi.function.table.TableFunctionProcessorState.Blocked;
import com.facebook.presto.spi.function.table.TableFunctionProcessorState.Processed;
import com.facebook.presto.common.type.Type;

import java.util.List;

import static com.facebook.airlift.concurrent.MoreFutures.toListenableFuture;
import static com.facebook.presto.spi.StandardErrorCode.FUNCTION_IMPLEMENTATION_ERROR;
import static com.facebook.presto.spi.function.table.TableFunctionProcessorState.Finished.FINISHED;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

/**
* This is a class representing empty input to a table function. An EmptyTableFunctionPartition is created
* when the table function has KEEP WHEN EMPTY property, which means that the function should be executed
* even if the input is empty, and all the table arguments are empty relations.
* <p>
* An EmptyTableFunctionPartition is created and processed once per node. To avoid duplicated execution,
* a table function having KEEP WHEN EMPTY property must have single distribution.
*/
public class EmptyTableFunctionPartition
implements TableFunctionPartition
{
private final TableFunctionDataProcessor tableFunction;
private final int properChannelsCount;
private final int passThroughSourcesCount;
private final Type[] passThroughTypes;

public EmptyTableFunctionPartition(TableFunctionDataProcessor tableFunction, int properChannelsCount, int passThroughSourcesCount, List<Type> passThroughTypes)
{
this.tableFunction = requireNonNull(tableFunction, "tableFunction is null");
this.properChannelsCount = properChannelsCount;
this.passThroughSourcesCount = passThroughSourcesCount;
this.passThroughTypes = passThroughTypes.toArray(new Type[] {});
}

@Override
public WorkProcessor<Page> toOutputPages()
{
return WorkProcessor.create(() -> {
TableFunctionProcessorState state = tableFunction.process(null);
if (state == FINISHED) {
return WorkProcessor.ProcessState.finished();
}
if (state instanceof Blocked) {
Blocked blocked = (Blocked) state;
return WorkProcessor.ProcessState.blocked(toListenableFuture(blocked.getFuture()));
}
Processed processed = (Processed) state;
if (processed.getResult() != null) {
return WorkProcessor.ProcessState.ofResult(appendNullsForPassThroughColumns(processed.getResult()));
}
throw new PrestoException(FUNCTION_IMPLEMENTATION_ERROR, "When function got no input, it should either produce output or return Blocked state");
});
}

private Page appendNullsForPassThroughColumns(Page page)
{
if (page.getChannelCount() != properChannelsCount + passThroughSourcesCount) {
throw new PrestoException(
FUNCTION_IMPLEMENTATION_ERROR,
format(
"Table function returned a page containing %s channels. Expected channel number: %s (%s proper columns, %s pass-through index columns)",
page.getChannelCount(),
properChannelsCount + passThroughSourcesCount,
properChannelsCount,
passThroughSourcesCount));
}

Block[] resultBlocks = new Block[properChannelsCount + passThroughTypes.length];

// proper outputs first
for (int channel = 0; channel < properChannelsCount; channel++) {
resultBlocks[channel] = page.getBlock(channel);
}

// pass-through columns next
// because no input was processed, all pass-through indexes in the result page must be null (there are no input rows they could refer to).
// for performance reasons this is not checked. All pass-through columns are filled with nulls.
int channel = properChannelsCount;
for (Type type : passThroughTypes) {
resultBlocks[channel] = RunLengthEncodedBlock.create(type, null, page.getPositionCount());
channel++;
}

// pass the position count so that the Page can be successfully created in the case when there are no output channels (resultBlocks is empty)
return new Page(page.getPositionCount(), resultBlocks);
}
}
Loading