Skip to content

Commit

Permalink
Added to Localrunner the possibility to execute builtin functions
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolopaganin committed May 22, 2020
1 parent 4f0bd52 commit 69e3046
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
24 changes: 14 additions & 10 deletions pulsar-functions/localrun-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand Down Expand Up @@ -71,10 +71,11 @@
</filter>
</filters>
<relocations>
<relocation>
<!-- Netty cannot be shaded, this is causing java.lang.NoSuchMethodError -->
<!-- <relocation>
<pattern>com.typesafe.netty</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.typesafe.netty</shadedPattern>
</relocation>
</relocation> -->
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.google</shadedPattern>
Expand Down Expand Up @@ -179,10 +180,11 @@
<pattern>org.apache.distributedlog</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog</shadedPattern>
</relocation>
<relocation>
<!-- Jackson cannot be shaded, this is causing java.lang.NoSuchMethodError when calling getThreadLocalYaml-->
<!-- <relocation>
<pattern>com.fasterxml</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.fasterxml</shadedPattern>
</relocation>
</relocation> -->
<relocation>
<pattern>org.inferred</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.inferred</shadedPattern>
Expand All @@ -199,10 +201,11 @@
<pattern>dlshade</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.dlshade</shadedPattern>
</relocation>
<relocation>
<!-- This refers to an older version of Jackson -->
<!-- <relocation>
<pattern>org.codehaus.jackson</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.jackson</shadedPattern>
</relocation>
</relocation> -->
<relocation>
<pattern>net.java.dev.jna</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.java.dev.jna</shadedPattern>
Expand Down Expand Up @@ -315,10 +318,11 @@
<pattern>com.beust</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.beust</shadedPattern>
</relocation>
<relocation>
<!-- Netty cannot be shaded, this is causing java.lang.NoSuchMethodError -->
<!-- <relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.netty</shadedPattern>
</relocation>
</relocation> -->
<relocation>
<pattern>org.hamcrest</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.hamcrest</shadedPattern>
Expand Down Expand Up @@ -357,4 +361,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.functions.Functions;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.utils.io.Connectors;
import org.apache.pulsar.functions.worker.WorkerConfig;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -212,6 +215,14 @@ public void start(boolean blocking) throws Exception {
.getProtectionDomain().getCodeSource().getLocation().getFile();
}

boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
if(isBuiltin){
WorkerConfig workerConfig = WorkerConfig.load(System.getenv("PULSAR_HOME") + "/conf/functions_worker.yml");
Functions functions = FunctionUtils.searchForFunctions(System.getenv("PULSAR_HOME") + workerConfig.getFunctionsDirectory().replaceFirst("^.", ""));
String functionType = functionConfig.getJar().replaceFirst("^builtin://", "");
userCodeFile = functions.getFunctions().get(functionType).toString();
}

if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
classLoader = FunctionConfigUtils.validate(functionConfig, file);
Expand Down Expand Up @@ -436,9 +447,10 @@ private String isBuiltInSource(String sourceType) throws IOException {
// Validate the connector source type from the locally available connectors
Connectors connectors = getConnectors();

if (connectors.getSources().containsKey(sourceType)) {
String source = sourceType.replaceFirst("^builtin://", "");
if (connectors.getSources().containsKey(source)) {
// Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
return connectors.getSources().get(sourceType).toString();
return connectors.getSources().get(source).toString();
} else {
return null;
}
Expand All @@ -448,9 +460,10 @@ private String isBuiltInSink(String sinkType) throws IOException {
// Validate the connector source type from the locally available connectors
Connectors connectors = getConnectors();

if (connectors.getSinks().containsKey(sinkType)) {
String sink = sinkType.replaceFirst("^builtin://", "");
if (connectors.getSinks().containsKey(sink)) {
// Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
return connectors.getSinks().get(sinkType).toString();
return connectors.getSinks().get(sink).toString();
} else {
return null;
}
Expand Down

0 comments on commit 69e3046

Please sign in to comment.