Skip to content

Support loading from vineyard stream/dataframe using names. #166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.11
options:
--shm-size 4096m
strategy:
Expand Down Expand Up @@ -135,7 +135,7 @@ jobs:
defaults:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.11
steps:
- name: Install Dependencies
run: |
Expand Down Expand Up @@ -175,7 +175,7 @@ jobs:
defaults:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.11
steps:
- name: Install Dependencies
run: |
Expand Down
8 changes: 5 additions & 3 deletions analytical_engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ find_package(MPI REQUIRED)
include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH})

find_package(Boost REQUIRED COMPONENTS system filesystem
# required by folly
context program_options regex thread)
# required by folly
context program_options regex thread)
include_directories(SYSTEM ${Boost_INCLUDE_DIRS})
# eliminate a lot of warnings for newer version of boost library.
add_compile_options(-DBOOST_BIND_GLOBAL_PLACEHOLDERS)

include("cmake/FindGFlags.cmake")
if (GFLAGS_FOUND)
Expand All @@ -105,7 +107,7 @@ endif ()
find_package(libgrapelite REQUIRED)
include_directories(${LIBGRAPELITE_INCLUDE_DIRS})

find_package(vineyard 0.1.8 REQUIRED)
find_package(vineyard 0.1.11 REQUIRED)
include_directories(${VINEYARD_INCLUDE_DIRS})
add_compile_options(-DENABLE_SELECTOR)

Expand Down
46 changes: 32 additions & 14 deletions analytical_engine/core/loader/arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,13 @@ class ArrowFragmentLoader {
int index, int total_parts) {
// a special code path when multiple labeled vertex batches are mixed.
if (vertices.size() == 1 && vertices[0]->protocol == "vineyard") {
VLOG(2) << "read vertex table from vineyard: " << vertices[0]->values;
BOOST_LEAF_AUTO(sourceId, resolveVYObject(vertices[0]->values));
auto read_procedure = [&]()
-> boost::leaf::result<std::vector<std::shared_ptr<arrow::Table>>> {
BOOST_LEAF_AUTO(
tables,
vineyard::GatherVTables(
client_, {vineyard::ObjectIDFromString(vertices[0]->values)},
comm_spec_.local_id(), comm_spec_.local_num()));
BOOST_LEAF_AUTO(tables, vineyard::GatherVTables(
client_, {sourceId}, comm_spec_.local_id(),
comm_spec_.local_num()));
if (tables.size() == 1 && tables[0] != nullptr) {
std::shared_ptr<arrow::KeyValueMetadata> meta;
if (tables[0]->schema()->metadata() == nullptr) {
Expand Down Expand Up @@ -487,9 +487,10 @@ class ArrowFragmentLoader {
table = tmp;
} else if (vertices[i]->protocol == "vineyard") {
VLOG(2) << "read vertex table from vineyard: " << vertices[i]->values;
BOOST_LEAF_AUTO(sourceId, resolveVYObject(vertices[i]->values));
VY_OK_OR_RAISE(vineyard::ReadTableFromVineyard(
client_, vineyard::ObjectIDFromString(vertices[i]->values), table,
comm_spec_.local_id(), comm_spec_.local_num()));
client_, sourceId, table, comm_spec_.local_id(),
comm_spec_.local_num()));
if (table != nullptr) {
VLOG(2) << "schema of vertex table: "
<< table->schema()->ToString();
Expand Down Expand Up @@ -609,15 +610,17 @@ class ArrowFragmentLoader {
// a special code path when multiple labeled edge batches are mixed.
if (edges.size() == 1 && edges[0]->sub_labels.size() == 1 &&
edges[0]->sub_labels[0].protocol == "vineyard") {
LOG(INFO) << "read edge table from vineyard: "
<< edges[0]->sub_labels[0].values;
BOOST_LEAF_AUTO(sourceId,
resolveVYObject(edges[0]->sub_labels[0].values));
auto read_procedure =
[&]() -> boost::leaf::result<
std::vector<std::vector<std::shared_ptr<arrow::Table>>>> {
BOOST_LEAF_AUTO(tables,
vineyard::GatherETables(
client_,
{{vineyard::ObjectIDFromString(
edges[0]->sub_labels[0].values)}},
comm_spec_.local_id(), comm_spec_.local_num()));
vineyard::GatherETables(client_, {{sourceId}},
comm_spec_.local_id(),
comm_spec_.local_num()));
if (tables.size() == 1 && tables[0].size() == 1 &&
tables[0][0] != nullptr) {
std::shared_ptr<arrow::KeyValueMetadata> meta;
Expand Down Expand Up @@ -684,9 +687,10 @@ class ArrowFragmentLoader {
} else if (sub_labels[j].protocol == "vineyard") {
LOG(INFO) << "read edge table from vineyard: "
<< sub_labels[j].values;
BOOST_LEAF_AUTO(sourceId, resolveVYObject(sub_labels[j].values));
VY_OK_OR_RAISE(vineyard::ReadTableFromVineyard(
client_, vineyard::ObjectIDFromString(sub_labels[j].values),
table, comm_spec_.local_id(), comm_spec_.local_num()));
client_, sourceId, table, comm_spec_.local_id(),
comm_spec_.local_num()));
if (table == nullptr) {
VLOG(2) << "edge table is null";
} else {
Expand Down Expand Up @@ -718,6 +722,20 @@ class ArrowFragmentLoader {
return tables;
}

boost::leaf::result<vineyard::ObjectID> resolveVYObject(
std::string const& source) {
vineyard::ObjectID sourceId = vineyard::InvalidObjectID();
// encoding: 'o' prefix for object id, and 's' prefix for object name.
CHECK_OR_RAISE(!source.empty() && (source[0] == 'o' || source[0] == 's'));
if (source[0] == 'o') {
sourceId = vineyard::ObjectIDFromString(source.substr(1));
} else {
VY_OK_OR_RAISE(client_.GetName(source.substr(1), sourceId, true));
}
CHECK_OR_RAISE(sourceId != vineyard::InvalidObjectID());
return sourceId;
}

vineyard::Client& client_;
grape::CommSpec comm_spec_;

Expand Down
11 changes: 2 additions & 9 deletions analytical_engine/core/object/fragment_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,15 @@ class FragmentWrapper<vineyard::ArrowFragment<OID_T, VID_T>>
const std::string& copy_type) override {
auto& meta = fragment_->meta();
auto* client = dynamic_cast<vineyard::Client*>(meta.GetClient());
vineyard::ObjectMeta obj_meta;
VINEYARD_CHECK_OK(client->GetMetaData(fragment_->id(), obj_meta));
vineyard::ObjectID new_frag_id;
VINEYARD_CHECK_OK(client->CreateMetaData(obj_meta, new_frag_id));
VINEYARD_CHECK_OK(client->Persist(new_frag_id));
BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup(
*client, new_frag_id, comm_spec));
auto new_frag =
std::static_pointer_cast<fragment_t>(client->GetObject(new_frag_id));
*client, fragment_->id(), comm_spec));
auto dst_graph_def = graph_def_;

dst_graph_def.set_key(dst_graph_name);
dst_graph_def.set_vineyard_id(frag_group_id);

auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
dst_graph_name, dst_graph_def, new_frag);
dst_graph_name, dst_graph_def, fragment_);
return std::dynamic_pointer_cast<IFragmentWrapper>(wrapper);
}

Expand Down
21 changes: 7 additions & 14 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,19 @@ void LoadGraph(
client.GetObject(frag_group_id));
auto fid = comm_spec.WorkerToFrag(comm_spec.worker_id());
auto frag_id = fg->Fragments().at(fid);
vineyard::ObjectMeta obj_meta;
VINEYARD_CHECK_OK(client.GetMetaData(frag_id, obj_meta));
vineyard::ObjectID new_frag_id;
VINEYARD_CHECK_OK(client.CreateMetaData(obj_meta, new_frag_id));
VINEYARD_CHECK_OK(client.Persist(new_frag_id));
auto new_frag = std::static_pointer_cast<_GRAPH_TYPE>(
client.GetObject(new_frag_id));

VINEYARD_CHECK_OK(client.Persist(new_frag_id));
BOOST_LEAF_AUTO(
new_frag_group_id,
vineyard::ConstructFragmentGroup(client, new_frag_id, comm_spec));
auto frag =
std::static_pointer_cast<_GRAPH_TYPE>(client.GetObject(frag_id));

BOOST_LEAF_AUTO(new_frag_group_id, vineyard::ConstructFragmentGroup(
client, frag_id, comm_spec));
gs::rpc::GraphDef graph_def;

graph_def.set_key(graph_name);
graph_def.set_vineyard_id(new_frag_group_id);
gs::set_graph_def(new_frag, graph_def);
gs::set_graph_def(frag, graph_def);

auto wrapper = std::make_shared<gs::FragmentWrapper<_GRAPH_TYPE>>(
graph_name, graph_def, new_frag);
graph_name, graph_def, frag);
return std::dynamic_pointer_cast<gs::IFragmentWrapper>(wrapper);
} else {
BOOST_LEAF_AUTO(graph_info, gs::ParseCreatePropertyGraph(params));
Expand Down
5 changes: 4 additions & 1 deletion coordinator/gscoordinator/template/CMakeLists.template
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ endif ()

include_directories(${CMAKE_SOURCE_DIR})

# eliminate a lot of warnings for newer version of boost library.
add_compile_options(-DBOOST_BIND_GLOBAL_PLACEHOLDERS)

# find MPI----------------------------------------------------------------------
find_package(MPI REQUIRED)
include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH})
Expand All @@ -38,7 +41,7 @@ set(CMAKE_THREAD_PREFER_PTHREAD ON)
find_package(Threads REQUIRED)

# find vineyard-----------------------------------------------------------------
find_package(vineyard 0.1.8 REQUIRED)
find_package(vineyard 0.1.11 REQUIRED)
include_directories(${VINEYARD_INCLUDE_DIRS})
add_compile_options(-DENABLE_SELECTOR)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ find_package(Threads REQUIRED)
# we need edge src/dst ids in etable.
add_definitions(-DENDPOINT_LISTS)

find_package(vineyard 0.1.8 REQUIRED)
find_package(vineyard 0.1.11 REQUIRED)
add_library(native_store global_store_ffi.cc
htap_ds_impl.cc
graph_builder_ffi.cc
Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/tests/function_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function _start {
curl -XPOST http://localhost:${_port} -d 'import graphscope'
curl -XPOST http://localhost:${_port} -d 'graphscope.set_option(show_log=True)'
curl -XPOST http://localhost:${_port} -d 'from graphscope.framework.loader import Loader'
curl_sess="curl -XPOST http://localhost:${_port} -d 'session = graphscope.session(num_workers=${workers}, k8s_volumes={\"data\": {\"type\": \"hostPath\", \"field\": {\"path\": \"${GS_TEST_DIR}\", \"type\": \"Directory\"}, \"mounts\": {\"mountPath\": \"/testingdata\"}}}, k8s_coordinator_cpu=1.0, k8s_coordinator_mem='\''4Gi'\'', k8s_vineyard_cpu=1.0, k8s_vineyard_mem='\''4Gi'\'', k8s_vineyard_shared_mem='\''4Gi'\'', k8s_engine_cpu=1.0, k8s_engine_mem='\''4Gi'\'', k8s_gie_graph_manager_image='\''${gie_manager_image}'\'', k8s_gs_image='\''${gs_image}'\'')' --write-out %{http_code} --silent --output ./curl.tmp"
curl_sess="curl -XPOST http://localhost:${_port} -d 'session = graphscope.session(num_workers=${workers}, k8s_volumes={\"data\": {\"type\": \"hostPath\", \"field\": {\"path\": \"${GS_TEST_DIR}\", \"type\": \"Directory\"}, \"mounts\": {\"mountPath\": \"/testingdata\"}}}, k8s_coordinator_cpu=1.0, k8s_coordinator_mem='\''4Gi'\'', k8s_vineyard_cpu=1.0, k8s_vineyard_mem='\''4Gi'\'', k8s_vineyard_shared_mem='\''4Gi'\'', k8s_engine_cpu=1.0, k8s_engine_mem='\''4Gi'\'', k8s_etcd_cpu=2, k8s_gie_graph_manager_image='\''${gie_manager_image}'\'', k8s_gs_image='\''${gs_image}'\'')' --write-out %{http_code} --silent --output ./curl.tmp"

echo $curl_sess
code=`sh -c "$curl_sess"`
Expand Down
2 changes: 1 addition & 1 deletion k8s/graphscope.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# the result image includes all runtime stuffs of graphscope, with analytical engine,
# learning engine and interactive engine installed.

ARG BASE_VERSION=v0.1.8
ARG BASE_VERSION=v0.1.11
FROM registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:$BASE_VERSION as builder

ARG CI=true
Expand Down
7 changes: 6 additions & 1 deletion k8s/gsvineyard.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RUN cd /tmp && \
make -j`nproc` && \
make install && \
cd /tmp && \
git clone -b v0.1.8 https://github.com/alibaba/libvineyard.git --depth=1 && \
git clone -b v0.1.11 https://github.com/alibaba/libvineyard.git --depth=1 && \
cd libvineyard && \
git submodule update --init && \
mkdir -p /tmp/libvineyard/build && \
Expand All @@ -32,4 +32,9 @@ RUN cd /tmp && \
mkdir -p /opt/graphscope/dist && \
cp -f wheelhouse/* /opt/graphscope/dist && \
pip3 install wheelhouse/*.whl && \
cd /tmp/libvineyard/modules/io && \
python3 setup.py bdist_wheel && \
cp -f dist/* /opt/graphscope/dist && \
pip3 install dist/* && \
cd /tmp && \
rm -fr /tmp/libvineyard /tmp/libgrape-lite
8 changes: 7 additions & 1 deletion python/graphscope/deploy/tests/test_demo_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def test_demo(data_dir):
k8s_vineyard_mem="512Mi",
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_etcd_cpu=2,
k8s_vineyard_shared_mem="2Gi",
k8s_volumes=get_k8s_volumes(),
)
Expand Down Expand Up @@ -121,6 +122,7 @@ def test_demo_distribute(data_dir, modern_graph_data_dir):
k8s_vineyard_mem="512Mi",
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_etcd_cpu=2,
k8s_vineyard_shared_mem="2Gi",
k8s_volumes=get_k8s_volumes(),
)
Expand Down Expand Up @@ -201,6 +203,7 @@ def test_multiple_session(data_dir):
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_vineyard_shared_mem="2Gi",
k8s_etcd_cpu=2,
k8s_volumes=get_k8s_volumes(),
)
info = sess.info
Expand All @@ -220,6 +223,7 @@ def test_multiple_session(data_dir):
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_vineyard_shared_mem="2Gi",
k8s_etcd_cpu=2,
k8s_volumes=get_k8s_volumes(),
)

Expand All @@ -245,6 +249,7 @@ def test_query_modern_graph(modern_graph_data_dir):
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_vineyard_shared_mem="2Gi",
k8s_etcd_cpu=2,
k8s_volumes=get_k8s_volumes(),
)
graph = load_modern_graph(sess, modern_graph_data_dir)
Expand Down Expand Up @@ -278,6 +283,7 @@ def test_traversal_modern_graph(modern_graph_data_dir):
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_vineyard_shared_mem="2Gi",
k8s_etcd_cpu=2,
k8s_volumes=get_k8s_volumes(),
)
graph = load_modern_graph(sess, modern_graph_data_dir)
Expand Down Expand Up @@ -325,6 +331,7 @@ def test_serialize_roundtrip(p2p_property_dir):
k8s_engine_cpu=0.1,
k8s_engine_mem="1500Mi",
k8s_vineyard_shared_mem="2Gi",
k8s_etcd_cpu=2,
k8s_volumes=get_k8s_volumes(),
)
graph = sess.load_from(
Expand All @@ -346,7 +353,6 @@ def test_serialize_roundtrip(p2p_property_dir):
generate_eid=False,
)
graph.serialize("/tmp/serialize")
graph.unload()
new_graph = Graph.deserialize("/tmp/serialize", sess)
pg = new_graph.project_to_simple(0, 0, 0, 2)
ctx = graphscope.sssp(pg, src=6)
Expand Down
Loading