From cff87da21675da8cf40c6dede934c54f1ad4e498 Mon Sep 17 00:00:00 2001 From: shirly121 Date: Fri, 8 Sep 2023 18:00:37 +0800 Subject: [PATCH] [GIE Compiler] fix bugs of columnId in schema refactor(flex): Replace the Adhoc csv reader with Arrow CSV reader (#3154) 1. Use Arrow CSV Reader to replace current adhoc csv reader, to support more configurable options in `bulk_load.yaml`. 2. Introduce `CSVFragmentLoader`, `BasicFragmentLoader` for `MutablePropertyFragment`. With this PR merged, `MutablePropertyFragment` will support loading fragment from csv with options: - delimeter: default '|' - header_row: default true - quoting: default false - quoting_char: default '"' - escaping: default false - escaping_char: default'\\' - batch_size: the batch size of when reading file into memory, default 1MB. - batch_reader: default false. If set to true, `arrow::csv::StreamingReader` will be used to parse the input file. Otherwise, `arrow::TableReader` will be used. With this PR merged, the performance of graph loading will be improved. The Adhoc Reader denote the current implemented csv parser, 1,2,4,8 denotes the parallelism of graph loading, i.e. how many labels of vertex/edge are concurrently processed. Note that TableReader is around 10x faster than StreamingReader. The possible reason could be the multi-threading is used. See [arrow-csv-doc](https://arrow.apache.org/docs/cpp/csv.html) for details. | Reader | Phase | 1 | 2 | 4 | 8 | | --------- | -------------- | ---------- |---------- |---------- |---------- | | Adhoc Reader | ReadFile\+LoadGraph |805s| 468s| 349s| 313s| | Adhoc Reader | Serialization | 126s| 126s| 126s| 126s| | Adhoc Reader | **Total** |931s| 594s| 475s| 439s| | Table Reader | ReadFile | 9s |9s |9s| 9s| | Table Reader | LoadGraph |455s| 280s| 211s| 182s| | Table Reader |Serialization |126s| 126s| 126s| 126s| | Table Reader | **Total** | 600s| 415s| 346s| 317s| | Streaming Reader | ReadFile |91s| 91s| 91s| 91s| | Streaming Reader | LoadGraph | 555s| 289s| 196s| 149s| | Streaming Reader | Serialization |126s| 126s| 126s| 126s| | Streaming Reader | **Total** | 772s| 506s| 413s| 366s| | Reader | Phase | 1 | 2 | 4 | 8 | | --------- | -------------- | ---------- |---------- |---------- |---------- | | Adhoc Reader | ReadFile\+LoadGraph |2720s| 1548s| 1176s| 948s| | Adhoc Reader | Serialization | 409s| 409s| 409s| 409s| | Adhoc Reader | **Total** | 3129s| 1957s| 1585s| 1357s| | Table Reader | ReadFile |24s| 24s| 24s| 24s| | Table Reader | LoadGraph |1576s| 949s| 728s| 602s| | Table Reader |Serialization |409s| 409s| 409s| 409s| | Table Reader | **Total** | 2009s| 1382s| 1161s| 1035s| | Streaming Reader | ReadFile |300s| 300s| 300s| 300s| | Streaming Reader | LoadGraph | 1740s| 965s| 669s| 497s| | Streaming Reader | Serialization | 409s| 409s| 409s| 409s| | Streaming Reader | **Total** | 2539s| 1674s| 1378s| 1206s| | Reader | Phase | 1 | 2 | 4 | 8 | | --------- | -------------- | ---------- |---------- |---------- |---------- | | Adhoc Reader | ReadFile\+LoadGraph | 8260s| 4900s |3603s |2999s| | Adhoc Reader | Serialization | 1201s | 1201s| 1201s |1201s| | Adhoc Reader | **Total** | 9461s| 6101s | 4804s |4200s| | Table Reader | ReadFile | 73s |73s| 96s| 96s| | Table Reader | LoadGraph |4650s| 2768s| 2155s |1778s| | Table Reader |Serialization | 1201s | 1201s| 1201s |1201s| | Table Reader | **Total** | 5924s| 4042s| 3452s| 3075s| | Streaming Reader | ReadFile | 889s |889s | 889s| 889s| | Streaming Reader | LoadGraph | 5589s| 3005s| 2200s| 1712s| | Streaming Reader | Serialization | 1201s| 1201s| 1201s |1201s | | Streaming Reader | **Total** | 7679s | 5095s |4290s| 3802s| FIx #3116 minor fix and move modern graph fix grin test todo: do_start fix fix stash fix fix make rules unique dockerfile stash minor change remove plugin-dir fix minor fix debug debug fix fix --- .github/workflows/flex.yml | 4 +- .gitignore | 6 +- flex/bin/load_plan_and_gen.sh | 61 +- flex/bin/sync_server.cc | 66 +- .../graph_db/grin/src/topology/structure.cc | 7 +- flex/engines/graph_db/grin/test/test.c | 120 +- .../hqps_db/core/operator/edge_expand.h | 49 +- flex/engines/hqps_db/core/utils/keyed.h | 15 + flex/engines/hqps_db/database/adj_list.h | 22 +- .../hqps_db/database/mutable_csr_interface.h | 19 +- .../multi_edge_set/untyped_edge_set.h | 97 +- flex/engines/http_server/stored_procedure.cc | 39 +- flex/engines/http_server/stored_procedure.h | 16 +- flex/interactive/README.md | 8 + flex/interactive/bin/db_admin.sh | 654 ------- flex/interactive/bin/gs_interactive | 1651 +++++++++++++++++ flex/interactive/conf/engine_config.yaml | 13 + flex/interactive/conf/interactive.properties | 8 - flex/interactive/conf/interactive.yaml | 26 +- flex/interactive/data/ldbc/graph.json | 128 -- flex/interactive/data/ldbc/graph.yaml | 70 - .../docker/interactive-runtime.Dockerfile | 5 +- flex/interactive/examples/modern_graph | 1 - .../examples}/modern_graph/bulk_load.yaml | 1 + .../modern_graph/count_vertex_num.cypher | 1 + .../examples}/modern_graph/modern_graph.yaml | 4 - .../examples}/modern_graph/person.csv | 0 .../modern_graph/person_created_software.csv | 0 .../modern_graph/person_knows_person.csv | 0 .../examples}/modern_graph/software.csv | 0 flex/storages/rt_mutable_graph/schema.cc | 49 +- flex/storages/rt_mutable_graph/schema.h | 5 + flex/tests/hqps/match_query.h | 54 + flex/tests/hqps/query_test.cc | 290 +-- flex/utils/property/types.h | 22 + flex/utils/yaml_utils.cc | 32 + flex/utils/yaml_utils.h | 5 + .../groot/common/util/IrSchemaParser.java | 4 +- .../graphscope/common/config/YamlConfigs.java | 1 + .../ir/meta/reader/LocalMetaDataReader.java | 29 +- .../common/ir/meta/schema/IrGraphSchema.java | 7 +- 41 files changed, 2375 insertions(+), 1214 deletions(-) delete mode 100755 flex/interactive/bin/db_admin.sh create mode 100755 flex/interactive/bin/gs_interactive create mode 100644 flex/interactive/conf/engine_config.yaml delete mode 100755 flex/interactive/conf/interactive.properties delete mode 100755 flex/interactive/data/ldbc/graph.json delete mode 100755 flex/interactive/data/ldbc/graph.yaml delete mode 120000 flex/interactive/examples/modern_graph rename flex/{storages/rt_mutable_graph => interactive/examples}/modern_graph/bulk_load.yaml (98%) create mode 100644 flex/interactive/examples/modern_graph/count_vertex_num.cypher rename flex/{storages/rt_mutable_graph => interactive/examples}/modern_graph/modern_graph.yaml (94%) rename flex/{storages/rt_mutable_graph => interactive/examples}/modern_graph/person.csv (100%) rename flex/{storages/rt_mutable_graph => interactive/examples}/modern_graph/person_created_software.csv (100%) rename flex/{storages/rt_mutable_graph => interactive/examples}/modern_graph/person_knows_person.csv (100%) rename flex/{storages/rt_mutable_graph => interactive/examples}/modern_graph/software.csv (100%) create mode 100644 flex/utils/yaml_utils.cc diff --git a/.github/workflows/flex.yml b/.github/workflows/flex.yml index 77267b8e6463..6f5a798f5fa1 100644 --- a/.github/workflows/flex.yml +++ b/.github/workflows/flex.yml @@ -55,7 +55,9 @@ jobs: mkdir build && cd build cmake .. && sudo make -j$(nproc) export FLEX_DATA_DIR=../../../../storages/rt_mutable_graph/modern_graph/ - ./run_grin_test + ./run_grin_test flex://../../../../interactive/examples/modern_graph/ \ + ../../../../interactive/examples/modern_graph/modern_graph.yaml \ + ../../../../interactive/examples/modern_graph/bulk_load.yaml - name: Test Graph Loading on modern graph env: diff --git a/.gitignore b/.gitignore index 50f7e42c339a..477bf177b5ef 100644 --- a/.gitignore +++ b/.gitignore @@ -90,8 +90,8 @@ core.* # Flex related flex/docs/ -flex/interactive/data/*/indices/ -flex/interactive/data/*/plugins/ +flex/interactive/data/* flex/interactive/logs/* flex/interactive/examples/sf0.1-raw/ -flex/interactive/.running \ No newline at end of file +flex/interactive/.running +flex/interactive/.env \ No newline at end of file diff --git a/flex/bin/load_plan_and_gen.sh b/flex/bin/load_plan_and_gen.sh index 354112fe9499..aac1920a122f 100755 --- a/flex/bin/load_plan_and_gen.sh +++ b/flex/bin/load_plan_and_gen.sh @@ -68,9 +68,11 @@ fi #fi cypher_to_plan() { - if [ $# -ne 7 ]; then + if [ $# -ne 9 ]; then echo "Usage: $0 " - echo " , but receive: "$# + echo " " + echo " " + echo " but receive: "$# exit 1 fi query_name=$1 @@ -150,8 +152,10 @@ cypher_to_plan() { compile_hqps_so() { #check input params size eq 2 or 3 - if [ $# -ne 5 ] && [ $# -ne 6 ]; then - echo "Usage: $0 [output_dir]" + if [ $# -gt 8 ] || [ $# -lt 5 ]; then + echo "Usage: $0 " + echo " " + echo " [output_dir] [stored_procedure_name] [stored_procedure_description]" exit 1 fi input_path=$1 @@ -159,17 +163,32 @@ compile_hqps_so() { ir_compiler_properties=$3 graph_schema_path=$4 gie_home=$5 - if [ $# -eq 6 ]; then + if [ $# -ge 6 ]; then output_dir=$6 else output_dir=${work_dir} fi + + if [ $# -ge 7 ]; then + procedure_name=$7 + else + procedure_name="" + fi + + if [ $# -ge 8 ]; then + procedure_description=$8 + else + procedure_description="" + fi + echo "Input path = ${input_path}" echo "Work dir = ${work_dir}" echo "ir compiler properties = ${ir_compiler_properties}" echo "graph schema path = ${graph_schema_path}" echo "GIE_HOME = ${gie_home}" echo "Output dir = ${output_dir}" + echo "Procedure name = ${procedure_name}" + echo "Procedure description = ${procedure_description}" last_file_name=$(basename ${input_path}) @@ -188,9 +207,18 @@ compile_hqps_so() { echo "Expect a .pb or .cc file" exit 1 fi + # if procedure_name is not set, use query_name + if [ -z ${procedure_name} ]; then + procedure_name=${query_name} + fi + # if procedure_description is not set, use query_name + if [ -z ${procedure_description} ]; then + procedure_description="\"Stored procedure for ${query_name}\"" + fi cur_dir=${work_dir} mkdir -p ${cur_dir} output_cc_path="${cur_dir}/${query_name}.cc" + dst_yaml_path="${output_dir}/${query_name}.yaml" if [[ $(uname) == "Linux" ]]; then output_so_path="${cur_dir}/lib${query_name}.so" dst_so_path="${output_dir}/lib${query_name}.so" @@ -213,7 +241,10 @@ compile_hqps_so() { # first do .cypher to .pb output_pb_path="${cur_dir}/${query_name}.pb" output_yaml_path="${cur_dir}/${query_name}.yaml" - cypher_to_plan ${query_name} ${input_path} ${output_pb_path} ${output_yaml_path} ${ir_compiler_properties} ${graph_schema_path} ${gie_home} + cypher_to_plan ${query_name} ${input_path} ${output_pb_path} \ + ${output_yaml_path} ${ir_compiler_properties} ${graph_schema_path} ${gie_home} \ + ${procedure_name} '${procedure_description}' + echo "----------------------------" echo "Codegen from cypher query done." echo "----------------------------" @@ -294,6 +325,12 @@ compile_hqps_so() { echo "Copy failed, ${dst_so_path} not exists." exit 1 fi + # copy the generated yaml + cp ${output_yaml_path} ${output_dir} + if [ ! -f ${dst_yaml_path} ]; then + echo "Copy failed, ${dst_yaml_path} not exists." + exit 1 + fi echo "Finish copying, output to ${dst_so_path}" } @@ -461,6 +498,14 @@ run() { OUTPUT_DIR="${i#*=}" shift # past argument=value ;; + --procedure_name=*) + PROCEDURE_NAME="${i#*=}" + shift # past argument=value + ;; + --procedure_desc=*) + PROCEDURE_DESCRIPTION="${i#*=}" + shift # past argument=value + ;; -* | --*) echo "Unknown option $i" exit 1 @@ -477,6 +522,8 @@ run() { echo "graph_schema_path ="${GRAPH_SCHEMA_PATH} echo "GIE_HOME ="${GIE_HOME} echo "Output path ="${OUTPUT_DIR} + echo "Procedure name ="${PROCEDURE_NAME} + echo "Procedure description ="${PROCEDURE_DESCRIPTION} # check input exist if [ ! -f ${INPUT} ]; then @@ -487,7 +534,7 @@ run() { # if engine_type equals hqps if [ ${ENGINE_TYPE} == "hqps" ]; then echo "Engine type is hqps, generating dynamic library for hqps engine." - compile_hqps_so ${INPUT} ${WORK_DIR} ${IR_CONF} ${GRAPH_SCHEMA_PATH} ${GIE_HOME} ${OUTPUT_DIR} + compile_hqps_so ${INPUT} ${WORK_DIR} ${IR_CONF} ${GRAPH_SCHEMA_PATH} ${GIE_HOME} ${OUTPUT_DIR} ${PROCEDURE_NAME} \"${PROCEDURE_DESCRIPTION}\" # else if engine_type equals pegasus elif [ ${ENGINE_TYPE} == "pegasus" ]; then diff --git a/flex/bin/sync_server.cc b/flex/bin/sync_server.cc index 67b3d5c59f3d..13c8736e735a 100644 --- a/flex/bin/sync_server.cc +++ b/flex/bin/sync_server.cc @@ -120,11 +120,11 @@ std::tuple parse_from_server_config( << engine_type_str; } } - auto shard_num_node = engine_node["shared_num"]; + auto shard_num_node = engine_node["shard_num"]; if (shard_num_node) { shard_num = shard_num_node.as(); } else { - LOG(INFO) << "shared_num not found, use default value " + LOG(INFO) << "shard_num not found, use default value " << DEFAULT_SHARD_NUM; } auto host_node = engine_node["hosts"]; @@ -151,23 +151,9 @@ std::tuple parse_from_server_config( } } -void load_plugins(const bpo::variables_map& vm) { - if (vm.count("plugin-dir") == 0) { - LOG(INFO) << "plugin-dir is not specified"; - return; - } - std::string plugin_dir = vm["plugin-dir"].as(); - if (!std::filesystem::exists(plugin_dir)) { - LOG(FATAL) << "plugin dir not exists: " << plugin_dir; - } - LOG(INFO) << "plugin dir: " << plugin_dir; - if (!plugin_dir.empty()) { - LOG(INFO) << "Load plugins from dir: " << plugin_dir; - server::StoredProcedureManager::get().LoadFromPluginDir(plugin_dir); - } -} - -void init_codegen_proxy(const bpo::variables_map& vm) { +void init_codegen_proxy(const bpo::variables_map& vm, + const std::string& graph_schema_file, + const std::string& engine_config_file) { std::string codegen_dir = parse_codegen_dir(vm); std::string codegen_bin; std::string gie_home; @@ -181,25 +167,6 @@ void init_codegen_proxy(const bpo::variables_map& vm) { LOG(FATAL) << "codegen bin not exists: " << codegen_bin; } } - std::string ir_compiler_properties; - std::string compiler_graph_schema; - if (vm.count("ir-compiler-prop") == 0) { - LOG(FATAL) << "ir-compiler-prop is not specified"; - } else { - ir_compiler_properties = vm["ir-compiler-prop"].as(); - if (!std::filesystem::exists(ir_compiler_properties)) { - LOG(FATAL) << "ir-compiler-prop not exists: " << ir_compiler_properties; - } - } - if (vm.count("compiler-graph-schema") == 0) { - LOG(FATAL) << "compiler-graph-schema is not specified"; - } else { - compiler_graph_schema = vm["compiler-graph-schema"].as(); - if (!std::filesystem::exists(compiler_graph_schema)) { - LOG(FATAL) << "compiler-graph-schema not exists: " - << compiler_graph_schema; - } - } if (vm.count("gie-home") == 0) { LOG(FATAL) << "gie-home is not specified"; } else { @@ -208,9 +175,8 @@ void init_codegen_proxy(const bpo::variables_map& vm) { LOG(FATAL) << "gie-home not exists: " << gie_home; } } - server::CodegenProxy::get().Init(codegen_dir, codegen_bin, - ir_compiler_properties, - compiler_graph_schema, gie_home); + server::CodegenProxy::get().Init(codegen_dir, codegen_bin, graph_schema_file, + engine_config_file, gie_home); } } // namespace gs @@ -226,12 +192,7 @@ int main(int argc, char** argv) { "graph-config,g", bpo::value(), "graph schema config file")( "data-path,a", bpo::value(), "data directory path")( "bulk-load,l", bpo::value(), "bulk-load config file")( - "plugin-dir,p", bpo::value(), "plugin directory path")( - "gie-home,h", bpo::value(), "path to gie home")( - "ir-compiler-prop,i", bpo::value(), - "ir compiler property file")("compiler-graph-schema,z", - bpo::value(), - "compiler graph schema file"); + "gie-home,h", bpo::value(), "path to gie home"); setenv("TZ", "Asia/Shanghai", 1); tzset(); @@ -251,9 +212,10 @@ int main(int argc, char** argv) { std::string data_path; std::string bulk_load_config_path; std::string plugin_dir; + std::string server_config_path; if (vm.count("server-config") != 0) { - std::string server_config_path = vm["server-config"].as(); + server_config_path = vm["server-config"].as(); // check file exists if (!std::filesystem::exists(server_config_path)) { LOG(ERROR) << "server-config not exists: " << server_config_path; @@ -294,8 +256,12 @@ int main(int argc, char** argv) { LOG(INFO) << "Finished loading graph, elapsed " << t0 << " s"; // loading plugin - gs::load_plugins(vm); - gs::init_codegen_proxy(vm); + if (!schema.GetPluginDir().empty() && !schema.GetPluginsList().empty()) { + server::StoredProcedureManager::get().LoadFromPluginDir( + schema.GetPluginDir(), schema.GetPluginsList()); + } + + gs::init_codegen_proxy(vm, graph_schema_path, server_config_path); server::HQPSService::get().init(shard_num, http_port, false); server::HQPSService::get().run_and_wait_for_exit(); diff --git a/flex/engines/graph_db/grin/src/topology/structure.cc b/flex/engines/graph_db/grin/src/topology/structure.cc index 5698fdc5291a..ba72fd91b75a 100644 --- a/flex/engines/graph_db/grin/src/topology/structure.cc +++ b/flex/engines/graph_db/grin/src/topology/structure.cc @@ -24,7 +24,8 @@ limitations under the License. * flex://{path_to_yaml} * @return A graph handle. */ -GRIN_GRAPH grin_get_graph_from_storage(const char* uri) { +GRIN_GRAPH grin_get_graph_from_storage(const char* uri, const char* schema_file, + const char* bulk_load_file) { std::string _uri(uri); std::string::size_type pos = _uri.find("://"); if (pos == std::string::npos) { @@ -35,9 +36,9 @@ GRIN_GRAPH grin_get_graph_from_storage(const char* uri) { return GRIN_NULL_GRAPH; } _uri = _uri.substr(pos + 3); - std::string graph_schema_path = _uri + "/modern_graph.yaml"; + std::string graph_schema_path = schema_file; std::string data_path = uri; - std::string bulk_load_config_path = _uri + "/bulk_load.yaml"; + std::string bulk_load_config_path = bulk_load_file; if (!std::filesystem::exists(graph_schema_path) || !(std::filesystem::exists(bulk_load_config_path))) { return GRIN_NULL_GRAPH; diff --git a/flex/engines/graph_db/grin/test/test.c b/flex/engines/graph_db/grin/test/test.c index 4612d041885b..19c95e43b701 100644 --- a/flex/engines/graph_db/grin/test/test.c +++ b/flex/engines/graph_db/grin/test/test.c @@ -1,8 +1,8 @@ #include +#include #include #include #include -#include #include "grin/predefine.h" @@ -126,7 +126,8 @@ const char* v_names[][4] = {{"josh", "vadas", "peter", "marko"}, {"lop", "ripple", "wrong", "wrong"}}; // TODO align with order in local graph -GRIN_GRAPH get_graph(const char* uri_str, int p) { +GRIN_GRAPH get_graph(const char* uri_str, const char* schema_file, + const char* bulk_load_file, int p) { #ifdef GRIN_ENABLE_GRAPH_PARTITION GRIN_PARTITIONED_GRAPH pg = grin_get_partitioned_graph_from_storage(argv[1]); GRIN_PARTITION_LIST local_partitions = grin_get_local_partition_list(pg); @@ -144,7 +145,7 @@ GRIN_GRAPH get_graph(const char* uri_str, int p) { grin_destroy_partition_list(pg, local_partitions); grin_destroy_partitioned_graph(pg); #else - GRIN_GRAPH g = grin_get_graph_from_storage(uri_str); + GRIN_GRAPH g = grin_get_graph_from_storage(uri_str, schema_file, bulk_load_file); #endif return g; } @@ -181,10 +182,11 @@ GRIN_VERTEX get_one_person(GRIN_GRAPH g) { return v; } -void test_property_type(const char* uri_str) { +void test_property_type(const char* uri_str, const char* schema_file, + const char* bulk_load_file) { printf("+++++++++++++++++++++ Test property/type +++++++++++++++++++++\n"); - GRIN_GRAPH g = get_graph(uri_str, 0); + GRIN_GRAPH g = get_graph(uri_str, schema_file,bulk_load_file, 0); printf("------------ Vertex Type ------------\n"); GRIN_VERTEX_TYPE_LIST vtl = grin_get_vertex_type_list(g); @@ -334,9 +336,11 @@ void test_property_type(const char* uri_str) { grin_destroy_graph(g); } -void test_property_vertex_property_value(const char* uri_str) { +void test_property_vertex_property_value(const char* uri_str, + const char* schema_file, + const char* bulk_load_file) { printf("------------ Test Vertex property value ------------\n"); - GRIN_GRAPH g = get_graph(uri_str, 0); + GRIN_GRAPH g = get_graph(uri_str, schema_file,bulk_load_file, 0); // value check printf("------ check value ------\n"); @@ -469,9 +473,11 @@ void test_property_vertex_property_value(const char* uri_str) { } void test_property_edge_property_value(const char* uri_str, + const char* schema_file, + const char* bulk_load_file, GRIN_DIRECTION dir) { printf("------------ Test Edge property value ------------\n"); - GRIN_GRAPH g = get_graph(uri_str, 0); + GRIN_GRAPH g = get_graph(uri_str, schema_file,bulk_load_file, 0); // value check printf("------ check value ------\n"); @@ -626,11 +632,11 @@ void test_property_edge_property_value(const char* uri_str, } #ifdef GRIN_ENABLE_VERTEX_PRIMARY_KEYS -void test_property_primary_key(const char* uri_str) { +void test_property_primary_key(const char* uri_str, const char*schema_file, const char* bulk_load_file) { printf( "+++++++++++++++++++++ Test property/primary key " "+++++++++++++++++++++\n"); - GRIN_GRAPH g = get_graph(uri_str, 0); + GRIN_GRAPH g = get_graph(uri_str, schema_file,bulk_load_file, 0); GRIN_VERTEX_TYPE_LIST vtl = grin_get_vertex_types_with_primary_keys(g); size_t vtl_size = grin_get_vertex_type_list_size(g, vtl); printf("vertex type num with primary key: %zu\n", vtl_size); @@ -683,9 +689,9 @@ void test_property_primary_key(const char* uri_str) { } #endif -void test_error_code(const char* uri_str) { +void test_error_code(const char* uri_str,const char* schema_file, const char* bulk_load_file) { printf("+++++++++++++++++++++ Test error code +++++++++++++++++++++\n"); - GRIN_GRAPH g = get_graph(uri_str, 0); + GRIN_GRAPH g = get_graph(uri_str, schema_file,bulk_load_file, 0); GRIN_VERTEX_TYPE vt1 = grin_get_vertex_type_by_name(g, "person"); GRIN_VERTEX_TYPE vt2 = grin_get_vertex_type_by_name(g, "software"); @@ -700,24 +706,25 @@ void test_error_code(const char* uri_str) { assert(grin_get_last_error_code() == INVALID_VALUE); } -void test_property(const char* uri_str) { - test_property_type(uri_str); - test_property_vertex_property_value(uri_str); - test_property_edge_property_value(uri_str, OUT); - test_property_edge_property_value(uri_str, IN); +void test_property(const char* uri_str, const char* schema_file, + const char* bulk_load_file) { + test_property_type(uri_str, schema_file, bulk_load_file); + test_property_vertex_property_value(uri_str, schema_file, bulk_load_file); + test_property_edge_property_value(uri_str, schema_file, bulk_load_file, OUT); + test_property_edge_property_value(uri_str, schema_file, bulk_load_file, IN); #ifdef GRIN_ENABLE_VERTEX_PRIMARY_KEYS - test_property_primary_key(uri_str); + test_property_primary_key(uri_str,schema_file, bulk_load_file); #endif #ifdef GRIN_WITH_VERTEX_PROPERTY_NAME - test_error_code(uri_str); + test_error_code(uri_str,schema_file, bulk_load_file); #endif } /** void test_partition_reference(const char* uri_str) { printf("+++++++++++++++++++++ Test partition/reference -+++++++++++++++++++++\n"); ++++++++++++++++++++++\n"); GRIN_PARTITIONED_GRAPH pg = -grin_get_partitioned_graph_from_storage(argv[1]); +grin_get_partitioned_graph_from_storage(argv[1]); GRIN_PARTITION_LIST local_partitions = grin_get_local_partition_list(pg); assert(grin_get_partition_list_size(pg, local_partitions) >= 2); @@ -827,10 +834,11 @@ void test_partition(const char* uri_str) { #endif } */ -void test_topology_structure(const char* uri_str) { +void test_topology_structure(const char* uri_str, const char* schema_file, + const char* bulk_load_file) { printf( "+++++++++++++++++++++ Test topology/structure +++++++++++++++++++++\n"); - GRIN_GRAPH g = get_graph(uri_str, 0); + GRIN_GRAPH g = get_graph(uri_str, schema_file,bulk_load_file, 0); #ifndef GRIN_WITH_VERTEX_PROPERTY printf("vertex num: %zu\n", grin_get_vertex_num(g)); #endif @@ -841,11 +849,12 @@ void test_topology_structure(const char* uri_str) { grin_destroy_graph(g); } -void test_topology_vertex_list(const char* uri_str) { +void test_topology_vertex_list(const char* uri_str, const char* schema_file, + const char* bulk_load_file) { printf( "+++++++++++++++++++++ Test topology/vertex_list " "+++++++++++++++++++++\n"); - GRIN_GRAPH g = get_graph(uri_str, 0); + GRIN_GRAPH g = get_graph(uri_str, schema_file,bulk_load_file, 0); FOR_VERTEX_LIST_BEGIN(g, vl) FOR_VERTEX_BEGIN(g, vl, v) @@ -860,7 +869,8 @@ void test_topology_vertex_list(const char* uri_str) { grin_destroy_graph(g); } -void test_topology_adjacent_list(const char* uri_str, GRIN_DIRECTION dir) { +void test_topology_adjacent_list(const char* uri_str, const char* schema_file, + const char* bulk_load_file, GRIN_DIRECTION dir) { if (dir == IN) { printf( "+++++++++++++++++++++ Test topology/adjacent_list IN " @@ -871,7 +881,7 @@ void test_topology_adjacent_list(const char* uri_str, GRIN_DIRECTION dir) { "+++++++++++++++++++++\n"); } - GRIN_GRAPH g = get_graph(uri_str, 0); + GRIN_GRAPH g = get_graph(uri_str, schema_file,bulk_load_file, 0); FOR_VERTEX_LIST_BEGIN(g, vl) FOR_VERTEX_BEGIN(g, vl, v) @@ -938,18 +948,20 @@ void test_topology_adjacent_list(const char* uri_str, GRIN_DIRECTION dir) { grin_destroy_graph(g); } -void test_topology(const char* uri_str) { - test_topology_structure(uri_str); - test_topology_vertex_list(uri_str); - test_topology_adjacent_list(uri_str, OUT); - test_topology_adjacent_list(uri_str, IN); +void test_topology(const char* uri_str, const char* schema_file, + const char* bulk_load_file) { + test_topology_structure(uri_str, schema_file, bulk_load_file); + test_topology_vertex_list(uri_str, schema_file, bulk_load_file); + test_topology_adjacent_list(uri_str, schema_file, bulk_load_file, OUT); + test_topology_adjacent_list(uri_str, schema_file, bulk_load_file, IN); } #if defined(GRIN_ASSUME_ALL_VERTEX_LIST_SORTED) && \ defined(GRIN_ENABLE_VERTEX_LIST_ARRAY) -void test_index_order(const char* uri_str) { +void test_index_order(const char* uri_str, const char* schema_file, + const char* bulk_load_file) { printf("+++++++++++++++++++++ Test index order +++++++++++++++++++++\n"); - GRIN_GRAPH g = get_graph(uri_str, 0); + GRIN_GRAPH g = get_graph(uri_str, schema_file,bulk_load_file, 0); FOR_VERTEX_LIST_BEGIN(g, vl) FOR_VERTEX_BEGIN(g, vl, v) @@ -996,10 +1008,11 @@ void test_index_order(const char* uri_str) { } #endif -void test_index_internal_id(const char* uri_str) { +void test_index_internal_id(const char* uri_str, const char* schema_file, + const char* bulk_load_file) { printf( "+++++++++++++++++++++ Test index internal id +++++++++++++++++++++\n"); - GRIN_GRAPH g = get_graph(uri_str, 0); + GRIN_GRAPH g = get_graph(uri_str, schema_file,bulk_load_file, 0); FOR_VERTEX_LIST_BEGIN(g, vl) long long int min = grin_get_vertex_internal_id_lower_bound_by_type(g, __vt); @@ -1018,18 +1031,19 @@ void test_index_internal_id(const char* uri_str) { grin_destroy_graph(g); } -void test_index(const char* uri_str) { +void test_index(const char* uri_str, const char* schema_file, + const char* bulk_load_file) { #if defined(GRIN_ASSUME_ALL_VERTEX_LIST_SORTED) && \ defined(GRIN_ENABLE_VERTEX_LIST_ARRAY) - test_index_order(uri_str); + test_index_order(uri_str, schema_file, bulk_load_file); #endif #ifdef GRIN_ENABLE_VERTEX_INTERNAL_ID_INDEX - test_index_internal_id(uri_str); + test_index_internal_id(uri_str, schema_file, bulk_load_file); #endif } -void test_vertex_property_value(const char* uri_str) { - GRIN_GRAPH g = get_graph(uri_str, 0); +void test_vertex_property_value(const char* uri_str, const char* schema_file,const char* bulk_load_file) { + GRIN_GRAPH g = get_graph(uri_str, schema_file,bulk_load_file, 0); GRIN_VERTEX_TYPE vt = grin_get_vertex_type_by_name(g, "person"); GRIN_VERTEX_PROPERTY vp = grin_get_vertex_property_by_name(g, vt, "age"); GRIN_VERTEX v = get_one_person(g); @@ -1049,17 +1063,23 @@ void test_vertex_property_value(const char* uri_str) { grin_destroy_graph(g); } -void test_perf(const char* uri_str) { test_vertex_property_value(uri_str); } - +void test_perf(const char* uri_str,const char* schema_file,const char* bulk_load_file) { test_vertex_property_value(uri_str, schema_file, bulk_load_file); } +// uri_str = +//"flex://" +// "../../../../storages/rt_mutable_graph/modern_graph/"; int main(int argc, char** argv) { - const char* uri_str = - "flex://" - "../../../../storages/rt_mutable_graph/modern_graph/"; + if (argc != 4) { + printf("Usage: %s \n", argv[0]); + return 1; + } + const char* uri_str = argv[1]; + const char* schema_file = argv[2]; + const char* bulk_load_file = argv[3]; - test_index(uri_str); - test_property(uri_str); + test_index(uri_str, schema_file, bulk_load_file); + test_property(uri_str, schema_file, bulk_load_file); // test_partition(uri_str); - test_topology(uri_str); - test_perf(uri_str); + test_topology(uri_str, schema_file, bulk_load_file); + test_perf(uri_str, schema_file, bulk_load_file); return 0; } diff --git a/flex/engines/hqps_db/core/operator/edge_expand.h b/flex/engines/hqps_db/core/operator/edge_expand.h index 4d365ed38aa9..cfa6cf081b0d 100644 --- a/flex/engines/hqps_db/core/operator/edge_expand.h +++ b/flex/engines/hqps_db/core/operator/edge_expand.h @@ -731,20 +731,25 @@ class EdgeExpand { // Expand from multi label vertices and though multi edge labels. // result in general edge set. auto src_label = cur_vertex_set.GetLabel(); - LOG(INFO) << "[EdgeExpandEMultiTriplet] real labels: " - << gs::to_string(edge_labels); + LOG(INFO) << "[EdgeExpandEMultiTriplet] real labels: "; + for (auto i = 0; i < edge_labels.size(); ++i) { + LOG(INFO) << std::to_string(edge_labels[i][0]) << " " + << std::to_string(edge_labels[i][1]) << " " + << std::to_string(edge_labels[i][2]); + } // for each triplet, returns a vector of edge iters. auto& vertices = cur_vertex_set.GetVertices(); using sub_graph_t = typename GRAPH_INTERFACE::sub_graph_t; using edge_iter_t = typename sub_graph_t::iterator; std::vector sub_graphs; + auto prop_names_vec = prop_names_to_vec(prop_names); for (auto i = 0; i < edge_labels.size(); ++i) { // Check whether the edge triplet match input vertices. // return a hanlder to get edges - auto sub_graph_vec = - graph.GetSubGraph(edge_labels[i][0], edge_labels[i][1], - edge_labels[i][2], gs::to_string(direction)); + auto sub_graph_vec = graph.GetSubGraph( + edge_labels[i][0], edge_labels[i][1], edge_labels[i][2], + gs::to_string(direction), prop_names_vec[i]); for (auto sub_graph : sub_graph_vec) { sub_graphs.emplace_back(sub_graph); } @@ -811,7 +816,8 @@ class EdgeExpand { } auto set = UnTypedEdgeSet( - vertices, label_indices, label_vec, std::move(label_to_subgraphs)); + vertices, label_indices, label_vec, std::move(label_to_subgraphs), + direction); return std::make_pair(std::move(set), std::move(offsets)); } @@ -854,12 +860,13 @@ class EdgeExpand { using sub_graph_t = typename GRAPH_INTERFACE::sub_graph_t; using edge_iter_t = typename sub_graph_t::iterator; std::vector sub_graphs; + auto prop_names_vec = prop_names_to_vec(prop_names); for (auto i = 0; i < edge_labels.size(); ++i) { // Check whether the edge triplet match input vertices. // return a hanlder to get edges - auto sub_graph_vec = - graph.GetSubGraph(edge_labels[i][0], edge_labels[i][1], - edge_labels[i][2], gs::to_string(direction)); + auto sub_graph_vec = graph.GetSubGraph( + edge_labels[i][0], edge_labels[i][1], edge_labels[i][2], + gs::to_string(direction), prop_names_vec[i]); for (auto sub_graph : sub_graph_vec) { sub_graphs.emplace_back(sub_graph); } @@ -936,7 +943,8 @@ class EdgeExpand { } auto set = UnTypedEdgeSet( - vertices, label_indices, label_vec, std::move(label_to_subgraphs)); + vertices, label_indices, label_vec, std::move(label_to_subgraphs), + direction); return std::make_pair(std::move(set), std::move(offsets)); } @@ -1490,7 +1498,26 @@ class EdgeExpand { << gs::to_string(edge_label_id) << ", new vertices count: " << tmp_offset.back(); } -}; // namespace gs + + template + static void emplace_prop_names_to_vec( + std::vector>& vec_vec_prop_names, + std::tuple...>& prop_names, + std::index_sequence) { + (vec_vec_prop_names.emplace_back(array_to_vec(std::get(prop_names))), + ...); + } + template + static std::vector> prop_names_to_vec( + std::tuple...>& prop_names) { + std::vector> vec_vec_prop_names; + vec_vec_prop_names.reserve(sizeof...(PropTuple)); + emplace_prop_names_to_vec( + vec_vec_prop_names, prop_names, + std::make_index_sequence()); + return vec_vec_prop_names; + } +}; } // namespace gs diff --git a/flex/engines/hqps_db/core/utils/keyed.h b/flex/engines/hqps_db/core/utils/keyed.h index 2bd8dff06708..58dda429a2e3 100644 --- a/flex/engines/hqps_db/core/utils/keyed.h +++ b/flex/engines/hqps_db/core/utils/keyed.h @@ -383,6 +383,21 @@ struct KeyedAggT, } }; +template +struct KeyedAggT, AggFunc::COUNT, + std::tuple, + std::integer_sequence> { + using agg_res_t = Collection; + using aggregate_res_builder_t = CountBuilder; + + static aggregate_res_builder_t create_agg_builder( + const FlatEdgeSet& set, const GI& graph, + std::tuple>& selectors) { + return CountBuilder(); + } +}; + template static inline auto insert_into_builder_v2_impl( diff --git a/flex/engines/hqps_db/database/adj_list.h b/flex/engines/hqps_db/database/adj_list.h index 2f28afcdde78..fb8f8e996a01 100644 --- a/flex/engines/hqps_db/database/adj_list.h +++ b/flex/engines/hqps_db/database/adj_list.h @@ -49,9 +49,14 @@ class EdgeIter { inline label_id_t GetSrcLabel() const { return label_triplet_[0]; } inline Any GetData() const { return ptr1_->get_data(); } - inline bool IsValid() const { return ptr1_->is_valid(); } + inline bool IsValid() const { return ptr1_ && ptr1_->is_valid(); } - size_t Size() const { return ptr1_->size(); } + size_t Size() const { + if (ptr1_) { + return ptr1_->size(); + } + return 0; + } private: std::shared_ptr ptr1_; @@ -66,21 +71,28 @@ class SubGraph { using iterator = EdgeIter; using label_id_t = LabelT; SubGraph(const MutableCsrBase* first, - const std::array& label_triplet) - : first_(first), label_triplet_(label_triplet) {} + const std::array& label_triplet, + const std::vector& prop_names) + : first_(first), label_triplet_(label_triplet), prop_names_(prop_names) {} inline iterator get_edges(VID_T vid) const { - return iterator(label_triplet_, first_->edge_iter(vid)); + if (first_) { + return iterator(label_triplet_, first_->edge_iter(vid)); + } + return iterator(label_triplet_, nullptr); } label_id_t GetSrcLabel() const { return label_triplet_[0]; } label_id_t GetEdgeLabel() const { return label_triplet_[2]; } label_id_t GetDstLabel() const { return label_triplet_[1]; } + const std::vector& GetPropNames() const { return prop_names_; } + private: const MutableCsrBase* first_; // We assume first is out edge, second is in edge. std::array label_triplet_; + std::vector prop_names_; }; template diff --git a/flex/engines/hqps_db/database/mutable_csr_interface.h b/flex/engines/hqps_db/database/mutable_csr_interface.h index 5531246ddbf2..0f1b9f3c89fa 100644 --- a/flex/engines/hqps_db/database/mutable_csr_interface.h +++ b/flex/engines/hqps_db/database/mutable_csr_interface.h @@ -498,21 +498,21 @@ class MutableCSRInterface { // get edges with input vids. return a edge list. std::vector> GetSubGraph(const label_id_t src_label_id, const label_id_t dst_label_id, - const label_id_t edge_label_id, - const std::string& direction_str) const { + const label_id_t edge_label_id, const std::string& direction_str, + const std::vector& prop_names) const { const MutableCsrBase *csr = nullptr, *other_csr = nullptr; if (direction_str == "out" || direction_str == "Out" || direction_str == "OUT") { csr = db_session_.graph().get_oe_csr(src_label_id, dst_label_id, edge_label_id); - return std::vector{ - sub_graph_t{csr, {src_label_id, dst_label_id, edge_label_id}}}; + return std::vector{sub_graph_t{ + csr, {src_label_id, dst_label_id, edge_label_id}, prop_names}}; } else if (direction_str == "in" || direction_str == "In" || direction_str == "IN") { csr = db_session_.graph().get_ie_csr(src_label_id, dst_label_id, edge_label_id); - return std::vector{ - sub_graph_t{csr, {src_label_id, dst_label_id, edge_label_id}}}; + return std::vector{sub_graph_t{ + csr, {src_label_id, dst_label_id, edge_label_id}, prop_names}}; } else if (direction_str == "both" || direction_str == "Both" || direction_str == "BOTH") { csr = db_session_.graph().get_oe_csr(src_label_id, dst_label_id, @@ -520,8 +520,11 @@ class MutableCSRInterface { other_csr = db_session_.graph().get_ie_csr(src_label_id, dst_label_id, edge_label_id); return std::vector{ - sub_graph_t{csr, {src_label_id, dst_label_id, edge_label_id}}, - sub_graph_t{other_csr, {dst_label_id, src_label_id, edge_label_id}}}; + sub_graph_t{ + csr, {src_label_id, dst_label_id, edge_label_id}, prop_names}, + sub_graph_t{other_csr, + {dst_label_id, src_label_id, edge_label_id}, + prop_names}}; } else { throw std::runtime_error("Not implemented - " + direction_str); } diff --git a/flex/engines/hqps_db/structures/multi_edge_set/untyped_edge_set.h b/flex/engines/hqps_db/structures/multi_edge_set/untyped_edge_set.h index 64ff2f1501c3..fcdc9d1a42a8 100644 --- a/flex/engines/hqps_db/structures/multi_edge_set/untyped_edge_set.h +++ b/flex/engines/hqps_db/structures/multi_edge_set/untyped_edge_set.h @@ -171,12 +171,14 @@ class UnTypedEdgeSet { const std::vector& src_v, const std::vector& label_indices, const std::vector& labels, - std::unordered_map>&& adj_lists) + std::unordered_map>&& adj_lists, + const Direction& direction) : src_vertices_(src_v), label_indices_(label_indices), src_labels_(labels), adj_lists_(std::move(adj_lists)), - size_(0) { + size_(0), + direction_(direction) { sanity_check(); } @@ -368,6 +370,67 @@ class UnTypedEdgeSet { LOG(FATAL) << "not implemented, and should not be called"; } + template ::type* = nullptr> + auto ProjectWithRepeatArray(const std::vector& repeat_array, + KeyAlias& key_alias) const { + using dst_ele_tuple_t = std::tuple; + CHECK(repeat_array.size() == Size()); + size_t real_size = 0; + for (auto v : repeat_array) { + real_size += v; + } + std::vector dst_eles; + dst_eles.reserve(real_size); + auto edge_label_triplets = get_edge_triplets(); + auto edge_iters = generate_iters(); + std::vector label_triplet_indices; + label_triplet_indices.reserve(real_size); + std::vector sizes; + sizes.emplace_back(0); + for (auto i = 0; i < edge_label_triplets.size(); ++i) { + sizes.emplace_back(sizes.back() + edge_label_triplets[i].size()); + } + + // 0,2,4 + size_t cur_ind = 0; + for (auto i = 0; i < src_vertices_.size(); ++i) { + auto src_vid = src_vertices_[i]; + auto& cur_edge_iters = edge_iters[i]; + auto src_label_ind = label_indices_[i]; + auto src_label = src_labels_[src_label_ind]; + auto cur_triplets_vec = edge_label_triplets[src_label_ind]; + CHECK(cur_triplets_vec.size() == cur_edge_iters.size()); + + for (auto j = 0; j < cur_edge_iters.size(); ++j) { + auto& cur_iter = cur_edge_iters[j]; + while (cur_iter.IsValid()) { + auto dst_vid = cur_iter.GetDstId(); + auto data = cur_iter.GetData(); + for (auto k = 0; k < repeat_array[cur_ind]; ++k) { + dst_eles.emplace_back(std::make_tuple(src_vid, dst_vid, data)); + label_triplet_indices.emplace_back(sizes[src_label_ind] + j); + } + cur_iter.Next(); + cur_ind += 1; + } + } + } + std::vector> res_label_triplets; + // put edge_label_triplets into res_label_triplets + for (auto i = 0; i < edge_label_triplets.size(); ++i) { + auto& cur_triplets_vec = edge_label_triplets[i]; + for (auto j = 0; j < cur_triplets_vec.size(); ++j) { + res_label_triplets.emplace_back(cur_triplets_vec[j]); + } + } + std::vector> prop_names = get_prop_namees(); + CHECK(prop_names.size() == res_label_triplets.size()); + return FlatEdgeSet( + std::move(dst_eles), std::move(res_label_triplets), prop_names, + std::move(label_triplet_indices), direction_); + } + private: std::pair, std::unordered_map> preprocess_getting_labels(const std::vector& req_labels, @@ -420,6 +483,35 @@ class UnTypedEdgeSet { << " vertices, with " << edge_iter_vecs.size() << " iters"; return edge_iter_vecs; } + + std::vector>> get_edge_triplets() const { + std::vector>> ret; + for (auto iter : adj_lists_) { + auto& sub_graphs = iter.second; + std::vector> tmp; + for (auto i = 0; i < sub_graphs.size(); ++i) { + auto& sub_graph = sub_graphs[i]; + tmp.emplace_back(std::array({sub_graph.GetSrcLabel(), + sub_graph.GetDstLabel(), + sub_graph.GetEdgeLabel()})); + } + ret.emplace_back(std::move(tmp)); + } + return ret; + } + + std::vector> get_prop_namees() const { + std::vector> ret; + for (auto iter : adj_lists_) { + auto& sub_graphs = iter.second; + for (auto i = 0; i < sub_graphs.size(); ++i) { + auto& sub_graph = sub_graphs[i]; + ret.push_back(sub_graph.GetPropNames()); + } + } + return ret; + } + void sanity_check() { CHECK(src_vertices_.size() == label_indices_.size()); for (auto v : label_indices_) { @@ -437,6 +529,7 @@ class UnTypedEdgeSet { std::unordered_map> adj_lists_; // match src_label to all triplet. mutable size_t size_; // computed lazily + Direction direction_; }; } // namespace gs diff --git a/flex/engines/http_server/stored_procedure.cc b/flex/engines/http_server/stored_procedure.cc index 76f1635dd327..a4f63faa4a58 100644 --- a/flex/engines/http_server/stored_procedure.cc +++ b/flex/engines/http_server/stored_procedure.cc @@ -83,22 +83,10 @@ void close_lib(void* handle, const char* lib_path) { } } -std::vector get_yaml_files(const std::string& plugin_dir) { - std::filesystem::path dir_path = plugin_dir; - std::string suffix = ".yaml"; - std::vector res_yaml_files; - - for (auto& entry : std::filesystem::directory_iterator(dir_path)) { - if (entry.is_regular_file() && entry.path().extension() == suffix) { - res_yaml_files.emplace_back(entry.path()); - } - } - return res_yaml_files; -} - std::vector parse_from_multiple_yamls( - const std::string& plugin_dir, - const std::vector& stored_procedure_yamls) { + const std::string& plugin_dir, + const std::vector& stored_procedure_yamls, + const std::vector& valid_procedure_names) { std::vector stored_procedures; for (auto cur_yaml : stored_procedure_yamls) { LOG(INFO) << "Loading for: " << cur_yaml; @@ -109,18 +97,21 @@ std::vector parse_from_multiple_yamls( LOG(ERROR) << "Expect path in pre_installed procedure"; } else { std::string name = root["name"].as(); - std::string path = root["library"].as(); - if (!std::filesystem::exists(path)) { - // in case the path is relative to plugin_dir, prepend plugin_dir - path = plugin_dir + "/" +path; + if (find(valid_procedure_names.begin(), valid_procedure_names.end(), + name) != valid_procedure_names.end()) { + VLOG(10) << "Find valid procedure: " << name; + std::string path = root["library"].as(); if (!std::filesystem::exists(path)) { - LOG(ERROR) << "plugin - " << path << " file not found..."; - } - else { + // in case the path is relative to plugin_dir, prepend plugin_dir + path = plugin_dir + "/" + path; + if (!std::filesystem::exists(path)) { + LOG(ERROR) << "plugin - " << path << " file not found..."; + } else { + stored_procedures.push_back({name, path}); + } + } else { stored_procedures.push_back({name, path}); } - } else { - stored_procedures.push_back({name, path}); } } } diff --git a/flex/engines/http_server/stored_procedure.h b/flex/engines/http_server/stored_procedure.h index 3f6fcf85fc0d..7aad2cce6eed 100644 --- a/flex/engines/http_server/stored_procedure.h +++ b/flex/engines/http_server/stored_procedure.h @@ -34,6 +34,7 @@ #include "flex/engines/hqps_db/app/hqps_app_base.h" #include "flex/engines/hqps_db/database/mutable_csr_interface.h" #include "flex/utils/app_utils.h" +#include "flex/utils/yaml_utils.h" #include @@ -83,7 +84,9 @@ struct StoredProcedureMeta { std::vector parse_stored_procedures( const std::string& stored_procedure_yaml); std::vector parse_from_multiple_yamls( - const std::string& plugin_dir, const std::vector& stored_procedure_yamls); + const std::string& plugin_dir, + const std::vector& stored_procedure_yamls, + const std::vector& valid_procedure_names); enum class StoredProcedureType { kCypher = 0, @@ -142,17 +145,18 @@ class CypherStoredProcedure; std::shared_ptr create_stored_procedure_impl( int32_t procedure_id, const std::string& procedure_path); -std::vector get_yaml_files(const std::string& plugin_dir); - class StoredProcedureManager { public: static StoredProcedureManager& get(); StoredProcedureManager() {} // expect multiple query.yaml under this directory. - void LoadFromPluginDir(const std::string& plugin_dir) { - auto yaml_files = get_yaml_files(plugin_dir); - auto stored_procedures = parse_from_multiple_yamls(plugin_dir, yaml_files); + void LoadFromPluginDir( + const std::string& plugin_dir, + const std::vector& valid_procedure_names) { + auto yaml_files = gs::get_yaml_files(plugin_dir); + auto stored_procedures = parse_from_multiple_yamls(plugin_dir, yaml_files, + valid_procedure_names); CreateStoredProcedures(stored_procedures); } diff --git a/flex/interactive/README.md b/flex/interactive/README.md index fd6ca0e3e239..34be2f2596d4 100755 --- a/flex/interactive/README.md +++ b/flex/interactive/README.md @@ -3,3 +3,11 @@ GraphScope Interactive is a specialized construction of [GraphScope Flex](https://github.com/alibaba/GraphScope/tree/main/flex), designed to handle concurrent graph queries at an impressive speed. Its primary goal is to process as many queries as possible within a given timeframe, emphasizing a high query throughput rate. For the full documentation of GraphScope Interactive, please refer to [GraphScope-Interactive](https://graphscope.io/docs/interactive_engine/graphscope_interactive). + + +# problems +- 不在任何`.yaml`中配置graph_name相关的内容,`graph_name`相关必须在命令行指定 +- compiler: + - 去掉对workspace+data+graph.name的约定俗称的拼接,必须显式指定 +- gs interactive logs get server_log/compiler_log + diff --git a/flex/interactive/bin/db_admin.sh b/flex/interactive/bin/db_admin.sh deleted file mode 100755 index 8f98befcc53d..000000000000 --- a/flex/interactive/bin/db_admin.sh +++ /dev/null @@ -1,654 +0,0 @@ -#!/bin/bash -# Copyright 2020 Alibaba Group Holding Limited. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -# The product name -DB_PROD_NAME="interactive" - -# colored error and info functions to wrap messages. -RED='\033[0;31m' -GREEN='\033[0;32m' -NC='\033[0m' # No Color -err() { - echo -e "${RED}[$(date +'%Y-%m-%d %H:%M:%S')] -ERROR- $* ${NC}" >&2 -} - -info() { - echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')] -INFO- $* ${NC}" -} - -################## Some Util Functions ################## -function parse_yaml { - local prefix=$2 - local s='[[:space:]]*' w='[a-zA-Z0-9_]*' fs=$(echo @|tr @ '\034') - sed -ne "s|^\($s\):|\1|" \ - -e "s|^\($s\)\($w\)$s:$s[\"']\(.*\)[\"']$s\$|\1$fs\2$fs\3|p" \ - -e "s|^\($s\)\($w\)$s:$s\(.*\)$s\$|\1$fs\2$fs\3|p" $1 | - awk -F$fs '{ - indent = length($1)/2; - vname[indent] = $2; - for (i in vname) {if (i > indent) {delete vname[i]}} - if (length($3) > 0) { - vn=""; for (i=0; i/dev/null 2>&1 - pwd -P -)" -info "HOST_DB_HOME = ${HOST_DB_HOME}" - -#################### DEFINE CONSTANTS #################### -GRAPHSCOPE_GROUP_ID=1001 - -# the configuration directory -HOST_DB_CONF_DIR="${HOST_DB_HOME}/conf" -# the data directory -HOST_DB_DATA_DIR="${HOST_DB_HOME}/data" -# the log directory -HOST_DB_LOG_DIR="${HOST_DB_HOME}/logs" -HOST_DB_SERVER_OUTPUT_LOG="${HOST_DB_LOG_DIR}/server.log" -HOST_DB_COMPILER_OUTPUT_LOG="${HOST_DB_LOG_DIR}/compiler.log" -HOST_DB_INTERACTIVE_YAML="${HOST_DB_CONF_DIR}/interactive.yaml" -HOST_DB_EXAMPLE_DATASET_DIR=${HOST_DB_HOME}/"examples/sf0.1-raw/" -HOST_DB_RUNNING_FILE="${HOST_DB_HOME}/.running" -# will export DOCKER_DB_HOME, if not set, exist -get_docker_workspace_from_yaml "${HOST_DB_INTERACTIVE_YAML}" - -DOCKER_DB_GRAPHSCOPE_HOME="/home/graphscope/GraphScope" -DOCKER_DB_DATA_DIR="${DOCKER_DB_HOME}/data" -DOCKER_DB_LOG_DIR="${DOCKER_DB_HOME}/logs" -DOCKER_DB_CONF_DIR="${DOCKER_DB_HOME}/conf" -DOCKER_DB_IR_CONF_FILE="${DOCKER_DB_HOME}/conf/interactive.properties" -DOCKER_DB_GIE_HOME="${DOCKER_DB_GRAPHSCOPE_HOME}/interactive_engine/" -DOCKER_DB_INTERACTIVE_YAML="${DOCKER_DB_HOME}/conf/interactive.yaml" -DOCKER_DB_SERVER_BIN="${DOCKER_DB_GRAPHSCOPE_HOME}/flex/build/bin/sync_server" -DOCKER_DB_COMPILER_BIN="com.alibaba.graphscope.GraphServer" -DOCKER_DB_GEN_BIN="${DOCKER_DB_GRAPHSCOPE_HOME}/flex/bin/load_plan_and_gen.sh" -DOCKER_DB_SERVER_OUTPUT_LOG=${DOCKER_DB_LOG_DIR}/server.log -DOCKER_DB_COMPILER_OUTPUT_LOG=${DOCKER_DB_LOG_DIR}/compiler.log -export DOCKER_DB_CONNECTOR_PORT=7687 -DB_CONNECT_DEFAULT_PORT=7687 -# update the port by parsing the yaml file -DOCKER_DB_CONNECTOR_PORT=$(parse_yaml "${HOST_DB_INTERACTIVE_YAML}" | grep "compiler_endpoint_boltConnector_port" | awk -F "=" '{print $2}') -#remove "" and space -DOCKER_DB_CONNECTOR_PORT=$(echo "${DOCKER_DB_CONNECTOR_PORT}" | sed 's/^"//' | sed 's/"$//') - -EXAMPLE_DATA_SET_URL="https://github.com/GraphScope/gstest.git" - -################### IMAGE VERSION ################### -GIE_DB_IMAGE_VERSION="v0.0.1" -GIE_DB_IMAGE_NAME="registry.cn-hongkong.aliyuncs.com/graphscope/${DB_PROD_NAME}" -GIE_DB_CONTAINER_NAME="${DB_PROD_NAME}-server" - - -#################### DEFINE FUNCTIONS #################### -function check_running_containers_and_exit(){ - # check if there is any running containers - info "Check running containers and exit" - running_containers=$(docker ps -a --format "{{.Names}}" | grep "${GIE_DB_CONTAINER_NAME}") - if [ -n "${running_containers}" ]; then - err "There are running containers: ${running_containers}, please stop them first." - exit 1 - fi - info "finish check" -} - -function check_container_running(){ - if [ "$(docker inspect -f '{{.State.Running}}' "${GIE_DB_CONTAINER_NAME}")" = "true" ]; then - info "container ${GIE_DB_CONTAINER_NAME} is running" - else - info "container ${GIE_DB_CONTAINER_NAME} is not running" - # start the container - docker start "${GIE_DB_CONTAINER_NAME}" - fi -} - -function ensure_container_running(){ - if [ "$(docker inspect -f '{{.State.Running}}' "${GIE_DB_CONTAINER_NAME}")" = "true" ]; then - info "container ${GIE_DB_CONTAINER_NAME} is running" - else - info "container ${GIE_DB_CONTAINER_NAME} is not running" - # start the container - docker start "${GIE_DB_CONTAINER_NAME}" - fi -} - -function check_process_running_in_container(){ - local container_name=$1 - local process_name=$2 - local error_msg=$3 - local process_id=$(docker top "${container_name}" | grep "${process_name}" | awk '{print $2}\') - if [ -z "${process_id}" ]; then - err "process ${process_name} is not running in container ${container_name}" - err "${error_msg}" - exit 1 - fi - info "process ${process_name} is running in container ${container_name}, process id is ${process_id}" -} - - -#################### DEFINE USAGE #################### -# parse the args and set the variables. -function usage() { - init_usage - start_usage - stop_usage - restart_usage - compile_usage - show_stored_procedure_usage - download_dataset_usage - destroy_usage -} - -function init_usage() { - cat << EOF - db_admin.sh init -p[---publish] - -v[--volume] - --version - Init the database, create the containers. --publish and --volume can be used multiple times. -EOF -} - -function start_usage() { - cat << EOF - db_admin.sh start -n [--name] -b [--bulk-load] -r[--root-data-dir] - Start the database with the given graph. graph schema file should be placed at ./data/{graph_name}/graph.yaml. - If mode is override, we need to clear the data directory first. -EOF -} - -function stop_usage() { - cat << EOF - db_admin.sh stop - Stop the database with the given graph. -EOF -} - -function restart_usage() { - cat << EOF - db_admin.sh restart - Restart the database with current running graph. -EOF -} - -function compile_usage(){ - cat << EOF - db_admin.sh compile -g[--graph] -i ${DOCKER_DB_COMPILER_OUTPUT_LOG} 2>&1 &" - cmd=${cmd}"\"" - info "Running cmd: ${cmd}" - eval ${cmd} - sleep 6 - check_process_running_in_container ${GIE_DB_CONTAINER_NAME} ${DOCKER_DB_COMPILER_BIN} "check ${HOST_DB_COMPILER_OUTPUT_LOG} to see more details" - info "Successfuly start compiler" - info "DataBase service is running..., port is open on :${DOCKER_DB_CONNECTOR_PORT}" - - # if do_start success, we should write current args to ${HOST_DB_RUNNING_FILE} - echo "GRAPH_NAME=${GRAPH_NAME}" > ${HOST_DB_RUNNING_FILE} - echo "BULK_LOAD_FILE=${BULK_LOAD_FILE}" >> ${HOST_DB_RUNNING_FILE} - echo "ROOT_DATA_DIR=${root_data_dir}" >> ${HOST_DB_RUNNING_FILE} -# info "Successfuly write running args to ${HOST_DB_RUNNING_FILE}" -} - - -#################### Stop database #################### -function do_stop(){ - # stop the container - docker stop ${GIE_DB_CONTAINER_NAME} - info "Successfuly stop database" -} - - -#################### Get database status #################### -function do_status() { - if [ "$(docker inspect -f '{{.State.Running}}' "${GIE_DB_CONTAINER_NAME}")" = "true" ]; then - info "container ${GIE_DB_CONTAINER_NAME} is running" - else - info "container ${GIE_DB_CONTAINER_NAME} is not running" - info "Please start database first" - fi - # the container is running but the process is not running - check_process_running_in_container ${GIE_DB_CONTAINER_NAME} ${DOCKER_DB_SERVER_BIN} "check ${HOST_DB_SERVER_OUTPUT_LOG} to see more details" - check_process_running_in_container ${GIE_DB_CONTAINER_NAME} ${DOCKER_DB_COMPILER_BIN} "check ${HOST_DB_COMPILER_OUTPUT_LOG} to see more details" - info "Database service is running..., port is open on :${DOCKER_DB_CONNECTOR_PORT}" -} - - -#################### Download dataset #################### -function do_download_dataset(){ - git clone ${EXAMPLE_DATA_SET_URL} ${HOST_DB_EXAMPLE_DATASET_DIR} - info "Successfuly download dataset to: ${HOST_DB_EXAMPLE_DATASET_DIR}" -} - - -#################### Restart #################### -function do_restart() { - # if the container is not running, exit - if [ "$(docker inspect -f '{{.State.Running}}' "${GIE_DB_CONTAINER_NAME}")" = "false" ]; then - info "container ${GIE_DB_CONTAINER_NAME} is not running" - info "Please start database first" - exit 1 - fi - info "Stopping database first..." - do_stop - info "Successfuly stop database" - # read args from cached file. - # get num lines in file ${HOST_DB_RUNNING_FILE} - num_lines=$(wc -l < ${HOST_DB_RUNNING_FILE}) - if [ ${num_lines} -ne 3 ]; then - err "Error: ${HOST_DB_RUNNING_FILE} should have 3 lines, but got ${num_lines}, something wrong with the file ${HOST_DB_RUNNING_FILE}" - exit 1 - fi - # read args from file - GRAPH_NAME=$(sed -n '1p' ${HOST_DB_RUNNING_FILE} | cut -d '=' -f 2) - BULK_LOAD_FILE=$(sed -n '2p' ${HOST_DB_RUNNING_FILE} | cut -d '=' -f 2) - ROOT_DATA_DIR=$(sed -n '3p' ${HOST_DB_RUNNING_FILE} | cut -d '=' -f 2) - do_start -n ${GRAPH_NAME} -b ${BULK_LOAD_FILE} -r ${ROOT_DATA_DIR} - info "Finish restart database" -} - -# the compiled dynamic libs will be placed at data/${graph_name}/plugins/ -# after compilation, the user need to write the cooresponding yaml, telling the compiler about -# the input and output of the stored procedure -function do_compile() { - # check args num == 4 - # start container - ensure_container_running - if [ $# -ne 4 ]; then - err "stored_procedure command need 2 args, but got $#" - compile_usage - exit 1 - fi - graph_name="" - file_path="" # file path - output_dir="" - - while [[ $# -gt 0 ]]; do - key="$1" - case $key in - -g | --graph) - graph_name="$2" - info "graph_name = ${graph_name}" - shift # past argument - shift - ;; - -i | --input) - file_path="$2" - shift # past argument - shift - ;; - *) - err "unknown option $1" - compile_usage - exit 1 - ;; - esac - done - - # check graph_name - if [ -z "${graph_name}" ]; then - err "graph_name is empty" - compile_usage - exit 1 - fi - - # check file_path - if [ -z "${file_path}" ]; then - err "file_path is empty" - compile_usage - exit 1 - fi - - # get real file_path - file_name=$(basename "${file_path}") - real_file_path=$(realpath "${file_path}") - # check exists - if [ ! -f "${real_file_path}" ]; then - err "file ${real_file_path} not exist" - exit 1 - fi - # check graph dir exists - graph_dir="${HOST_DB_HOME}/data/${graph_name}" - if [ ! -d "${graph_dir}" ]; then - err "graph ${graph_name} not exist" - exit 1 - fi - mkdir -p "${graph_dir}/plugins" - - DOCKER_OUTPUT_DIR="${DOCKER_DB_HOME}/data/${graph_name}/plugins" - HOST_OUTPUT_DIR="${HOST_DB_HOME}/data/${graph_name}/plugins" - DOCKER_DB_GRAPH_SCHEMA="${DOCKER_DB_HOME}/data/${graph_name}/graph.json" - DOCKER_REAL_FILE_PATH="/tmp/${file_name}" - # docker cp file to container - cmd="docker cp ${real_file_path} ${GIE_DB_CONTAINER_NAME}:${DOCKER_REAL_FILE_PATH}" - eval ${cmd} || exit 1 - - cmd="docker exec ${GIE_DB_CONTAINER_NAME} bash -c \"" - cmd=${cmd}" ${DOCKER_DB_GEN_BIN}" - cmd=${cmd}" --engine_type=hqps" - cmd=${cmd}" --input=${DOCKER_REAL_FILE_PATH}" - cmd=${cmd}" --work_dir=/tmp/codegen/" - cmd=${cmd}" --ir_conf=${DOCKER_DB_IR_CONF_FILE}" - cmd=${cmd}" --graph_schema_path=${DOCKER_DB_GRAPH_SCHEMA}" - cmd=${cmd}" --gie_home=${DOCKER_DB_GIE_HOME}" - cmd=${cmd}" --output_dir=${DOCKER_OUTPUT_DIR}" - cmd=${cmd}" \"" - - echo "Running cmd: ${cmd}" - eval ${cmd} || exit 1 - # check output exists - # get the file_name of file_path - file_name="${file_name%.*}" - output_file="${HOST_OUTPUT_DIR}/lib${file_name}.so" - - if [ ! -f "${output_file}" ]; then - err "output file ${output_file} not exist, compilation failed" - exit 1 - fi - info "success generate dynamic lib ${output_file}, please create the cooresponding yaml file ${HOST_OUTPUT_DIR}/${file_name}.yaml." -} - -#################### Entry #################### -if [ $# -eq 0 ]; then - usage - exit 1 -fi - -while [[ $# -gt 0 ]]; do - key="$1" - - case $key in - -h | --help) - usage - exit - ;; - init) - shift - info "Start initiating database..." - do_init "$@" - exit 0 - ;; - start) - shift - info "Start database service..." - do_start "$@" - exit 0 - ;; - status) - shift - do_status "$@" - exit 0 - ;; - stop) - shift - do_stop "$@" - exit 0 - ;; - restart) - shift - do_restart # restart current graph - exit 0 - ;; - compile) - shift - do_compile "$@" - exit 0 - ;; - show_stored_procedure) - shift - do_show_stored_procedure "$@" - exit 0 - ;; - destroy) - shift - do_destroy "$@" - exit 0 - ;; - download_dataset) - shift - do_download_dataset - exit 0 - ;; - *) # unknown option - err "unknown option $1" - usage - exit 1 - ;; - esac -done - - - - diff --git a/flex/interactive/bin/gs_interactive b/flex/interactive/bin/gs_interactive new file mode 100755 index 000000000000..bc2e8bb2d61b --- /dev/null +++ b/flex/interactive/bin/gs_interactive @@ -0,0 +1,1651 @@ +#!/bin/bash +# Copyright 2020 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# The product name +DB_PROD_NAME="interactive" + +# colored error and info functions to wrap messages. +RED='\033[0;31m' +GREEN='\033[0;32m' +NC='\033[0m' # No Color +err() { + echo -e "${RED}[$(date +'%Y-%m-%d %H:%M:%S')] -ERROR- $* ${NC}" >&2 +} + +info() { + echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')] -INFO- $* ${NC}" +} + +################## Some Util Functions ################## + +# source : https://github.com/mrbaseman/parse_yaml +function parse_yaml { + local prefix=$2 + local separator=${3:-_} + + local indexfix + # Detect awk flavor + if awk --version 2>&1 | grep -q "GNU Awk" ; then + # GNU Awk detected + indexfix=-1 + elif awk -Wv 2>&1 | grep -q "mawk" ; then + # mawk detected + indexfix=0 + fi + + local s='[[:space:]]*' sm='[ \t]*' w='[a-zA-Z0-9_]*' fs=${fs:-$(echo @|tr @ '\034')} i=${i:- } + cat $1 | \ + awk -F$fs "{multi=0; + if(match(\$0,/$sm\|$sm$/)){multi=1; sub(/$sm\|$sm$/,\"\");} + if(match(\$0,/$sm>$sm$/)){multi=2; sub(/$sm>$sm$/,\"\");} + while(multi>0){ + str=\$0; gsub(/^$sm/,\"\", str); + indent=index(\$0,str); + indentstr=substr(\$0, 0, indent+$indexfix) \"$i\"; + obuf=\$0; + getline; + while(index(\$0,indentstr)){ + obuf=obuf substr(\$0, length(indentstr)+1); + if (multi==1){obuf=obuf \"\\\\n\";} + if (multi==2){ + if(match(\$0,/^$sm$/)) + obuf=obuf \"\\\\n\"; + else obuf=obuf \" \"; + } + getline; + } + sub(/$sm$/,\"\",obuf); + print obuf; + multi=0; + if(match(\$0,/$sm\|$sm$/)){multi=1; sub(/$sm\|$sm$/,\"\");} + if(match(\$0,/$sm>$sm$/)){multi=2; sub(/$sm>$sm$/,\"\");} + } + print}" | \ + sed -e "s|^\($s\)?|\1-|" \ + -ne "s|^$s#.*||;s|$s#[^\"']*$||;s|^\([^\"'#]*\)#.*|\1|;t1;t;:1;s|^$s\$||;t2;p;:2;d" | \ + sed -ne "s|,$s\]$s\$|]|" \ + -e ":1;s|^\($s\)\($w\)$s:$s\(&$w\)\?$s\[$s\(.*\)$s,$s\(.*\)$s\]|\1\2: \3[\4]\n\1$i- \5|;t1" \ + -e "s|^\($s\)\($w\)$s:$s\(&$w\)\?$s\[$s\(.*\)$s\]|\1\2: \3\n\1$i- \4|;" \ + -e ":2;s|^\($s\)-$s\[$s\(.*\)$s,$s\(.*\)$s\]|\1- [\2]\n\1$i- \3|;t2" \ + -e "s|^\($s\)-$s\[$s\(.*\)$s\]|\1-\n\1$i- \2|;p" | \ + sed -ne "s|,$s}$s\$|}|" \ + -e ":1;s|^\($s\)-$s{$s\(.*\)$s,$s\($w\)$s:$s\(.*\)$s}|\1- {\2}\n\1$i\3: \4|;t1" \ + -e "s|^\($s\)-$s{$s\(.*\)$s}|\1-\n\1$i\2|;" \ + -e ":2;s|^\($s\)\($w\)$s:$s\(&$w\)\?$s{$s\(.*\)$s,$s\($w\)$s:$s\(.*\)$s}|\1\2: \3 {\4}\n\1$i\5: \6|;t2" \ + -e "s|^\($s\)\($w\)$s:$s\(&$w\)\?$s{$s\(.*\)$s}|\1\2: \3\n\1$i\4|;p" | \ + sed -e "s|^\($s\)\($w\)$s:$s\(&$w\)\(.*\)|\1\2:\4\n\3|" \ + -e "s|^\($s\)-$s\(&$w\)\(.*\)|\1- \3\n\2|" | \ + sed -ne "s|^\($s\):|\1|" \ + -e "s|^\($s\)\(---\)\($s\)||" \ + -e "s|^\($s\)\(\.\.\.\)\($s\)||" \ + -e "s|^\($s\)-$s[\"']\(.*\)[\"']$s\$|\1$fs$fs\2|p;t" \ + -e "s|^\($s\)\($w\)$s:$s[\"']\(.*\)[\"']$s\$|\1$fs\2$fs\3|p;t" \ + -e "s|^\($s\)-$s\(.*\)$s\$|\1$fs$fs\2|" \ + -e "s|^\($s\)\($w\)$s:$s[\"']\?\(.*\)$s\$|\1$fs\2$fs\3|" \ + -e "s|^\($s\)[\"']\?\([^&][^$fs]\+\)[\"']$s\$|\1$fs$fs$fs\2|" \ + -e "s|^\($s\)[\"']\?\([^&][^$fs]\+\)$s\$|\1$fs$fs$fs\2|" \ + -e "s|$s\$||p" | \ + awk -F$fs "{ + gsub(/\t/,\" \",\$1); + if(NF>3){if(value!=\"\"){value = value \" \";}value = value \$4;} + else { + if(match(\$1,/^&/)){anchor[substr(\$1,2)]=full_vn;getline}; + indent = length(\$1)/length(\"$i\"); + vname[indent] = \$2; + value= \$3; + for (i in vname) {if (i > indent) {delete vname[i]; idx[i]=0}} + if(length(\$2)== 0){ vname[indent]= ++idx[indent] }; + vn=\"\"; for (i=0; i0)&&index(val, ref)==1){ + tmpval=assignment[val]; + sub(ref,full_vn,val); + if(match(val,\"$separator\$\")){ + gsub(ref,full_vn,tmpval); + } else if (length(tmpval) > 0) { + printf(\"%s=\\\"%s\\\"\n\", val, tmpval); + } + assignment[val]=tmpval; + } + } + } + } else if (length(value) > 0) { + printf(\"%s=\\\"%s\\\"\n\", full_vn, value); + } + }END{ + for(val in assignment){ + if(match(val,\"$separator\$\")) + printf(\"%s=\\\"%s\\\"\n\", val, assignment[val]); + } + }" +} + +# check if the file exists, if not, exit. +function check_file_exists(){ + if [ ! -f "$1" ]; then + err "file $1 not exists" + exit 1 + fi +} +function check_directory_exists(){ + if [ ! -d "$1" ]; then + err "directory $1 not exists" + exit 1 + fi +} + +HOST_DB_HOME="$( + cd "$(dirname "$0")/../" >/dev/null 2>&1 + pwd -P +)" +info "HOST_DB_HOME = ${HOST_DB_HOME}" + +################### GET USER INFO ################### +# get uid +uid=$(id -u) +# get group id +gid=$(id -g) + + +#################### DEFINE CONSTANTS #################### + +# the log directory +# HOST_DB_INTERACTIVE_YAML="${HOST_DB_CONF_DIR}/interactive.yaml" +HOST_DB_RUNNING_FILE="${HOST_DB_HOME}/.running" +HOST_DB_ENV_FILE="${HOST_DB_HOME}/.env" + +DOCKER_DB_GRAPHSCOPE_HOME="/home/graphscope/GraphScope" +DOCKER_DB_GIE_HOME="${DOCKER_DB_GRAPHSCOPE_HOME}/interactive_engine/" +DOCKER_DB_SERVER_BIN="${DOCKER_DB_GRAPHSCOPE_HOME}/flex/build/bin/sync_server" +DOCKER_DB_GRAPH_IMPORT_BIN="${DOCKER_DB_GRAPHSCOPE_HOME}/flex/build/tests/rt_mutable_graph/test_graph_loading" +DOCKER_DB_COMPILER_BIN="com.alibaba.graphscope.GraphServer" +DOCKER_DB_GEN_BIN="${DOCKER_DB_GRAPHSCOPE_HOME}/flex/bin/load_plan_and_gen.sh" +HOST_DB_TMP_DIR="/tmp" + +#################### DEFINE DEFAULT CONSTATNS #################### +DATABASE_VERSION="v0.0.2" +DATABASE_DEFAULT_GRAPH_NAME="modern" +DATABASE_CURRENT_GRAPH_NAME="modern" +DATABASE_DEFAULT_GRAPH_DOCKER_PATH="/home/graphscope/default_graph/${DEFAULT_GRAPH_NAME}" +DATABASE_DEFAULT_GRAPH_MOUNT_CMD="${HOST_DB_HOME}/examples/modern_graph/:${DATABASE_DEFAULT_GRAPH_DOCKER_PATH}" +DATABASE_VOLUMES="${DATABASE_DEFAULT_GRAPH_MOUNT_CMD}" +DATABASE_LOG_LEVEL="INFO" +DATABASE_PORTS="" + +## compiler related default configuration +DATABASE_COMPILER_PLANNER_IS_ON="true" +DATABASE_COMPILER_PLANNER_OPT="RBO" +DATABASE_COMPILER_PLANNER_RULES="FilterMatchRule,FilterIntoJoinRule,NotExistToAntiJoinRule" +DATABASE_COMPILER_ENDPOINT_ADDRESS="localhost" +DATABASE_COMPILER_BOLT_PORT="7687" +DATABASE_COMPILER_QUERY_TIMEOUT="20000" + +## hiactor related default configuration +DATABASE_COMPUTE_ENGINE_PORT="10000" +DATABASE_COMPUTE_ENGINE_SHARD_NUM=1 + +## directories +DATABASE_WORKSPACE="/home/graphscope/workspace/" +DATABASE_DATA_DIR_NAME="data" +DATABASE_LOG_DIR_NAME="logs" +DATABASE_CONF_DIR_NAME="conf" + + +################### IMAGE VERSION ################### +GIE_DB_IMAGE_VERSION="v0.0.2" +#GIE_DB_IMAGE_NAME="registry.cn-hongkong.aliyuncs.com/graphscope/${DB_PROD_NAME}" +GIE_DB_IMAGE_NAME="interactive" +GIE_DB_CONTAINER_NAME="${DB_PROD_NAME}-server" + + +#################### Prepare uncreated directories #################### + +info "Finish create log dir" + +#################### DEFINE FUNCTIONS #################### +function check_running_containers_and_exit(){ + # check if there is any running containers + info "Check running containers and exit" + running_containers=$(docker ps -a --format "{{.Names}}" | grep "${GIE_DB_CONTAINER_NAME}") + if [ -n "${running_containers}" ]; then + err "There are running containers: ${running_containers}, please stop them first." + exit 1 + fi + info "finish check" +} + +function check_container_running(){ + if [ "$(docker inspect -f '{{.State.Running}}' "${GIE_DB_CONTAINER_NAME}")" = "true" ]; then + info "container ${GIE_DB_CONTAINER_NAME} is running" + else + info "container ${GIE_DB_CONTAINER_NAME} is not running" + # start the container + docker start "${GIE_DB_CONTAINER_NAME}" + fi +} + +function ensure_container_running(){ + if [ "$(docker inspect -f '{{.State.Running}}' "${GIE_DB_CONTAINER_NAME}")" = "true" ]; then + info "container ${GIE_DB_CONTAINER_NAME} is running" + else + info "container ${GIE_DB_CONTAINER_NAME} is not running" + # start the container + docker start "${GIE_DB_CONTAINER_NAME}" + fi +} + +function check_process_running_in_container(){ + local container_name=$1 + local process_name=$2 + local error_msg=$3 + local process_id=$(docker top "${container_name}" | grep "${process_name}" | awk '{print $2}\') + if [ -z "${process_id}" ]; then + err "process ${process_name} is not running in container ${container_name}" + err "${error_msg}" + exit 1 + fi + info "process ${process_name} is running in container ${container_name}, process id is ${process_id}" +} + +function check_process_not_running_in_container(){ + local container_name=$1 + local process_name=$2 + local error_msg=$3 + local process_id=$(docker top "${container_name}" | grep "${process_name}" | awk '{print $2}\') + if [ -z "${process_id}" ]; then + info "process ${process_name} is not running in container ${container_name}" + else + err "process ${process_name} is running in container ${container_name}, process id is ${process_id}" + err "${error_msg}" + exit 1 + fi +} + +# check the given graph is locked or not. +function check_graph_not_running(){ + info "Check graph whether is not running" + if [ $# -ne 1 ]; then + err "Expect graph name given." + exit 1 + fi + local graph_name=$1 + # check whether .lock is presented in container's data/${graph_name}/ directory + . ${HOST_DB_ENV_FILE} + local lock_file="${DATABASE_WORKSPACE}/data/${graph_name}/.lock" + info "Check lock file ${lock_file}" + # check lock_file whether exists in container, if not exists, exit 0, else exit 1 + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "[ ! -f ${lock_file} ]" +} + +function update_init_config_from_yaml(){ + if [ $# -ne 1 ]; then + err "Expect configuration file given" + exit 1 + fi + config_file=$1 + eval $(parse_yaml "${config_file}") + # update workspace if exists + if [ -n "${workspace}" ]; then + DATABASE_WORKSPACE="${workspace}" + fi + # update database version if exists + if [ -n "${version}" ]; then + DATABASE_VERSION="${version}" + fi + # append the found volumes to DATABASE_VOLUMES + # map the HOST_DB_HOME/data/ to ${DATABASE_WORKSPACE}/data + DATABASE_VOLUMES="${DATABASE_VOLUMES},${HOST_DB_HOME}/data:${DATABASE_WORKSPACE}/data" + + x=1 + while true; do + volume_x_key="volume_${x}" + volume_x=$(eval echo "\$${volume_x_key}") + if [ -z "${volume_x}" ]; then + break + fi + DATABASE_VOLUMES="${DATABASE_VOLUMES},${volume_x}" + x=$((x + 1)) + done + # append compiler port and engine port to DATABASE_PORTS + DATABASE_PORTS="${DATABASE_COMPILER_BOLT_PORT}:${DATABASE_COMPILER_BOLT_PORT}" + DATABASE_PORTS="${DATABASE_PORTS},${DATABASE_COMPUTE_ENGINE_PORT}:${DATABASE_COMPUTE_ENGINE_PORT}" +} + +function update_engine_config_from_yaml(){ + if [ $# -ne 1 ]; then + err "Expect configuration file given" + exit 1 + fi + config_file=$1 + eval $(parse_yaml "${config_file}") + if [ -n "${log_level}" ]; then + DATABASE_LOG_LEVEL="${log_level}" + fi + # default_graph + if [ -n "${default_graph}" ]; then + DATABASE_CURRENT_GRAPH_NAME="${default_graph}" + fi + # compiler + if [ -n ${compiler_planner_is_on} ]; then + DATABASE_COMPILER_PLANNER_IS_ON="${compiler_planner_is_on}" + fi + info "Found compiler planner opt: ${compiler_planner_is_on}, ${DATABASE_COMPILER_PLANNER_IS_ON}" + if [ -n ${compiler_planner_opt} ]; then + DATABASE_COMPILER_PLANNER_OPT="${compiler_planner_opt}" + fi + # append the founded compiler planner rules to DATABASE_COMPILER_PLANNER_RULES + x=1 + while true; do + compiler_planner_rules_x_key="compiler_planner_rules_${x}" + compiler_planner_rules_x=$(eval echo "\$${compiler_planner_rules_x_key}") + if [ -z "${compiler_planner_rules_x}" ]; then + break + fi + # check compiler_planner_rules_x present in DATABASE_COMPILER_PLANNER_RULES, if not, append + if [[ ! "${DATABASE_COMPILER_PLANNER_RULES}" =~ "${compiler_planner_rules_x}" ]]; then + DATABASE_COMPILER_PLANNER_RULES="${DATABASE_COMPILER_PLANNER_RULES},${compiler_planner_rules_x}" + fi + x=$((x + 1)) + done + if [ -n "${compiler_endpoint_address}" ]; then + DATABASE_COMPILER_ENDPOINT_ADDRESS="${compiler_endpoint_address}" + fi + if [ -n "${compiler_endpoint_bolt_connector_port}" ]; then + DATABASE_COMPILER_BOLT_PORT="${compiler_endpoint_bolt_connector_port}" + fi + if [ -n "${compiler_query_timeout}" ]; then + DATABASE_COMPILER_QUERY_TIMEOUT="${compiler_query_timeout}" + fi +} + + +#################### DEFINE USAGE #################### + +function init_usage() { + cat << EOF + db_admin.sh init -c [--config] + Init the database, create the containers. Specify the database version and volume mounting in the config yaml. +EOF +} + +function destroy_usage() { + cat << EOF + db_admin.sh destroy + Destroy the current database, remove the container. +EOF +} + +function create_usage() { + cat << EOF + db_admin.sh database create -g [--graph] -c [--config] + Create a graph in database, with the provided schema file. + User should import data to the created graph. +EOF +} + +function remove_usage() { + cat << EOF + db_admin.sh database remove -g [--graph] + Remove the database with the given graph. +EOF +} + +function import_usage() { + cat << EOF + db_admin.sh database import -g [--graph] -c [--config] + Load the raw data specified in bulk load file to the specified graph. +EOF +} + +function database_usage(){ + create_usage + remove_usage + import_usage +} + + +function start_usage() { + cat << EOF + db_admin.sh service start -g [--graph] -c [--config] + Start the graph service on the specified graph, with the provided engine config file. +EOF +} + +function stop_usage() { + cat << EOF + db_admin.sh service stop + Stop the database with the given graph. +EOF +} + +function restart_usage() { + cat << EOF + db_admin.sh service restart -c [--config] [engine config file] + Restart the database with current running graph. Can update with new engine config file. +EOF +} + +function get_log_usage() { + cat << EOF + db_admin.sh service get_log -o [--output] output directory + Get the log of the specified service/compiler, and write to the output file. +EOF +} + +function services_usage(){ + start_usage + stop_usage + restart_usage + get_log_usage +} + +function compile_usage(){ + cat << EOF + db_admin.sh procedure compile -g[--graph] -i -d [--description] + --compile_only + Compile cypher/.cc to dynamic library, according to the schema of graph. The output library will be placed at ./data/{graph_name}/lib. + If --compile_only is specified, the library will not be loaded to the graph. +EOF +} + +## .enable and .disable file contols the stored procedure enable/disable + +function enable_proc_usage(){ + cat << EOF + db_admin.sh procedure enable -g[--graph] -n[--name] + -c[--config] + Enable the stored procedure in the given graph, with the provided library. + stored_procedures.yaml contains the the stored procedure names at each line. +EOF +} + +function disable_proc_usage(){ + cat << EOF + db_admin.sh procedure disable -g[--graph] -n[--name] + -c[--config] + Disable the stored procedure in the given graph, with the provided library. + stored_procedures.yaml contains the the stored procedure names at each line. +EOF +} + +function show_stored_procedure_usage(){ + cat << EOF + db_admin.sh procedure show -g[--graph] graph_name + Show all stored procedure for the given graph. +EOF +} + + +function procedure_usage(){ + compile_usage + enable_proc_usage + disable_proc_usage + show_stored_procedure_usage +} + +# parse the args and set the variables. +function usage() { + init_usage + destroy_usage + database_usage + services_usage + procedure_usage +} + +################### Generate config file ################### +function do_gen_conf(){ + # receive only one args, the config file + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -o | --output) + output_config_file="$2" + shift + shift + ;; + *) + err "unknown option $1" + exit 1 + ;; + esac + done + + #if output_config_file exists, remove + if [ -f "${output_config_file}" ]; then + rm "${output_config_file}" + fi + + # echo directories + echo "directories:" >> ${output_config_file} + echo " workspace: ${DATABASE_WORKSPACE}" >> ${output_config_file} + echo " subdirs:" >> ${output_config_file} + echo " data: ${DATABASE_DATA_DIR_NAME}" >> ${output_config_file} + echo " logs: ${DATABASE_LOG_DIR_NAME}" >> ${output_config_file} + echo " conf: ${DATABASE_CONF_DIR_NAME}" >> ${output_config_file} + + # log level + echo "log_level: ${DATABASE_LOG_LEVEL}" >> ${output_config_file} + + # current graph + echo "default_graph: ${DATABASE_CURRENT_GRAPH_NAME}" >> ${output_config_file} + + + #compute_engine + echo "compute_engine:" >> ${output_config_file} + echo " type: hiactor" >> ${output_config_file} + echo " hosts:" >> ${output_config_file} + echo " - localhost:${DATABASE_COMPUTE_ENGINE_PORT}" >> ${output_config_file} + echo " shard_num: ${DATABASE_COMPUTE_ENGINE_SHARD_NUM}" >> ${output_config_file} + + + #compiler + echo "compiler:" >> ${output_config_file} + echo " planner:" >> ${output_config_file} + echo " is_on: ${DATABASE_COMPILER_PLANNER_IS_ON}" >> ${output_config_file} + echo " opt: ${DATABASE_COMPILER_PLANNER_OPT}" >> ${output_config_file} + # split compiler planner rules and put as sequences in yaml + echo " rules:" >> ${output_config_file} + IFS=',' read -ra RULES_ARRAY <<<"${DATABASE_COMPILER_PLANNER_RULES}" + for rule in "${RULES_ARRAY[@]}"; do + echo " - ${rule}" >> ${output_config_file} + done + echo " endpoint:" >> ${output_config_file} + echo " default_listen_address: ${DATABASE_COMPILER_ENDPOINT_ADDRESS}" >> ${output_config_file} + echo " bolt_connector:" >> ${output_config_file} + echo " port: ${DATABASE_COMPILER_BOLT_PORT}" >> ${output_config_file} + echo " gremlin_connector:" >> ${output_config_file} + echo " disabled: true" >> ${output_config_file} + echo " port: 8182" >> ${output_config_file} + echo " query_timeout: ${DATABASE_COMPILER_QUERY_TIMEOUT}" >> ${output_config_file} + info "Finish generate config file ${output_config_file}" +} + +function generate_real_engine_conf(){ + # expect less than three args + if [ $# -gt 2 ] || [ $# -eq 0 ]; then + err "Expect one or two args, but got $#" + exit 1 + fi + if [ $# -eq 1 ]; then + real_engine_config_file=$1 + info "engine config file is not specified, using default engine config" + do_gen_conf -o ${real_engine_config_file} + else + engine_config_file=$1 + real_engine_config_file=$2 + check_file_exists "${engine_config_file}" + update_engine_config_from_yaml "${engine_config_file}" + do_gen_conf -o ${real_engine_config_file} + fi +} + +function update_graph_yaml_with_procedure_enabling(){ + # expect one args, graph name + if [ $# -ne 1 ]; then + err "Expect one args, but got $#" + exit 1 + fi + graph_name=$1 + + # gather .enable and append to graph yaml + + . ${HOST_DB_ENV_FILE} + # copy graph_name's graph0.yaml to ${HOST_DB_TMP_DIR} + docker cp "${GIE_DB_CONTAINER_NAME}:${DATABASE_WORKSPACE}/data/${graph_name}/graph0.yaml" "${HOST_DB_TMP_DIR}/graph0.yaml" + echo "" >> ${HOST_DB_TMP_DIR}/graph0.yaml + echo "stored_procedures:" >> ${HOST_DB_TMP_DIR}/graph0.yaml + echo " enable_lists:" >> ${HOST_DB_TMP_DIR}/graph0.yaml + # copy graph_name's .enable file to ${HOST_DB_TMP_DIR} + docker cp "${GIE_DB_CONTAINER_NAME}:${DATABASE_WORKSPACE}/data/${graph_name}/.enable" "${HOST_DB_TMP_DIR}/.enable" || true + # get .enable and .disable file + local enable_file="${HOST_DB_TMP_DIR}/.enable" + any_stored_procedures=false + # check enable_file and disable_file exists + if [ ! -f "${enable_file}" ]; then + info "enable file ${enable_file} not exists, all stored procedure are enabled" + else + # cat all lines in enable_file and split by ',' + IFS=',' read -ra ENABLED_ARRAY <<<"$(cat ${enable_file})" + # if ENABLED_ARRAY is empty, all stored procedure are disabled + if [ ${#ENABLED_ARRAY[@]} -eq 0 ]; then + info "enable file ${enable_file} is empty, all stored procedure are disabled" + else + # append all enabled stored procedure to graph yaml + for enabled in "${ENABLED_ARRAY[@]}"; do + echo " - ${enabled}" >> ${HOST_DB_TMP_DIR}/graph0.yaml + any_stored_procedures=true + done + fi + fi + if [ "${any_stored_procedures}" = false ]; then + echo " directory: not-a-directory" >> ${HOST_DB_TMP_DIR}/graph0.yaml + else + echo " directory: plugins" >> ${HOST_DB_TMP_DIR}/graph0.yaml + fi + + # copy graph0.yaml to container + docker cp "${HOST_DB_TMP_DIR}/graph0.yaml" "${GIE_DB_CONTAINER_NAME}:${DATABASE_WORKSPACE}/data/${graph_name}/graph.yaml" + info "Finish update graph yaml with procedure enabling, add stored proc.edures of size ${#ENABLED_ARRAY[@]}." +} + + +#################### Init database #################### +# Init the current data base. +# create a user with same user id in container +function do_init(){ + # check running containers and exit + check_running_containers_and_exit + info "Ok, no running instance found, start init database..." + # if no containers running, procede to init + +# check args num 1, and get the first args as CONFIG_FILE + if [ $# -eq 0 ]; then + err "init command need 1 args, but got $#" + init_usage + exit 1 + fi + + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -c | --config) + config_file="$2" + shift # past argument + shift + ;; + *) + err "unknown option $1" + init_usage + exit 1 + ;; + esac + done + + check_file_exists "${config_file}" + + # parse yaml config + # eval $(parse_yaml "${config_file}") + + # Parse the configuration presented in yaml, and override the default values. + update_init_config_from_yaml "${config_file}" + + #0. Found workspace + info "Found docker db home: ${DATABASE_WORKSPACE}" + # put docker_workspace into env + echo "export DATABASE_WORKSPACE=${DATABASE_WORKSPACE}" >> ${HOST_DB_ENV_FILE} + echo "export DATABASE_DATA_DIR_NAME=${DATABASE_DATA_DIR_NAME}" >> ${HOST_DB_ENV_FILE} + info "Found databse version: ${DATABASE_VERSION}" + + #2. Found mounting volumes from yaml file + mount_cmd="" + # split DATABASE_VOLUMES and append to mount_cmd + IFS=',' read -ra VOLUME_ARRAY <<<"${DATABASE_VOLUMES}" + for volume in "${VOLUME_ARRAY[@]}"; do + # split with : and check host path exists + volume_value_array=(${volume//:/ }) + # if volume_value_array length is not 2, error + if [ ${#volume_value_array[@]} -ne 2 ]; then + err "volume ${volume_value_array} is not valid, should be :" + exit 1 + fi + # get host_path + host_path=${volume_value_array[0]} + docker_path=${volume_value_array[1]} + # check host_path exists + info "Found host path: ${host_path}" + check_directory_exists "${host_path}" || (err "host path ${host_path} not exists" && exit 1) + mount_cmd="${mount_cmd} -v ${volume}" + done +# mount_cmd="${mount_cmd} -v /etc/passwd:/etc/passwd:ro -v /etc/group:/etc/group:ro" + + info "Found docker volumes: ${mount_cmd}" + + #3. get mapped port + port_cmd="" + # split the DATABASE_PORTS and append to port_cmd + IFS=',' read -ra DATABASE_PORTS_ARRAY <<<"${DATABASE_PORTS}" + for ports in "${DATABASE_PORTS_ARRAY[@]}"; do + port_x_value_array=(${ports//:/ }) + # if volume_x_value_array length is not 2, error + if [ ${#port_x_value_array[@]} -ne 2 ]; then + err "port ${port_x_value_array} is not valid, should be :" + exit 1 + fi + # get host_path + host_port=${port_x_value_array[0]} + docker_port=${port_x_value_array[1]} + #check port are int + if ! [[ "${host_port}" =~ ^[0-9]+$ ]]; then + err "host port ${host_port} is not valid" + exit 1 + fi + if ! [[ "${docker_port}" =~ ^[0-9]+$ ]]; then + err "docker port ${docker_port} is not valid" + exit 1 + fi + port_cmd="${port_cmd} -p ${host_port}:${docker_port}" + done + info "Found docker port: ${port_cmd}" + + # get uid + local uid=$(id -u) + local gid=$(id -g) + # get group name + local group_name=$(id -gn) + # get username + local username=$(id -un) + + GIE_DB_IMAGE_NAME_TAG="${GIE_DB_IMAGE_NAME}:${DATABASE_VERSION}" + cmd="docker run -it -d --privileged --name ${GIE_DB_CONTAINER_NAME}" + # create user in container + cmd="${cmd} ${port_cmd} ${mount_cmd} ${GIE_DB_IMAGE_NAME_TAG} bash" + + info "Running cmd: ${cmd}" + eval ${cmd} || docker rm "${GIE_DB_CONTAINER_NAME}" + + info "Finish init database" + + # create the workspace directory in container + docker exec -u graphscope "${GIE_DB_CONTAINER_NAME}" bash -c "mkdir -p ${DATABASE_WORKSPACE}" || exit 1 + docker exec -u graphscope "${GIE_DB_CONTAINER_NAME}" bash -c "sudo chown -R graphscope:graphscope ${DATABASE_WORKSPACE}" || exit 1 + docker exec -u graphscope "${GIE_DB_CONTAINER_NAME}" bash -c "mkdir -p ${DATABASE_WORKSPACE}/logs" || exit 1 + docker exec -u graphscope "${GIE_DB_CONTAINER_NAME}" bash -c "mkdir -p ${DATABASE_WORKSPACE}/conf" || exit 1 +} + + +#################### Create graph #################### +function do_create(){ + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -g | --graph) + graph_name="$2" + shift # past argument + shift + ;; + -c | --config) + schema_file="$2" + shift + shift + ;; + *) + err "unknown option $1" + create_usage + exit 1 + ;; + esac + done + # check graph_name is set + if [ -z "${graph_name}" ]; then + err "graph name is not specified" + create_usage + exit 1 + fi + check_file_exists "${schema_file}" + # check graph is running inside docker + check_graph_not_running ${graph_name} || err "Can not create graph ${graph_name}, since a graph with same nameing running." + # create the graph directory in the docker's workspace + . ${HOST_DB_ENV_FILE} + docker_graph_dir="${DATABASE_WORKSPACE}/data/${graph_name}" + docker_graph_schema_file="${docker_graph_dir}/graph.yaml" + docker_graph_schema_file_back="${docker_graph_dir}/graph0.yaml" # used for later adding/removing stored procedure + # check docker_graph_schema_file exists in the container, if exists, tell user to remove it first + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "[ -f ${docker_graph_schema_file} ] && echo \"graph ${graph_name} already exists, please remove it first\" && exit 1 || exit 0" || exit 1 + # create the graph directory in the docker's workspace + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "mkdir -p ${docker_graph_dir}" || exit 1 + # create plugins dir + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "mkdir -p ${docker_graph_dir}/plugins" || exit 1 + # copy the schema file to the docker's workspace + docker cp "${schema_file}" "${GIE_DB_CONTAINER_NAME}:${docker_graph_schema_file}" || exit 1 + docker cp "${schema_file}" "${GIE_DB_CONTAINER_NAME}:${docker_graph_schema_file_back}" || exit 1 + info "Successfuly create graph ${graph_name}" + #TODO: support creating an empty graph +} + +###################Remove graph#################### +function do_remove(){ + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -g | --graph) + graph_name="$2" + shift # past argument + shift + ;; + *) + err "unknown option $1" + create_usage + exit 1 + ;; + esac + done + # check graph_name is set + if [ -z "${graph_name}" ]; then + err "graph name is not specified" + remove_usage + exit 1 + fi + # check graph is running inside docker + check_graph_not_running ${graph_name} || err "Can not remove graph ${graph_name}, since a graph with same nameing running." + . ${HOST_DB_ENV_FILE} + docker_graph_dir="${DATABASE_WORKSPACE}/data/${graph_name}" + # rm -rf the graph directory in the docker's workspace + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "rm -rf ${docker_graph_dir}" || exit 1 +} + +#################### Import #################### +function do_import(){ + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -g | --graph) + graph_name="$2" + shift # past argument + shift + ;; + -c | --config) + bulk_load_file="$2" + shift + shift + ;; + *) + err "unknown option $1" + import_usage + exit 1 + ;; + esac + done + info "Import data to graph ${graph_name} from ${bulk_load_file}" + # check if the container is running + check_container_running + # check if the bulk_load_file exists + check_file_exists "${bulk_load_file}" + info "bulk_load_file ${bulk_load_file} exists" + + check_graph_not_running ${graph_name} || info "Can not import data to graph ${graph_name}, since it is already running." + . ${HOST_DB_ENV_FILE} + # check graph_schema_file exists in container, if not, let user create graph first + docker_graph_schema_file="${DATABASE_WORKSPACE}/data/${graph_name}/graph.yaml" + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "[ -f ${docker_graph_schema_file} ] || (echo \"graph ${graph_name} not exists, please create it first\" && exit 1)" + info "Graph Schema exists" + # copy the bulk_load_file to container + bulk_load_file_name=$(basename "${bulk_load_file}") + docker_bulk_load_file="/tmp/${bulk_load_file_name}" + docker cp "${bulk_load_file}" "${GIE_DB_CONTAINER_NAME}:${docker_bulk_load_file}" + + docker_graph_data_dir="${DATABASE_WORKSPACE}/data//${graph_name}/indices" + # currently we can only overwrite the indices, so if it exists, remove it first + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "[ -d ${docker_graph_data_dir} ] && rm -rf ${docker_graph_data_dir} || exit 0" || exit 1 + + cmd="docker exec ${GIE_DB_CONTAINER_NAME} bash -c \"" + cmd="${cmd} ${DOCKER_DB_GRAPH_IMPORT_BIN} ${docker_graph_schema_file} ${docker_bulk_load_file} ${docker_graph_data_dir}" + cmd="${cmd} \"" + + info "Running cmd: ${cmd}" + eval ${cmd} || (echo "Fail to import graph to database " && exit 1) + info "Successfuly import data to graph ${graph_name}" +} + +#################### Destroy #################### +function do_destroy() { + info "Destroy database" + docker stop "${GIE_DB_CONTAINER_NAME}" + docker rm "${GIE_DB_CONTAINER_NAME}" + . ${HOST_DB_ENV_FILE} + # rm host data/* + sudo rm -rf ${HOST_DB_HOME}/data/* + + #rm .running + rm ${HOST_DB_RUNNING_FILE} + rm ${HOST_DB_ENV_FILE} + + + info "Finish destroy database" +} + +#################### Start database #################### +function do_start(){ + . ${HOST_DB_ENV_FILE} + info "Starting database..." + + # check whether the .running file exists, if exists, exit + check_process_not_running_in_container "${GIE_DB_CONTAINER_NAME}" "${DOCKER_DB_SERVER_BIN}" "Database is already running" + check_process_not_running_in_container "${GIE_DB_CONTAINER_NAME}" "${DOCKER_DB_COMPILER_BIN}" "Compiler is already running" + + # set trap to do_stop + trap do_stop SIGINT SIGTERM + + graph_name="" + engine_config_file="" + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -g | --graph) + graph_name="$2" + shift # past argument + shift + ;; + -c | --config) + engine_config_file="$2" + shift + shift + ;; + *) + err "unknown option $1" + start_usage + exit 1 + ;; + esac + done + # try parse default_graph from engine_config_file + # generate real engine config file, put it at /tmp/real_engine_config.yaml + if [ -z "${graph_name}" ]; then + graph_name=${DATABASE_CURRENT_GRAPH_NAME} + info "Using user specified graph ${graph_name}" + else + DATABASE_CURRENT_GRAPH_NAME=${graph_name} + fi + + real_engine_config_file="/tmp/real_engine_config.yaml" + if [ -z "${engine_config_file}" ]; then + generate_real_engine_conf "${real_engine_config_file}" + else + generate_real_engine_conf "${engine_config_file}" "${real_engine_config_file}" + fi + + # copy engine config file to container + dst_engine_config_file="${DATABASE_WORKSPACE}/conf/engine_config.yaml" + docker cp "${real_engine_config_file}" "${GIE_DB_CONTAINER_NAME}:${dst_engine_config_file}" || (echo "fail to copy $engine_config_file to container" && exit 1) + + + if [ -z "${graph_name}" ]; then + info "graph name is not specified" + info "Using default graph [modern]" + graph_name="modern" + fi + + # check if modern_graph exists in container, get the result as bool + docker_graph_schema_file="${DATABASE_WORKSPACE}/data/${graph_name}/graph.yaml" + wal_file="${DATABASE_WORKSPACE}/data/${graph_name}/indices/init_snapshot.bin" + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "( [ -f ${docker_graph_schema_file} ] && [ -f ${wal_file} ] && echo \"true\" e) || echo \"false\"" > /tmp/graph_exists + graph_exists=$(cat /tmp/graph_exists) + if [ "${graph_exists}" = "false" ]; then + info "graph ${graph_name} not exists, create it first" + # remove the data/${graph_name} directory in container + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "rm -rf ${DATABASE_WORKSPACE}/data/${graph_name}" + do_create -g ${graph_name} -c ${HOST_DB_HOME}/examples/modern_graph/modern_graph.yaml + do_import -g ${graph_name} -c ${HOST_DB_HOME}/examples/modern_graph/bulk_load.yaml + info "Successfuly create and import graph ${graph_name}" + else + info "graph ${graph_name} exists, skip create and import" + fi + + do_stop + ensure_container_running + # regenerate graph.yaml from graph0.yaml and override graph.yaml with stored procedure enable and disable + update_graph_yaml_with_procedure_enabling ${graph_name} + + # the bulk_load_file shoud place inside ${DATABASE_WORKSPACE}. and should use relative path + . ${HOST_DB_ENV_FILE} + info "In start datebase, received graph_name = ${graph_name}, engine_config_file = ${engine_config_file}" + docker_server_log_path="${DATABASE_WORKSPACE}/logs/server.log" + graph_schema_file="${DATABASE_WORKSPACE}/data/${graph_name}/graph.yaml" + csr_data_dir="${DATABASE_WORKSPACE}/data/${graph_name}/indices" + cmd="docker exec ${GIE_DB_CONTAINER_NAME} bash -c \"" + cmd="${cmd} ${DOCKER_DB_SERVER_BIN} -c ${dst_engine_config_file}" + cmd="${cmd} -g ${graph_schema_file} --data-path ${csr_data_dir}" + cmd="${cmd} --gie-home ${DOCKER_DB_GIE_HOME}" + cmd="${cmd} > ${docker_server_log_path} 2>&1 & \"" + echo "Running cmd: ${cmd}" + # eval command, if fails exist + eval ${cmd} || (echo "Fail to launch hqps server" && exit 1) + sleep 4 + # check whether the process is running + check_process_running_in_container ${GIE_DB_CONTAINER_NAME} ${DOCKER_DB_SERVER_BIN} ", use gs_interactive service get_log -o [dir] to see get logs" + info "Successfuly start server" + + # start compiler + docker_compiler_log_path="${DATABASE_WORKSPACE}/logs/compiler.log" + cmd="docker exec ${GIE_DB_CONTAINER_NAME} bash -c \"" + cmd=${cmd}"java -cp \"${DOCKER_DB_GIE_HOME}/compiler/target/libs/*:${DOCKER_DB_GIE_HOME}/compiler/target/compiler-0.0.1-SNAPSHOT.jar\" " + cmd=${cmd}" -Djna.library.path=${DOCKER_DB_GIE_HOME}/executor/ir/target/release" + cmd=${cmd}" -Dgraph.schema=${graph_schema_file}" + # should error be reported? + # cmd=${cmd}" -Dgraph.stored.procedures.uri=file:${docker_graph_plugin_dir}" + cmd=${cmd}" ${DOCKER_DB_COMPILER_BIN} ${dst_engine_config_file} > ${docker_compiler_log_path} 2>&1 &" + cmd=${cmd}"\"" + info "Running cmd: ${cmd}" + eval ${cmd} + sleep 6 + check_process_running_in_container ${GIE_DB_CONTAINER_NAME} ${DOCKER_DB_COMPILER_BIN} ", use gs_interactive service get_log -o [dir] to see more details" + info "Successfuly start compiler" + # get cypher port from engine config file + # bolt_connector_port=$(parse_yaml "${engine_config_file}" | grep "compiler_endpoint_bolt_connector_port" | awk -F "=" '{print $2}') + info "DataBase service is running..., port is open on :${DATABASE_COMPILER_BOLT_PORT}" + + # if do_start success, we should write current args to ${HOST_DB_RUNNING_FILE} + echo "GRAPH_NAME=${graph_name}" > ${HOST_DB_RUNNING_FILE} + echo "ENGINE_CONFIG_FILE=${engine_config_file}" >> ${HOST_DB_RUNNING_FILE} + # create .lock file + docker_graph_lock_file="${DATABASE_WORKSPACE}/data/${graph_name}/.lock" + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "touch ${docker_graph_lock_file}" || exit 1 +} + + +#################### Stop database #################### +function do_stop(){ + # if container is not running, do nothing + if [ -f "${HOST_DB_RUNNING_FILE}" ]; then + . ${HOST_DB_ENV_FILE} + else + info "No running database found, do nothing" + fi + # get graph_name from ${HOST_DB_RUNNING_FILE} + local graph_name=$(sed -n '1p' ${HOST_DB_RUNNING_FILE} | cut -d '=' -f 2) + docker_graph_lock_file="${DATABASE_WORKSPACE}/data/${graph_name}/.lock" + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "rm -f ${docker_graph_lock_file}" || exit 1 + info "Successfuly remove ${docker_graph_lock_file} file" + # stop the SERVER_BIN process and graph_server process + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "pkill -f ${DOCKER_DB_SERVER_BIN}" + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "pkill -f ${DOCKER_DB_COMPILER_BIN}" + sleep 6 + info "Successfuly stop database" +} + + +#################### Get database status #################### +function do_status() { + if [ "$(docker inspect -f '{{.State.Running}}' "${GIE_DB_CONTAINER_NAME}")" = "true" ]; then + info "container ${GIE_DB_CONTAINER_NAME} is running" + else + info "container ${GIE_DB_CONTAINER_NAME} is not running" + info "Please start database first" + fi + . ${HOST_DB_ENV_FILE} + # the container is running but the process is not running + check_process_running_in_container ${GIE_DB_CONTAINER_NAME} ${DOCKER_DB_SERVER_BIN} "The service is stopped or down. Use gs_interactive service get_log -o [dir] to see more details" + check_process_running_in_container ${GIE_DB_CONTAINER_NAME} ${DOCKER_DB_COMPILER_BIN} "The service is stopped or down. Use gs_interactive service get_log -o [dir] to see more details" + # get cypher port from engine config file in container + + docker_engine_config_file="${DATABASE_WORKSPACE}/conf/engine_config.yaml" + # copy the engine config file to host's tmp directory + docker cp "${GIE_DB_CONTAINER_NAME}:${docker_engine_config_file}" "${HOST_DB_TMP_DIR}/engine_config.yaml" || exit 1 + eval $(parse_yaml "${HOST_DB_TMP_DIR}/engine_config.yaml") + info "Database service is running..., port is open on :${compiler_endpoint_bolt_connector_port}" +} + + + +#################### Restart #################### +function do_restart() { + # read args from cached file. + # get num lines in file ${HOST_DB_RUNNING_FILE} + num_lines=$(wc -l < ${HOST_DB_RUNNING_FILE}) + if [ ${num_lines} -ne 2 ]; then + err "Error: ${HOST_DB_RUNNING_FILE} should have 2 lines, but got ${num_lines}, something wrong with the file ${HOST_DB_RUNNING_FILE}" + exit 1 + fi + # read args from file + GRAPH_NAME=$(sed -n '1p' ${HOST_DB_RUNNING_FILE} | cut -d '=' -f 2) + ENGINE_CONFIG_FILE=$(sed -n '2p' ${HOST_DB_RUNNING_FILE} | cut -d '=' -f 2) + # parse current args, override the args from file + info "Restarting database..." + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -g | --graph) + GRAPH_NAME="$2" + shift # past argument + shift + ;; + -c | --config) + ENGINE_CONFIG_FILE="$2" + shift + shift + ;; + *) + err "unknown option $1" + restart_usage + exit 1 + ;; + esac + done + do_stop + info "Successfuly stop database" + do_start -g ${GRAPH_NAME} -c ${ENGINE_CONFIG_FILE} + info "Finish restarting database..." +} + +#################### Get log #################### +function do_log(){ + . ${HOST_DB_ENV_FILE} + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -o | --output) + directory="$2" + shift # past argument + shift + ;; + *) + err "unknown option $1" + get_log_usage + exit 1 + ;; + esac + done + # check directory is set + if [ -z "${directory}" ]; then + err "output directory is not specified" + get_log_usage + exit 1 + fi + # get log directory in container + docker_log_dir="${DATABASE_WORKSPACE}/logs" + # copy ${docker_log_dir}/compiler.log and ${docker_log_dir}/server.log to ${directory} + docker_compiler_log="${docker_log_dir}/compiler.log" + docker_server_log="${docker_log_dir}/server.log" + # docker cp + docker cp "${GIE_DB_CONTAINER_NAME}:${docker_compiler_log}" "${directory}/compiler.log" || exit 1 + docker cp "${GIE_DB_CONTAINER_NAME}:${docker_server_log}" "${directory}/server.log" || exit 1 + info "Successfuly get log to ${directory}, please check compiler.log and server.log" +} + +# the compiled dynamic libs will be placed at data/${graph_name}/plugins/ +# after compilation, the user need to write the cooresponding yaml, telling the compiler about +# the input and output of the stored procedure +function do_compile() { + ensure_container_running + if [ $# -lt 4 ]; then + err "compile stored_procedure command at least 4 args, but got $#" + compile_usage + exit 1 + fi + compile_only=false + + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -g | --graph) + graph_name="$2" + info "graph_name = ${graph_name}" + shift # past argument + shift + ;; + -i | --input) + file_path="$2" + shift # past argument + shift + ;; + -n | --name) + stored_procedure_name="$2" + shift + shift + ;; + -d | --description) + stored_procedure_desc="$2" + shift + shift + ;; + --compile_only) + compile_only=true + shift + shift + ;; + *) + err "unknown option $1" + compile_usage + exit 1 + ;; + esac + done + + # check graph_name + if [ -z "${graph_name}" ]; then + err "graph_name is empty" + compile_usage + exit 1 + fi + + # check file_path + check_file_exists "${file_path}" + # get real file_path + file_name=$(basename "${file_path}") + # get file_name and assign to stored_procedure_name if stored_procedure_name is not set + if [ -z "${stored_procedure_name}" ]; then + stored_procedure_name="${file_name%.*}" + fi + real_file_path=$(realpath "${file_path}") + # check exists + if [ ! -f "${real_file_path}" ]; then + err "file ${real_file_path} not exist" + exit 1 + fi + + . ${HOST_DB_ENV_FILE} + + real_engine_config_file="/tmp/real_engine_config.yaml" + # update default graph name + DATABASE_CURRENT_GRAPH_NAME=${graph_name} + generate_real_engine_conf "${real_engine_config_file}" + # copy to container + docker_engine_config="${DATABASE_WORKSPACE}/conf/engine_config.yaml" + docker cp "${real_engine_config_file}" "${GIE_DB_CONTAINER_NAME}:${docker_engine_config}" || exit 1 + + docker_graph_dir="${DATABASE_WORKSPACE}/data/${graph_name}" + docker_graph_schema="${docker_graph_dir}/graph.yaml" + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "[ -d ${docker_graph_dir} ] || (echo \"graph ${graph_name} not exists, please create it first\" && exit 1)" + + container_output_dir="${DATABASE_WORKSPACE}/data/${graph_name}/plugins" + cotainer_input_path="/tmp/${file_name}" + # docker cp file to container + cmd="docker cp ${real_file_path} ${GIE_DB_CONTAINER_NAME}:${cotainer_input_path}" + eval ${cmd} || exit 1 + + cmd="docker exec ${GIE_DB_CONTAINER_NAME} bash -c \"" + cmd=${cmd}" ${DOCKER_DB_GEN_BIN}" + cmd=${cmd}" --engine_type=hqps" + cmd=${cmd}" --input=${cotainer_input_path}" + cmd=${cmd}" --work_dir=/tmp/codegen/" + cmd=${cmd}" --ir_conf=${docker_engine_config}" + cmd=${cmd}" --graph_schema_path=${docker_graph_schema}" + cmd=${cmd}" --gie_home=${DOCKER_DB_GIE_HOME}" + cmd=${cmd}" --output_dir=${container_output_dir}" + cmd=${cmd}" --procedure_name=${stored_procedure_name}" + if [ ! -z "${stored_procedure_desc}" ]; then + cmd=${cmd}" --procedure_desc=\"${stored_procedure_desc}\"" + fi + cmd=${cmd}" \"" + + echo "Running cmd: ${cmd}" + eval ${cmd} || exit 1 + # check output exists + output_file="${HOST_DB_HOME}/data/${graph_name}/plugins/lib${stored_procedure_name}.so" + + if [ ! -f "${output_file}" ]; then + err "output file ${output_file} not exist, compilation failed" + exit 1 + fi + info "success generate dynamic lib ${output_file}." + + # if not compile_only, we should add the stored_procedure_name to .enable + docker_graph_enable_file="${docker_graph_dir}/plugins/.enable" + # copy container to host + rm -f /tmp/.enable + docker cp "${GIE_DB_CONTAINER_NAME}:${docker_graph_enable_file}" "/tmp/.enable" || true + if [ ! -f "/tmp/.enable" ]; then + touch "/tmp/.enable" + fi + # if compile_only equal to false + if [ "${compile_only}" = false ]; then + echo "${stored_procedure_name}" >> /tmp/.enable + fi + # copy back + docker cp "/tmp/.enable" "${GIE_DB_CONTAINER_NAME}:${docker_graph_enable_file}" || exit 1 +} + +function do_enable(){ + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -g | --graph) + graph_name="$2" + shift # past argument + shift + ;; + -n | --name) + stored_procedure_names="$2" + shift + shift + ;; + -c | --config) + stored_procedure_names_yaml="$2" + shift + shift + ;; + *) + err "unknown option $1" + enable_proc_usage + exit 1 + ;; + esac + done + + # --name and --config can not be set at the same time + if [ ! -z "${stored_procedure_names}" ] && [ ! -z "${stored_procedure_names_yaml}" ]; then + err "--name and --config can not be set at the same time" + enable_proc_usage + exit 1 + fi + # use stored_procedures_names_yaml if it is set + if [ ! -z "${stored_procedure_names_yaml}" ]; then + check_file_exists "${stored_procedure_names_yaml}" + # cat the file and get each line as a stored_procedure_name, join them with ',' + stored_procedure_names=$(cat "${stored_procedure_names_yaml}" | tr '\n' ',' | sed 's/,$//') + fi + info "stored_procedure_names = ${stored_procedure_names}" + # add the names to .enable file for graph_name + . ${HOST_DB_ENV_FILE} + docker_graph_dir="${DATABASE_WORKSPACE}/data/${graph_name}" + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "[ -d ${docker_graph_dir} ] || (echo \"graph ${graph_name} not exists, please create it first\" && exit 1)" + docker_graph_plugin_dir="${docker_graph_dir}/plugins" + docker_graph_enable_file="${docker_graph_plugin_dir}/.enable" + rm -f /tmp/.enable + # copy the .enable file to host, and append the stored_procedure_names to it; if the stored_procedure_names already exists, do nothing + docker cp "${GIE_DB_CONTAINER_NAME}:${docker_graph_enable_file}" "/tmp/.enable" || true + if [ ! -f "/tmp/.enable" ]; then + touch "/tmp/.enable" + fi + old_line_num=$(wc -l < /tmp/.enable) + # split the stored_procedure_names by ',' and append them to .enable file + IFS=',' read -ra stored_procedure_names_array <<< "${stored_procedure_names}" + for stored_procedure_name in "${stored_procedure_names_array[@]}"; do + # check if the stored_procedure_name already exists in .enable file + if grep -q "${stored_procedure_name}" "/tmp/.enable"; then + info "stored_procedure_name ${stored_procedure_name} already exists in .enable file, skip" + else + echo "${stored_procedure_name}" >> /tmp/.enable + fi + done + # copy the .enable file back to container + docker cp "/tmp/.enable" "${GIE_DB_CONTAINER_NAME}:${docker_graph_enable_file}" || exit 1 + new_line_num=$(wc -l < /tmp/.enable) + info "Successfuly enable stored_procedures ${stored_procedure_names} for graph ${graph_name}, ${old_line_num} -> ${new_line_num}" +} + +function do_disable(){ + disable_all=false + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -g | --graph) + graph_name="$2" + shift # past argument + shift + ;; + -n | --name) + stored_procedure_names="$2" + shift + shift + ;; + -c | --config) + stored_procedure_names_yaml="$2" + shift + shift + ;; + -a | --all) + disable_all=true + shift + ;; + *) + err "unknown option $1" + disable_proc_usage + exit 1 + ;; + esac + done + + # --name and --config can not be set at the same time + if [ ! -z "${stored_procedure_names}" ] && [ ! -z "${stored_procedure_names_yaml}" ] && [ disable_all ]; then + err "--name, --config and --all can not be set at the same time" + disable_proc_usage + exit 1 + fi + . ${HOST_DB_ENV_FILE} + # check graph_name not empty + if [ -z "${graph_name}" ]; then + err "graph_name is empty" + disable_proc_usage + exit 1 + fi + info "graph_name = ${graph_name}" + docker_graph_dir="${DATABASE_WORKSPACE}/data/${graph_name}" + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "[ -d ${docker_graph_dir} ] || (echo \"graph ${graph_name} not exists, please create it first\" && exit 1)" + docker_graph_plugin_dir="${docker_graph_dir}/plugins" + docker_graph_enable_file="${docker_graph_plugin_dir}/.enable" + echo "disable_all = ${disable_all}" + if [ "${disable_all}" = true ]; then + # clear the .enable file + info "disable all stored_procedures for graph ${graph_name}" + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "(rm \"\" > ${docker_graph_enable_file}) || exit 0" || exit 1 + info "Successfuly disable all stored_procedures for graph ${graph_name}" + exit 0 + fi + + # use stored_procedures_names_yaml if it is set + if [ ! -z "${stored_procedure_names_yaml}" ]; then + check_file_exists "${stored_procedure_names_yaml}" + # cat the file and get each line as a stored_procedure_name, join them with ',' + stored_procedure_names=$(cat "${stored_procedure_names_yaml}" | tr '\n' ',' | sed 's/,$//') + fi + info "stored_procedure_names = ${stored_procedure_names}" + # add the names to .enable file for graph_name + + # copy the .enable file to host, and remove the stored_procedure_names from it + docker cp "${GIE_DB_CONTAINER_NAME}:${docker_graph_enable_file}" "/tmp/.enable" || exit 1 + old_line_num=$(wc -l < /tmp/.enable) + # split the stored_procedure_names by ',' and remove them from .enable file + IFS=',' read -ra stored_procedure_names_array <<< "${stored_procedure_names}" + for stored_procedure_name in "${stored_procedure_names_array[@]}"; do + sed -i "/${stored_procedure_name}/d" /tmp/.enable + done + # copy the .enable file back to container + docker cp "/tmp/.enable" "${GIE_DB_CONTAINER_NAME}:${docker_graph_enable_file}" || exit 1 + new_line_num=$(wc -l < /tmp/.enable) + info "Successfuly disable stored_procedures ${stored_procedure_names} for graph ${graph_name}, ${old_line_num} -> ${new_line_num}" +} + +function do_show(){ + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -g | --graph) + graph_name="$2" + shift + shift + ;; + *) + err "unknown option $1" + show_stored_procedure_usage + exit 1 + ;; + esac + done + . ${HOST_DB_ENV_FILE} + # check graph_name not empty + if [ -z "${graph_name}" ]; then + err "graph_name is empty" + exit 1 + fi + info "graph_name = ${graph_name}" + docker_graph_dir="${DATABASE_WORKSPACE}/data/${graph_name}" + docker exec "${GIE_DB_CONTAINER_NAME}" bash -c "[ -d ${docker_graph_dir} ] || (echo \"graph ${graph_name} not exists, please create it first\" && exit 1)" || exit 1 + docker_graph_plugin_dir="${docker_graph_dir}/plugins" + docker_graph_enable_file="${docker_graph_plugin_dir}/.enable" + docker cp "${GIE_DB_CONTAINER_NAME}:${docker_graph_enable_file}" "/tmp/.enable" || exit 1 + info "Enabled stored_procedures for graph ${graph_name}:" + cat /tmp/.enable +} + +function do_database(){ + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + create) + shift + do_create "$@" + exit 0 + ;; + remove) + shift + do_remove "$@" + exit 0 + ;; + import) + shift + do_import "$@" + exit 0 + ;; + *) + err "unknown option $1" + database_usage + exit 1 + ;; + esac + done +} + +function do_service(){ + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + start) + shift + do_start "$@" + exit 0 + ;; + stop) + shift + do_stop "$@" + exit 0 + ;; + restart) + shift + do_restart "$@" + exit 0 + ;; + status) + shift + do_status "$@" + exit 0 + ;; + get_log) + shift + do_log "$@" + exit 0 + ;; + *) + err "unknown option $1" + services_usage + exit 1 + ;; + esac + done +} + +function do_procedure(){ + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + compile) + shift + do_compile "$@" + exit 0 + ;; + enable) + shift + do_enable "$@" + exit 0 + ;; + disable) + shift + do_disable "$@" + exit 0 + ;; + show) + shift + do_show "$@" + exit 1 + ;; + *) + err "unknown option $1" + procedure_usage + exit 1 + ;; + esac + done + procedure_usage +} + +#################### Entry #################### +if [ $# -eq 0 ]; then + usage + exit 1 +fi + +while [[ $# -gt 0 ]]; do + key="$1" + + case $key in + -h | --help) + usage + exit + ;; + init) + shift + info "Start initiating database..." + do_init "$@" + exit 0 + ;; + database) + shift + do_database "$@" + exit 0 + ;; + service) + shift + do_service "$@" + exit 0 + ;; + procedure) + shift + do_procedure "$@" + exit 0 + ;; + destroy) + shift + do_destroy "$@" + exit 0 + ;; + gen_conf) + shift + do_gen_conf "$@" + exit 0 + ;; + *) # unknown option + err "unknown option $1" + usage + exit 1 + ;; + esac +done + + + + diff --git a/flex/interactive/conf/engine_config.yaml b/flex/interactive/conf/engine_config.yaml new file mode 100644 index 000000000000..0bb4487bda46 --- /dev/null +++ b/flex/interactive/conf/engine_config.yaml @@ -0,0 +1,13 @@ +log_level: INFO # default INFO +default_graph: modern # configure the graph to be loaded while starting the service, if graph name not specified +compute_engine: + shard_num: 1 # the number of shared workers, default 1 +compiler: + planner: + is_on: true + opt: RBO + rules: + - FilterMatchRule + - FilterIntoJoinRule + - NotExistToAntiJoinRule + query_timeout: 20000 # query timeout in milliseconds, default 2000 \ No newline at end of file diff --git a/flex/interactive/conf/interactive.properties b/flex/interactive/conf/interactive.properties deleted file mode 100755 index bd1ee716ef00..000000000000 --- a/flex/interactive/conf/interactive.properties +++ /dev/null @@ -1,8 +0,0 @@ -engine.type: hiactor -hiactor.hosts: localhost:10000 -graph.store: exp -graph.schema: file:../data/ldbc/graph.json -graph.stored.procedures.uri: file:/tmp -graph.planner: {"isOn":true,"opt":"RBO","rules":["FilterMatchRule"]} -gremlin.server.disabled: true -neo4j.bolt.server.port: 7687 \ No newline at end of file diff --git a/flex/interactive/conf/interactive.yaml b/flex/interactive/conf/interactive.yaml index 969b6ca80401..12ca707b2fab 100755 --- a/flex/interactive/conf/interactive.yaml +++ b/flex/interactive/conf/interactive.yaml @@ -1,23 +1,3 @@ ---- -version: 0.0.1 -directories: - workspace: /home/graphscope/interactive/ - subdirs: - data: data # by default data, relative to ${workspace} - conf: conf # by default conf, relative to ${workspace} - logs: logs # by default logs, relative to ${workspace} -logLevel: INFO # default INFO -default_graph: modern # configure the graph to be loaded while starting the service, if graph name not specified - # may include other configuration items of other engines -compute_engine: - type: hiactor - hosts: - - localhost:10000 # currently only one host can be specified - shared_num: 1 # the number of shared workers, default 1 -compiler: - planner: {"isOn":true,"opt":"RBO","rules":["FilterMatchRule"]} # Confirm这个配置 - endpoint: - default_listen_address: localhost # default localhost - bolt_connector: # for cypher, there may be other connectors, such as bolt_connector, https_connector - enabled: true # default false - port: 7687 +version: v0.0.2 +volumes: + - /home/zhanglei/code/lei/gie-db/GraphScope/flex/interactive/examples/modern_graph:/home/modern_graph/ \ No newline at end of file diff --git a/flex/interactive/data/ldbc/graph.json b/flex/interactive/data/ldbc/graph.json deleted file mode 100755 index f16dd1710336..000000000000 --- a/flex/interactive/data/ldbc/graph.json +++ /dev/null @@ -1,128 +0,0 @@ -{ - "entities": [ - { - "label": { - "id": 1, - "name": "software" - }, - "columns": [ - { - "key": { - "id": 4, - "name": "id" - }, - "data_type": 1, - "is_primary_key": false - }, - { - "key": { - "id": 0, - "name": "name" - }, - "data_type": 4, - "is_primary_key": false - }, - { - "key": { - "id": 2, - "name": "lang" - }, - "data_type": 4, - "is_primary_key": false - } - ] - }, - { - "label": { - "id": 0, - "name": "person" - }, - "columns": [ - { - "key": { - "id": 4, - "name": "id" - }, - "data_type": 1, - "is_primary_key": false - }, - { - "key": { - "id": 0, - "name": "name" - }, - "data_type": 4, - "is_primary_key": false - }, - { - "key": { - "id": 1, - "name": "age" - }, - "data_type": 1, - "is_primary_key": false - } - ] - } - ], - "relations": [ - { - "label": { - "id": 0, - "name": "knows" - }, - "entity_pairs": [ - { - "src": { - "id": 0, - "name": "person" - }, - "dst": { - "id": 0, - "name": "person" - } - } - ], - "columns": [ - { - "key": { - "id": 3, - "name": "weight" - }, - "data_type": 3, - "is_primary_key": false - } - ] - }, - { - "label": { - "id": 1, - "name": "created" - }, - "entity_pairs": [ - { - "src": { - "id": 0, - "name": "person" - }, - "dst": { - "id": 1, - "name": "software" - } - } - ], - "columns": [ - { - "key": { - "id": 3, - "name": "weight" - }, - "data_type": 3, - "is_primary_key": false - } - ] - } - ], - "is_table_id": true, - "is_column_id": false -} \ No newline at end of file diff --git a/flex/interactive/data/ldbc/graph.yaml b/flex/interactive/data/ldbc/graph.yaml deleted file mode 100755 index c37d4731b071..000000000000 --- a/flex/interactive/data/ldbc/graph.yaml +++ /dev/null @@ -1,70 +0,0 @@ -name: modern # then must have a modern dir under ${data} directory -store_type: mutable_csr # v6d, groot, gart -stored_procedures: - directory: plugins # default plugins, relative to ${workspace}/${name} -schema: - vertex_types: - - type_name: person - x_csr_params: - max_vertex_num: 100 - properties: - - property_id: 0 - property_name: id - property_type: - primitive_type: DT_SIGNED_INT64 - - property_id: 1 - property_name: name - property_type: - primitive_type: DT_STRING - - property_id: 2 - property_name: age - property_type: - primitive_type: DT_SIGNED_INT32 - primary_keys: - - id - - type_name: software - x_csr_params: - max_vertex_num: 100 - properties: - - property_id: 0 - property_name: id - property_type: - primitive_type: DT_SIGNED_INT64 - x_csr_params: - - property_id: 1 - property_name: name - property_type: - primitive_type: DT_STRING - - property_id: 2 - property_name: lang - property_type: - primitive_type: DT_STRING - primary_keys: - - id - edge_types: - - type_name: knows - x_csr_params: - incoming_edge_strategy: None - outgoing_edge_strategy: Multiple - vertex_type_pair_relations: - source_vertex: person - destination_vertex: person - relation: MANY_TO_MANY - properties: - - property_id: 0 - property_name: weight - property_type: - primitive_type: DT_DOUBLE - - type_name: created - x_csr_params: - incoming_edge_strategy: None - outgoing_edge_strategy: Single - vertex_type_pair_relations: - source_vertex: person - destination_vertex: software - relation: ONE_TO_MANY - properties: - - property_id: 0 - property_name: weight - property_type: - primitive_type: DT_DOUBLE diff --git a/flex/interactive/docker/interactive-runtime.Dockerfile b/flex/interactive/docker/interactive-runtime.Dockerfile index 1b8ee02de93e..fb55465f7266 100755 --- a/flex/interactive/docker/interactive-runtime.Dockerfile +++ b/flex/interactive/docker/interactive-runtime.Dockerfile @@ -5,8 +5,9 @@ ARG CI=false SHELL ["/bin/bash", "-c"] # install graphscope -RUN cd /home/graphscope/ && git clone -b main --single-branch https://github.com/alibaba/GraphScope.git && \ +RUN cd /home/graphscope/ && git clone -b interactive_dev --single-branch https://github.com/zhanglei1949/GraphScope.git && \ cd GraphScope/flex && mkdir build && cd build && cmake .. -DBUILD_DOC=OFF && sudo make -j install # install graphscope GIE -RUN . /home/graphscope/.cargo/env && cd /home/graphscope/GraphScope/interactive_engine/compiler && make build +RUN . /home/graphscope/.cargo/env && cd /home/graphscope/GraphScope/interactive_engine && \ + mvn clean install -DskipTests -Drevision=0.0.1-SNAPSHOT -Pexperimental diff --git a/flex/interactive/examples/modern_graph b/flex/interactive/examples/modern_graph deleted file mode 120000 index 8ed59122aab3..000000000000 --- a/flex/interactive/examples/modern_graph +++ /dev/null @@ -1 +0,0 @@ -../../storages/rt_mutable_graph/modern_graph/ \ No newline at end of file diff --git a/flex/storages/rt_mutable_graph/modern_graph/bulk_load.yaml b/flex/interactive/examples/modern_graph/bulk_load.yaml similarity index 98% rename from flex/storages/rt_mutable_graph/modern_graph/bulk_load.yaml rename to flex/interactive/examples/modern_graph/bulk_load.yaml index 8d9085d81aca..4601075c8266 100644 --- a/flex/storages/rt_mutable_graph/modern_graph/bulk_load.yaml +++ b/flex/interactive/examples/modern_graph/bulk_load.yaml @@ -3,6 +3,7 @@ loading_config: data_source: scheme: file # file, oss, s3, hdfs; only file is supported now # location: # specify it or use FLEX_DATA_DIR env. + location: /home/graphscope/default_graph/ import_option: init # append, overwrite, only init is supported now format: type: csv diff --git a/flex/interactive/examples/modern_graph/count_vertex_num.cypher b/flex/interactive/examples/modern_graph/count_vertex_num.cypher new file mode 100644 index 000000000000..cca16c40269d --- /dev/null +++ b/flex/interactive/examples/modern_graph/count_vertex_num.cypher @@ -0,0 +1 @@ +MATCH(v:person { id: $personId}) RETURN COUNT(v); \ No newline at end of file diff --git a/flex/storages/rt_mutable_graph/modern_graph/modern_graph.yaml b/flex/interactive/examples/modern_graph/modern_graph.yaml similarity index 94% rename from flex/storages/rt_mutable_graph/modern_graph/modern_graph.yaml rename to flex/interactive/examples/modern_graph/modern_graph.yaml index 7823b3fd7561..7d6308bb96b0 100644 --- a/flex/storages/rt_mutable_graph/modern_graph/modern_graph.yaml +++ b/flex/interactive/examples/modern_graph/modern_graph.yaml @@ -1,9 +1,5 @@ name: modern # then must have a modern dir under ${data} directory store_type: mutable_csr # v6d, groot, gart -stored_procedures: - directory: plugins # default plugins, relative to ${workspace}/${name} - enable_lists: - - ldbc_ic1 schema: vertex_types: - type_id: 0 diff --git a/flex/storages/rt_mutable_graph/modern_graph/person.csv b/flex/interactive/examples/modern_graph/person.csv similarity index 100% rename from flex/storages/rt_mutable_graph/modern_graph/person.csv rename to flex/interactive/examples/modern_graph/person.csv diff --git a/flex/storages/rt_mutable_graph/modern_graph/person_created_software.csv b/flex/interactive/examples/modern_graph/person_created_software.csv similarity index 100% rename from flex/storages/rt_mutable_graph/modern_graph/person_created_software.csv rename to flex/interactive/examples/modern_graph/person_created_software.csv diff --git a/flex/storages/rt_mutable_graph/modern_graph/person_knows_person.csv b/flex/interactive/examples/modern_graph/person_knows_person.csv similarity index 100% rename from flex/storages/rt_mutable_graph/modern_graph/person_knows_person.csv rename to flex/interactive/examples/modern_graph/person_knows_person.csv diff --git a/flex/storages/rt_mutable_graph/modern_graph/software.csv b/flex/interactive/examples/modern_graph/software.csv similarity index 100% rename from flex/storages/rt_mutable_graph/modern_graph/software.csv rename to flex/interactive/examples/modern_graph/software.csv diff --git a/flex/storages/rt_mutable_graph/schema.cc b/flex/storages/rt_mutable_graph/schema.cc index b42f344dd519..6fe69a417440 100644 --- a/flex/storages/rt_mutable_graph/schema.cc +++ b/flex/storages/rt_mutable_graph/schema.cc @@ -714,21 +714,60 @@ static bool parse_schema_config_file(const std::string& path, Schema& schema) { return false; } } + // get the directory of path + auto parent_dir = std::filesystem::path(path).parent_path().string(); if (graph_node["stored_procedures"]) { auto stored_procedure_node = graph_node["stored_procedures"]; auto directory = stored_procedure_node["directory"].as(); // check is directory if (!std::filesystem::exists(directory)) { - LOG(WARNING) << "plugin directory - " << directory << " not found..."; + LOG(ERROR) << "plugin directory - " << directory + << " not found, try with parent dir:" << parent_dir; + directory = parent_dir + "/" + directory; + if (!std::filesystem::exists(directory)) { + LOG(ERROR) << "plugin directory - " << directory << " not found..."; + return true; + } } + schema.SetPluginDir(directory); std::vector files_got; if (!get_sequence(stored_procedure_node, "enable_lists", files_got)) { LOG(ERROR) << "stored_procedures is not set properly"; + return true; } + std::vector all_procedure_yamls = get_yaml_files(directory); + std::vector all_procedure_names; + { + // get all procedure names + for (auto& f : all_procedure_yamls) { + YAML::Node procedure_node = YAML::LoadFile(f); + if (!procedure_node || !procedure_node.IsMap()) { + LOG(ERROR) << "procedure is not set properly"; + return false; + } + std::string procedure_name; + if (!get_scalar(procedure_node, "name", procedure_name)) { + LOG(ERROR) << "name is not set properly for " << f; + return false; + } + all_procedure_names.push_back(procedure_name); + } + } + for (auto& f : files_got) { - if (!std::filesystem::exists(f)) { - LOG(ERROR) << "plugin - " << f << " file not found..."; + auto real_file = directory + "/" + f; + if (!std::filesystem::exists(real_file)) { + LOG(ERROR) << "plugin - " << real_file << " file not found..."; + // it seems that f is not the filename, but the plugin name, try to find + // the plugin in the directory + if (std::find(all_procedure_names.begin(), all_procedure_names.end(), + f) == all_procedure_names.end()) { + LOG(ERROR) << "plugin - " << f << " not found..."; + } else { + VLOG(1) << "plugin - " << f << " found..."; + schema.EmplacePlugin(f); + } } else { schema.EmplacePlugin(std::filesystem::canonical(f)); } @@ -748,6 +787,10 @@ void Schema::EmplacePlugin(const std::string& plugin) { plugin_list_.emplace_back(plugin); } +void Schema::SetPluginDir(const std::string& dir) { plugin_dir_ = dir; } + +std::string Schema::GetPluginDir() const { return plugin_dir_; } + // check whether prop in vprop_names, or is the primary key bool Schema::vertex_has_property(const std::string& label, const std::string& prop) const { diff --git a/flex/storages/rt_mutable_graph/schema.h b/flex/storages/rt_mutable_graph/schema.h index c0a017088d79..81e6bff50ce7 100644 --- a/flex/storages/rt_mutable_graph/schema.h +++ b/flex/storages/rt_mutable_graph/schema.h @@ -148,6 +148,10 @@ class Schema { void EmplacePlugin(const std::string& plugin_name); + void SetPluginDir(const std::string& plugin_dir); + + std::string GetPluginDir() const; + private: label_t vertex_label_to_index(const std::string& label); @@ -169,6 +173,7 @@ class Schema { std::map ie_strategy_; std::vector max_vnum_; std::vector plugin_list_; + std::string plugin_dir_; }; } // namespace gs diff --git a/flex/tests/hqps/match_query.h b/flex/tests/hqps/match_query.h index 189885b12428..a5aa3c1c2686 100644 --- a/flex/tests/hqps/match_query.h +++ b/flex/tests/hqps/match_query.h @@ -519,5 +519,59 @@ class MatchQuery9 : public HqpsAppBase { } }; +class MatchQuery10 : public HqpsAppBase { + public: + using Engine = SyncEngine; + using label_id_t = typename gs::MutableCSRInterface::label_id_t; + using vertex_id_t = typename gs::MutableCSRInterface::vertex_id_t; + // Query function for query class + results::CollectiveResults Query(const gs::MutableCSRInterface& graph) const { + auto ctx0 = Engine::template ScanVertex( + graph, 1, Filter()); + + auto edge_expand_opt0 = + gs::make_edge_expand_multie_opt, + std::tuple>( + gs::Direction::Out, + std::array, 2>{ + std::array{1, 1, 8}, + std::array{1, 2, 9}}, + std::tuple{PropTupleArrayT>{"creationDate"}, + PropTupleArrayT>{"creationDate"}}); + auto ctx1 = + Engine::template EdgeExpandE( + graph, std::move(ctx0), std::move(edge_expand_opt0)); + + auto get_v_opt1 = make_getv_opt( + gs::VOpt::End, + std::array{(label_id_t) 0, (label_id_t) 1}); + auto ctx2 = Engine::template GetV( + graph, std::move(ctx1), std::move(get_v_opt1)); + auto ctx3 = Engine::Project( + graph, std::move(ctx2), + std::tuple{gs::make_mapper_with_variable( + gs::PropertySelector("")), + gs::make_mapper_with_variable( + gs::PropertySelector(""))}); + auto agg_func2 = gs::make_aggregate_prop( + std::tuple{gs::PropertySelector("None")}, + std::integer_sequence{}); + + auto ctx4 = Engine::GroupByWithoutKey(graph, std::move(ctx3), + std::tuple{std::move(agg_func2)}); + for (auto iter : ctx4) { + VLOG(10) << "ctx4: " << gs::to_string(iter.GetAllElement()); + } + return Engine::Sink(ctx4, std::array{2}); + } + // Wrapper query function for query class + results::CollectiveResults Query(const gs::MutableCSRInterface& graph, + Decoder& decoder) const override { + // decoding params from decoder, and call real query func + + return Query(graph); + } +}; + } // namespace gs #endif // TESTS_HQPS_MATCH_QUERY_H_ \ No newline at end of file diff --git a/flex/tests/hqps/query_test.cc b/flex/tests/hqps/query_test.cc index 13d1850d3e49..38eb0cea291c 100644 --- a/flex/tests/hqps/query_test.cc +++ b/flex/tests/hqps/query_test.cc @@ -36,145 +36,159 @@ int main(int argc, char** argv) { db.Init(schema, loading_config, data_dir, 1); auto& sess = gs::GraphDB::get().GetSession(0); - { - auto& graph = sess.graph(); - auto max_v_num = graph.vertex_num(1); - std::vector vids(max_v_num); - for (gs::MutableCSRInterface::vertex_id_t i = 0; i < max_v_num; ++i) { - vids[i] = i; - } - gs::MutableCSRInterface interface(sess); - std::array prop_names{"creationDate"}; - auto edges = - interface.GetEdges(1, 1, 8, vids, "Both", INT_MAX, prop_names); - double t = -grape::GetCurrentTime(); - size_t cnt = 0; - for (auto i = 0; i < vids.size(); ++i) { - auto adj_list = edges.get(i); - for (auto iter : adj_list) { - VLOG(10) << iter.neighbor() << ", " << gs::to_string(iter.properties()); - cnt += 1; - } - } - t += grape::GetCurrentTime(); - LOG(INFO) << "visiting edges: cost: " << t << ", num edges: " << cnt; - - // visiting vertices properties - auto vertex_prop = - interface.GetVertexPropsFromVid(1, vids, {"id"}); - for (auto i = 0; i < 10; ++i) { - VLOG(10) << "vid: " << vids[i] - << ", prop: " << gs::to_string(vertex_prop[i]); - } - } - - { - gs::SampleQuery query; - std::vector encoder_array; - gs::Encoder input_encoder(encoder_array); - input_encoder.put_long(19791209300143); - input_encoder.put_long(1354060800000); - std::vector output_array; - gs::Encoder output(output_array); - gs::Decoder input(encoder_array.data(), encoder_array.size()); - - gs::MutableCSRInterface graph(sess); - query.Query(graph, input); - LOG(INFO) << "Finish Sample query"; - } - { - gs::MatchQuery query; - std::vector encoder_array; - gs::Encoder input_encoder(encoder_array); - std::vector output_array; - gs::Encoder output(output_array); - gs::Decoder input(encoder_array.data(), encoder_array.size()); - - gs::MutableCSRInterface graph(sess); - query.Query(graph, input); - LOG(INFO) << "Finish MatchQuery test"; - } - - { - gs::MatchQuery1 query; - std::vector encoder_array; - gs::Encoder input_encoder(encoder_array); - std::vector output_array; - gs::Encoder output(output_array); - gs::Decoder input(encoder_array.data(), encoder_array.size()); - - gs::MutableCSRInterface graph(sess); - auto res = query.Query(graph, input); - LOG(INFO) << "Finish MatchQuery1 test"; - } - - { - gs::MatchQuery2 query; - std::vector encoder_array; - gs::Encoder input_encoder(encoder_array); - std::vector output_array; - gs::Encoder output(output_array); - gs::Decoder input(encoder_array.data(), encoder_array.size()); - - gs::MutableCSRInterface graph(sess); - query.Query(graph, input); - LOG(INFO) << "Finish MatchQuery2 test"; - } - - { - gs::MatchQuery3 query; - std::vector encoder_array; - gs::Encoder input_encoder(encoder_array); - std::vector output_array; - gs::Encoder output(output_array); - gs::Decoder input(encoder_array.data(), encoder_array.size()); - - gs::MutableCSRInterface graph(sess); - query.Query(graph, input); - LOG(INFO) << "Finish MatchQuery3 test"; - } - - { - gs::MatchQuery4 query; - std::vector encoder_array; - gs::Encoder input_encoder(encoder_array); - std::vector output_array; - gs::Encoder output(output_array); - gs::Decoder input(encoder_array.data(), encoder_array.size()); - - gs::MutableCSRInterface graph(sess); - query.Query(graph, input); - LOG(INFO) << "Finish MatchQuery4 test"; - } - - { - gs::MatchQuery5 query; - std::vector encoder_array; - gs::Encoder input_encoder(encoder_array); - std::vector output_array; - gs::Encoder output(output_array); - gs::Decoder input(encoder_array.data(), encoder_array.size()); - - gs::MutableCSRInterface graph(sess); - query.Query(graph, input); - LOG(INFO) << "Finish MatchQuery5 test"; - } - - { - gs::MatchQuery7 query; - std::vector encoder_array; - gs::Encoder input_encoder(encoder_array); - std::vector output_array; - gs::Encoder output(output_array); - gs::Decoder input(encoder_array.data(), encoder_array.size()); - - gs::MutableCSRInterface graph(sess); - query.Query(graph, input); - LOG(INFO) << "Finish MatchQuery7 test"; - } + // { + // auto& graph = sess.graph(); + // auto max_v_num = graph.vertex_num(1); + // std::vector vids(max_v_num); + // for (gs::MutableCSRInterface::vertex_id_t i = 0; i < max_v_num; ++i) { + // vids[i] = i; + // } + // gs::MutableCSRInterface interface(sess); + // std::array prop_names{"creationDate"}; + // auto edges = + // interface.GetEdges(1, 1, 8, vids, "Both", INT_MAX, + // prop_names); + // double t = -grape::GetCurrentTime(); + // size_t cnt = 0; + // for (auto i = 0; i < vids.size(); ++i) { + // auto adj_list = edges.get(i); + // for (auto iter : adj_list) { + // VLOG(10) << iter.neighbor() << ", " << + // gs::to_string(iter.properties()); cnt += 1; + // } + // } + // t += grape::GetCurrentTime(); + // LOG(INFO) << "visiting edges: cost: " << t << ", num edges: " << cnt; + + // // visiting vertices properties + // auto vertex_prop = + // interface.GetVertexPropsFromVid(1, vids, {"id"}); + // for (auto i = 0; i < 10; ++i) { + // VLOG(10) << "vid: " << vids[i] + // << ", prop: " << gs::to_string(vertex_prop[i]); + // } + // } + + // { + // gs::SampleQuery query; + // std::vector encoder_array; + // gs::Encoder input_encoder(encoder_array); + // input_encoder.put_long(19791209300143); + // input_encoder.put_long(1354060800000); + // std::vector output_array; + // gs::Encoder output(output_array); + // gs::Decoder input(encoder_array.data(), encoder_array.size()); + + // gs::MutableCSRInterface graph(sess); + // query.Query(graph, input); + // LOG(INFO) << "Finish Sample query"; + // } + // { + // gs::MatchQuery query; + // std::vector encoder_array; + // gs::Encoder input_encoder(encoder_array); + // std::vector output_array; + // gs::Encoder output(output_array); + // gs::Decoder input(encoder_array.data(), encoder_array.size()); + + // gs::MutableCSRInterface graph(sess); + // query.Query(graph, input); + // LOG(INFO) << "Finish MatchQuery test"; + // } + + // { + // gs::MatchQuery1 query; + // std::vector encoder_array; + // gs::Encoder input_encoder(encoder_array); + // std::vector output_array; + // gs::Encoder output(output_array); + // gs::Decoder input(encoder_array.data(), encoder_array.size()); + + // gs::MutableCSRInterface graph(sess); + // auto res = query.Query(graph, input); + // LOG(INFO) << "Finish MatchQuery1 test"; + // } + + // { + // gs::MatchQuery2 query; + // std::vector encoder_array; + // gs::Encoder input_encoder(encoder_array); + // std::vector output_array; + // gs::Encoder output(output_array); + // gs::Decoder input(encoder_array.data(), encoder_array.size()); + + // gs::MutableCSRInterface graph(sess); + // query.Query(graph, input); + // LOG(INFO) << "Finish MatchQuery2 test"; + // } + + // { + // gs::MatchQuery3 query; + // std::vector encoder_array; + // gs::Encoder input_encoder(encoder_array); + // std::vector output_array; + // gs::Encoder output(output_array); + // gs::Decoder input(encoder_array.data(), encoder_array.size()); + + // gs::MutableCSRInterface graph(sess); + // query.Query(graph, input); + // LOG(INFO) << "Finish MatchQuery3 test"; + // } + + // { + // gs::MatchQuery4 query; + // std::vector encoder_array; + // gs::Encoder input_encoder(encoder_array); + // std::vector output_array; + // gs::Encoder output(output_array); + // gs::Decoder input(encoder_array.data(), encoder_array.size()); + + // gs::MutableCSRInterface graph(sess); + // query.Query(graph, input); + // LOG(INFO) << "Finish MatchQuery4 test"; + // } + + // { + // gs::MatchQuery5 query; + // std::vector encoder_array; + // gs::Encoder input_encoder(encoder_array); + // std::vector output_array; + // gs::Encoder output(output_array); + // gs::Decoder input(encoder_array.data(), encoder_array.size()); + + // gs::MutableCSRInterface graph(sess); + // query.Query(graph, input); + // LOG(INFO) << "Finish MatchQuery5 test"; + // } + + // { + // gs::MatchQuery7 query; + // std::vector encoder_array; + // gs::Encoder input_encoder(encoder_array); + // std::vector output_array; + // gs::Encoder output(output_array); + // gs::Decoder input(encoder_array.data(), encoder_array.size()); + + // gs::MutableCSRInterface graph(sess); + // query.Query(graph, input); + // LOG(INFO) << "Finish MatchQuery7 test"; + // } + + // { + // gs::MatchQuery9 query; + // std::vector encoder_array; + // gs::Encoder input_encoder(encoder_array); + // std::vector output_array; + // gs::Encoder output(output_array); + // gs::Decoder input(encoder_array.data(), encoder_array.size()); + + // gs::MutableCSRInterface graph(sess); + // query.Query(graph, input); + // LOG(INFO) << "Finish MatchQuery9 test"; + // } { - gs::MatchQuery9 query; + gs::MatchQuery10 query; std::vector encoder_array; gs::Encoder input_encoder(encoder_array); std::vector output_array; @@ -183,7 +197,7 @@ int main(int argc, char** argv) { gs::MutableCSRInterface graph(sess); query.Query(graph, input); - LOG(INFO) << "Finish MatchQuery9 test"; + LOG(INFO) << "Finish MatchQuery10 test"; } LOG(INFO) << "Finish context test."; diff --git a/flex/utils/property/types.h b/flex/utils/property/types.h index aaac320a767f..39a3b4887fa5 100644 --- a/flex/utils/property/types.h +++ b/flex/utils/property/types.h @@ -153,6 +153,28 @@ struct Any { return AnyConverter::to_any(value); } + bool operator==(const Any& other) const { + if (type == other.type) { + if (type == PropertyType::kInt32) { + return value.i == other.value.i; + } else if (type == PropertyType::kInt64) { + return value.l == other.value.l; + } else if (type == PropertyType::kDate) { + return value.d.milli_second == other.value.d.milli_second; + } else if (type == PropertyType::kString) { + return value.s == other.value.s; + } else if (type == PropertyType::kEmpty) { + return true; + } else if (type == PropertyType::kDouble) { + return value.db == other.value.db; + } else { + return false; + } + } else { + return false; + } + } + PropertyType type; AnyValue value; }; diff --git a/flex/utils/yaml_utils.cc b/flex/utils/yaml_utils.cc new file mode 100644 index 000000000000..2413e3b07fab --- /dev/null +++ b/flex/utils/yaml_utils.cc @@ -0,0 +1,32 @@ + +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/utils/yaml_utils.h" +namespace gs { +std::vector get_yaml_files(const std::string& plugin_dir) { + std::filesystem::path dir_path = plugin_dir; + std::vector res_yaml_files; + + for (auto& entry : std::filesystem::directory_iterator(dir_path)) { + if (entry.is_regular_file() && (entry.path().extension() == ".yaml") || + (entry.path().extension() == ".yml")) { + res_yaml_files.emplace_back(entry.path()); + } + } + return res_yaml_files; +} + +} // namespace gs diff --git a/flex/utils/yaml_utils.h b/flex/utils/yaml_utils.h index ea0f37d8b7aa..fc4572fa1c40 100644 --- a/flex/utils/yaml_utils.h +++ b/flex/utils/yaml_utils.h @@ -21,7 +21,12 @@ #include #include +#include "glog/logging.h" + namespace gs { + +std::vector get_yaml_files(const std::string& plugin_dir); + namespace config_parsing { template bool get_scalar(YAML::Node node, const std::string& key, T& value) { diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/IrSchemaParser.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/IrSchemaParser.java index b9994a984a92..bdcedca48c1d 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/IrSchemaParser.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/IrSchemaParser.java @@ -38,7 +38,7 @@ public static IrSchemaParser getInstance() { private IrSchemaParser() {} - public String parse(GraphSchema graphSchema) { + public String parse(GraphSchema graphSchema, boolean isColumnId) { List vertices = graphSchema.getVertexList(); List edges = graphSchema.getEdgeList(); List entities = new ArrayList(); @@ -49,7 +49,7 @@ public String parse(GraphSchema graphSchema) { schemaMap.put("entities", entities); schemaMap.put("relations", relations); schemaMap.put("is_table_id", true); - schemaMap.put("is_column_id", true); + schemaMap.put("is_column_id", isColumnId); return JSON.toJson(schemaMap); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/YamlConfigs.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/YamlConfigs.java index 460e0ad2fc0b..a87192612e03 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/YamlConfigs.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/YamlConfigs.java @@ -225,6 +225,7 @@ private static void flattenAndConvert( if (value instanceof Map) { flattenAndConvert((Map) value, properties, key); } else { + System.out.println("key: " + key + ", value: " + value); properties.put(key, value.toString()); } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/LocalMetaDataReader.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/LocalMetaDataReader.java index 09ffb4b5eede..fd288206645e 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/LocalMetaDataReader.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/reader/LocalMetaDataReader.java @@ -58,20 +58,29 @@ public List getStoredProcedures() throws IOException { List procedureInputs = Lists.newArrayList(); if (enableProcedureList.isEmpty()) { for (File file : procedureDir.listFiles()) { - procedureInputs.add(new FileInputStream(file)); + // if file is .yaml or .yml file + logger.info("load procedure {}", file.getName()); + if (file.getName().endsWith(".yaml") || file.getName().endsWith(".yml")){ + procedureInputs.add(new FileInputStream(file)); + } } } else { Map procedureInputMap = getProcedureNameWithInputStream(procedureDir); for (String enableProcedure : enableProcedureList) { InputStream enableInput = procedureInputMap.get(enableProcedure); - Preconditions.checkArgument( - enableInput != null, - "can not find procedure with name=%s under directory=%s, candidates are %s", - enableProcedure, - procedureDir, - procedureInputMap.keySet()); - procedureInputs.add(enableInput); + //check enableProcedure exists in procedureInputMap's keyset + if (procedureInputMap.containsKey(enableProcedure)){ + Preconditions.checkArgument( + enableInput != null, + "procedure %s not exist in %s", + enableProcedure, + procedurePath); + procedureInputs.add(enableInput); + } + else { + logger.warn("Enabled procedure {} not exist in {}", enableProcedure, procedurePath); + } } } return Collections.unmodifiableList(procedureInputs); @@ -81,8 +90,12 @@ private Map getProcedureNameWithInputStream(File procedureD throws IOException { Map procedureInputMap = Maps.newHashMap(); for (File file : procedureDir.listFiles()) { + if (!file.getName().endsWith(".yaml") && !file.getName().endsWith(".yml")) { + continue; + } String procedureName = getProcedureName(file); procedureInputMap.put(procedureName, new FileInputStream(file)); + logger.info("load procedure {}", procedureName); } return procedureInputMap; } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/IrGraphSchema.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/IrGraphSchema.java index 9e5229b7a44c..bdec124ad9fe 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/IrGraphSchema.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/schema/IrGraphSchema.java @@ -36,6 +36,7 @@ public class IrGraphSchema implements GraphSchema { private final boolean isColumnId; public IrGraphSchema(MetaDataReader dataReader) throws Exception { + this.isColumnId = false; SchemaInputStream schemaInputStream = dataReader.getGraphSchema(); String content = new String( @@ -44,19 +45,19 @@ public IrGraphSchema(MetaDataReader dataReader) throws Exception { switch (schemaInputStream.getFormatType()) { case YAML: this.graphSchema = Utils.buildSchemaFromYaml(content); - this.schemeJson = IrSchemaParser.getInstance().parse(this.graphSchema); + this.schemeJson = + IrSchemaParser.getInstance().parse(this.graphSchema, this.isColumnId); break; case JSON: default: this.graphSchema = Utils.buildSchemaFromJson(content); this.schemeJson = content; } - this.isColumnId = false; } public IrGraphSchema(GraphSchema graphSchema, boolean isColumnId) { this.graphSchema = graphSchema; - this.schemeJson = IrSchemaParser.getInstance().parse(graphSchema); + this.schemeJson = IrSchemaParser.getInstance().parse(graphSchema, isColumnId); this.isColumnId = isColumnId; }