Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(interactive): Reorganize the http handler of interactive #4037

Merged
merged 52 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
92f6537
fix handler
zhanglei1949 Jul 11, 2024
0700196
fix
zhanglei1949 Jul 11, 2024
89f16c7
Merge branch 'main' into reorg-server
zhanglei1949 Jul 11, 2024
9289469
merge graph_db_ic_handler into hqps_ic_handler
zhanglei1949 Jul 12, 2024
ce99f0d
Merge branch 'main' into reorg-server
zhanglei1949 Jul 12, 2024
cfcde39
todo: refine java sdk impl
zhanglei1949 Jul 16, 2024
9666680
replace http client in compiler with sdk
zhanglei1949 Jul 17, 2024
ece125d
todo: pass ci
zhanglei1949 Jul 17, 2024
c884054
merge
zhanglei1949 Jul 17, 2024
38795a9
interactive-sdk version upgrade to 0.4-SNAPSHOT
zhanglei1949 Jul 17, 2024
5b62a5a
use 0.4
zhanglei1949 Jul 17, 2024
9cddf79
minor
zhanglei1949 Jul 17, 2024
8368dc4
call with raw okHttp3 interface
liulx20 Jul 17, 2024
b6a269b
format
liulx20 Jul 17, 2024
4bd44da
fixing CI
zhanglei1949 Jul 18, 2024
d0d62b7
Merge branch 'main' into reorg-server
zhanglei1949 Jul 18, 2024
9021295
impl stored procedure only interface
zhanglei1949 Jul 18, 2024
1225d11
fix
zhanglei1949 Jul 18, 2024
ca4ea54
refine graph_db_handler
zhanglei1949 Jul 18, 2024
cc64099
fix
liulx20 Jul 18, 2024
3e2cce9
fix
liulx20 Jul 18, 2024
20a9cb0
Merge branch 'main' into reorg-server
zhanglei1949 Jul 18, 2024
4ff86fe
refactor call_procedure related code
zhanglei1949 Jul 18, 2024
5a946c9
formatting
zhanglei1949 Jul 18, 2024
89c73b4
remove unused files
zhanglei1949 Jul 18, 2024
c03823a
add config for httpclient
liulx20 Jul 19, 2024
a7d4d14
refactor(interactive): Order procedures by default based on creation …
zhanglei1949 Jul 18, 2024
20ca959
todo: passing ci
zhanglei1949 Jul 22, 2024
c3c75ea
fix(interactive): Improve the codegen error propagation from server t…
zhanglei1949 Jul 19, 2024
1c81c17
todo: fix ci
zhanglei1949 Jul 22, 2024
62634ce
merge
zhanglei1949 Jul 22, 2024
554f80a
fixing
zhanglei1949 Jul 22, 2024
5023b5b
remove comment
zhanglei1949 Jul 22, 2024
9ef3bf7
fix
zhanglei1949 Jul 22, 2024
2ae67a3
remove unused consts
zhanglei1949 Jul 22, 2024
fede30f
remove BUILD_HQPS and reuse start/stop scope
zhanglei1949 Jul 22, 2024
cc846d3
minor
zhanglei1949 Jul 22, 2024
9075652
merge
zhanglei1949 Jul 23, 2024
70d839d
fixing ci
zhanglei1949 Jul 23, 2024
9e9a677
Merge branch 'main' into reorg-server
zhanglei1949 Jul 23, 2024
ae1b187
fixing ci
zhanglei1949 Jul 23, 2024
887ec89
fixing
zhanglei1949 Jul 24, 2024
cc90e98
fixing
zhanglei1949 Jul 24, 2024
6afa711
fixing
zhanglei1949 Jul 24, 2024
d8a8099
remove dummy code
zhanglei1949 Jul 25, 2024
74cd639
fix
liulx20 Jul 25, 2024
a29c731
fix
liulx20 Jul 25, 2024
ad91184
fixing
zhanglei1949 Jul 25, 2024
28cf0ee
Merge branch 'main' into reorg-server
zhanglei1949 Jul 25, 2024
dea2dab
handle re-cancel exception
zhanglei1949 Jul 25, 2024
17817bc
fixing
zhanglei1949 Jul 25, 2024
548b65a
Merge branch 'main' into reorg-server
zhanglei1949 Jul 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions flex/engines/graph_db/database/graph_db_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,17 @@ class GraphDBSession {
public:
enum class InputFormat : uint8_t {
kCppEncoder = 0,
#ifdef BUILD_HQPS
kCypherJson = 1, // External usage format
kCypherInternalAdhoc = 2, // Internal format for adhoc query
kCypherInternalProcedure = 3, // Internal format for procedure
#endif // BUILD_HQPS
kCypherJson = 1, // Json format for cypher query
kCypherProtoAdhoc = 2, // Protobuf format for adhoc query
kCypherProtoProcedure = 3, // Protobuf format for procedure query
};

static constexpr int32_t MAX_RETRY = 3;
static constexpr int32_t MAX_PLUGIN_NUM = 256; // 2^(sizeof(uint8_t)*8)
#ifdef BUILD_HQPS
static constexpr const char* kCppEncoder = "\x00";
static constexpr const char* kCypherJson = "\x01";
static constexpr const char* kCypherInternalAdhoc = "\x02";
static constexpr const char* kCypherInternalProcedure = "\x03";
#endif // BUILD_HQPS
static constexpr const char* kCppEncoderStr = "\x00";
static constexpr const char* kCypherJsonStr = "\x01";
static constexpr const char* kCypherProtoAdhocStr = "\x02";
static constexpr const char* kCypherProtoProcedureStr = "\x03";
GraphDBSession(GraphDB& db, Allocator& alloc, WalWriter& logger,
const std::string& work_dir, int thread_id)
: db_(db),
Expand Down Expand Up @@ -157,7 +153,7 @@ class GraphDBSession {
}
#ifdef BUILD_HQPS
else if (input_tag ==
static_cast<uint8_t>(InputFormat::kCypherInternalAdhoc)) {
static_cast<uint8_t>(InputFormat::kCypherProtoAdhoc)) {
// For cypher internal adhoc, the query id is the
// second last byte,which is fixed to 255, and other bytes are a string
// representing the path to generated dynamic lib.
Expand All @@ -169,7 +165,7 @@ class GraphDBSession {
std::string_view str_view(input.data(), len - 1);
return parse_query_type_from_cypher_json(str_view);
} else if (input_tag ==
static_cast<uint8_t>(InputFormat::kCypherInternalProcedure)) {
static_cast<uint8_t>(InputFormat::kCypherProtoProcedure)) {
// For cypher internal procedure, the query_name is
// provided in the protobuf message.
std::string_view str_view(input.data(), len - 1);
Expand Down
21 changes: 3 additions & 18 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1068,12 +1068,6 @@ seastar::future<admin_query_result> admin_actor::start_service(
// now start the compiler
auto schema_path =
server::WorkDirManipulator::GetGraphSchemaPath(graph_name);
if (!hqps_service.start_compiler_subprocess(schema_path)) {
LOG(ERROR) << "Fail to start compiler";
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(gs::StatusCode::InternalError,
"Fail to start compiler")));
}
LOG(INFO) << "Successfully started service with graph: " << graph_name;
hqps_service.reset_start_time();
return seastar::make_ready_future<admin_query_result>(
Expand Down Expand Up @@ -1112,18 +1106,9 @@ seastar::future<admin_query_result> admin_actor::stop_service(
"Fail to clear running graph")));
}
}

if (hqps_service.stop_compiler_subprocess()) {
LOG(INFO) << "Successfully stop compiler";
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
to_message_json("Successfully stop service")));
} else {
LOG(ERROR) << "Fail to stop compiler";
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::InternalError, "Fail to stop compiler")));
}
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
to_message_json("Successfully stop service")));
}
});
}
Expand Down
40 changes: 20 additions & 20 deletions flex/engines/http_server/handler/admin_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ seastar::future<> admin_http_handler::set_routes() {
{
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
interactive_admin_group_id, shard_admin_procedure_concurrency));
interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure");
Expand All @@ -701,7 +701,7 @@ seastar::future<> admin_http_handler::set_routes() {
{
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
interactive_admin_group_id, shard_admin_procedure_concurrency));
interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure");
Expand All @@ -712,7 +712,7 @@ seastar::future<> admin_http_handler::set_routes() {
// Each procedure's handling
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
interactive_admin_group_id, shard_admin_procedure_concurrency));
interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure")
Expand All @@ -725,7 +725,7 @@ seastar::future<> admin_http_handler::set_routes() {
// Each procedure's handling
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
interactive_admin_group_id, shard_admin_procedure_concurrency));
interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure")
Expand All @@ -737,7 +737,7 @@ seastar::future<> admin_http_handler::set_routes() {
// Each procedure's handling
auto match_rule =
new seastar::httpd::match_rule(new admin_http_procedure_handler_impl(
interactive_admin_group_id, shard_admin_procedure_concurrency));
interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/procedure")
Expand All @@ -750,24 +750,24 @@ seastar::future<> admin_http_handler::set_routes() {
// List all graphs.
r.add(seastar::httpd::operation_type::GET, seastar::httpd::url("/v1/graph"),
new admin_http_graph_handler_impl(interactive_admin_group_id,
shard_admin_graph_concurrency));
shard_admin_concurrency));
// Create a new Graph
r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("/v1/graph"),
new admin_http_graph_handler_impl(interactive_admin_group_id,
shard_admin_graph_concurrency));
shard_admin_concurrency));

// Delete a graph
r.add(SEASTAR_DELETE,
seastar::httpd::url("/v1/graph").remainder("graph_id"),
new admin_http_graph_handler_impl(interactive_admin_group_id,
shard_admin_graph_concurrency));
shard_admin_concurrency));
{
// uploading file to server
r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("/v1/file/upload"),
new admin_file_upload_handler_impl(interactive_admin_group_id,
shard_admin_graph_concurrency));
shard_admin_concurrency));
}

// Get graph metadata
Expand All @@ -776,7 +776,7 @@ seastar::future<> admin_http_handler::set_routes() {
// /v1/graph/{graph_id}/schema
auto match_rule =
new seastar::httpd::match_rule(new admin_http_graph_handler_impl(
interactive_admin_group_id, shard_admin_graph_concurrency));
interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph").add_param("graph_id", false);
// Get graph schema
r.add(match_rule, seastar::httpd::operation_type::GET);
Expand All @@ -785,7 +785,7 @@ seastar::future<> admin_http_handler::set_routes() {
{ // load data to graph
auto match_rule =
new seastar::httpd::match_rule(new admin_http_graph_handler_impl(
interactive_admin_group_id, shard_admin_graph_concurrency));
interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/dataloading");
Expand All @@ -794,15 +794,15 @@ seastar::future<> admin_http_handler::set_routes() {
{ // Get Graph Schema
auto match_rule =
new seastar::httpd::match_rule(new admin_http_graph_handler_impl(
interactive_admin_group_id, shard_admin_graph_concurrency));
interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph").add_param("graph_id").add_str("/schema");
r.add(match_rule, seastar::httpd::operation_type::GET);
}
{
// Get running graph statistics
auto match_rule =
new seastar::httpd::match_rule(new admin_http_graph_handler_impl(
interactive_admin_group_id, shard_admin_graph_concurrency));
interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/graph")
.add_param("graph_id")
.add_str("/statistics");
Expand All @@ -814,18 +814,18 @@ seastar::future<> admin_http_handler::set_routes() {
r.add(seastar::httpd::operation_type::GET,
seastar::httpd::url("/v1/node/status"),
new admin_http_node_handler_impl(interactive_admin_group_id,
shard_admin_node_concurrency));
shard_admin_concurrency));

auto match_rule =
new seastar::httpd::match_rule(new admin_http_service_handler_impl(
interactive_admin_group_id, shard_admin_service_concurrency));
interactive_admin_group_id, shard_admin_concurrency));
match_rule->add_str("/v1/service").add_param("action");
r.add(match_rule, seastar::httpd::operation_type::POST);

r.add(seastar::httpd::operation_type::GET,
seastar::httpd::url("/v1/service/status"),
new admin_http_service_handler_impl(
interactive_admin_group_id, shard_admin_service_concurrency));
new admin_http_service_handler_impl(interactive_admin_group_id,
shard_admin_concurrency));
}

{
Expand Down Expand Up @@ -897,17 +897,17 @@ seastar::future<> admin_http_handler::set_routes() {
// job request handling.
r.add(seastar::httpd::operation_type::GET, seastar::httpd::url("/v1/job"),
new admin_http_job_handler_impl(interactive_admin_group_id,
shard_admin_job_concurrency));
shard_admin_concurrency));
auto match_rule =
new seastar::httpd::match_rule(new admin_http_job_handler_impl(
interactive_admin_group_id, shard_admin_job_concurrency));
interactive_admin_group_id, shard_admin_concurrency));

match_rule->add_str("/v1/job").add_param("job_id");
r.add(match_rule, seastar::httpd::operation_type::GET);

r.add(SEASTAR_DELETE, seastar::httpd::url("/v1/job").remainder("job_id"),
new admin_http_job_handler_impl(interactive_admin_group_id,
shard_admin_job_concurrency));
shard_admin_concurrency));
}

return seastar::make_ready_future<>();
Expand Down
Loading
Loading