Skip to content

Commit

Permalink
[apache#6003][pulsar-functions] Possibility to add builtin Functions (a…
Browse files Browse the repository at this point in the history
…pache#6895)

Master Issue: apache#6003

### Motivation

This pull request implements the possibility to add builtin functions (in the same way of the build in connectors). 

The builtin function must include a `pulsar-io.yml` file with the following content

```yml
name: <function-name>
description: <function-desciption>
functionClass: <function-class>
```

e.g.
```yml
name: test-function
description: test function description
functionClass: it.oncode.pulsar.functions.TestFunction
```

it is possible to create a builtin function in the same way of the builtin sinks/sources.

Example in scala
```scala
val functionConfigBuilder: FunctionConfigBuilder = FunctionConfig.builder()
    val function =
      functionConfigBuilder
        .tenant("public")
        .namespace("default")
        .jar("builtin://test-function")
        .name("test-function-name")
        .className("it.oncode.pulsar.functions.TestFunction")
        .inputs(Seq("channel_in").asJava)
        .output("channel_out")
        .runtime(FunctionConfig.Runtime.JAVA)
        .build()

Pulsar.admin.functions
      .createFunction(function, null)

```

Function folder to be specified in the `conf/functions_worker.yml` conf file

e.g.
`functionsDirectory: ./functions`

Function package must be in `*.nar` format like for source/sink connectors

### Modifications

I modified the `pulsar-function-utils`, `pulsar-functions-worker` and `pulsar-common` modules on the basis of the built in connectors implementation.
Also `Function.proto` has been modified in order to include the `builtin` property

#### What this MR does not include

- modification of pulsar-admin to fetch the available buildin functions
- the related documentation

This is a feature that is critical for us, I think we could open an issue for the remaining points and consider to merge this PR.
  • Loading branch information
oncodeit authored and cdbartholomew committed Jul 24, 2020
1 parent 8a362c0 commit e978244
Show file tree
Hide file tree
Showing 19 changed files with 364 additions and 35 deletions.
1 change: 1 addition & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,4 @@ brokerClientTrustCertsFilePath:
########################

connectorsDirectory: ./connectors
functionsDirectory: ./functions
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.functions;

import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Basic information about a Pulsar function.
*/
@Data
@NoArgsConstructor
public class FunctionDefinition {

/**
* The name of the function type.
*/
private String name;

/**
* Description to be used for user help.
*/
private String description;

/**
* The class name for the function implementation.
*
* <p>If not defined, it will be assumed this function cannot act as a data.
*/
private String functionClass;
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ public class NarClassLoader extends URLClassLoader {

public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars,
String narExtractionDirectory) throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
try {
return new NarClassLoader(unpacked, additionalJars, NarClassLoader.class.getClassLoader());
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IOException(e);
}
return NarClassLoader.getFromArchive(narPath, additionalJars, NarClassLoader.class.getClassLoader(),
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
}

public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars) throws IOException {
return NarClassLoader.getFromArchive(narPath, additionalJars, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
}

public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars, ClassLoader parent,
Expand Down
24 changes: 14 additions & 10 deletions pulsar-functions/localrun-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand Down Expand Up @@ -71,10 +71,11 @@
</filter>
</filters>
<relocations>
<relocation>
<!-- Netty cannot be shaded, this is causing java.lang.NoSuchMethodError -->
<!-- <relocation>
<pattern>com.typesafe.netty</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.typesafe.netty</shadedPattern>
</relocation>
</relocation> -->
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.google</shadedPattern>
Expand Down Expand Up @@ -179,10 +180,11 @@
<pattern>org.apache.distributedlog</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog</shadedPattern>
</relocation>
<relocation>
<!-- Jackson cannot be shaded, this is causing java.lang.NoSuchMethodError when calling getThreadLocalYaml-->
<!-- <relocation>
<pattern>com.fasterxml</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.fasterxml</shadedPattern>
</relocation>
</relocation> -->
<relocation>
<pattern>org.inferred</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.inferred</shadedPattern>
Expand All @@ -199,10 +201,11 @@
<pattern>dlshade</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.dlshade</shadedPattern>
</relocation>
<relocation>
<!-- This refers to an older version of Jackson -->
<!-- <relocation>
<pattern>org.codehaus.jackson</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.jackson</shadedPattern>
</relocation>
</relocation> -->
<relocation>
<pattern>net.java.dev.jna</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.java.dev.jna</shadedPattern>
Expand Down Expand Up @@ -315,10 +318,11 @@
<pattern>com.beust</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.beust</shadedPattern>
</relocation>
<relocation>
<!-- Netty cannot be shaded, this is causing java.lang.NoSuchMethodError -->
<!-- <relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.netty</shadedPattern>
</relocation>
</relocation> -->
<relocation>
<pattern>org.hamcrest</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.hamcrest</shadedPattern>
Expand Down Expand Up @@ -357,4 +361,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.functions.Functions;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.utils.io.Connectors;
import org.apache.pulsar.functions.worker.WorkerConfig;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -212,6 +215,14 @@ public void start(boolean blocking) throws Exception {
.getProtectionDomain().getCodeSource().getLocation().getFile();
}

boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
if(isBuiltin){
WorkerConfig workerConfig = WorkerConfig.load(System.getenv("PULSAR_HOME") + "/conf/functions_worker.yml");
Functions functions = FunctionUtils.searchForFunctions(System.getenv("PULSAR_HOME") + workerConfig.getFunctionsDirectory().replaceFirst("^.", ""));
String functionType = functionConfig.getJar().replaceFirst("^builtin://", "");
userCodeFile = functions.getFunctions().get(functionType).toString();
}

if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
classLoader = FunctionConfigUtils.validate(functionConfig, file);
Expand Down Expand Up @@ -440,9 +451,10 @@ private String isBuiltInSource(String sourceType) throws IOException {
// Validate the connector source type from the locally available connectors
Connectors connectors = getConnectors();

if (connectors.getSources().containsKey(sourceType)) {
String source = sourceType.replaceFirst("^builtin://", "");
if (connectors.getSources().containsKey(source)) {
// Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
return connectors.getSources().get(sourceType).toString();
return connectors.getSources().get(source).toString();
} else {
return null;
}
Expand All @@ -452,9 +464,10 @@ private String isBuiltInSink(String sinkType) throws IOException {
// Validate the connector source type from the locally available connectors
Connectors connectors = getConnectors();

if (connectors.getSinks().containsKey(sinkType)) {
String sink = sinkType.replaceFirst("^builtin://", "");
if (connectors.getSinks().containsKey(sink)) {
// Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
return connectors.getSinks().get(sinkType).toString();
return connectors.getSinks().get(sink).toString();
} else {
return null;
}
Expand Down
3 changes: 3 additions & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ message FunctionDetails {
string runtimeFlags = 17;
ComponentType componentType = 18;
string customRuntimeOptions = 19;
/* If specified, this will refer to an archive that is
* already present in the server */
string builtin = 20;
}

message ConsumerSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private static final String CATEGORY_STATE = "State Management";
@Category
private static final String CATEGORY_CONNECTORS = "Connectors";
@Category
private static final String CATEGORY_FUNCTIONS = "Functions";

@FieldContext(
category = CATEGORY_WORKER,
Expand Down Expand Up @@ -145,6 +147,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "Should we validate connector config during submission"
)
private Boolean validateConnectorConfig = false;
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "The path to the location to locate builtin functions"
)
private String functionsDirectory = "./functions";
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar topic used for storing function metadata"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class FunctionConfigUtils {

public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
throws IllegalArgumentException {

boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);

Class<?>[] typeArgs = null;
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
Expand Down Expand Up @@ -249,6 +251,11 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
functionDetailsBuilder.setCustomRuntimeOptions(functionConfig.getCustomRuntimeOptions());
}

if (isBuiltin) {
String builtin = functionConfig.getJar().replaceFirst("^builtin://", "");
functionDetailsBuilder.setBuiltin(builtin);
}

return functionDetailsBuilder.build();
}

Expand Down Expand Up @@ -596,12 +603,6 @@ private static void doCommonChecks(FunctionConfig functionConfig) {
throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
}

if (!isEmpty(functionConfig.getJar()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getJar())
&& functionConfig.getJar().startsWith(BUILTIN)) {
if (!new File(functionConfig.getJar()).exists()) {
throw new IllegalArgumentException("The supplied jar file does not exist");
}
}
if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getPy())
&& functionConfig.getPy().startsWith(BUILTIN)) {
if (!new File(functionConfig.getPy()).exists()) {
Expand Down Expand Up @@ -698,6 +699,10 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct
mergedConfig.setClassName(newConfig.getClassName());
}

if (!StringUtils.isEmpty(newConfig.getJar())) {
mergedConfig.setJar(newConfig.getJar());
}

if (newConfig.getInputSpecs() == null) {
newConfig.setInputSpecs(new HashMap<>());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.utils.functions;

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;

import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.functions.api.Function;


@UtilityClass
@Slf4j
public class FunctionUtils {

private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml";

/**
* Extract the Pulsar Function class from a functionctor archive.
*/
public static String getFunctionClass(ClassLoader classLoader) throws IOException {
NarClassLoader ncl = (NarClassLoader) classLoader;
String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);

FunctionDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
FunctionDefinition.class);
if (StringUtils.isEmpty(conf.getFunctionClass())) {
throw new IOException(
String.format("The '%s' functionctor does not provide a function implementation", conf.getName()));
}

try {
// Try to load source class and check it implements Function interface
Class functionClass = ncl.loadClass(conf.getFunctionClass());
if (!(Function.class.isAssignableFrom(functionClass))) {
throw new IOException(
"Class " + conf.getFunctionClass() + " does not implement interface " + Function.class.getName());
}
} catch (Throwable t) {
Exceptions.rethrowIOException(t);
}

return conf.getFunctionClass();
}

public static FunctionDefinition getFunctionDefinition(String narPath) throws IOException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, FunctionDefinition.class);
}
}

public static Functions searchForFunctions(String functionsDirectory) throws IOException {
Path path = Paths.get(functionsDirectory).toAbsolutePath();
log.info("Searching for functions in {}", path);

Functions functions = new Functions();

if (!path.toFile().exists()) {
log.warn("Functions archive directory not found");
return functions;
}

try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
try {
FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(archive.toString());
log.info("Found function {} from {}", cntDef, archive);
log.error(cntDef.getName());
log.error(cntDef.getFunctionClass());
if (!StringUtils.isEmpty(cntDef.getFunctionClass())) {
functions.functions.put(cntDef.getName(), archive);
}

functions.functionsDefinitions.add(cntDef);
} catch (Throwable t) {
log.warn("Failed to load function from {}", archive, t);
}
}
}

Collections.sort(functions.functionsDefinitions,
(c1, c2) -> String.CASE_INSENSITIVE_ORDER.compare(c1.getName(), c2.getName()));

return functions;
}
}
Loading

0 comments on commit e978244

Please sign in to comment.