Skip to content

Commit

Permalink
Support to use SQL to CRUD JavaScript UDF/UDA. close #3072.
Browse files Browse the repository at this point in the history
  • Loading branch information
sunset3000 committed Sep 19, 2023
1 parent 3a7bed3 commit e03595c
Show file tree
Hide file tree
Showing 27 changed files with 1,539 additions and 314 deletions.
170 changes: 2 additions & 168 deletions src/Functions/UserDefined/ExternalUserDefinedFunctionsLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
#include <Functions/UserDefined/UserDefinedFunctionFactory.h>

/// proton: starts
#include <Poco/JSON/Parser.h>
#include <Common/filesystemHelpers.h>
#include <Functions/UserDefined/UDFHelper.h>
/// proton: ends

namespace DB
Expand Down Expand Up @@ -76,172 +75,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedFunctionsLoader::create(const std
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The aggregate function '{}' already exists", name);

/// proton: starts
String type = config.getString(key_in_config + ".type");
UserDefinedFunctionConfiguration::FuncType func_type;
String command_value;
String source;
String command;
Poco::URI url;
if (type == "executable")
{
func_type = UserDefinedFunctionConfiguration::FuncType::EXECUTABLE;
command_value = config.getString(key_in_config + ".command", "");
}
else if (type == "remote")
{
func_type = UserDefinedFunctionConfiguration::FuncType::REMOTE;
url = Poco::URI(config.getString(key_in_config + ".url"));
}
else if (type == "javascript")
{
func_type = UserDefinedFunctionConfiguration::FuncType::JAVASCRIPT;
source = config.getString(key_in_config + ".source", "");
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong user defined function type expected 'executable' or 'remote' actual {}",
type);
/// proton: ends

bool execute_direct = config.getBool(key_in_config + ".execute_direct", true);

std::vector<String> command_arguments;

if (execute_direct)
{
boost::split(command_arguments, command_value, [](char c) { return c == ' '; });

command_value = std::move(command_arguments[0]);
auto user_scripts_path = getContext()->getUserScriptsPath();
command = std::filesystem::path(user_scripts_path) / command_value;

if (!fileOrSymlinkPathStartsWith(command, user_scripts_path))
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD, "Executable file {} must be inside user scripts folder {}", command_value, user_scripts_path);

if (!std::filesystem::exists(std::filesystem::path(command)))
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} does not exist inside user scripts folder {}",
command_value,
user_scripts_path);

command_arguments.erase(command_arguments.begin());
}

DataTypePtr result_type = DataTypeFactory::instance().get(config.getString(key_in_config + ".return_type"));
bool send_chunk_header = config.getBool(key_in_config + ".send_chunk_header", false);
size_t command_termination_timeout_seconds = config.getUInt64(key_in_config + ".command_termination_timeout", 0);
size_t command_read_timeout_milliseconds = config.getUInt64(key_in_config + ".command_read_timeout", 10000);
size_t command_write_timeout_milliseconds = config.getUInt64(key_in_config + ".command_write_timeout", 10000);

size_t pool_size = 0;
size_t max_command_execution_time = 0;

max_command_execution_time = config.getUInt64(key_in_config + ".max_command_execution_time", 10);

size_t max_execution_time_seconds = static_cast<size_t>(getContext()->getSettings().max_execution_time.totalSeconds());
if (max_execution_time_seconds != 0 && max_command_execution_time > max_execution_time_seconds)
max_command_execution_time = max_execution_time_seconds;

ExternalLoadableLifetime lifetime;

if (config.has(key_in_config + ".lifetime"))
lifetime = ExternalLoadableLifetime(config, key_in_config + ".lifetime");

/// proton: starts
String format = config.getString(key_in_config + ".format", "ArrowStream");
bool is_aggr_function = config.getBool(key_in_config + ".is_aggregation", false);
pool_size = config.getUInt64(key_in_config + ".pool_size", 1);

/// Below implementation only available for JSON configuration, because Poco::Util::AbstractConfiguration cannot work well with Array
std::vector<UserDefinedFunctionConfiguration::Argument> arguments;
String arg_str = config.getRawString(key_in_config + ".arguments", "");
if (!arg_str.empty())
{
Poco::JSON::Parser parser;
try
{
auto json_arguments = parser.parse(arg_str).extract<Poco::JSON::Array::Ptr>();
for (unsigned int i = 0; i < json_arguments->size(); i++)
{
UserDefinedFunctionConfiguration::Argument argument;
argument.name = json_arguments->getObject(i)->get("name").toString();
argument.type = DataTypeFactory::instance().get(json_arguments->getObject(i)->get("type").toString());
arguments.emplace_back(std::move(argument));
}
}
catch (const std::exception &)
{
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid UDF config");
}
}

/// handler auth_method
RemoteUserDefinedFunctionConfiguration::AuthMethod auth_method = RemoteUserDefinedFunctionConfiguration::AuthMethod::NONE;
RemoteUserDefinedFunctionConfiguration::AuthContext auth_ctx;
String method = config.getString(key_in_config + ".auth_method", "none");
if (method == "none")
{
auth_method = RemoteUserDefinedFunctionConfiguration::AuthMethod::NONE;
}
else if (method == "auth_header")
{
auth_ctx.key_name = config.getString(key_in_config + ".auth_context.key_name", "");
auth_ctx.key_value = config.getString(key_in_config + ".auth_context.key_value", "");
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong 'auth_method' expected 'none' or 'auth_header' actual {}",
method);

auto init_config = [&](UserDefinedFunctionConfigurationPtr cfg) {
cfg->type = std::move(func_type);
cfg->arguments = std::move(arguments);
cfg->is_aggregation = is_aggr_function;
cfg->name = name;
cfg->result_type = std::move(result_type);
cfg->max_command_execution_time_seconds = max_command_execution_time;
};

switch (func_type)
{
case UserDefinedFunctionConfiguration::FuncType::EXECUTABLE: {
auto udf_config = std::make_shared<ExecutableUserDefinedFunctionConfiguration>();
init_config(udf_config);
udf_config->command = std::move(command);
udf_config->command_arguments = std::move(command_arguments);
udf_config->format = std::move(format);
udf_config->command_termination_timeout_seconds = command_termination_timeout_seconds;
udf_config->command_read_timeout_milliseconds = command_read_timeout_milliseconds;
udf_config->command_write_timeout_milliseconds = command_write_timeout_milliseconds;
udf_config->pool_size = pool_size;
udf_config->is_executable_pool = true;
udf_config->send_chunk_header = send_chunk_header;
udf_config->execute_direct = execute_direct;
return std::make_shared<UserDefinedExecutableFunction>(std::move(udf_config), lifetime);
}
case UserDefinedFunctionConfiguration::FuncType::REMOTE: {
auto udf_config = std::make_shared<RemoteUserDefinedFunctionConfiguration>();
init_config(udf_config);
udf_config->command_read_timeout_milliseconds = command_read_timeout_milliseconds;
udf_config->url = url;
udf_config->auth_method = std::move(auth_method);
udf_config->auth_context = std::move(auth_ctx);
return std::make_shared<UserDefinedExecutableFunction>(std::move(udf_config), lifetime);
}
case UserDefinedFunctionConfiguration::FuncType::JAVASCRIPT: {
auto udf_config = std::make_shared<JavaScriptUserDefinedFunctionConfiguration>();
init_config(udf_config);
udf_config->source = std::move(source);
return std::make_shared<UserDefinedExecutableFunction>(std::move(udf_config), lifetime);
}
case UserDefinedFunctionConfiguration::FuncType::UNKNOWN:
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Wrong user defined function type expected 'executable', 'remote' or 'javascript' actual {}",
type);
}
return Streaming::createUserDefinedExecutableFunction(getContext(), name, config);
/// proton: ends
}

Expand Down
Loading

0 comments on commit e03595c

Please sign in to comment.