Skip to content

Commit

Permalink
Support loading from vineyard stream/dataframe using names. (#166)
Browse files Browse the repository at this point in the history
* And drop the vineyardd connection in subgraph query.
* Define BOOST_BIND_GLOBAL_PLACEHOLDERS to fixes such warnings.
* Drop some necessary work in the "copy graph" part.
* Don't unload graph in serialization tests.
* Use more cpu resources for etcd in CI.

Signed-off-by: Tao He <[email protected]>
  • Loading branch information
sighingnow authored Feb 25, 2021
1 parent d0251fa commit 70bb6e5
Show file tree
Hide file tree
Showing 19 changed files with 126 additions and 142 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.11
options:
--shm-size 4096m
strategy:
Expand Down Expand Up @@ -135,7 +135,7 @@ jobs:
defaults:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.11
steps:
- name: Install Dependencies
run: |
Expand Down Expand Up @@ -175,7 +175,7 @@ jobs:
defaults:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.11
steps:
- name: Install Dependencies
run: |
Expand Down
8 changes: 5 additions & 3 deletions analytical_engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ find_package(MPI REQUIRED)
include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH})

find_package(Boost REQUIRED COMPONENTS system filesystem
# required by folly
context program_options regex thread)
# required by folly
context program_options regex thread)
include_directories(SYSTEM ${Boost_INCLUDE_DIRS})
# eliminate a lot of warnings for newer version of boost library.
add_compile_options(-DBOOST_BIND_GLOBAL_PLACEHOLDERS)

include("cmake/FindGFlags.cmake")
if (GFLAGS_FOUND)
Expand All @@ -105,7 +107,7 @@ endif ()
find_package(libgrapelite REQUIRED)
include_directories(${LIBGRAPELITE_INCLUDE_DIRS})

find_package(vineyard 0.1.8 REQUIRED)
find_package(vineyard 0.1.11 REQUIRED)
include_directories(${VINEYARD_INCLUDE_DIRS})
add_compile_options(-DENABLE_SELECTOR)

Expand Down
46 changes: 32 additions & 14 deletions analytical_engine/core/loader/arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,13 @@ class ArrowFragmentLoader {
int index, int total_parts) {
// a special code path when multiple labeled vertex batches are mixed.
if (vertices.size() == 1 && vertices[0]->protocol == "vineyard") {
VLOG(2) << "read vertex table from vineyard: " << vertices[0]->values;
BOOST_LEAF_AUTO(sourceId, resolveVYObject(vertices[0]->values));
auto read_procedure = [&]()
-> boost::leaf::result<std::vector<std::shared_ptr<arrow::Table>>> {
BOOST_LEAF_AUTO(
tables,
vineyard::GatherVTables(
client_, {vineyard::ObjectIDFromString(vertices[0]->values)},
comm_spec_.local_id(), comm_spec_.local_num()));
BOOST_LEAF_AUTO(tables, vineyard::GatherVTables(
client_, {sourceId}, comm_spec_.local_id(),
comm_spec_.local_num()));
if (tables.size() == 1 && tables[0] != nullptr) {
std::shared_ptr<arrow::KeyValueMetadata> meta;
if (tables[0]->schema()->metadata() == nullptr) {
Expand Down Expand Up @@ -487,9 +487,10 @@ class ArrowFragmentLoader {
table = tmp;
} else if (vertices[i]->protocol == "vineyard") {
VLOG(2) << "read vertex table from vineyard: " << vertices[i]->values;
BOOST_LEAF_AUTO(sourceId, resolveVYObject(vertices[i]->values));
VY_OK_OR_RAISE(vineyard::ReadTableFromVineyard(
client_, vineyard::ObjectIDFromString(vertices[i]->values), table,
comm_spec_.local_id(), comm_spec_.local_num()));
client_, sourceId, table, comm_spec_.local_id(),
comm_spec_.local_num()));
if (table != nullptr) {
VLOG(2) << "schema of vertex table: "
<< table->schema()->ToString();
Expand Down Expand Up @@ -609,15 +610,17 @@ class ArrowFragmentLoader {
// a special code path when multiple labeled edge batches are mixed.
if (edges.size() == 1 && edges[0]->sub_labels.size() == 1 &&
edges[0]->sub_labels[0].protocol == "vineyard") {
LOG(INFO) << "read edge table from vineyard: "
<< edges[0]->sub_labels[0].values;
BOOST_LEAF_AUTO(sourceId,
resolveVYObject(edges[0]->sub_labels[0].values));
auto read_procedure =
[&]() -> boost::leaf::result<
std::vector<std::vector<std::shared_ptr<arrow::Table>>>> {
BOOST_LEAF_AUTO(tables,
vineyard::GatherETables(
client_,
{{vineyard::ObjectIDFromString(
edges[0]->sub_labels[0].values)}},
comm_spec_.local_id(), comm_spec_.local_num()));
vineyard::GatherETables(client_, {{sourceId}},
comm_spec_.local_id(),
comm_spec_.local_num()));
if (tables.size() == 1 && tables[0].size() == 1 &&
tables[0][0] != nullptr) {
std::shared_ptr<arrow::KeyValueMetadata> meta;
Expand Down Expand Up @@ -684,9 +687,10 @@ class ArrowFragmentLoader {
} else if (sub_labels[j].protocol == "vineyard") {
LOG(INFO) << "read edge table from vineyard: "
<< sub_labels[j].values;
BOOST_LEAF_AUTO(sourceId, resolveVYObject(sub_labels[j].values));
VY_OK_OR_RAISE(vineyard::ReadTableFromVineyard(
client_, vineyard::ObjectIDFromString(sub_labels[j].values),
table, comm_spec_.local_id(), comm_spec_.local_num()));
client_, sourceId, table, comm_spec_.local_id(),
comm_spec_.local_num()));
if (table == nullptr) {
VLOG(2) << "edge table is null";
} else {
Expand Down Expand Up @@ -718,6 +722,20 @@ class ArrowFragmentLoader {
return tables;
}

boost::leaf::result<vineyard::ObjectID> resolveVYObject(
std::string const& source) {
vineyard::ObjectID sourceId = vineyard::InvalidObjectID();
// encoding: 'o' prefix for object id, and 's' prefix for object name.
CHECK_OR_RAISE(!source.empty() && (source[0] == 'o' || source[0] == 's'));
if (source[0] == 'o') {
sourceId = vineyard::ObjectIDFromString(source.substr(1));
} else {
VY_OK_OR_RAISE(client_.GetName(source.substr(1), sourceId, true));
}
CHECK_OR_RAISE(sourceId != vineyard::InvalidObjectID());
return sourceId;
}

vineyard::Client& client_;
grape::CommSpec comm_spec_;

Expand Down
11 changes: 2 additions & 9 deletions analytical_engine/core/object/fragment_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,15 @@ class FragmentWrapper<vineyard::ArrowFragment<OID_T, VID_T>>
const std::string& copy_type) override {
auto& meta = fragment_->meta();
auto* client = dynamic_cast<vineyard::Client*>(meta.GetClient());
vineyard::ObjectMeta obj_meta;
VINEYARD_CHECK_OK(client->GetMetaData(fragment_->id(), obj_meta));
vineyard::ObjectID new_frag_id;
VINEYARD_CHECK_OK(client->CreateMetaData(obj_meta, new_frag_id));
VINEYARD_CHECK_OK(client->Persist(new_frag_id));
BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup(
*client, new_frag_id, comm_spec));
auto new_frag =
std::static_pointer_cast<fragment_t>(client->GetObject(new_frag_id));
*client, fragment_->id(), comm_spec));
auto dst_graph_def = graph_def_;

dst_graph_def.set_key(dst_graph_name);
dst_graph_def.set_vineyard_id(frag_group_id);

auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
dst_graph_name, dst_graph_def, new_frag);
dst_graph_name, dst_graph_def, fragment_);
return std::dynamic_pointer_cast<IFragmentWrapper>(wrapper);
}

Expand Down
21 changes: 7 additions & 14 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,19 @@ void LoadGraph(
client.GetObject(frag_group_id));
auto fid = comm_spec.WorkerToFrag(comm_spec.worker_id());
auto frag_id = fg->Fragments().at(fid);
vineyard::ObjectMeta obj_meta;
VINEYARD_CHECK_OK(client.GetMetaData(frag_id, obj_meta));
vineyard::ObjectID new_frag_id;
VINEYARD_CHECK_OK(client.CreateMetaData(obj_meta, new_frag_id));
VINEYARD_CHECK_OK(client.Persist(new_frag_id));
auto new_frag = std::static_pointer_cast<_GRAPH_TYPE>(
client.GetObject(new_frag_id));

VINEYARD_CHECK_OK(client.Persist(new_frag_id));
BOOST_LEAF_AUTO(
new_frag_group_id,
vineyard::ConstructFragmentGroup(client, new_frag_id, comm_spec));
auto frag =
std::static_pointer_cast<_GRAPH_TYPE>(client.GetObject(frag_id));

BOOST_LEAF_AUTO(new_frag_group_id, vineyard::ConstructFragmentGroup(
client, frag_id, comm_spec));
gs::rpc::GraphDef graph_def;

graph_def.set_key(graph_name);
graph_def.set_vineyard_id(new_frag_group_id);
gs::set_graph_def(new_frag, graph_def);
gs::set_graph_def(frag, graph_def);

auto wrapper = std::make_shared<gs::FragmentWrapper<_GRAPH_TYPE>>(
graph_name, graph_def, new_frag);
graph_name, graph_def, frag);
return std::dynamic_pointer_cast<gs::IFragmentWrapper>(wrapper);
} else {
BOOST_LEAF_AUTO(graph_info, gs::ParseCreatePropertyGraph(params));
Expand Down
5 changes: 4 additions & 1 deletion coordinator/gscoordinator/template/CMakeLists.template
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ endif ()

include_directories(${CMAKE_SOURCE_DIR})

# eliminate a lot of warnings for newer version of boost library.
add_compile_options(-DBOOST_BIND_GLOBAL_PLACEHOLDERS)

# find MPI----------------------------------------------------------------------
find_package(MPI REQUIRED)
include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH})
Expand All @@ -38,7 +41,7 @@ set(CMAKE_THREAD_PREFER_PTHREAD ON)
find_package(Threads REQUIRED)

# find vineyard-----------------------------------------------------------------
find_package(vineyard 0.1.8 REQUIRED)
find_package(vineyard 0.1.11 REQUIRED)
include_directories(${VINEYARD_INCLUDE_DIRS})
add_compile_options(-DENABLE_SELECTOR)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ find_package(Threads REQUIRED)
# we need edge src/dst ids in etable.
add_definitions(-DENDPOINT_LISTS)

find_package(vineyard 0.1.8 REQUIRED)
find_package(vineyard 0.1.11 REQUIRED)
add_library(native_store global_store_ffi.cc
htap_ds_impl.cc
graph_builder_ffi.cc
Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/tests/function_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function _start {
curl -XPOST http://localhost:${_port} -d 'import graphscope'
curl -XPOST http://localhost:${_port} -d 'graphscope.set_option(show_log=True)'
curl -XPOST http://localhost:${_port} -d 'from graphscope.framework.loader import Loader'
curl_sess="curl -XPOST http://localhost:${_port} -d 'session = graphscope.session(num_workers=${workers}, k8s_volumes={\"data\": {\"type\": \"hostPath\", \"field\": {\"path\": \"${GS_TEST_DIR}\", \"type\": \"Directory\"}, \"mounts\": {\"mountPath\": \"/testingdata\"}}}, k8s_coordinator_cpu=1.0, k8s_coordinator_mem='\''4Gi'\'', k8s_vineyard_cpu=1.0, k8s_vineyard_mem='\''4Gi'\'', k8s_vineyard_shared_mem='\''4Gi'\'', k8s_engine_cpu=1.0, k8s_engine_mem='\''4Gi'\'', k8s_gie_graph_manager_image='\''${gie_manager_image}'\'', k8s_gs_image='\''${gs_image}'\'')' --write-out %{http_code} --silent --output ./curl.tmp"
curl_sess="curl -XPOST http://localhost:${_port} -d 'session = graphscope.session(num_workers=${workers}, k8s_volumes={\"data\": {\"type\": \"hostPath\", \"field\": {\"path\": \"${GS_TEST_DIR}\", \"type\": \"Directory\"}, \"mounts\": {\"mountPath\": \"/testingdata\"}}}, k8s_coordinator_cpu=1.0, k8s_coordinator_mem='\''4Gi'\'', k8s_vineyard_cpu=1.0, k8s_vineyard_mem='\''4Gi'\'', k8s_vineyard_shared_mem='\''4Gi'\'', k8s_engine_cpu=1.0, k8s_engine_mem='\''4Gi'\'', k8s_etcd_cpu=2, k8s_gie_graph_manager_image='\''${gie_manager_image}'\'', k8s_gs_image='\''${gs_image}'\'')' --write-out %{http_code} --silent --output ./curl.tmp"

echo $curl_sess
code=`sh -c "$curl_sess"`
Expand Down
2 changes: 1 addition & 1 deletion k8s/graphscope.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# the result image includes all runtime stuffs of graphscope, with analytical engine,
# learning engine and interactive engine installed.

ARG BASE_VERSION=v0.1.8
ARG BASE_VERSION=v0.1.11
FROM registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:$BASE_VERSION as builder

ARG CI=true
Expand Down
7 changes: 6 additions & 1 deletion k8s/gsvineyard.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RUN cd /tmp && \
make -j`nproc` && \
make install && \
cd /tmp && \
git clone -b v0.1.8 https://github.com/alibaba/libvineyard.git --depth=1 && \
git clone -b v0.1.11 https://github.com/alibaba/libvineyard.git --depth=1 && \
cd libvineyard && \
git submodule update --init && \
mkdir -p /tmp/libvineyard/build && \
Expand All @@ -32,4 +32,9 @@ RUN cd /tmp && \
mkdir -p /opt/graphscope/dist && \
cp -f wheelhouse/* /opt/graphscope/dist && \
pip3 install wheelhouse/*.whl && \
cd /tmp/libvineyard/modules/io && \
python3 setup.py bdist_wheel && \
cp -f dist/* /opt/graphscope/dist && \
pip3 install dist/* && \
cd /tmp && \
rm -fr /tmp/libvineyard /tmp/libgrape-lite
8 changes: 7 additions & 1 deletion python/graphscope/deploy/tests/test_demo_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def test_demo(data_dir):
k8s_vineyard_mem="512Mi",
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_etcd_cpu=2,
k8s_vineyard_shared_mem="2Gi",
k8s_volumes=get_k8s_volumes(),
)
Expand Down Expand Up @@ -121,6 +122,7 @@ def test_demo_distribute(data_dir, modern_graph_data_dir):
k8s_vineyard_mem="512Mi",
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_etcd_cpu=2,
k8s_vineyard_shared_mem="2Gi",
k8s_volumes=get_k8s_volumes(),
)
Expand Down Expand Up @@ -201,6 +203,7 @@ def test_multiple_session(data_dir):
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_vineyard_shared_mem="2Gi",
k8s_etcd_cpu=2,
k8s_volumes=get_k8s_volumes(),
)
info = sess.info
Expand All @@ -220,6 +223,7 @@ def test_multiple_session(data_dir):
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_vineyard_shared_mem="2Gi",
k8s_etcd_cpu=2,
k8s_volumes=get_k8s_volumes(),
)

Expand All @@ -245,6 +249,7 @@ def test_query_modern_graph(modern_graph_data_dir):
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_vineyard_shared_mem="2Gi",
k8s_etcd_cpu=2,
k8s_volumes=get_k8s_volumes(),
)
graph = load_modern_graph(sess, modern_graph_data_dir)
Expand Down Expand Up @@ -278,6 +283,7 @@ def test_traversal_modern_graph(modern_graph_data_dir):
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_vineyard_shared_mem="2Gi",
k8s_etcd_cpu=2,
k8s_volumes=get_k8s_volumes(),
)
graph = load_modern_graph(sess, modern_graph_data_dir)
Expand Down Expand Up @@ -325,6 +331,7 @@ def test_serialize_roundtrip(p2p_property_dir):
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_vineyard_shared_mem="2Gi",
k8s_etcd_cpu=2,
k8s_volumes=get_k8s_volumes(),
)
graph = sess.load_from(
Expand All @@ -346,7 +353,6 @@ def test_serialize_roundtrip(p2p_property_dir):
generate_eid=False,
)
graph.serialize("/tmp/serialize")
graph.unload()
new_graph = Graph.deserialize("/tmp/serialize", sess)
pg = new_graph.project_to_simple(0, 0, 0, 2)
ctx = graphscope.sssp(pg, src=6)
Expand Down
Loading

0 comments on commit 70bb6e5

Please sign in to comment.