Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serialize and deserialize method for graph. #103

Merged
merged 1 commit into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.7
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8
options:
--shm-size 4096m
strategy:
Expand Down Expand Up @@ -133,7 +133,7 @@ jobs:
defaults:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.7
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8
steps:
- name: Install Dependencies
run: |
Expand Down Expand Up @@ -173,7 +173,7 @@ jobs:
defaults:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.7
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8
steps:
- name: Install Dependencies
run: |
Expand Down Expand Up @@ -375,6 +375,7 @@ jobs:
tar -xf ./gle-${{ github.sha }}/gle.tar
tar -xf ./gie-${{ github.sha }}/gie.tar
tar -xf ./manager-${{ github.sha }}/manager.tar
sudo docker pull registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-runtime:latest
sudo docker build -t registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }} \
--network=host \
-f ./graphscope.Dockerfile .
Expand All @@ -387,6 +388,11 @@ jobs:
python3 -m pip install pytest-cov --user
pushd python && sudo -E python3 setup.py develop && popd

# Copy Vineyard drivers to /usr/local
sudo rm -f ./artifacts/opt/graphscope/bin/vineyard-codegen || true
sudo cp -rvf ./artifacts/opt/graphscope/bin/* /usr/local/bin/
sudo cp -rvf ./artifacts/opt/graphscope/share/vineyard /usr/local/share/

- name: Kubernetes test
env:
CHANGE_MINIKUBE_NONE_USER: true
Expand Down Expand Up @@ -453,6 +459,7 @@ jobs:
tar -xf ./gle-${{ github.sha }}/gle.tar
tar -xf ./gie-${{ github.sha }}/gie.tar
tar -xf ./manager-${{ github.sha }}/manager.tar
sudo docker pull registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-runtime:latest
sudo docker build -t registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }} \
--network=host \
-f ./graphscope.Dockerfile .
Expand All @@ -471,7 +478,7 @@ jobs:
cd interactive_engine/tests
./function_test.sh 8111 1 registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }} \
registry.cn-hongkong.aliyuncs.com/graphscope/maxgraph_standalone_manager:${{ github.sha }}
./function_test.sh 8111 2 registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }} \
./function_test.sh 8112 2 registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }} \
registry.cn-hongkong.aliyuncs.com/graphscope/maxgraph_standalone_manager:${{ github.sha }}
- name: Clean
run: |
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.2
0.1.3
4 changes: 2 additions & 2 deletions analytical_engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 2.8)
if ("${GRAPHSCOPE_VERSION}" STREQUAL "")
set(GRAPHSCOPE_ANALYTICAL_MAJOR_VERSION 0)
set(GRAPHSCOPE_ANALYTICAL_MINOR_VERSION 1)
set(GRAPHSCOPE_ANALYTICAL_PATCH_VERSION 2)
set(GRAPHSCOPE_ANALYTICAL_PATCH_VERSION 3)
set(GRAPHSCOPE_ANALYTICAL_VERSION ${GRAPHSCOPE_ANALYTICAL_MAJOR_VERSION}.${GRAPHSCOPE_ANALYTICAL_MINOR_VERSION}.${GRAPHSCOPE_ANALYTICAL_PATCH_VERSION})
else ()
set(GRAPHSCOPE_ANALYTICAL_MAJOR_VERSION ${GRAPHSCOPE_MAJOR_VERSION})
Expand Down Expand Up @@ -105,7 +105,7 @@ endif ()
find_package(libgrapelite REQUIRED)
include_directories(${LIBGRAPELITE_INCLUDE_DIRS})

find_package(vineyard 0.1.7 REQUIRED)
find_package(vineyard 0.1.8 REQUIRED)
include_directories(${VINEYARD_INCLUDE_DIRS})
add_compile_options(-DENABLE_SELECTOR)

Expand Down
2 changes: 1 addition & 1 deletion analytical_engine/benchmarks/projected_graph_benchmarks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ int main(int argc, char** argv) {

LOG(INFO) << "Connected to IPCServer: " << ipc_socket;

vineyard::ObjectID fragment_id = vineyard::VYObjectIDFromString(frag_id_str);
vineyard::ObjectID fragment_id = vineyard::ObjectIDFromString(frag_id_str);

MPI_Barrier(comm_spec.comm());

Expand Down
2 changes: 1 addition & 1 deletion analytical_engine/benchmarks/property_graph_benchmarks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ int main(int argc, char** argv) {

LOG(INFO) << "Connected to IPCServer: " << ipc_socket;

vineyard::ObjectID fragment_id = vineyard::VYObjectIDFromString(frag_id_str);
vineyard::ObjectID fragment_id = vineyard::ObjectIDFromString(frag_id_str);

MPI_Barrier(comm_spec.comm());

Expand Down
6 changes: 3 additions & 3 deletions analytical_engine/benchmarks/property_graph_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ int main(int argc, char** argv) {
LOG(INFO) << "[property graph ids]:";
}
LOG(INFO) << "\n[frag-" << comm_spec.fid()
<< "]: " << vineyard::VYObjectIDToString(fragment_id);
<< "]: " << vineyard::ObjectIDToString(fragment_id);

MPI_Barrier(comm_spec.comm());

Expand All @@ -141,14 +141,14 @@ int main(int argc, char** argv) {
LOG(INFO) << "[empty graph ids]:";
}
LOG(INFO) << "\n[frag-" << comm_spec.fid()
<< "]: " << vineyard::VYObjectIDToString(empty_frag_id);
<< "]: " << vineyard::ObjectIDToString(empty_frag_id);
MPI_Barrier(comm_spec.comm());

if (comm_spec.worker_id() == 0) {
LOG(INFO) << "[ed graph ids]:";
}
LOG(INFO) << "\n[frag-" << comm_spec.fid()
<< "]: " << vineyard::VYObjectIDToString(ed_frag_id);
<< "]: " << vineyard::ObjectIDToString(ed_frag_id);

MPI_Barrier(comm_spec.comm());

Expand Down
32 changes: 14 additions & 18 deletions analytical_engine/core/context/mpi_object_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <vector>

#include "grape/worker/comm_spec.h"
#include "vineyard/basic/ds/object_set.h"
#include "vineyard/client/client.h"

namespace gs {
Expand All @@ -41,43 +40,40 @@ class MPIObjectSync {

void GatherWorkerObjectID(vineyard::Client& client,
grape::CommSpec const& comm_spec,
vineyard::ObjectID const object_id,
vineyard::ObjectSetBuilder& target_chunk_map) {
vineyard::ObjectID object_id,
std::vector<vineyard::ObjectID>& assembled_ids) {
// gather chunk id per worker, and add to the target chunkmap
if (comm_spec.worker_id() == 0) {
target_chunk_map.AddObject(client.instance_id(), object_id);
assembled_ids.push_back(object_id);
for (int src_worker_id = 1; src_worker_id < comm_spec.worker_num();
++src_worker_id) {
std::pair<vineyard::InstanceID, vineyard::ObjectID> chunk;
grape::recv_buffer(&chunk, 1, src_worker_id, comm_spec.comm(), 0x10);
target_chunk_map.AddObject(chunk.first, chunk.second);
vineyard::ObjectID recv_object_id;
grape::recv_buffer(&recv_object_id, 1, src_worker_id, comm_spec.comm(),
0x10);
assembled_ids.push_back(recv_object_id);
}
} else {
auto chunk = std::make_pair(client.instance_id(), object_id);
grape::send_buffer(&chunk, 1, 0, comm_spec.comm(), 0x10);
grape::send_buffer(&object_id, 1, 0, comm_spec.comm(), 0x10);
}
}

void GatherWorkerObjectIDs(vineyard::Client& client,
grape::CommSpec const& comm_spec,
std::vector<vineyard::ObjectID> const& object_ids,
vineyard::ObjectSetBuilder& target_chunk_map) {
// gather chunk id per worker, and add to the target chunkmap
std::vector<vineyard::ObjectID>& assembled_ids) {
// gather chunk id vector per worker, and add to the target chunkmap
if (comm_spec.worker_id() == 0) {
target_chunk_map.AddObjects(client.instance_id(), object_ids);
assembled_ids.insert(assembled_ids.end(), object_ids.begin(),
object_ids.end());
for (int src_worker_id = 1; src_worker_id < comm_spec.worker_num();
++src_worker_id) {
vineyard::InstanceID instance_id = vineyard::UnspecifiedInstanceID();
std::vector<vineyard::ObjectID> recv_object_ids;
grape::recv_buffer(&instance_id, 1, src_worker_id, comm_spec.comm(),
0x11);
grape::RecvVector(recv_object_ids, src_worker_id, comm_spec.comm(),
0x12);
target_chunk_map.AddObjects(instance_id, recv_object_ids);
assembled_ids.insert(assembled_ids.end(), recv_object_ids.begin(),
recv_object_ids.end());
}
} else {
auto instance_id = client.instance_id();
grape::send_buffer(&instance_id, 1, 0, comm_spec.comm(), 0x11);
grape::SendVector(object_ids, 0, comm_spec.comm(), 0x12);
}
}
Expand Down
18 changes: 6 additions & 12 deletions analytical_engine/core/context/tensor_dataframe_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,9 @@ class MPIGlobalTensorBuilder : public vineyard::GlobalTensorBuilder,
}

vineyard::Status Build(vineyard::Client& client) override {
GatherWorkerObjectIDs(client, comm_spec_, local_chunk_ids_,
this->partitions_builder_);
if (comm_spec_.worker_id() == 0) {
this->set_partitions_(this->partitions_builder_.Seal(client));
}
std::vector<vineyard::ObjectID> all_ids;
GatherWorkerObjectIDs(client, comm_spec_, local_chunk_ids_, all_ids);
this->AddPartitions(all_ids);
MPI_Barrier(comm_spec_.comm());
return vineyard::Status::OK();
}
Expand Down Expand Up @@ -123,8 +121,6 @@ class MPIGlobalDataFrameBuilder : public vineyard::GlobalDataFrameBuilder,
}
SyncGlobalObjectID(comm_spec_, id); // this sync can be seen as a barrier
if (comm_spec_.worker_id() != 0) {
// FIXME: the aim of `Construct` is to fillup the ObjectSet, needs better
// design.
df = std::make_shared<vineyard::GlobalDataFrame>();
vineyard::ObjectMeta meta;
VINEYARD_CHECK_OK(client.GetMetaData(id, meta, true));
Expand All @@ -134,11 +130,9 @@ class MPIGlobalDataFrameBuilder : public vineyard::GlobalDataFrameBuilder,
}

vineyard::Status Build(vineyard::Client& client) override {
GatherWorkerObjectIDs(client, comm_spec_, local_chunk_ids_,
object_set_builder_);
if (comm_spec_.worker_id() == 0) {
this->set_objects_(object_set_builder_.Seal(client));
}
std::vector<vineyard::ObjectID> all_ids;
GatherWorkerObjectIDs(client, comm_spec_, local_chunk_ids_, all_ids);
this->AddPartitions(all_ids);
MPI_Barrier(comm_spec_.comm());
return vineyard::Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ bl::result<std::string> GrapeInstance::contextToVineyardTensor(
CHECK(false);
}

auto s_id = vineyard::VYObjectIDToString(id);
auto s_id = vineyard::ObjectIDToString(id);

client_->PutName(id, s_id);

Expand Down Expand Up @@ -501,7 +501,7 @@ bl::result<std::string> GrapeInstance::contextToVineyardDataFrame(
CHECK(false);
}

auto s_id = vineyard::VYObjectIDToString(id);
auto s_id = vineyard::ObjectIDToString(id);

client_->PutName(id, s_id);

Expand Down
10 changes: 5 additions & 5 deletions analytical_engine/core/loader/arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ class ArrowFragmentLoader {
BOOST_LEAF_AUTO(
tables,
vineyard::GatherVTables(
client_, {vineyard::VYObjectIDFromString(vertices[0]->values)},
client_, {vineyard::ObjectIDFromString(vertices[0]->values)},
comm_spec_.local_id(), comm_spec_.local_num()));
if (tables.size() == 1 && tables[0] != nullptr) {
std::shared_ptr<arrow::KeyValueMetadata> meta;
Expand Down Expand Up @@ -487,8 +487,8 @@ class ArrowFragmentLoader {
} else if (vertices[i]->protocol == "vineyard") {
VLOG(2) << "read vertex table from vineyard: " << vertices[i]->values;
VY_OK_OR_RAISE(vineyard::ReadTableFromVineyard(
client_, vineyard::VYObjectIDFromString(vertices[i]->values),
table, comm_spec_.local_id(), comm_spec_.local_num()));
client_, vineyard::ObjectIDFromString(vertices[i]->values), table,
comm_spec_.local_id(), comm_spec_.local_num()));
if (table != nullptr) {
VLOG(2) << "schema of vertex table: "
<< table->schema()->ToString();
Expand Down Expand Up @@ -614,7 +614,7 @@ class ArrowFragmentLoader {
BOOST_LEAF_AUTO(tables,
vineyard::GatherETables(
client_,
{{vineyard::VYObjectIDFromString(
{{vineyard::ObjectIDFromString(
edges[0]->sub_labels[0].values)}},
comm_spec_.local_id(), comm_spec_.local_num()));
if (tables.size() == 1 && tables[0].size() == 1 &&
Expand Down Expand Up @@ -684,7 +684,7 @@ class ArrowFragmentLoader {
LOG(INFO) << "read edge table from vineyard: "
<< sub_labels[j].values;
VY_OK_OR_RAISE(vineyard::ReadTableFromVineyard(
client_, vineyard::VYObjectIDFromString(sub_labels[j].values),
client_, vineyard::ObjectIDFromString(sub_labels[j].values),
table, comm_spec_.local_id(), comm_spec_.local_num()));
if (table == nullptr) {
VLOG(2) << "edge table is null";
Expand Down
1 change: 1 addition & 0 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ void LoadGraph(
VINEYARD_CHECK_OK(client.GetMetaData(frag_id, obj_meta));
vineyard::ObjectID new_frag_id;
VINEYARD_CHECK_OK(client.CreateMetaData(obj_meta, new_frag_id));
VINEYARD_CHECK_OK(client.Persist(new_frag_id));
auto new_frag = std::static_pointer_cast<_GRAPH_TYPE>(
client.GetObject(new_frag_id));

Expand Down
4 changes: 2 additions & 2 deletions analytical_engine/test/run_load_from_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ static std::shared_ptr<gs::detail::Graph> make_graph_info(
for (auto const& e : ess) {
auto sublabel = gs::detail::Edge::SubLabel();
auto pstream = client.GetObject<vineyard::ParallelStream>(
vineyard::VYObjectIDFromString(e));
vineyard::ObjectIDFromString(e));
VINEYARD_ASSERT(pstream != nullptr,
"The pstream " + e + " doesn't exist");
auto stream = pstream->GetStream<vineyard::DataframeStream>(0);
Expand All @@ -64,7 +64,7 @@ static std::shared_ptr<gs::detail::Graph> make_graph_info(
for (auto const& v : vstreams) {
auto vertex = std::make_shared<gs::detail::Vertex>();
auto pstream = client.GetObject<vineyard::ParallelStream>(
vineyard::VYObjectIDFromString(v));
vineyard::ObjectIDFromString(v));
VINEYARD_ASSERT(pstream != nullptr, "The stream " + v + " doesn't exist");
auto stream = pstream->GetStream<vineyard::DataframeStream>(0);
auto params = stream->GetParams();
Expand Down
2 changes: 1 addition & 1 deletion coordinator/gscoordinator/template/CMakeLists.template
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ set(CMAKE_THREAD_PREFER_PTHREAD ON)
find_package(Threads REQUIRED)

# find vineyard-----------------------------------------------------------------
find_package(vineyard 0.1.7 REQUIRED)
find_package(vineyard 0.1.8 REQUIRED)
include_directories(${VINEYARD_INCLUDE_DIRS})
add_compile_options(-DENABLE_SELECTOR)

Expand Down
4 changes: 2 additions & 2 deletions coordinator/gscoordinator/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
__version__ = fp.read().strip()
__version_tuple__ = (int(v) for v in __version__.split("."))
else:
__version__ = "0.1.2"
__version_tuple__ = (0, 1, 2)
__version__ = "0.1.3"
__version_tuple__ = (0, 1, 3)

del version_file_path
2 changes: 1 addition & 1 deletion docs/loading_graph.rst
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ directly be passed to corresponding storage class. Like `host` and `port` to `HD

.. code:: python

from graphscope import Loader
from graphscope.framework.loader import Loader

ds1 = Loader("file:///var/datafiles/group.e")
ds2 = Loader("oss://graphscope_bucket/datafiles/group.e", key='access-id', secret='secret-access-key', endpoint='oss-cn-hangzhou.aliyuncs.com')
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/loading_graph.rst
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ GraphScope 以

.. code:: python

from graphscope import Loader
from graphscope.framework.loader import Loader

ds1 = Loader("file:///var/datafiles/group.e")
ds2 = Loader("oss://graphscope_bucket/datafiles/group.e", key='access-id', secret='secret-access-key', endpoint='oss-cn-hangzhou.aliyuncs.com')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ find_package(Threads REQUIRED)
# we need edge src/dst ids in etable.
add_definitions(-DENDPOINT_LISTS)

find_package(vineyard 0.1.7 REQUIRED)
find_package(vineyard 0.1.8 REQUIRED)
add_library(native_store global_store_ffi.cc
htap_ds_impl.cc
graph_builder_ffi.cc
Expand Down
Loading