Skip to content

Commit c7a444c

Browse files
mohsakakasiafixin-zhang2
committed
Add SPI support for TableFunctions
Changes adapted from trino/PR#11336, 12350, 12407, 12476, 12531, 12813 Original commits: 395fd91c6480a993241eeabd9599873d0d05b24b 998315075343beecef962657b8cbf440d53cc13b 712b8e98ff8a726f95295ac539159fc532628273 131dc44af97b31a2fa8115028d98d06671641bfa 0da095e14b0855f89af3c4f254a5a60280fc7170 5310671f80291394b12ba2ea746e4e60051aaff4 18bb60262cb0850cf839c2b20b434344921f5122 4a7d72afb64f93a9748a4c6b4defc2d42bbae000 Author: kasiafi Modifications were made to adapt to Presto including: Removal of ConnectorExpression. Co-authored-by: kasiafi <[email protected]> Co-authored-by: Xin Zhang <[email protected]>
1 parent 9548caf commit c7a444c

34 files changed

+1913
-2
lines changed

presto-function-server/src/main/java/com/facebook/presto/server/FunctionServerModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.facebook.presto.functionNamespace.JsonBasedUdfFunctionMetadata;
2727
import com.facebook.presto.metadata.FunctionAndTypeManager;
2828
import com.facebook.presto.metadata.HandleResolver;
29+
import com.facebook.presto.metadata.TableFunctionRegistry;
2930
import com.facebook.presto.sql.analyzer.FeaturesConfig;
3031
import com.facebook.presto.sql.analyzer.FunctionsConfig;
3132
import com.facebook.presto.transaction.NoOpTransactionManager;
@@ -53,6 +54,7 @@ protected void setup(Binder binder)
5354
{
5455
jaxrsBinder(binder).bind(FunctionResource.class);
5556
binder.bind(FunctionAndTypeManager.class).in(Scopes.SINGLETON);
57+
binder.bind(TableFunctionRegistry.class).in(Scopes.SINGLETON);
5658
binder.bind(TransactionManager.class).to(NoOpTransactionManager.class).in(Scopes.SINGLETON);
5759
binder.bind(HandleResolver.class).in(Scopes.SINGLETON);
5860
install(new InternalCommunicationModule());

presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
5252
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
5353
import com.facebook.presto.spi.connector.ConnectorSplitManager;
54+
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
5455
import com.facebook.presto.spi.procedure.Procedure;
5556
import com.facebook.presto.spi.relation.DeterminismEvaluator;
5657
import com.facebook.presto.spi.relation.DomainTranslator;
@@ -323,6 +324,7 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
323324
metadataManager.getSchemaPropertyManager().addProperties(connectorId, connector.getSchemaProperties());
324325
metadataManager.getAnalyzePropertyManager().addProperties(connectorId, connector.getAnalyzeProperties());
325326
metadataManager.getSessionPropertyManager().addConnectorSessionProperties(connectorId, connector.getSessionProperties());
327+
metadataManager.getFunctionAndTypeManager().getTableFunctionRegistry().addTableFunctions(connectorId, connector.getTableFunctions());
326328
}
327329

328330
public synchronized void dropConnection(String catalogName)
@@ -334,6 +336,7 @@ public synchronized void dropConnection(String catalogName)
334336
removeConnectorInternal(connectorId);
335337
removeConnectorInternal(createInformationSchemaConnectorId(connectorId));
336338
removeConnectorInternal(createSystemTablesConnectorId(connectorId));
339+
metadataManager.getFunctionAndTypeManager().getTableFunctionRegistry().removeTableFunctions(connectorId);
337340
});
338341
}
339342

@@ -398,6 +401,7 @@ private static class MaterializedConnector
398401
private final Set<Procedure> procedures;
399402

400403
private final Set<Class<?>> functions;
404+
private final Set<ConnectorTableFunction> connectorTableFunctions;
401405
private final ConnectorPageSourceProvider pageSourceProvider;
402406
private final Optional<ConnectorPageSinkProvider> pageSinkProvider;
403407
private final Optional<ConnectorIndexProvider> indexProvider;
@@ -427,6 +431,10 @@ public MaterializedConnector(ConnectorId connectorId, Connector connector)
427431
requireNonNull(procedures, "Connector %s returned a null procedures set");
428432
this.procedures = ImmutableSet.copyOf(procedures);
429433

434+
Set<ConnectorTableFunction> connectorTableFunctions = connector.getTableFunctions();
435+
requireNonNull(connectorTableFunctions, format("Connector '%s' returned a null table functions set", connectorId));
436+
this.connectorTableFunctions = ImmutableSet.copyOf(connectorTableFunctions);
437+
430438
ConnectorPageSourceProvider connectorPageSourceProvider = null;
431439
try {
432440
connectorPageSourceProvider = connector.getPageSourceProvider();
@@ -615,5 +623,10 @@ public Optional<ConnectorCodecProvider> getConnectorCodecProvider()
615623
{
616624
return connectorCodecProvider;
617625
}
626+
627+
public Set<ConnectorTableFunction> getTableFunctions()
628+
{
629+
return connectorTableFunctions;
630+
}
618631
}
619632
}

presto-main-base/src/main/java/com/facebook/presto/metadata/FunctionAndTypeManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public class FunctionAndTypeManager
133133
{
134134
private static final Pattern DEFAULT_NAMESPACE_PREFIX_PATTERN = Pattern.compile("[a-z]+\\.[a-z]+");
135135
private final TransactionManager transactionManager;
136+
private final TableFunctionRegistry tableFunctionRegistry;
136137
private final BlockEncodingSerde blockEncodingSerde;
137138
private final BuiltInTypeAndFunctionNamespaceManager builtInTypeAndFunctionNamespaceManager;
138139
private final FunctionInvokerProvider functionInvokerProvider;
@@ -156,13 +157,15 @@ public class FunctionAndTypeManager
156157
@Inject
157158
public FunctionAndTypeManager(
158159
TransactionManager transactionManager,
160+
TableFunctionRegistry tableFunctionRegistry,
159161
BlockEncodingSerde blockEncodingSerde,
160162
FeaturesConfig featuresConfig,
161163
FunctionsConfig functionsConfig,
162164
HandleResolver handleResolver,
163165
Set<Type> types)
164166
{
165167
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
168+
this.tableFunctionRegistry = requireNonNull(tableFunctionRegistry, "tableFunctionRegistry is null");
166169
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
167170
this.functionsConfig = requireNonNull(functionsConfig, "functionsConfig is null");
168171
this.types = requireNonNull(types, "types is null");
@@ -192,6 +195,7 @@ public static FunctionAndTypeManager createTestFunctionAndTypeManager()
192195
{
193196
return new FunctionAndTypeManager(
194197
createTestTransactionManager(),
198+
new TableFunctionRegistry(),
195199
new BlockEncodingManager(),
196200
new FeaturesConfig(),
197201
new FunctionsConfig(),
@@ -421,6 +425,11 @@ public void addFunctionNamespaceFactory(FunctionNamespaceManagerFactory factory)
421425
handleResolver.addFunctionNamespace(name, factory.getHandleResolver());
422426
}
423427

428+
public TableFunctionRegistry getTableFunctionRegistry()
429+
{
430+
return tableFunctionRegistry;
431+
}
432+
424433
public void loadTypeManager(String typeManagerName)
425434
{
426435
requireNonNull(typeManagerName, "typeManagerName is null");

presto-main-base/src/main/java/com/facebook/presto/metadata/MetadataManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ public static MetadataManager createTestMetadataManager(TransactionManager trans
243243
{
244244
BlockEncodingManager blockEncodingManager = new BlockEncodingManager();
245245
return new MetadataManager(
246-
new FunctionAndTypeManager(transactionManager, blockEncodingManager, featuresConfig, functionsConfig, new HandleResolver(), ImmutableSet.of()),
246+
new FunctionAndTypeManager(transactionManager, new TableFunctionRegistry(), blockEncodingManager, featuresConfig, functionsConfig, new HandleResolver(), ImmutableSet.of()),
247247
blockEncodingManager,
248248
createTestingSessionPropertyManager(),
249249
new SchemaPropertyManager(),
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.metadata;
15+
16+
import com.facebook.presto.spi.ConnectorId;
17+
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
18+
19+
import static java.util.Objects.requireNonNull;
20+
21+
public class TableFunctionMetadata
22+
{
23+
private final ConnectorId connectorId;
24+
private final ConnectorTableFunction function;
25+
26+
public TableFunctionMetadata(ConnectorId connectorId, ConnectorTableFunction function)
27+
{
28+
this.connectorId = requireNonNull(connectorId, "connectorId is null");
29+
this.function = requireNonNull(function, "function is null");
30+
}
31+
32+
public ConnectorId getConnectorId()
33+
{
34+
return connectorId;
35+
}
36+
37+
public ConnectorTableFunction getFunction()
38+
{
39+
return function;
40+
}
41+
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.metadata;
15+
16+
import com.facebook.presto.Session;
17+
import com.facebook.presto.spi.ConnectorId;
18+
import com.facebook.presto.spi.PrestoException;
19+
import com.facebook.presto.spi.StandardErrorCode;
20+
import com.facebook.presto.spi.function.CatalogSchemaFunctionName;
21+
import com.facebook.presto.spi.function.SchemaFunctionName;
22+
import com.facebook.presto.spi.function.table.ArgumentSpecification;
23+
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
24+
import com.facebook.presto.spi.function.table.TableArgumentSpecification;
25+
import com.facebook.presto.sql.analyzer.SemanticException;
26+
import com.facebook.presto.sql.tree.QualifiedName;
27+
import com.google.common.collect.ImmutableList;
28+
import com.google.common.collect.ImmutableMap;
29+
import com.google.errorprone.annotations.ThreadSafe;
30+
31+
import java.util.Collection;
32+
import java.util.HashSet;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Optional;
36+
import java.util.Set;
37+
import java.util.concurrent.ConcurrentHashMap;
38+
39+
import static com.facebook.presto.spi.StandardErrorCode.SESSION_CATALOG_NOT_SET;
40+
import static com.facebook.presto.spi.function.table.Preconditions.checkArgument;
41+
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.CATALOG_NOT_SPECIFIED;
42+
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.SCHEMA_NOT_SPECIFIED;
43+
import static com.google.common.base.Preconditions.checkState;
44+
import static java.util.Locale.ENGLISH;
45+
import static java.util.Objects.requireNonNull;
46+
47+
@ThreadSafe
48+
public class TableFunctionRegistry
49+
{
50+
// catalog name in the original case; schema and function name in lowercase
51+
private final Map<ConnectorId, Map<SchemaFunctionName, TableFunctionMetadata>> tableFunctions = new ConcurrentHashMap<>();
52+
53+
public void addTableFunctions(ConnectorId catalogName, Collection<ConnectorTableFunction> functions)
54+
{
55+
requireNonNull(catalogName, "catalogName is null");
56+
requireNonNull(functions, "functions is null");
57+
checkState(!tableFunctions.containsKey(catalogName), "Table functions already registered for catalog: " + catalogName);
58+
59+
functions.stream()
60+
.forEach(TableFunctionRegistry::validateTableFunction);
61+
62+
ImmutableMap.Builder<SchemaFunctionName, TableFunctionMetadata> builder = ImmutableMap.builder();
63+
for (ConnectorTableFunction function : functions) {
64+
builder.put(
65+
new SchemaFunctionName(
66+
function.getSchema().toLowerCase(ENGLISH),
67+
function.getName().toLowerCase(ENGLISH)),
68+
new TableFunctionMetadata(catalogName, function));
69+
}
70+
tableFunctions.putIfAbsent(catalogName, builder.buildOrThrow());
71+
}
72+
73+
public void removeTableFunctions(ConnectorId catalogName)
74+
{
75+
tableFunctions.remove(catalogName);
76+
}
77+
78+
public static List<CatalogSchemaFunctionName> toPath(Session session, QualifiedName name)
79+
{
80+
List<String> parts = name.getParts();
81+
if (parts.size() > 3) {
82+
throw new PrestoException(StandardErrorCode.FUNCTION_NOT_FOUND, "Invalid function name: " + name);
83+
}
84+
if (parts.size() == 3) {
85+
return ImmutableList.of(new CatalogSchemaFunctionName(parts.get(0), parts.get(1), parts.get(2)));
86+
}
87+
88+
if (parts.size() == 2) {
89+
String currentCatalog = session.getCatalog()
90+
.orElseThrow(() -> new PrestoException(SESSION_CATALOG_NOT_SET, "Session default catalog must be set to resolve a partial function name: " + name));
91+
return ImmutableList.of(new CatalogSchemaFunctionName(currentCatalog, parts.get(0), parts.get(1)));
92+
}
93+
94+
ImmutableList.Builder<CatalogSchemaFunctionName> names = ImmutableList.builder();
95+
96+
String currentCatalog = session.getCatalog()
97+
.orElseThrow(() -> new SemanticException(CATALOG_NOT_SPECIFIED, "Catalog must be specified when session catalog is not set"));
98+
String currentSchema = session.getSchema()
99+
.orElseThrow(() -> new SemanticException(SCHEMA_NOT_SPECIFIED, "Schema must be specified when session schema is not set"));
100+
101+
// add resolved path items
102+
names.add(new CatalogSchemaFunctionName(currentCatalog, currentSchema, parts.get(0)));
103+
return names.build();
104+
}
105+
106+
/**
107+
* Resolve table function with given qualified name.
108+
* Table functions are resolved case-insensitive for consistency with existing scalar function resolution.
109+
*/
110+
public Optional<TableFunctionMetadata> resolve(Session session, QualifiedName qualifiedName)
111+
{
112+
for (CatalogSchemaFunctionName name : toPath(session, qualifiedName)) {
113+
ConnectorId connectorId = new ConnectorId(name.getCatalogName());
114+
Map<SchemaFunctionName, TableFunctionMetadata> catalogFunctions = tableFunctions.get(connectorId);
115+
if (catalogFunctions != null) {
116+
String lowercasedSchemaName = name.getSchemaFunctionName().getSchemaName().toLowerCase(ENGLISH);
117+
String lowercasedFunctionName = name.getSchemaFunctionName().getFunctionName().toLowerCase(ENGLISH);
118+
TableFunctionMetadata function = catalogFunctions.get(new SchemaFunctionName(lowercasedSchemaName, lowercasedFunctionName));
119+
if (function != null) {
120+
return Optional.of(function);
121+
}
122+
}
123+
}
124+
125+
return Optional.empty();
126+
}
127+
128+
private static void validateTableFunction(ConnectorTableFunction tableFunction)
129+
{
130+
requireNonNull(tableFunction, "tableFunction is null");
131+
requireNonNull(tableFunction.getName(), "table function name is null");
132+
requireNonNull(tableFunction.getSchema(), "table function schema name is null");
133+
requireNonNull(tableFunction.getArguments(), "table function arguments is null");
134+
requireNonNull(tableFunction.getReturnTypeSpecification(), "table function returnTypeSpecification is null");
135+
136+
checkArgument(!tableFunction.getName().isEmpty(), "table function name is empty");
137+
checkArgument(!tableFunction.getSchema().isEmpty(), "table function schema name is empty");
138+
139+
Set<String> argumentNames = new HashSet<>();
140+
int tableArgumentsWithRowSemantics = 0;
141+
for (ArgumentSpecification specification : tableFunction.getArguments()) {
142+
if (!argumentNames.add(specification.getName())) {
143+
throw new IllegalArgumentException("duplicate argument name: " + specification.getName());
144+
}
145+
146+
if (specification instanceof TableArgumentSpecification &&
147+
((TableArgumentSpecification) specification).isRowSemantics()) {
148+
tableArgumentsWithRowSemantics++;
149+
}
150+
}
151+
checkArgument(tableArgumentsWithRowSemantics <= 1, "more than one table argument with row semantics");
152+
// The 'keep when empty' or 'prune when empty' property must not be explicitly specified for a table argument with row semantics.
153+
// Such a table argument is implicitly 'prune when empty'. The TableArgumentSpecification.Builder enforces the 'prune when empty' property
154+
// for a table argument with row semantics.
155+
}
156+
}

presto-main-base/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
import com.facebook.presto.metadata.QualifiedTablePrefix;
110110
import com.facebook.presto.metadata.SchemaPropertyManager;
111111
import com.facebook.presto.metadata.Split;
112+
import com.facebook.presto.metadata.TableFunctionRegistry;
112113
import com.facebook.presto.metadata.TablePropertyManager;
113114
import com.facebook.presto.nodeManager.PluginNodeManager;
114115
import com.facebook.presto.operator.Driver;
@@ -433,7 +434,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
433434
featuresConfig.setIgnoreStatsCalculatorFailures(false);
434435

435436
this.metadata = new MetadataManager(
436-
new FunctionAndTypeManager(transactionManager, blockEncodingManager, featuresConfig, functionsConfig, new HandleResolver(), ImmutableSet.of()),
437+
new FunctionAndTypeManager(transactionManager, new TableFunctionRegistry(), blockEncodingManager, featuresConfig, functionsConfig, new HandleResolver(), ImmutableSet.of()),
437438
blockEncodingManager,
438439
createTestingSessionPropertyManager(
439440
new SystemSessionProperties(

0 commit comments

Comments
 (0)