Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/issue 5262 support to create remote udf in sql #802

Merged
Merged
12 changes: 12 additions & 0 deletions src/Interpreters/InterpreterCreateFunctionQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ BlockIO InterpreterCreateFunctionQuery::execute()
/// proton: starts. Handle javascript UDF
if (create_function_query.isJavaScript())
return handleJavaScriptUDF(throw_if_exists, replace_if_exists);
else if (create_function_query.isRemote())
return handleRemoteUDF(throw_if_exists, replace_if_exists);
/// proton: ends

UserDefinedSQLFunctionFactory::instance().registerFunction(current_context, function_name, query_ptr, throw_if_exists, replace_if_exists);
Expand All @@ -75,5 +77,15 @@ BlockIO InterpreterCreateFunctionQuery::handleJavaScriptUDF(bool throw_if_exists

return {};
}

BlockIO InterpreterCreateFunctionQuery::handleRemoteUDF(bool throw_if_exists, bool replace_if_exists)
chhtimeplus marked this conversation as resolved.
Show resolved Hide resolved
{
ASTCreateFunctionQuery & create = query_ptr->as<ASTCreateFunctionQuery &>();
assert(create.isRemote());
const auto func_name = create.getFunctionName();
Poco::JSON::Object::Ptr func = create.toJSON();
UserDefinedFunctionFactory::instance().registerFunction(getContext(), func_name, func, throw_if_exists, replace_if_exists);
return {};
}
/// proton: ends
}
1 change: 1 addition & 0 deletions src/Interpreters/InterpreterCreateFunctionQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class InterpreterCreateFunctionQuery : public IInterpreter, WithMutableContext

/// proton: starts
BlockIO handleJavaScriptUDF(bool throw_if_exists, bool replace_if_exists);
BlockIO handleRemoteUDF(bool throw_if_exists, bool replace_if_exists);
chhtimeplus marked this conversation as resolved.
Show resolved Hide resolved
/// proton: ends
};

Expand Down
42 changes: 39 additions & 3 deletions src/Parsers/ASTCreateFunctionQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <Parsers/formatAST.h>
#include <Parsers/ASTNameTypePair.h>
#include <Parsers/ASTLiteral.h>

#include <boost/algorithm/string/case_conv.hpp>
/// proton: ends


Expand All @@ -24,6 +26,7 @@ ASTPtr ASTCreateFunctionQuery::clone() const

res->function_core = function_core->clone();
res->children.push_back(res->function_core);
res->payload = payload;
return res;
}

Expand All @@ -50,7 +53,8 @@ void ASTCreateFunctionQuery::formatImpl(const IAST::FormatSettings & settings, I

/// proton: starts
bool is_javascript_func = isJavaScript();
if (is_javascript_func)
bool is_remote = isRemote();
if (is_javascript_func || is_remote)
{
/// arguments
arguments->formatImpl(settings, state, frame);
Expand All @@ -63,6 +67,17 @@ void ASTCreateFunctionQuery::formatImpl(const IAST::FormatSettings & settings, I

formatOnCluster(settings);

/// proton: starts
if (is_remote)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << fmt::format("\nTYPE Remote \n") << (settings.hilite ? hilite_none : "");
settings.ostr << fmt::format("URL '{}'\n", payload->get("AUTH_METHOD").toString());
settings.ostr << fmt::format("AUTH_METHOD '{}'\n", payload->has("AUTH_METHOD") ? payload->get("AUTH_METHOD").toString() : "none");
settings.ostr << fmt::format("AUTH_HEADER '{}'\n", payload->has("AUTH_HEADER") ? payload->get("AUTH_HEADER").toString() : "none");
settings.ostr << fmt::format("AUTH_KEY '{}'\n", payload->has("AUTH_KEY") ? payload->get("AUTH_KEY").toString() : "none");
return;
}
/// proton: ends
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS " << (settings.hilite ? hilite_none : "");

/// proton: starts. Do not format the source of JavaScript UDF
Expand All @@ -89,7 +104,8 @@ Poco::JSON::Object::Ptr ASTCreateFunctionQuery::toJSON() const
Poco::JSON::Object::Ptr func = new Poco::JSON::Object(Poco::JSON_PRESERVE_KEY_ORDER);
Poco::JSON::Object::Ptr inner_func = new Poco::JSON::Object(Poco::JSON_PRESERVE_KEY_ORDER);
inner_func->set("name", getFunctionName());
if (!isJavaScript())
bool is_remote = isRemote();
if (!isJavaScript() && !isRemote())
{
WriteBufferFromOwnString source_buf;
formatAST(*function_core, source_buf, false);
Expand All @@ -116,7 +132,9 @@ Poco::JSON::Object::Ptr ASTCreateFunctionQuery::toJSON() const
inner_func->set("arguments", json_args);

/// type
inner_func->set("type", "javascript");
auto type = lang;
boost::to_lower(type);
inner_func->set("type", type);

/// is_aggregation
inner_func->set("is_aggregation", is_aggregation);
Expand All @@ -126,6 +144,24 @@ Poco::JSON::Object::Ptr ASTCreateFunctionQuery::toJSON() const
formatAST(*return_type, return_buf, false);
inner_func->set("return_type", return_buf.str());

/// remote functio
chhtimeplus marked this conversation as resolved.
Show resolved Hide resolved
if (is_remote)
{
inner_func->set("url", payload->get("URL").toString());
// auth
if (payload->has("AUTH_METHOD"))
{
inner_func->set("auth_method", payload->get("AUTH_METHOD").toString());
Poco::JSON::Object::Ptr auth_context = new Poco::JSON::Object();
auth_context->set("key_name", payload->get("AUTH_HEADER").toString());
auth_context->set("key_value", payload->get("AUTH_KEY").toString());
inner_func->set("auth_context", auth_context);
}
func->set("function", inner_func);
/// Remote function don't have source, return early.
return func;
}

/// source
ASTLiteral * js_src = function_core->as<ASTLiteral>();
inner_func->set("source", js_src->value.safeGet<String>());
Expand Down
4 changes: 4 additions & 0 deletions src/Parsers/ASTCreateFunctionQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ASTCreateFunctionQuery : public IAST, public ASTQueryWithOnCluster
public:
ASTPtr function_name;
ASTPtr function_core;
Poco::JSON::Object::Ptr payload;
chhtimeplus marked this conversation as resolved.
Show resolved Hide resolved

bool or_replace = false;
bool if_not_exists = false;
Expand All @@ -41,6 +42,9 @@ class ASTCreateFunctionQuery : public IAST, public ASTQueryWithOnCluster

/// If it is a JavaScript UDF
bool isJavaScript() const noexcept { return lang == "JavaScript"; }

/// If it is a JavaScript UDF
bool isRemote() const noexcept { return lang == "remote"; }
chhtimeplus marked this conversation as resolved.
Show resolved Hide resolved
/// proton: ends
};

Expand Down
53 changes: 51 additions & 2 deletions src/Parsers/ParserCreateFunctionQuery.cpp
chhtimeplus marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@
/// proton: starts
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/Streaming/ParserArguments.h>
#include <Parsers/ASTLiteral.h>

#include <Poco/JSON/Object.h>
/// proton: ends


namespace DB
{

chhtimeplus marked this conversation as resolved.
Show resolved Hide resolved
namespace ErrorCodes
{
extern const int AGGREGATE_FUNCTION_NOT_APPLICABLE;
}
bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected, [[ maybe_unused ]] bool hint)
{
ParserKeyword s_create("CREATE");
Expand All @@ -25,6 +32,16 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
ParserKeyword s_aggr_function("AGGREGATE FUNCTION");
ParserKeyword s_returns("RETURNS");
ParserKeyword s_javascript_type("LANGUAGE JAVASCRIPT");
ParserKeyword s_type_remote("TYPE Remote");
chhtimeplus marked this conversation as resolved.
Show resolved Hide resolved
ParserKeyword s_url("URL");
ParserKeyword s_auth_method("AUTH_METHOD");
ParserKeyword s_auth_header("AUTH_HEADER");
ParserKeyword s_auth_key("AUTH_KEY");
ParserLiteral value;
ASTPtr url;
ASTPtr auth_method;
ASTPtr auth_header;
ASTPtr auth_key;
ParserArguments arguments_p;
ParserDataType return_p;
ParserStringLiteral js_src_p;
Expand All @@ -46,6 +63,7 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
bool is_aggregation = false;
bool is_javascript_func = false;
bool is_new_syntax = false;
bool is_remote = false;
/// proton: ends

String cluster_str;
Expand Down Expand Up @@ -93,8 +111,11 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp

if (s_javascript_type.ignore(pos, expected))
is_javascript_func = true;
else if (s_type_remote.ignore(pos,expected)){
is_remote = true;
}

if (!s_as.ignore(pos, expected))
if (!is_remote && !s_as.ignore(pos, expected))
return false;

/// Parse source code and function_core will be 'ASTLiteral'
Expand All @@ -110,6 +131,33 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
if (!lambda_p.parse(pos, function_core, expected))
return false;
}
Poco::JSON::Object::Ptr payload = new Poco::JSON::Object();
if (is_remote){
if (is_aggregation){
throw Exception("Remote udf can not be an aggregate function",ErrorCodes::AGGREGATE_FUNCTION_NOT_APPLICABLE);
}
if (!s_url.ignore(pos,expected))
return false;
if (!value.parse(pos, url, expected))
return false;
function_core = std::make_shared<ASTLiteral>(Field());
chhtimeplus marked this conversation as resolved.
Show resolved Hide resolved
payload->set("URL", url->as<ASTLiteral>()->value.safeGet<String>());
if (s_auth_method.ignore(pos,expected)){
if (!value.parse(pos, auth_method, expected))
chhtimeplus marked this conversation as resolved.
Show resolved Hide resolved
return false;
if (!s_auth_header.ignore(pos, expected))
return false;
if (!value.parse(pos, auth_header, expected))
return false;
if (!s_auth_key.ignore(pos, expected))
return false;
if (!value.parse(pos, auth_key, expected))
return false;
payload->set("AUTH_METHOD", auth_method->as<ASTLiteral>()->value.safeGet<String>());
payload->set("AUTH_HEADER", auth_header->as<ASTLiteral>()->value.safeGet<String>());
payload->set("AUTH_KEY", auth_key->as<ASTLiteral>()->value.safeGet<String>());
}
}
/// proton: ends

auto create_function_query = std::make_shared<ASTCreateFunctionQuery>();
Expand All @@ -127,9 +175,10 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp

/// proton: starts
create_function_query->is_aggregation = is_aggregation;
create_function_query->lang = is_javascript_func ? "JavaScript" : "SQL";
create_function_query->lang = is_javascript_func ? "JavaScript" : is_remote ? "remote" : "SQL";
create_function_query->arguments = std::move(arguments);
create_function_query->return_type = std::move(return_type);
create_function_query->payload = payload;
/// proton: ends

return true;
Expand Down
98 changes: 98 additions & 0 deletions tests/stream/test_stream_smoke/0022_udf3_create_remote_func.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
test_suite_name: udf3_create_remote_func
tag: smoke
test_suite_config:
tests_2_run:
ids_2_run:
- all
tags_2_run: [ ]
tags_2_skip:
default:
- todo
- to_support
- change
- bug
- sample
cluster:
- view
- cluster_table_bug
comments: Test SQL to create remote udf

tests:
- id: 0
tags:
- udf3_create_remote_func
name: create remote udf
description: SQL - remote UDF
steps:
- statements:
- client: python
wait: 1
query: DROP FUNCTION IF EXISTS ip_lookup;

- client: python
query_type: table
query_id: udf-29-0
wait: 1
query: |
CREATE FUNCTION ip_lookup(ip string) RETURNS string
TYPE Remote
URL 'https://hn6wip76uexaeusz5s7bh3e4u40lrrrz.lambda-url.us-west-2.on.aws/';

- client: python
query_id: udf-29-1
query_end_timer: 7
depends_on_done: udf-29-0
query_type: table
wait: 5
query: |
select ip_lookup('1.1.1.1');

- client: python
query_id: udf-29-2
query_end_timer: 7
depends_on_done: udf-29-0
query_type: table
wait: 5
query: |
select ip_lookup('1');

- client: python
query_type: table
query_id: udf-29-3
depends_on_done: udf-29-0
wait: 1
query: |
DROP FUNCTION ip_lookup;

expected_results:
- query_id: udf-29-1
expected_results:
- [ '{"ip":"1.1.1.1","hostname":"one.one.one.one","anycast":true,"city":"Englewood","region":"Colorado","country":"US","loc":"39.6123,-104.8799","org":"AS13335 Cloudflare, Inc.","postal":"80111","timezone":"America/Denver"}' ]
- query_id: udf-29-2
expected_results:
- [ '{"status":404,"error":{"title":"Wrong ip","message":"Please provide a valid IP address"}}']

- id: 1
tags:
- udf3_create_remote_func
name: create remote uda
description: create remote uda failed
steps:
- statements:
- client: python
wait: 1
query: DROP FUNCTION IF EXISTS ip_lookup;

- client: python
query_type: table
query_id: udf-30-0
wait: 1
query: |
CREATE AGGREGATE FUNCTION ip_lookup(ip string) RETURNS string
TYPE Remote
URL 'https://hn6wip76uexaeusz5s7bh3e4u40lrrrz.lambda-url.us-west-2.on.aws/';
expected_results:
- query_id: udf-30-0
expected_results: "error_code:154"


Loading