Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,6 @@ public Field(Optional<NodeLocation> nodeLocation, Optional<QualifiedName> relati
this.aliased = aliased;
}

public static Field newUnqualified(Optional<String> name, Type type)
{
requireNonNull(name, "name is null");
requireNonNull(type, "type is null");

return new Field(Optional.empty(), Optional.empty(), name, type, false, Optional.empty(), Optional.empty(), false);
}

public Optional<NodeLocation> getNodeLocation()
{
return nodeLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
import com.facebook.presto.spi.function.table.TableFunctionProcessorProvider;
import com.facebook.presto.spi.procedure.BaseProcedure;
import com.facebook.presto.spi.procedure.DistributedProcedure;
import com.facebook.presto.spi.procedure.Procedure;
Expand Down Expand Up @@ -86,6 +88,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static com.facebook.presto.metadata.FunctionExtractor.extractFunctions;
import static com.facebook.presto.spi.ConnectorId.createInformationSchemaConnectorId;
Expand Down Expand Up @@ -215,6 +218,12 @@ public synchronized void addConnectorFactory(ConnectorFactory connectorFactory)
ConnectorFactory existingConnectorFactory = connectorFactories.putIfAbsent(connectorFactory.getName(), connectorFactory);
checkArgument(existingConnectorFactory == null, "Connector %s is already registered", connectorFactory.getName());
handleResolver.addConnectorName(connectorFactory.getName(), connectorFactory.getHandleResolver());
connectorFactory.getTableFunctionHandleResolver().ifPresent(resolver -> {
handleResolver.addTableFunctionNamespace(connectorFactory.getName(), resolver);
});
connectorFactory.getTableFunctionSplitResolver().ifPresent(resolver -> {
handleResolver.addTableFunctionSplitNamespace(connectorFactory.getName(), resolver);
});
}

public synchronized ConnectorId createConnection(String catalogName, String connectorName, Map<String, String> properties)
Expand Down Expand Up @@ -334,6 +343,7 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
metadataManager.getAnalyzePropertyManager().addProperties(connectorId, connector.getAnalyzeProperties());
metadataManager.getSessionPropertyManager().addConnectorSessionProperties(connectorId, connector.getSessionProperties());
metadataManager.getFunctionAndTypeManager().getTableFunctionRegistry().addTableFunctions(connectorId, connector.getTableFunctions());
metadataManager.getFunctionAndTypeManager().addTableFunctionProcessorProvider(connectorId, connector.getTableFunctionProcessorProvider());
}

public synchronized void dropConnection(String catalogName)
Expand All @@ -346,6 +356,7 @@ public synchronized void dropConnection(String catalogName)
removeConnectorInternal(createInformationSchemaConnectorId(connectorId));
removeConnectorInternal(createSystemTablesConnectorId(connectorId));
metadataManager.getFunctionAndTypeManager().getTableFunctionRegistry().removeTableFunctions(connectorId);
metadataManager.getFunctionAndTypeManager().removeTableFunctionProcessorProvider(connectorId);
});
}

Expand Down Expand Up @@ -422,6 +433,7 @@ private static class MaterializedConnector

private final Set<Class<?>> functions;
private final Set<ConnectorTableFunction> connectorTableFunctions;
private final Function<ConnectorTableFunctionHandle, TableFunctionProcessorProvider> connectorTableFunctionProcessorProvider;
private final ConnectorPageSourceProvider pageSourceProvider;
private final Optional<ConnectorPageSinkProvider> pageSinkProvider;
private final Optional<ConnectorIndexProvider> indexProvider;
Expand Down Expand Up @@ -459,6 +471,7 @@ public MaterializedConnector(ConnectorId connectorId, Connector connector)
Set<ConnectorTableFunction> connectorTableFunctions = connector.getTableFunctions();
requireNonNull(connectorTableFunctions, format("Connector '%s' returned a null table functions set", connectorId));
this.connectorTableFunctions = ImmutableSet.copyOf(connectorTableFunctions);
this.connectorTableFunctionProcessorProvider = connector.getTableFunctionProcessorProvider();

ConnectorPageSourceProvider connectorPageSourceProvider = null;
try {
Expand Down Expand Up @@ -660,5 +673,10 @@ public Set<ConnectorTableFunction> getTableFunctions()
{
return connectorTableFunctions;
}

public Function<ConnectorTableFunctionHandle, TableFunctionProcessorProvider> getTableFunctionProcessorProvider()
{
return connectorTableFunctionProcessorProvider;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@

import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.operator.table.ExcludeColumns;
import com.facebook.presto.operator.table.Sequence;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayout;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
Expand All @@ -34,6 +37,9 @@
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
import com.facebook.presto.spi.function.table.TableFunctionProcessorProvider;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.transaction.IsolationLevel;
import com.facebook.presto.transaction.InternalConnector;
Expand All @@ -45,7 +51,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

import static com.facebook.presto.operator.table.Sequence.getSequenceFunctionSplitSource;
import static java.util.Objects.requireNonNull;

public class GlobalSystemConnector
Expand All @@ -56,12 +64,14 @@ public class GlobalSystemConnector
private final String connectorId;
private final Set<SystemTable> systemTables;
private final Set<Procedure> procedures;
private final Set<ConnectorTableFunction> tableFunctions;

public GlobalSystemConnector(String connectorId, Set<SystemTable> systemTables, Set<Procedure> procedures)
public GlobalSystemConnector(String connectorId, Set<SystemTable> systemTables, Set<Procedure> procedures, Set<ConnectorTableFunction> tableFunctions)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null"));
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
this.tableFunctions = ImmutableSet.copyOf(requireNonNull(tableFunctions, "tableFunctions is null"));
}

@Override
Expand Down Expand Up @@ -138,8 +148,22 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
@Override
public ConnectorSplitManager getSplitManager()
{
return (transactionHandle, session, layout, splitSchedulingContext) -> {
throw new UnsupportedOperationException();
return new ConnectorSplitManager() {
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingContext splitSchedulingContext)
{
throw new UnsupportedOperationException();
}

@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableFunctionHandle function)
{
if (function instanceof Sequence.SequenceFunctionHandle) {
Sequence.SequenceFunctionHandle sequenceFunctionHandle = (Sequence.SequenceFunctionHandle) function;
return getSequenceFunctionSplitSource(sequenceFunctionHandle);
}
throw new UnsupportedOperationException();
}
};
}

Expand All @@ -166,4 +190,24 @@ public Set<Procedure> getProcedures()
{
return procedures;
}

@Override
public Set<ConnectorTableFunction> getTableFunctions()
{
return tableFunctions;
}

@Override
public Function<ConnectorTableFunctionHandle, TableFunctionProcessorProvider> getTableFunctionProcessorProvider()
{
return connectorTableFunctionHandle -> {
if (connectorTableFunctionHandle instanceof ExcludeColumns.ExcludeColumnsFunctionHandle) {
return ExcludeColumns.getExcludeColumnsFunctionProcessorProvider();
}
else if (connectorTableFunctionHandle instanceof Sequence.SequenceFunctionHandle) {
return Sequence.getSequenceFunctionProcessorProvider();
}
return null;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorContext;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
import com.facebook.presto.spi.procedure.Procedure;
import com.google.common.collect.ImmutableSet;
import jakarta.inject.Inject;
Expand All @@ -32,12 +33,14 @@ public class GlobalSystemConnectorFactory
{
private final Set<SystemTable> tables;
private final Set<Procedure> procedures;
private final Set<ConnectorTableFunction> tableFunctions;

@Inject
public GlobalSystemConnectorFactory(Set<SystemTable> tables, Set<Procedure> procedures)
public GlobalSystemConnectorFactory(Set<SystemTable> tables, Set<Procedure> procedures, Set<ConnectorTableFunction> tableFunctions)
{
this.tables = ImmutableSet.copyOf(requireNonNull(tables, "tables is null"));
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
this.tableFunctions = ImmutableSet.copyOf(requireNonNull(tableFunctions, "tableFunctions is null"));
}

@Override
Expand All @@ -55,6 +58,6 @@ public ConnectorHandleResolver getHandleResolver()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
return new GlobalSystemConnector(catalogName, tables, procedures);
return new GlobalSystemConnector(catalogName, tables, procedures, tableFunctions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import com.facebook.presto.connector.system.jdbc.TableTypeJdbcTable;
import com.facebook.presto.connector.system.jdbc.TypesJdbcTable;
import com.facebook.presto.connector.system.jdbc.UdtJdbcTable;
import com.facebook.presto.operator.table.ExcludeColumns;
import com.facebook.presto.operator.table.Sequence;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
import com.facebook.presto.spi.procedure.Procedure;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
Expand Down Expand Up @@ -77,6 +80,10 @@ public void configure(Binder binder)

binder.bind(GlobalSystemConnectorFactory.class).in(Scopes.SINGLETON);
binder.bind(SystemConnectorRegistrar.class).asEagerSingleton();

Multibinder<ConnectorTableFunction> tableFunctions = Multibinder.newSetBinder(binder, ConnectorTableFunction.class);
tableFunctions.addBinding().toProvider(ExcludeColumns.class).in(Scopes.SINGLETON);
tableFunctions.addBinding().toProvider(Sequence.class).in(Scopes.SINGLETON);
}

@ProvidesIntoSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ private PlanRoot runCreateLogicalPlanAsync()

private void createQueryScheduler(PlanRoot plan)
{
CloseableSplitSourceProvider splitSourceProvider = new CloseableSplitSourceProvider(splitManager::getSplits);
CloseableSplitSourceProvider splitSourceProvider = new CloseableSplitSourceProvider(splitManager);

// ensure split sources are closed
stateMachine.addStateChangeListener(state -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,10 @@ public ListenableFuture<?> processFor(Duration duration)
@Override
public String getInfo()
{
return (partitionedSplit == null) ? "" : partitionedSplit.getSplit().getInfo().toString();
if (partitionedSplit != null && partitionedSplit.getSplit() != null && partitionedSplit.getSplit().getInfo() != null) {
return partitionedSplit.getSplit().getInfo().toString();
}
return "";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.facebook.presto.common.type.TypeWithName;
import com.facebook.presto.common.type.UserDefinedType;
import com.facebook.presto.operator.window.WindowFunctionSupplier;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
Expand All @@ -56,6 +57,8 @@
import com.facebook.presto.spi.function.SqlFunctionId;
import com.facebook.presto.spi.function.SqlFunctionSupplier;
import com.facebook.presto.spi.function.SqlInvokedFunction;
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
import com.facebook.presto.spi.function.table.TableFunctionProcessorProvider;
import com.facebook.presto.spi.type.TypeManagerContext;
import com.facebook.presto.spi.type.TypeManagerFactory;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
Expand Down Expand Up @@ -92,6 +95,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Pattern;

import static com.facebook.presto.SystemSessionProperties.isExperimentalFunctionsEnabled;
Expand All @@ -105,6 +109,7 @@
import static com.facebook.presto.metadata.FunctionSignatureMatcher.decideAndThrow;
import static com.facebook.presto.metadata.SessionFunctionHandle.SESSION_NAMESPACE;
import static com.facebook.presto.metadata.SignatureBinder.applyBoundVariables;
import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS;
import static com.facebook.presto.spi.StandardErrorCode.AMBIGUOUS_FUNCTION_CALL;
import static com.facebook.presto.spi.StandardErrorCode.FUNCTION_IMPLEMENTATION_MISSING;
import static com.facebook.presto.spi.StandardErrorCode.FUNCTION_NOT_FOUND;
Expand Down Expand Up @@ -160,6 +165,7 @@ public class FunctionAndTypeManager
private final AtomicReference<Supplier<Map<String, ParametricType>>> servingTypeManagerParametricTypesSupplier;
private final BuiltInWorkerFunctionNamespaceManager builtInWorkerFunctionNamespaceManager;
private final BuiltInPluginFunctionNamespaceManager builtInPluginFunctionNamespaceManager;
private final ConcurrentHashMap<ConnectorId, Function<ConnectorTableFunctionHandle, TableFunctionProcessorProvider>> tableFunctionProcessorProviderMap = new ConcurrentHashMap<>();
private final FunctionsConfig functionsConfig;
private final Set<Type> types;

Expand Down Expand Up @@ -704,6 +710,24 @@ public ScalarFunctionImplementation getScalarFunctionImplementation(FunctionHand
return functionNamespaceManager.get().getScalarFunctionImplementation(functionHandle);
}

public TableFunctionProcessorProvider getTableFunctionProcessorProvider(TableFunctionHandle tableFunctionHandle)
{
return tableFunctionProcessorProviderMap.get(tableFunctionHandle.getConnectorId()).apply(tableFunctionHandle.getFunctionHandle());
}

public void addTableFunctionProcessorProvider(ConnectorId connectorId, Function<ConnectorTableFunctionHandle, TableFunctionProcessorProvider> tableFunctionProcessorProvider)
{
if (tableFunctionProcessorProviderMap.putIfAbsent(connectorId, tableFunctionProcessorProvider) != null) {
throw new PrestoException(ALREADY_EXISTS,
format("TableFuncitonProcessorProvider already exists for connectorId %s. Overwriting is not supported.", connectorId.getCatalogName()));
}
}

public void removeTableFunctionProcessorProvider(ConnectorId connectorId)
{
tableFunctionProcessorProviderMap.remove(connectorId);
}

public AggregationFunctionImplementation getAggregateFunctionImplementation(FunctionHandle functionHandle)
{
if (isBuiltInPluginFunctionHandle(functionHandle)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void configure(Binder binder)
jsonBinder(binder).addModuleBinding().to(TransactionHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(FunctionHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(TableFunctionJacksonHandleModule.class);

if (handleResolver == null) {
binder.bind(HandleResolver.class).in(Scopes.SINGLETON);
Expand Down
Loading
Loading