Skip to content

Commit

Permalink
Possibility to add builtin Pulsar Functions with a shared jar copied …
Browse files Browse the repository at this point in the history
…inside the function folder (#6003)
  • Loading branch information
nicolopaganin committed May 6, 2020
1 parent 00bd430 commit b96197a
Show file tree
Hide file tree
Showing 16 changed files with 324 additions and 15 deletions.
1 change: 1 addition & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,4 @@ brokerClientTrustCertsFilePath:
########################

connectorsDirectory: ./connectors
functionsDirectory: ./functions
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.functions;

import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Basic information about a Pulsar function.
*/
@Data
@NoArgsConstructor
public class FunctionDefinition {

/**
* The name of the function type.
*/
private String name;

/**
* Description to be used for user help.
*/
private String description;

/**
* The class name for the function implementation.
*
* <p>If not defined, it will be assumed this function cannot act as a data.
*/
private String functionClass;
}
3 changes: 3 additions & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ message FunctionDetails {
string runtimeFlags = 17;
ComponentType componentType = 18;
string customRuntimeOptions = 19;
/* If specified, this will refer to an archive that is
* already present in the server */
string builtin = 20;
}

message ConsumerSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private static final String CATEGORY_STATE = "State Management";
@Category
private static final String CATEGORY_CONNECTORS = "Connectors";
@Category
private static final String CATEGORY_FUNCTIONS = "Functions";

@FieldContext(
category = CATEGORY_WORKER,
Expand Down Expand Up @@ -134,6 +136,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The path to the location to locate builtin connectors"
)
private String connectorsDirectory = "./connectors";
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "The path to the location to locate builtin functions"
)
private String functionsDirectory = "./functions";
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar topic used for storing function metadata"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
public class FunctionConfigUtils {
public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
throws IllegalArgumentException {

boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);

Class<?>[] typeArgs = null;
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
Expand Down Expand Up @@ -243,6 +245,11 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
functionDetailsBuilder.setCustomRuntimeOptions(functionConfig.getCustomRuntimeOptions());
}

if (isBuiltin) {
String builtin = functionConfig.getJar().replaceFirst("^builtin://", "");
functionDetailsBuilder.setBuiltin(builtin);
}

return functionDetailsBuilder.build();
}

Expand Down Expand Up @@ -582,12 +589,6 @@ private static void doCommonChecks(FunctionConfig functionConfig) {
throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
}

if (!isEmpty(functionConfig.getJar()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getJar())
&& functionConfig.getJar().startsWith(BUILTIN)) {
if (!new File(functionConfig.getJar()).exists()) {
throw new IllegalArgumentException("The supplied jar file does not exist");
}
}
if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getPy())
&& functionConfig.getPy().startsWith(BUILTIN)) {
if (!new File(functionConfig.getPy()).exists()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.utils.functions;

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;

import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.functions.api.Function;


@UtilityClass
@Slf4j
public class FunctionUtils {

private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml";

/**
* Extract the Pulsar Function class from a functionctor archive.
*/
public static String getFunctionClass(ClassLoader classLoader) throws IOException {
NarClassLoader ncl = (NarClassLoader) classLoader;
String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);

FunctionDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
FunctionDefinition.class);
if (StringUtils.isEmpty(conf.getFunctionClass())) {
throw new IOException(
String.format("The '%s' functionctor does not provide a function implementation", conf.getName()));
}

try {
// Try to load source class and check it implements Function interface
Class functionClass = ncl.loadClass(conf.getFunctionClass());
if (!(Function.class.isAssignableFrom(functionClass))) {
throw new IOException(
"Class " + conf.getFunctionClass() + " does not implement interface " + Function.class.getName());
}
} catch (Throwable t) {
Exceptions.rethrowIOException(t);
}

return conf.getFunctionClass();
}

public static FunctionDefinition getFunctionDefinition(String narPath) throws IOException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, FunctionDefinition.class);
}
}

public static Functions searchForFunctions(String functionsDirectory) throws IOException {
Path path = Paths.get(functionsDirectory).toAbsolutePath();
log.info("Searching for functions in {}", path);

Functions functions = new Functions();

if (!path.toFile().exists()) {
log.warn("Functions archive directory not found");
return functions;
}

try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
try {
FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(archive.toString());
log.info("Found function {} from {}", cntDef, archive);
log.error(cntDef.getName());
log.error(cntDef.getFunctionClass());
if (!StringUtils.isEmpty(cntDef.getFunctionClass())) {
functions.functions.put(cntDef.getName(), archive);
}

functions.functionsDefinitions.add(cntDef);
} catch (Throwable t) {
log.warn("Failed to load function from {}", archive, t);
}
}
}

Collections.sort(functions.functionsDefinitions,
(c1, c2) -> String.CASE_INSENSITIVE_ORDER.compare(c1.getName(), c2.getName()));

return functions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.utils.functions;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import lombok.Data;

import org.apache.pulsar.common.functions.FunctionDefinition;

@Data
public class Functions {
final List<FunctionDefinition> functionsDefinitions = new ArrayList<>();
final Map<String, Path> functions = new TreeMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,18 @@ public class FunctionActioner {
private final RuntimeFactory runtimeFactory;
private final Namespace dlogNamespace;
private final ConnectorsManager connectorsManager;
private final FunctionsManager functionsManager;
private final PulsarAdmin pulsarAdmin;

public FunctionActioner(WorkerConfig workerConfig,
RuntimeFactory runtimeFactory,
Namespace dlogNamespace,
ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin) {
ConnectorsManager connectorsManager,FunctionsManager functionsManager,PulsarAdmin pulsarAdmin) {
this.workerConfig = workerConfig;
this.runtimeFactory = runtimeFactory;
this.dlogNamespace = dlogNamespace;
this.connectorsManager = connectorsManager;
this.functionsManager = functionsManager;
this.pulsarAdmin = pulsarAdmin;
}

Expand Down Expand Up @@ -421,6 +423,10 @@ private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws I
}
}

if (!StringUtils.isEmpty(functionDetails.getBuiltin())) {
return functionsManager.getFunctionArchive(functionDetails.getBuiltin()).toFile();
}

throw new IOException("Could not find built in archive definition");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public int size() {


public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace,
MembershipManager membershipManager, ConnectorsManager connectorsManager,
MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionsManager functionsManager,
FunctionMetaDataManager functionMetaDataManager) throws Exception {
this.workerConfig = workerConfig;
this.workerService = workerService;
Expand Down Expand Up @@ -194,7 +194,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
this.runtimeFactory.initialize(workerConfig, authConfig, secretsProviderConfigurator, functionAuthProvider, runtimeCustomizer);

this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
dlogNamespace, connectorsManager, workerService.getBrokerAdmin());
dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin());

this.membershipManager = membershipManager;
this.functionMetaDataManager = functionMetaDataManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.functions.Functions;
@Slf4j
public class FunctionsManager {

private Functions functions;

public FunctionsManager(WorkerConfig workerConfig) throws IOException {
this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
}

public List<FunctionDefinition> getFunctions() {
return functions.getFunctionsDefinitions();
}

public Path getFunctionArchive(String functionType) {
return functions.getFunctions().get(functionType);
}

public void reloadFunctions(WorkerConfig workerConfig) throws IOException {
this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class WorkerService {
private AuthenticationService authenticationService;
private AuthorizationService authorizationService;
private ConnectorsManager connectorsManager;
private FunctionsManager functionsManager;
private PulsarAdmin brokerAdmin;
private PulsarAdmin functionAdmin;
private final MetricsGenerator metricsGenerator;
Expand Down Expand Up @@ -170,6 +171,7 @@ public void start(URI dlogUri,
this.workerConfig, this.schedulerManager, this.client);

this.connectorsManager = new ConnectorsManager(workerConfig);
this.functionsManager = new FunctionsManager(workerConfig);

//create membership manager
String coordinationTopic = workerConfig.getClusterCoordinationTopic();
Expand All @@ -180,7 +182,7 @@ public void start(URI dlogUri,

// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionMetaDataManager);
this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionsManager, functionMetaDataManager);

// Setting references to managers in scheduler
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
Expand Down
Loading

0 comments on commit b96197a

Please sign in to comment.