Skip to content

Commit

Permalink
refactor(flex): Refine the schema definition of rt_mutable_graph (#…
Browse files Browse the repository at this point in the history
…3079)

<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

- Adapt to a schema definition, change `modern_graph.yaml`,
`bulk_load.yaml`.
- Introduce new test to verify graph loading.
- All csr related params are put in `x_csr_params`
- Our schema definition support each label of Vertex has at least one
primary key. But current implementation doesn't support Primary
keys(Maybe supported later).

Other repos reflected by this PR: gstest, flex_ldbc_snb

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes #3077
  • Loading branch information
zhanglei1949 authored Aug 17, 2023
1 parent bc2edb8 commit 1b9903e
Show file tree
Hide file tree
Showing 24 changed files with 1,912 additions and 656 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/flex.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,27 @@ jobs:
cmake .. && sudo make -j$(nproc)
export FLEX_DATA_DIR=../../../../storages/rt_mutable_graph/modern_graph/
./run_grin_test
- name: Test Graph Loading on modern graph
env:
FLEX_DATA_DIR: ${{ github.workspace }}/flex/storages/rt_mutable_graph/modern_graph/
run: |
cd ${GITHUB_WORKSPACE}/flex/build/
SCHEMA_FILE=../storages/rt_mutable_graph/modern_graph/modern_graph.yaml
BULK_LOAD_FILE=../storages/rt_mutable_graph/modern_graph/bulk_load.yaml
GLOG_v=10 ./tests/rt_mutable_graph/test_graph_loading ${SCHEMA_FILE} ${BULK_LOAD_FILE} /tmp/csr-data-dir/
- name: Test Graph Loading on LDBC SNB sf0.1
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest/
FLEX_DATA_DIR: ${{ github.workspace }}/gstest/flex/ldbc-sf01-long-date/
run: |
# remove modern graph indices
rm -rf /tmp/csr-data-dir/
git clone -b master --single-branch --depth=1 https://github.com/GraphScope/gstest.git ${GS_TEST_DIR}
cd ${GITHUB_WORKSPACE}/flex/build/
SCHEMA_FILE=${FLEX_DATA_DIR}/audit_graph_schema.yaml
BULK_LOAD_FILE=${FLEX_DATA_DIR}/audit_bulk_load.yaml
GLOG_v=10 ./tests/rt_mutable_graph/test_graph_loading ${SCHEMA_FILE} ${BULK_LOAD_FILE} /tmp/csr-data-dir/ 2
7 changes: 4 additions & 3 deletions flex/bin/rt_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ int main(int argc, char** argv) {
double t0 = -grape::GetCurrentTime();
auto& db = gs::GraphDB::get();

auto ret = gs::Schema::LoadFromYaml(graph_schema_path, bulk_load_config_path);
db.Init(std::get<0>(ret), std::get<1>(ret), std::get<2>(ret),
std::get<3>(ret), data_path, shard_num);
auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
auto loading_config =
gs::LoadingConfig::ParseFromYaml(schema, bulk_load_config_path);
db.Init(schema, loading_config, data_path, shard_num);

t0 += grape::GetCurrentTime();

Expand Down
8 changes: 5 additions & 3 deletions flex/bin/sync_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "flex/engines/hqps_db/database/mutable_csr_interface.h"
#include "flex/engines/http_server/codegen_proxy.h"
#include "flex/engines/http_server/stored_procedure.h"
#include "flex/storages/rt_mutable_graph/loading_config.h"

#include <yaml-cpp/yaml.h>
#include <boost/program_options.hpp>
Expand Down Expand Up @@ -205,9 +206,10 @@ int main(int argc, char** argv) {
double t0 = -grape::GetCurrentTime();
auto& db = gs::GraphDB::get();

auto ret = gs::Schema::LoadFromYaml(graph_schema_path, bulk_load_config_path);
db.Init(std::get<0>(ret), std::get<1>(ret), std::get<2>(ret),
std::get<3>(ret), data_path, shard_num);
auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
auto loading_config =
gs::LoadingConfig::ParseFromYaml(schema, bulk_load_config_path);
db.Init(schema, loading_config, data_path, shard_num);

t0 += grape::GetCurrentTime();

Expand Down
21 changes: 9 additions & 12 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,33 @@ GraphDB& GraphDB::get() {
return db;
}

void GraphDB::Init(
const Schema& schema,
const std::vector<std::pair<std::string, std::string>>& vertex_files,
const std::vector<std::tuple<std::string, std::string, std::string,
std::string>>& edge_files,
const std::vector<std::string>& plugins, const std::string& data_dir,
int thread_num) {
void GraphDB::Init(const Schema& schema, const LoadingConfig& load_config,
const std::string& data_dir, int thread_num) {
std::filesystem::path data_dir_path(data_dir);
if (!std::filesystem::exists(data_dir_path)) {
std::filesystem::create_directory(data_dir_path);
}

std::filesystem::path serial_path = data_dir_path / "init_snapshot.bin";
if (!std::filesystem::exists(serial_path)) {
if (!vertex_files.empty() || !edge_files.empty()) {
if (!load_config.GetVertexLoadingMeta().empty() ||
!load_config.GetEdgeLoadingMeta().empty()) {
LOG(INFO) << "Initializing graph db through bulk loading";
{
MutablePropertyFragment graph;
graph.Init(schema, vertex_files, edge_files, thread_num);
graph.Init(schema, load_config, thread_num);
graph.Serialize(data_dir_path.string());
}
graph_.Deserialize(data_dir_path.string());
} else {
LOG(INFO) << "Initializing empty graph db";
graph_.Init(schema, vertex_files, edge_files, thread_num);
graph_.Init(schema, load_config, thread_num);
graph_.Serialize(data_dir_path.string());
}
} else {
LOG(INFO) << "Initializing graph db from data files of work directory";
if (!vertex_files.empty() || !edge_files.empty()) {
if (!load_config.GetVertexLoadingMeta().empty() ||
!load_config.GetEdgeLoadingMeta().empty()) {
LOG(WARNING) << "Bulk loading is ignored because data files of work "
"directory exist";
}
Expand Down Expand Up @@ -109,7 +106,7 @@ void GraphDB::Init(
contexts_[i].logger.open(wal_dir.string(), i);
}

initApps(plugins);
initApps(schema.GetPluginsList());
}

ReadTransaction GraphDB::GetReadTransaction() {
Expand Down
10 changes: 3 additions & 7 deletions flex/engines/graph_db/database/graph_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "flex/engines/graph_db/database/single_vertex_insert_transaction.h"
#include "flex/engines/graph_db/database/update_transaction.h"
#include "flex/engines/graph_db/database/version_manager.h"
#include "flex/storages/rt_mutable_graph/loading_config.h"
#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h"

namespace gs {
Expand All @@ -45,13 +46,8 @@ class GraphDB {

static GraphDB& get();

void Init(
const Schema& schema,
const std::vector<std::pair<std::string, std::string>>& vertex_files,
const std::vector<std::tuple<std::string, std::string, std::string,
std::string>>& edge_files,
const std::vector<std::string>& plugins, const std::string& data_dir,
int thread_num = 1);
void Init(const Schema& schema, const LoadingConfig& config,
const std::string& data_dir, int thread_num = 1);

/** @brief Create a transaction to read vertices and edges.
*
Expand Down
1 change: 1 addition & 0 deletions flex/engines/graph_db/grin/src/predefine.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <vector>

#include "grin/predefine.h"
#include "storages/rt_mutable_graph/loading_config.h"
#include "storages/rt_mutable_graph/mutable_property_fragment.h"

typedef gs::oid_t GRIN_OID_T;
Expand Down
10 changes: 4 additions & 6 deletions flex/engines/graph_db/grin/src/topology/structure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@ GRIN_GRAPH grin_get_graph_from_storage(const char* uri) {
!(std::filesystem::exists(bulk_load_config_path))) {
return GRIN_NULL_GRAPH;
}
auto ret = gs::Schema::LoadFromYaml(graph_schema_path, bulk_load_config_path);
const auto& schema = std::get<0>(ret);
auto& vertex_files = std::get<1>(ret);

auto& edge_files = std::get<2>(ret);
auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
auto loading_config =
gs::LoadingConfig::ParseFromYaml(schema, bulk_load_config_path);

GRIN_GRAPH_T* g = new GRIN_GRAPH_T();
g->g.Init(schema, vertex_files, edge_files);
g->g.Init(schema, loading_config);
init_cache(g);
return g;
}
Expand Down
120 changes: 72 additions & 48 deletions flex/storages/rt_mutable_graph/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,64 +25,88 @@ The configuration file ([modern graph example](./modern_graph/modern_graph.yaml)
Here is an example of a configuration file:

```yaml
graph:
graph_store: mutable_csr
vertex:
- label_name: person
name: modern
store_type: mutable_csr
stored_procedures:
directory: plugins
enable_lists:
- libxxx.so
schema:
vertex_types:
- type_name: person
x_csr_params:
max_vertex_num: 100
properties:
- name: _ID
type: int64
- name: name
type: String
- name: age
type: int32
max_vertex_num: 100
- label_name: software
- property_id: 0
property_name: id
property_type:
primitive_type: DT_SIGNED_INT64
- property_id: 1
property_name: name
property_type:
primitive_type: DT_STRING
- property_id: 2
property_name: age
property_type:
primitive_type: DT_SIGNED_INT32
primary_keys:
- id
- type_name: software
x_csr_params:
max_vertex_num: 100
properties:
- name: _ID
type: int64
- name: name
type: String
- name: lang
type: String
max_vertex_num: 100
edge:
- src_label_name: person
dst_label_name: software
edge_label_name: created
- property_id: 0
property_name: id
property_type:
primitive_type: DT_SIGNED_INT64
x_csr_params:
- property_id: 1
property_name: name
property_type:
primitive_type: DT_STRING
- property_id: 2
property_name: lang
property_type:
primitive_type: DT_STRING
primary_keys:
- id
edge_types:
- type_name: knows
x_csr_params:
incoming_edge_strategy: None
outgoing_edge_strategy: Multiple
vertex_type_pair_relations:
source_vertex: person
destination_vertex: person
relation: MANY_TO_MANY
properties:
- name: _SRC
type: int64
- name: _DST
type: int64
- name: weight
type: double
incoming_edge_strategy: None
outgoing_edge_strategy: Single
- src_label_name: person
dst_label_name: person
edge_label_name: knows
- property_id: 0
property_name: weight
property_type:
primitive_type: DT_DOUBLE
- type_name: created
x_csr_params:
incoming_edge_strategy: None
outgoing_edge_strategy: Single
vertex_type_pair_relations:
source_vertex: person
destination_vertex: software
relation: ONE_TO_MANY
properties:
- name: _SRC
type: int64
- name: _DST
type: int64
- name: weight
type: double
incoming_edge_strategy: None
outgoing_edge_strategy: Multiple

stored_procedures:
- libxxx.so
- property_id: 0
property_name: weight
property_type:
primitive_type: DT_DOUBLE
```
Notes:
- `_ID`, `_SRC`, `_DST` are reserved words, they are the external id of vertices, only int64 type is supported.
- `max_vertex_num` limit the number of vertices of this type:
- Currently we only support one primary key, and the type has to be `DT_SIGNED_INT64`.
- All implementation related configuration are put under x_csr_params.
- `max_vertex_num` limit the number of vertices of this type:
- The limit number is used to `mmap` memory, so it only takes virtual memory before vertices are actually inserted.
- If `max_vertex_num` is not set, a default large number (e.g.: 2^48) will be used.
- `incoming/outgoing_edge_strategy` specifies the storing strategy of the incoming or outgoing edges of this type, there are 3 kinds of strategies
- `incoming/outgoing_edge_strategy` specifies the storing strategy of the incoming or outgoing edges of this type, there are 3 kinds of strategies
- None: no edge will be stored
- Single: only one edge will be stored
- Multiple(default): multiple edges will be stored
Expand Down
Loading

0 comments on commit 1b9903e

Please sign in to comment.