Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion presto-docs/src/main/sphinx/develop/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
Functions
=========

Functions in Presto can be implemented at Plugin and the Connector level.
The following two sections describe how to implement them.

Plugin Implementation
---------------------

The function framework is used to implement SQL functions. Presto includes a
number of built-in functions. In order to implement new functions, you can
write a plugin that returns one more functions from ``getFunctions()``:
write a plugin that returns one or more functions from ``getFunctions()``:

.. code-block:: java

Expand All @@ -31,10 +34,44 @@ Note that the ``ImmutableSet`` class is a utility class from Guava.
The ``getFunctions()`` method contains all of the classes for the functions
that we will implement below in this tutorial.

Functions registered using this method are available in the default
namespace ``presto.default``.

For a full example in the codebase, see either the ``presto-ml`` module for machine
learning functions or the ``presto-teradata-functions`` module for Teradata-compatible
functions, both in the root of the Presto source.

Connector Functions Implementation
----------------------------------

To implement new functions at the connector level, in your
connector implementation, override the ``getSystemFunctions()`` method that returns one
or more functions:

.. code-block:: java

public class ExampleFunctionsConnector
implements Connector
{
@Override
public Set<Class<?>> getSystemFunctions()
{
return ImmutableSet.<Class<?>>builder()
.add(ExampleNullFunction.class)
.add(IsNullFunction.class)
.add(IsEqualOrNullFunction.class)
.add(ExampleStringFunction.class)
.add(ExampleAverageFunction.class)
.build();
}
}

Functions registered using this interface are available in the namespace
``<catalog-name>.system`` where ``<catalog-name>`` is the catalog name used
in the Presto deployment for this connector type.

At present, connector level functions do not support Window functions and Scalar operators.

Scalar Function Implementation
------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.log.Logger;
import com.facebook.airlift.node.NodeInfo;
import com.facebook.presto.common.CatalogSchemaName;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.connector.informationSchema.InformationSchemaConnector;
Expand Down Expand Up @@ -82,6 +83,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.facebook.presto.metadata.FunctionExtractor.extractFunctions;
import static com.facebook.presto.spi.ConnectorId.createInformationSchemaConnectorId;
import static com.facebook.presto.spi.ConnectorId.createSystemTablesConnectorId;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -308,6 +310,10 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
}
connector.getConnectorCodecProvider().ifPresent(connectorCodecProvider -> connectorCodecManager.addConnectorCodecProvider(connectorId, connectorCodecProvider));
metadataManager.getProcedureRegistry().addProcedures(connectorId, connector.getProcedures());
Set<Class<?>> systemFunctions = connector.getSystemFunctions();
if (!systemFunctions.isEmpty()) {
metadataManager.registerConnectorFunctions(connectorId.getCatalogName(), extractFunctions(systemFunctions, new CatalogSchemaName(connectorId.getCatalogName(), "system")));
}

connector.getAccessControl()
.ifPresent(accessControl -> accessControlManager.addCatalogAccessControl(connectorId, accessControl));
Expand Down Expand Up @@ -390,6 +396,8 @@ private static class MaterializedConnector
private final ConnectorSplitManager splitManager;
private final Set<SystemTable> systemTables;
private final Set<Procedure> procedures;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove empty line

private final Set<Class<?>> functions;
private final ConnectorPageSourceProvider pageSourceProvider;
private final Optional<ConnectorPageSinkProvider> pageSinkProvider;
private final Optional<ConnectorIndexProvider> indexProvider;
Expand Down Expand Up @@ -512,6 +520,10 @@ public MaterializedConnector(ConnectorId connectorId, Connector connector)
List<PropertyMetadata<?>> analyzeProperties = connector.getAnalyzeProperties();
requireNonNull(analyzeProperties, "Connector %s returned a null analyze properties set");
this.analyzeProperties = ImmutableList.copyOf(analyzeProperties);

Set<Class<?>> systemFunctions = connector.getSystemFunctions();
requireNonNull(systemFunctions, "Connector %s returned a null system function set");
this.functions = ImmutableSet.copyOf(systemFunctions);
}

public ConnectorId getConnectorId()
Expand Down Expand Up @@ -539,6 +551,11 @@ public Set<Procedure> getProcedures()
return procedures;
}

public Set<Class<?>> getSystemFunctions()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name "SystemFunction" seems a bit misleading to me. It suggests they are Presto system functions. How about just call it getFunctions()? cc @tdcmeehan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think system functions makes sense, because they will be added to the following catalog-schema: <CATALOG>.system.fn. It's present on the Connector interface, which makes it obvious that this is not adding a Presto-wide system function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @yingsu00 here that "system" is a bit confusing. It makes me think that these will be system related functions like for getting catalog metadata or something. I also prefer getFunctions().

Copy link
Contributor

@tdcmeehan tdcmeehan Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we need naming that makes the following clear:

  • These are functions
  • They are tied to the underlying connector in some intrinsic way, for example, they are functions that may only be executed on data from the underlying connector or which is bound to be ingested by the underlying connector
  • They will be added to a specific schema, namely, the system schema. e.g. iceberg.system.date.

I think getFunctions is too general because it's not clear that these functions only make sense in the context of the underlying connector. getConnectorFunctions, or getConnectorSystemFunctions perhaps?

{
return functions;
}

public ConnectorPageSourceProvider getPageSourceProvider()
{
return pageSourceProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,15 @@ public BuiltInTypeAndFunctionNamespaceManager(
FunctionsConfig functionsConfig,
Set<Type> types,
FunctionAndTypeManager functionAndTypeManager)
{
this(blockEncodingSerde, functionsConfig, types, functionAndTypeManager, true);
}
public BuiltInTypeAndFunctionNamespaceManager(
BlockEncodingSerde blockEncodingSerde,
FunctionsConfig functionsConfig,
Set<Type> types,
FunctionAndTypeManager functionAndTypeManager,
boolean registerFunctions)
{
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null");
this.magicLiteralFunction = new MagicLiteralFunction(blockEncodingSerde);
Expand Down Expand Up @@ -605,7 +614,9 @@ public BuiltInTypeAndFunctionNamespaceManager(
.expireAfterWrite(1, HOURS)
.build(CacheLoader.from(this::instantiateParametricType));

registerBuiltInFunctions(getBuiltInFunctions(functionsConfig));
if (registerFunctions) {
registerBuiltInFunctions(getBuiltInFunctions(functionsConfig));
}
registerBuiltInTypes(functionsConfig);

for (Type type : requireNonNull(types, "types is null")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ public void registerBuiltInFunctions(List<? extends SqlFunction> functionInfos)
delegate.registerBuiltInFunctions(functionInfos);
}

@Override
public void registerConnectorFunctions(String catalogName, List<? extends SqlFunction> functionInfos)
{
delegate.registerConnectorFunctions(catalogName, functionInfos);
}

@Override
public List<String> listSchemaNames(Session session, String catalogName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ public class FunctionAndTypeManager
private final AtomicReference<TypeManager> servingTypeManager;
private final AtomicReference<Supplier<Map<String, ParametricType>>> servingTypeManagerParametricTypesSupplier;
private final BuiltInPluginFunctionNamespaceManager builtInPluginFunctionNamespaceManager;
private final FunctionsConfig functionsConfig;
private final Set<Type> types;

@Inject
public FunctionAndTypeManager(
Expand All @@ -162,6 +164,8 @@ public FunctionAndTypeManager(
{
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.functionsConfig = requireNonNull(functionsConfig, "functionsConfig is null");
this.types = requireNonNull(types, "types is null");
this.builtInTypeAndFunctionNamespaceManager = new BuiltInTypeAndFunctionNamespaceManager(blockEncodingSerde, functionsConfig, types, this);
this.functionNamespaceManagers.put(JAVA_BUILTIN_NAMESPACE.getCatalogName(), builtInTypeAndFunctionNamespaceManager);
this.functionInvokerProvider = new FunctionInvokerProvider(this);
Expand Down Expand Up @@ -449,6 +453,16 @@ public void registerPluginFunctions(List<? extends SqlFunction> functions)
builtInPluginFunctionNamespaceManager.registerPluginFunctions(functions);
}

public void registerConnectorFunctions(String catalogName, List<? extends SqlFunction> functions)
{
FunctionNamespaceManager builtInPluginFunctionNamespaceManager = functionNamespaceManagers.get(catalogName);
if (builtInPluginFunctionNamespaceManager == null) {
builtInPluginFunctionNamespaceManager = new BuiltInTypeAndFunctionNamespaceManager(blockEncodingSerde, functionsConfig, types, this, false);
addFunctionNamespace(catalogName, builtInPluginFunctionNamespaceManager);
}
((BuiltInTypeAndFunctionNamespaceManager) builtInPluginFunctionNamespaceManager).registerBuiltInFunctions(functions);
}

/**
* likePattern / escape is an opportunistic optimization push down to function namespace managers.
* Not all function namespace managers can handle it, thus the returned function list could
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,21 @@
import static com.facebook.presto.metadata.BuiltInTypeAndFunctionNamespaceManager.JAVA_BUILTIN_NAMESPACE;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.String.format;

public final class FunctionExtractor
{
private FunctionExtractor() {}

public static List<SqlFunction> extractFunctions(Collection<Class<?>> classes)
{
return extractFunctions(classes, JAVA_BUILTIN_NAMESPACE);
}

public static List<SqlFunction> extractFunctions(Collection<Class<?>> classes, CatalogSchemaName functionNamespace)
{
return classes.stream()
.map(FunctionExtractor::extractFunctions)
.map(c -> extractFunctions(c, functionNamespace))
.flatMap(Collection::stream)
.collect(toImmutableList());
}
Expand All @@ -53,17 +59,21 @@ public static List<? extends SqlFunction> extractFunctions(Class<?> clazz)
public static List<? extends SqlFunction> extractFunctions(Class<?> clazz, CatalogSchemaName defaultNamespace)
{
if (WindowFunction.class.isAssignableFrom(clazz)) {
checkArgument(defaultNamespace.equals(JAVA_BUILTIN_NAMESPACE), format("Connector specific Window functions are not supported: Class [%s], Namespace [%s]", clazz.getName(), defaultNamespace));
@SuppressWarnings("unchecked")
Class<? extends WindowFunction> windowClazz = (Class<? extends WindowFunction>) clazz;
return WindowAnnotationsParser.parseFunctionDefinition(windowClazz);
}

if (clazz.isAnnotationPresent(AggregationFunction.class)) {
return SqlAggregationFunction.createFunctionsByAnnotations(clazz);
return SqlAggregationFunction.createFunctionsByAnnotations(clazz, defaultNamespace);
}

if (clazz.isAnnotationPresent(ScalarFunction.class) ||
clazz.isAnnotationPresent(ScalarOperator.class)) {
if (clazz.isAnnotationPresent(ScalarFunction.class)) {
return ScalarFromAnnotationsParser.parseFunctionDefinition(clazz, defaultNamespace);
}
if (clazz.isAnnotationPresent(ScalarOperator.class)) {
checkArgument(defaultNamespace.equals(JAVA_BUILTIN_NAMESPACE), format("Connector specific Scalar Operator functions are not supported: Class [%s], Namespace [%s]", clazz.getName(), defaultNamespace));
return ScalarFromAnnotationsParser.parseFunctionDefinition(clazz);
}

Expand All @@ -72,9 +82,9 @@ public static List<? extends SqlFunction> extractFunctions(Class<?> clazz, Catal
}

List<SqlFunction> scalarFunctions = ImmutableList.<SqlFunction>builder()
.addAll(ScalarFromAnnotationsParser.parseFunctionDefinitions(clazz))
.addAll(ScalarFromAnnotationsParser.parseFunctionDefinitions(clazz, defaultNamespace))
.addAll(SqlInvokedScalarFromAnnotationsParser.parseFunctionDefinitions(clazz, defaultNamespace))
.addAll(CodegenScalarFromAnnotationsParser.parseFunctionDefinitions(clazz))
.addAll(CodegenScalarFromAnnotationsParser.parseFunctionDefinitions(clazz, defaultNamespace))
.build();
checkArgument(!scalarFunctions.isEmpty(), "Class [%s] does not define any scalar functions", clazz.getName());
return scalarFunctions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public interface Metadata

void registerBuiltInFunctions(List<? extends SqlFunction> functions);

void registerConnectorFunctions(String catalogName, List<? extends SqlFunction> functionInfos);

List<String> listSchemaNames(Session session, String catalogName);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ public void registerBuiltInFunctions(List<? extends SqlFunction> functionInfos)
functionAndTypeManager.registerBuiltInFunctions(functionInfos);
}

@Override
public void registerConnectorFunctions(String catalogName, List<? extends SqlFunction> functionInfos)
{
functionAndTypeManager.registerConnectorFunctions(catalogName, functionInfos);
}

@Override
public List<String> listSchemaNames(Session session, String catalogName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.metadata;

import com.facebook.presto.common.CatalogSchemaName;
import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.operator.aggregation.AggregationFromAnnotationsParser;
Expand Down Expand Up @@ -46,7 +47,12 @@ public static List<SqlAggregationFunction> createFunctionByAnnotations(Class<?>

public static List<SqlAggregationFunction> createFunctionsByAnnotations(Class<?> aggregationDefinition)
{
return AggregationFromAnnotationsParser.parseFunctionDefinitions(aggregationDefinition)
return createFunctionsByAnnotations(aggregationDefinition, JAVA_BUILTIN_NAMESPACE);
}

public static List<SqlAggregationFunction> createFunctionsByAnnotations(Class<?> aggregationDefinition, CatalogSchemaName functionNamespace)
{
return AggregationFromAnnotationsParser.parseFunctionDefinitions(aggregationDefinition, functionNamespace)
.stream()
.map(x -> (SqlAggregationFunction) x)
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.operator.aggregation;

import com.facebook.presto.common.CatalogSchemaName;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.operator.ParametricImplementationsGroup;
import com.facebook.presto.operator.annotations.FunctionsParserHelper;
Expand All @@ -34,6 +35,7 @@
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.metadata.BuiltInTypeAndFunctionNamespaceManager.JAVA_BUILTIN_NAMESPACE;
import static com.facebook.presto.operator.aggregation.AggregationImplementation.Parser.parseImplementation;
import static com.facebook.presto.operator.annotations.FunctionsParserHelper.parseDescription;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -54,7 +56,7 @@ public static ParametricAggregation parseFunctionDefinitionWithTypesConstraint(C
{
requireNonNull(returnType, "returnType is null");
requireNonNull(argumentTypes, "argumentTypes is null");
for (ParametricAggregation aggregation : parseFunctionDefinitions(clazz)) {
for (ParametricAggregation aggregation : parseFunctionDefinitions(clazz, JAVA_BUILTIN_NAMESPACE)) {
if (aggregation.getSignature().getReturnType().equals(returnType) &&
aggregation.getSignature().getArgumentTypes().equals(argumentTypes)) {
return aggregation;
Expand All @@ -64,6 +66,11 @@ public static ParametricAggregation parseFunctionDefinitionWithTypesConstraint(C
}

public static List<ParametricAggregation> parseFunctionDefinitions(Class<?> aggregationDefinition)
{
return parseFunctionDefinitions(aggregationDefinition, JAVA_BUILTIN_NAMESPACE);
}

public static List<ParametricAggregation> parseFunctionDefinitions(Class<?> aggregationDefinition, CatalogSchemaName functionNamespace)
{
AggregationFunction aggregationAnnotation = aggregationDefinition.getAnnotation(AggregationFunction.class);
requireNonNull(aggregationAnnotation, "aggregationAnnotation is null");
Expand All @@ -76,7 +83,7 @@ public static List<ParametricAggregation> parseFunctionDefinitions(Class<?> aggr
for (Method outputFunction : getOutputFunctions(aggregationDefinition, stateClass)) {
for (Method inputFunction : getInputFunctions(aggregationDefinition, stateClass)) {
for (AggregationHeader header : parseHeaders(aggregationDefinition, outputFunction)) {
AggregationImplementation onlyImplementation = parseImplementation(aggregationDefinition, header, stateClass, inputFunction, outputFunction, combineFunction, aggregationStateSerializerFactory);
AggregationImplementation onlyImplementation = parseImplementation(aggregationDefinition, header, stateClass, inputFunction, outputFunction, combineFunction, aggregationStateSerializerFactory, functionNamespace);
ParametricImplementationsGroup<AggregationImplementation> implementations = ParametricImplementationsGroup.of(onlyImplementation);
builder.add(new ParametricAggregation(implementations.getSignature(), header, implementations));
}
Expand All @@ -97,7 +104,7 @@ public static ParametricAggregation parseFunctionDefinition(Class<?> aggregation
Optional<Method> aggregationStateSerializerFactory = getAggregationStateSerializerFactory(aggregationDefinition, stateClass);
Method outputFunction = getOnlyElement(getOutputFunctions(aggregationDefinition, stateClass));
for (Method inputFunction : getInputFunctions(aggregationDefinition, stateClass)) {
AggregationImplementation implementation = parseImplementation(aggregationDefinition, header, stateClass, inputFunction, outputFunction, combineFunction, aggregationStateSerializerFactory);
AggregationImplementation implementation = parseImplementation(aggregationDefinition, header, stateClass, inputFunction, outputFunction, combineFunction, aggregationStateSerializerFactory, JAVA_BUILTIN_NAMESPACE);
implementationsBuilder.addImplementation(implementation);
}
}
Expand Down
Loading
Loading