Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.trino.metadata.ProcedureRegistry;
import io.trino.metadata.SchemaPropertyManager;
import io.trino.metadata.SessionPropertyManager;
import io.trino.metadata.TableFunctionRegistry;
import io.trino.metadata.TableProceduresPropertyManager;
import io.trino.metadata.TableProceduresRegistry;
import io.trino.metadata.TablePropertyManager;
Expand All @@ -60,6 +61,7 @@
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.ptf.ConnectorTableFunction;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.TypeManager;
import io.trino.split.PageSinkManager;
Expand Down Expand Up @@ -119,6 +121,7 @@ public class ConnectorManager
private final TypeManager typeManager;
private final ProcedureRegistry procedureRegistry;
private final TableProceduresRegistry tableProceduresRegistry;
private final TableFunctionRegistry tableFunctionRegistry;
private final SessionPropertyManager sessionPropertyManager;
private final SchemaPropertyManager schemaPropertyManager;
private final ColumnPropertyManager columnPropertyManager;
Expand Down Expand Up @@ -158,6 +161,7 @@ public ConnectorManager(
TypeManager typeManager,
ProcedureRegistry procedureRegistry,
TableProceduresRegistry tableProceduresRegistry,
TableFunctionRegistry tableFunctionRegistry,
SessionPropertyManager sessionPropertyManager,
SchemaPropertyManager schemaPropertyManager,
ColumnPropertyManager columnPropertyManager,
Expand Down Expand Up @@ -186,6 +190,7 @@ public ConnectorManager(
this.typeManager = typeManager;
this.procedureRegistry = procedureRegistry;
this.tableProceduresRegistry = tableProceduresRegistry;
this.tableFunctionRegistry = tableFunctionRegistry;
this.sessionPropertyManager = sessionPropertyManager;
this.schemaPropertyManager = schemaPropertyManager;
this.columnPropertyManager = columnPropertyManager;
Expand Down Expand Up @@ -333,6 +338,7 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
procedureRegistry.addProcedures(catalogName, connector.getProcedures());
Set<TableProcedureMetadata> tableProcedures = connector.getTableProcedures();
tableProceduresRegistry.addTableProcedures(catalogName, tableProcedures);
tableFunctionRegistry.addTableFunctions(catalogName, connector.getTableFunctions());

connector.getAccessControl()
.ifPresent(accessControl -> accessControlManager.addCatalogAccessControl(catalogName, accessControl));
Expand Down Expand Up @@ -369,6 +375,7 @@ private synchronized void removeConnectorInternal(CatalogName catalogName)
nodePartitioningManager.removePartitioningProvider(catalogName);
procedureRegistry.removeProcedures(catalogName);
tableProceduresRegistry.removeProcedures(catalogName);
tableFunctionRegistry.removeTableFunctions(catalogName);
accessControlManager.removeCatalogAccessControl(catalogName);
tablePropertyManager.removeProperties(catalogName);
materializedViewPropertyManager.removeProperties(catalogName);
Expand Down Expand Up @@ -495,6 +502,7 @@ private static class MaterializedConnector
private final Set<SystemTable> systemTables;
private final Set<Procedure> procedures;
private final Set<TableProcedureMetadata> tableProcedures;
private final Set<ConnectorTableFunction> connectorTableFunctions;
private final Optional<ConnectorSplitManager> splitManager;
private final Optional<ConnectorPageSourceProvider> pageSourceProvider;
private final Optional<ConnectorPageSinkProvider> pageSinkProvider;
Expand Down Expand Up @@ -527,6 +535,10 @@ public MaterializedConnector(CatalogName catalogName, Connector connector, Runna
requireNonNull(tableProcedures, format("Connector '%s' returned a null table procedures set", catalogName));
this.tableProcedures = ImmutableSet.copyOf(tableProcedures);

Set<ConnectorTableFunction> connectorTableFunctions = connector.getTableFunctions();
requireNonNull(connectorTableFunctions, format("Connector '%s' returned a null table functions set", catalogName));
this.connectorTableFunctions = ImmutableSet.copyOf(connectorTableFunctions);

ConnectorSplitManager splitManager = null;
try {
splitManager = connector.getSplitManager();
Expand Down Expand Up @@ -642,6 +654,11 @@ public Set<TableProcedureMetadata> getTableProcedures()
return tableProcedures;
}

public Set<ConnectorTableFunction> getTableFunctions()
{
return connectorTableFunctions;
}

public Optional<ConnectorSplitManager> getSplitManager()
{
return splitManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ FunctionBinding resolveFunction(
throw new TrinoException(FUNCTION_NOT_FOUND, message);
}

private static List<CatalogSchemaFunctionName> toPath(Session session, QualifiedName name)
public static List<CatalogSchemaFunctionName> toPath(Session session, QualifiedName name)
{
List<String> parts = name.getParts();
checkArgument(parts.size() <= 3, "Function name can only have 3 parts: " + name);
Expand Down
3 changes: 3 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.trino.spi.connector.SortItem;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableColumnsMetadata;
import io.trino.spi.connector.TableFunctionApplicationResult;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
import io.trino.spi.connector.TopNApplicationResult;
import io.trino.spi.expression.ConnectorExpression;
Expand Down Expand Up @@ -473,6 +474,8 @@ Optional<TopNApplicationResult<TableHandle>> applyTopN(
List<SortItem> sortItems,
Map<String, ColumnHandle> assignments);

Optional<TableFunctionApplicationResult<TableHandle>> applyTableFunction(Session session, TableFunctionHandle handle);

default void validateScan(Session session, TableHandle table) {}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import io.trino.spi.connector.SortItem;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableColumnsMetadata;
import io.trino.spi.connector.TableFunctionApplicationResult;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
import io.trino.spi.connector.TopNApplicationResult;
import io.trino.spi.expression.ConnectorExpression;
Expand Down Expand Up @@ -1649,6 +1650,18 @@ public Optional<TopNApplicationResult<TableHandle>> applyTopN(
result.isPrecalculateStatistics()));
}

@Override
public Optional<TableFunctionApplicationResult<TableHandle>> applyTableFunction(Session session, TableFunctionHandle handle)
{
CatalogName catalogName = handle.getCatalogName();
ConnectorMetadata metadata = getMetadata(session, catalogName);

return metadata.applyTableFunction(session.toConnectorSession(catalogName), handle.getFunctionHandle())
.map(result -> new TableFunctionApplicationResult<>(
new TableHandle(catalogName, result.getTableHandle(), handle.getTransactionHandle()),
result.getColumnHandles()));
}

private void verifyProjection(TableHandle table, List<ConnectorExpression> projections, List<Assignment> assignments, int expectedProjectionSize)
{
projections.forEach(projection -> requireNonNull(projection, "one of the projections is null"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.metadata;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.connector.CatalogName;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.ptf.ConnectorTableFunctionHandle;

import static java.util.Objects.requireNonNull;

public class TableFunctionHandle
{
private final CatalogName catalogName;
private final ConnectorTableFunctionHandle functionHandle;
private final ConnectorTransactionHandle transactionHandle;

@JsonCreator
public TableFunctionHandle(
@JsonProperty("catalogName") CatalogName catalogName,
@JsonProperty("functionHandle") ConnectorTableFunctionHandle functionHandle,
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.functionHandle = requireNonNull(functionHandle, "functionHandle is null");
this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
}

@JsonProperty
public CatalogName getCatalogName()
{
return catalogName;
}

@JsonProperty
public ConnectorTableFunctionHandle getFunctionHandle()
{
return functionHandle;
}

@JsonProperty
public ConnectorTransactionHandle getTransactionHandle()
{
return transactionHandle;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.metadata;

import io.trino.connector.CatalogName;
import io.trino.spi.ptf.ConnectorTableFunction;

import static java.util.Objects.requireNonNull;

public class TableFunctionMetadata
{
private final CatalogName catalogName;
private final ConnectorTableFunction function;

public TableFunctionMetadata(CatalogName catalogName, ConnectorTableFunction function)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.function = requireNonNull(function, "function is null");
}

public CatalogName getCatalogName()
{
return catalogName;
}

public ConnectorTableFunction getFunction()
{
return function;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.metadata;

import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.connector.CatalogName;
import io.trino.spi.ptf.ConnectorTableFunction;
import io.trino.sql.tree.QualifiedName;

import javax.annotation.concurrent.ThreadSafe;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.metadata.FunctionResolver.toPath;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

@ThreadSafe
public class TableFunctionRegistry
{
// catalog name in the original case; schema and function name in lowercase
private final Map<CatalogName, Map<SchemaFunctionName, TableFunctionMetadata>> tableFunctions = new ConcurrentHashMap<>();

public void addTableFunctions(CatalogName catalogName, Collection<ConnectorTableFunction> functions)
{
requireNonNull(catalogName, "catalogName is null");
requireNonNull(functions, "functions is null");

ImmutableMap.Builder<SchemaFunctionName, TableFunctionMetadata> builder = ImmutableMap.builder();
for (ConnectorTableFunction function : functions) {
builder.put(
new SchemaFunctionName(
function.getSchema().toLowerCase(ENGLISH),
Comment thread
Praveen2112 marked this conversation as resolved.
Outdated
function.getName().toLowerCase(ENGLISH)),
new TableFunctionMetadata(catalogName, function));
}
checkState(tableFunctions.putIfAbsent(catalogName, builder.buildOrThrow()) == null, "Table functions already registered for catalog: " + catalogName);
}

public void removeTableFunctions(CatalogName catalogName)
{
tableFunctions.remove(catalogName);
}

/**
* Resolve table function with given qualified name.
* Table functions are resolved case-insensitive for consistency with existing scalar function resolution.
*/
public TableFunctionMetadata resolve(Session session, QualifiedName qualifiedName)
{
for (CatalogSchemaFunctionName name : toPath(session, qualifiedName)) {
CatalogName catalogName = new CatalogName(name.getCatalogName());
Map<SchemaFunctionName, TableFunctionMetadata> catalogFunctions = tableFunctions.get(catalogName);
if (catalogFunctions != null) {
String lowercasedSchemaName = name.getSchemaFunctionName().getSchemaName().toLowerCase(ENGLISH);
String lowercasedFunctionName = name.getSchemaFunctionName().getFunctionName().toLowerCase(ENGLISH);
TableFunctionMetadata function = catalogFunctions.get(new SchemaFunctionName(lowercasedSchemaName, lowercasedFunctionName));
if (function != null) {
return function;
}
}
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import io.trino.metadata.StaticCatalogStoreConfig;
import io.trino.metadata.SystemFunctionBundle;
import io.trino.metadata.SystemSecurityMetadata;
import io.trino.metadata.TableFunctionRegistry;
import io.trino.metadata.TableProceduresPropertyManager;
import io.trino.metadata.TableProceduresRegistry;
import io.trino.metadata.TablePropertyManager;
Expand Down Expand Up @@ -391,6 +392,7 @@ protected void setup(Binder binder)
newExporter(binder).export(TypeOperatorsCache.class).withGeneratedName();
binder.bind(ProcedureRegistry.class).in(Scopes.SINGLETON);
binder.bind(TableProceduresRegistry.class).in(Scopes.SINGLETON);
binder.bind(TableFunctionRegistry.class).in(Scopes.SINGLETON);
binder.bind(PlannerContext.class).in(Scopes.SINGLETON);

// function
Expand Down
Loading