Skip to content

Commit

Permalink
Support to use SQL to CRUD JavaScript UDF/UDA. close #3072. (#109)
Browse files Browse the repository at this point in the history
* Support to use SQL to CRUD JavaScript UDF/UDA. close #3072.
* Reuse 'HereDoc' as type for JavaScript UDF source and fix review comments.
  • Loading branch information
sunset3000 authored Oct 12, 2023
1 parent 72cd9c2 commit 924b98d
Show file tree
Hide file tree
Showing 26 changed files with 1,426 additions and 339 deletions.
21 changes: 12 additions & 9 deletions src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,30 +234,33 @@ void JavaScriptAggrFunctionState::reinitCache()
}

AggregateFunctionJavaScriptAdapter::AggregateFunctionJavaScriptAdapter(
const JavaScriptUserDefinedFunctionConfiguration & config_, const DataTypes & types, const Array & params_, size_t max_v8_heap_size_in_bytes_)
JavaScriptUserDefinedFunctionConfigurationPtr config_,
const DataTypes & types,
const Array & params_,
size_t max_v8_heap_size_in_bytes_)
: IAggregateFunctionHelper<AggregateFunctionJavaScriptAdapter>(types, params_)
, config(config_)
, num_arguments(types.size())
, max_v8_heap_size_in_bytes(max_v8_heap_size_in_bytes_)
, blueprint(config.name, config.source)
, blueprint(config->name, config->source)
{
}

String AggregateFunctionJavaScriptAdapter::getName() const
{
return config.name;
return config->name;
}

DataTypePtr AggregateFunctionJavaScriptAdapter::getReturnType() const
{
return config.result_type;
return config->result_type;
}

/// create instance of UDF via function_builder
void AggregateFunctionJavaScriptAdapter::create(AggregateDataPtr __restrict place) const
{
V8::checkHeapLimit(blueprint.isolate.get(), max_v8_heap_size_in_bytes);
new (place) Data(blueprint, config.arguments);
new (place) Data(blueprint, config->arguments);
}

/// destroy instance of UDF
Expand Down Expand Up @@ -372,17 +375,17 @@ size_t AggregateFunctionJavaScriptAdapter::flush(AggregateDataPtr __restrict pla
v8::Local<v8::Function> local_func = v8::Local<v8::Function>::New(isolate_, data.process_func);

/// Second, convert the input column into the corresponding object used by UDF
auto argv = V8::prepareArguments(isolate_, config.arguments, data.columns);
auto argv = V8::prepareArguments(isolate_, config->arguments, data.columns);

/// Third, execute the UDF and get aggregate state (only support the final state now, intermediate state is not supported
v8::Local<v8::Value> res;
if (!local_func->Call(ctx, local_obj, static_cast<int>(config.arguments.size()), argv.data()).ToLocal(&res))
if (!local_func->Call(ctx, local_obj, static_cast<int>(config->arguments.size()), argv.data()).ToLocal(&res))
V8::throwException(
isolate_,
try_catch,
ErrorCodes::AGGREGATE_FUNCTION_THROW,
"Failed to invoke JavaScript user defined aggregation function : {}",
config.name);
config->name);

/// Forth, check if the UDA should emit only it has emit strategy
if (blueprint.has_user_defined_emit_strategy && !res->IsUndefined())
Expand Down Expand Up @@ -497,7 +500,7 @@ void AggregateFunctionJavaScriptAdapter::insertResultInto(AggregateDataPtr __res
num_of_results);
}

V8::insertResult(isolate_, to, config.result_type, res, hasUserDefinedEmit());
V8::insertResult(isolate_, to, config->result_type, res, hasUserDefinedEmit());
};

V8::run(blueprint.isolate.get(), blueprint.global_context, std::move(finalize_func));
Expand Down
4 changes: 2 additions & 2 deletions src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ class AggregateFunctionJavaScriptAdapter final : public IAggregateFunctionHelper
static Data & data(AggregateDataPtr __restrict place) { return *reinterpret_cast<Data *>(place); }
static const Data & data(ConstAggregateDataPtr __restrict place) { return *reinterpret_cast<const Data *>(place); }

const JavaScriptUserDefinedFunctionConfiguration & config;
const JavaScriptUserDefinedFunctionConfigurationPtr config;
size_t num_arguments;
size_t max_v8_heap_size_in_bytes;
JavaScriptBlueprint blueprint;

public:
AggregateFunctionJavaScriptAdapter(
const JavaScriptUserDefinedFunctionConfiguration & config_,
JavaScriptUserDefinedFunctionConfigurationPtr config_,
const DataTypes & types,
const Array & params_,
size_t max_v8_heap_size_in_bytes_);
Expand Down
18 changes: 9 additions & 9 deletions src/AggregateFunctions/tests/gtest_uda.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ v8::Local<v8::Value> createV8Array(v8::Isolate * isolate, bool is_empty_array)
return scope.Escape(result);
}

JavaScriptUserDefinedFunctionConfiguration
JavaScriptUserDefinedFunctionConfigurationPtr
createUDFConfig(const String & name, const String & arg_str, const String & return_type, const String & source)
{
DataTypePtr result_type = DataTypeFactory::instance().get(return_type);
Expand All @@ -172,13 +172,13 @@ createUDFConfig(const String & name, const String & arg_str, const String & retu
}
}

JavaScriptUserDefinedFunctionConfiguration function_configuration;
function_configuration.source = source;
function_configuration.is_aggregation = true;
function_configuration.name = name;
function_configuration.result_type = std::move(result_type);
function_configuration.type = UserDefinedFunctionConfiguration::FuncType::JAVASCRIPT;
function_configuration.arguments = std::move(arguments);
auto function_configuration = std::make_shared<JavaScriptUserDefinedFunctionConfiguration>();
function_configuration->source = source;
function_configuration->is_aggregation = true;
function_configuration->name = name;
function_configuration->result_type = std::move(result_type);
function_configuration->type = UserDefinedFunctionConfiguration::FuncType::JAVASCRIPT;
function_configuration->arguments = std::move(arguments);

return function_configuration;
};
Expand Down Expand Up @@ -542,7 +542,7 @@ void checkPrepareArguments(String type, CREATE_DATA_FUNC create_fn, CHECK_V8_DAT

MutableColumns columns;
columns.emplace_back(std::move(col_ptr));
auto argv = V8::prepareArguments(isolate, config.arguments, columns);
auto argv = V8::prepareArguments(isolate, config->arguments, columns);

ASSERT_EQ(argv.size(), 1);
v8::Local<v8::Array> v8_arr = argv[0].As<v8::Array>();
Expand Down
4 changes: 2 additions & 2 deletions src/Common/queryStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void queryOneStream(ContextMutablePtr query_context, const String &database_name
String cols = "database, name, engine, mode, uuid, dependencies_table, create_table_query, engine_full, partition_key, sorting_key, "
"primary_key, sampling_key, storage_policy";
String query = fmt::format(
"SELECT {} FROM system.tables WHERE database = '{}' AND name = '{}' settings "
"SELECT {} FROM system.tables WHERE database = '{}' AND name = '{}' AND engine != 'Dictionary' settings " /// 'Dictionary' does not have columns which will cause REST api crashes.
"show_table_uuid_in_table_create_query_if_not_nil = 1, _tp_internal_system_open_sesame=true",
cols,
database_name,name);
Expand All @@ -42,7 +42,7 @@ void queryStreamsByDatabasse(ContextMutablePtr query_context, const String &data
String cols = "database, name, engine, mode, uuid, dependencies_table, create_table_query, engine_full, partition_key, sorting_key, "
"primary_key, sampling_key, storage_policy";
String query = fmt::format(
"SELECT {} FROM system.tables WHERE database = '{}' settings "
"SELECT {} FROM system.tables WHERE database = '{}' AND engine != 'Dictionary' settings " /// 'Dictionary' does not have columns which will cause REST api crashes.
"show_table_uuid_in_table_create_query_if_not_nil = 1, _tp_internal_system_open_sesame=true",
cols,
database_name);
Expand Down
170 changes: 2 additions & 168 deletions src/Functions/UserDefined/ExternalUserDefinedFunctionsLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
#include <Functions/UserDefined/UserDefinedFunctionFactory.h>

/// proton: starts
#include <Poco/JSON/Parser.h>
#include <Common/filesystemHelpers.h>
#include <Functions/UserDefined/UDFHelper.h>
/// proton: ends

namespace DB
Expand Down Expand Up @@ -76,172 +75,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedFunctionsLoader::create(const std
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The aggregate function '{}' already exists", name);

/// proton: starts
String type = config.getString(key_in_config + ".type");
UserDefinedFunctionConfiguration::FuncType func_type;
String command_value;
String source;
String command;
Poco::URI url;
if (type == "executable")
{
func_type = UserDefinedFunctionConfiguration::FuncType::EXECUTABLE;
command_value = config.getString(key_in_config + ".command", "");
}
else if (type == "remote")
{
func_type = UserDefinedFunctionConfiguration::FuncType::REMOTE;
url = Poco::URI(config.getString(key_in_config + ".url"));
}
else if (type == "javascript")
{
func_type = UserDefinedFunctionConfiguration::FuncType::JAVASCRIPT;
source = config.getString(key_in_config + ".source", "");
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong user defined function type expected 'executable' or 'remote' actual {}",
type);
/// proton: ends

bool execute_direct = config.getBool(key_in_config + ".execute_direct", true);

std::vector<String> command_arguments;

if (execute_direct)
{
boost::split(command_arguments, command_value, [](char c) { return c == ' '; });

command_value = std::move(command_arguments[0]);
auto user_scripts_path = getContext()->getUserScriptsPath();
command = std::filesystem::path(user_scripts_path) / command_value;

if (!fileOrSymlinkPathStartsWith(command, user_scripts_path))
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD, "Executable file {} must be inside user scripts folder {}", command_value, user_scripts_path);

if (!std::filesystem::exists(std::filesystem::path(command)))
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} does not exist inside user scripts folder {}",
command_value,
user_scripts_path);

command_arguments.erase(command_arguments.begin());
}

DataTypePtr result_type = DataTypeFactory::instance().get(config.getString(key_in_config + ".return_type"));
bool send_chunk_header = config.getBool(key_in_config + ".send_chunk_header", false);
size_t command_termination_timeout_seconds = config.getUInt64(key_in_config + ".command_termination_timeout", 0);
size_t command_read_timeout_milliseconds = config.getUInt64(key_in_config + ".command_read_timeout", 10000);
size_t command_write_timeout_milliseconds = config.getUInt64(key_in_config + ".command_write_timeout", 10000);

size_t pool_size = 0;
size_t max_command_execution_time = 0;

max_command_execution_time = config.getUInt64(key_in_config + ".max_command_execution_time", 10);

size_t max_execution_time_seconds = static_cast<size_t>(getContext()->getSettings().max_execution_time.totalSeconds());
if (max_execution_time_seconds != 0 && max_command_execution_time > max_execution_time_seconds)
max_command_execution_time = max_execution_time_seconds;

ExternalLoadableLifetime lifetime;

if (config.has(key_in_config + ".lifetime"))
lifetime = ExternalLoadableLifetime(config, key_in_config + ".lifetime");

/// proton: starts
String format = config.getString(key_in_config + ".format", "ArrowStream");
bool is_aggr_function = config.getBool(key_in_config + ".is_aggregation", false);
pool_size = config.getUInt64(key_in_config + ".pool_size", 1);

/// Below implementation only available for JSON configuration, because Poco::Util::AbstractConfiguration cannot work well with Array
std::vector<UserDefinedFunctionConfiguration::Argument> arguments;
String arg_str = config.getRawString(key_in_config + ".arguments", "");
if (!arg_str.empty())
{
Poco::JSON::Parser parser;
try
{
auto json_arguments = parser.parse(arg_str).extract<Poco::JSON::Array::Ptr>();
for (unsigned int i = 0; i < json_arguments->size(); i++)
{
UserDefinedFunctionConfiguration::Argument argument;
argument.name = json_arguments->getObject(i)->get("name").toString();
argument.type = DataTypeFactory::instance().get(json_arguments->getObject(i)->get("type").toString());
arguments.emplace_back(std::move(argument));
}
}
catch (const std::exception &)
{
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid UDF config");
}
}

/// handler auth_method
RemoteUserDefinedFunctionConfiguration::AuthMethod auth_method = RemoteUserDefinedFunctionConfiguration::AuthMethod::NONE;
RemoteUserDefinedFunctionConfiguration::AuthContext auth_ctx;
String method = config.getString(key_in_config + ".auth_method", "none");
if (method == "none")
{
auth_method = RemoteUserDefinedFunctionConfiguration::AuthMethod::NONE;
}
else if (method == "auth_header")
{
auth_ctx.key_name = config.getString(key_in_config + ".auth_context.key_name", "");
auth_ctx.key_value = config.getString(key_in_config + ".auth_context.key_value", "");
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong 'auth_method' expected 'none' or 'auth_header' actual {}",
method);

auto init_config = [&](UserDefinedFunctionConfigurationPtr cfg) {
cfg->type = std::move(func_type);
cfg->arguments = std::move(arguments);
cfg->is_aggregation = is_aggr_function;
cfg->name = name;
cfg->result_type = std::move(result_type);
cfg->max_command_execution_time_seconds = max_command_execution_time;
};

switch (func_type)
{
case UserDefinedFunctionConfiguration::FuncType::EXECUTABLE: {
auto udf_config = std::make_shared<ExecutableUserDefinedFunctionConfiguration>();
init_config(udf_config);
udf_config->command = std::move(command);
udf_config->command_arguments = std::move(command_arguments);
udf_config->format = std::move(format);
udf_config->command_termination_timeout_seconds = command_termination_timeout_seconds;
udf_config->command_read_timeout_milliseconds = command_read_timeout_milliseconds;
udf_config->command_write_timeout_milliseconds = command_write_timeout_milliseconds;
udf_config->pool_size = pool_size;
udf_config->is_executable_pool = true;
udf_config->send_chunk_header = send_chunk_header;
udf_config->execute_direct = execute_direct;
return std::make_shared<UserDefinedExecutableFunction>(std::move(udf_config), lifetime);
}
case UserDefinedFunctionConfiguration::FuncType::REMOTE: {
auto udf_config = std::make_shared<RemoteUserDefinedFunctionConfiguration>();
init_config(udf_config);
udf_config->command_read_timeout_milliseconds = command_read_timeout_milliseconds;
udf_config->url = url;
udf_config->auth_method = std::move(auth_method);
udf_config->auth_context = std::move(auth_ctx);
return std::make_shared<UserDefinedExecutableFunction>(std::move(udf_config), lifetime);
}
case UserDefinedFunctionConfiguration::FuncType::JAVASCRIPT: {
auto udf_config = std::make_shared<JavaScriptUserDefinedFunctionConfiguration>();
init_config(udf_config);
udf_config->source = std::move(source);
return std::make_shared<UserDefinedExecutableFunction>(std::move(udf_config), lifetime);
}
case UserDefinedFunctionConfiguration::FuncType::UNKNOWN:
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Wrong user defined function type expected 'executable', 'remote' or 'javascript' actual {}",
type);
}
return Streaming::createUserDefinedExecutableFunction(getContext(), name, config);
/// proton: ends
}

Expand Down
Loading

0 comments on commit 924b98d

Please sign in to comment.