Skip to content

Commit

Permalink
serialize and deserialize
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed Jan 29, 2021
1 parent 6865c1b commit bdd7916
Show file tree
Hide file tree
Showing 26 changed files with 212 additions and 71 deletions.
15 changes: 11 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.7
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8
options:
--shm-size 4096m
strategy:
Expand Down Expand Up @@ -133,7 +133,7 @@ jobs:
defaults:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.7
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8
steps:
- name: Install Dependencies
run: |
Expand Down Expand Up @@ -173,7 +173,7 @@ jobs:
defaults:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.7
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8
steps:
- name: Install Dependencies
run: |
Expand Down Expand Up @@ -375,6 +375,7 @@ jobs:
tar -xf ./gle-${{ github.sha }}/gle.tar
tar -xf ./gie-${{ github.sha }}/gie.tar
tar -xf ./manager-${{ github.sha }}/manager.tar
sudo docker pull registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-runtime:latest
sudo docker build -t registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }} \
--network=host \
-f ./graphscope.Dockerfile .
Expand All @@ -387,6 +388,11 @@ jobs:
python3 -m pip install pytest-cov --user
pushd python && sudo -E python3 setup.py develop && popd
# Copy Vineyard drivers to /usr/local
sudo rm -f ./artifacts/opt/graphscope/bin/vineyard-codegen || true
sudo cp -rvf ./artifacts/opt/graphscope/bin/* /usr/local/bin/
sudo cp -rvf ./artifacts/opt/graphscope/share/vineyard /usr/local/share/
- name: Kubernetes test
env:
CHANGE_MINIKUBE_NONE_USER: true
Expand Down Expand Up @@ -453,6 +459,7 @@ jobs:
tar -xf ./gle-${{ github.sha }}/gle.tar
tar -xf ./gie-${{ github.sha }}/gie.tar
tar -xf ./manager-${{ github.sha }}/manager.tar
sudo docker pull registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-runtime:latest
sudo docker build -t registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }} \
--network=host \
-f ./graphscope.Dockerfile .
Expand All @@ -471,7 +478,7 @@ jobs:
cd interactive_engine/tests
./function_test.sh 8111 1 registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }} \
registry.cn-hongkong.aliyuncs.com/graphscope/maxgraph_standalone_manager:${{ github.sha }}
./function_test.sh 8111 2 registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }} \
./function_test.sh 8112 2 registry.cn-hongkong.aliyuncs.com/graphscope/graphscope:${{ github.sha }} \
registry.cn-hongkong.aliyuncs.com/graphscope/maxgraph_standalone_manager:${{ github.sha }}
- name: Clean
run: |
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.2
0.1.3
4 changes: 2 additions & 2 deletions analytical_engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 2.8)
if ("${GRAPHSCOPE_VERSION}" STREQUAL "")
set(GRAPHSCOPE_ANALYTICAL_MAJOR_VERSION 0)
set(GRAPHSCOPE_ANALYTICAL_MINOR_VERSION 1)
set(GRAPHSCOPE_ANALYTICAL_PATCH_VERSION 2)
set(GRAPHSCOPE_ANALYTICAL_PATCH_VERSION 3)
set(GRAPHSCOPE_ANALYTICAL_VERSION ${GRAPHSCOPE_ANALYTICAL_MAJOR_VERSION}.${GRAPHSCOPE_ANALYTICAL_MINOR_VERSION}.${GRAPHSCOPE_ANALYTICAL_PATCH_VERSION})
else ()
set(GRAPHSCOPE_ANALYTICAL_MAJOR_VERSION ${GRAPHSCOPE_MAJOR_VERSION})
Expand Down Expand Up @@ -105,7 +105,7 @@ endif ()
find_package(libgrapelite REQUIRED)
include_directories(${LIBGRAPELITE_INCLUDE_DIRS})

find_package(vineyard 0.1.7 REQUIRED)
find_package(vineyard 0.1.8 REQUIRED)
include_directories(${VINEYARD_INCLUDE_DIRS})
add_compile_options(-DENABLE_SELECTOR)

Expand Down
2 changes: 1 addition & 1 deletion analytical_engine/benchmarks/projected_graph_benchmarks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ int main(int argc, char** argv) {

LOG(INFO) << "Connected to IPCServer: " << ipc_socket;

vineyard::ObjectID fragment_id = vineyard::VYObjectIDFromString(frag_id_str);
vineyard::ObjectID fragment_id = vineyard::ObjectIDFromString(frag_id_str);

MPI_Barrier(comm_spec.comm());

Expand Down
2 changes: 1 addition & 1 deletion analytical_engine/benchmarks/property_graph_benchmarks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ int main(int argc, char** argv) {

LOG(INFO) << "Connected to IPCServer: " << ipc_socket;

vineyard::ObjectID fragment_id = vineyard::VYObjectIDFromString(frag_id_str);
vineyard::ObjectID fragment_id = vineyard::ObjectIDFromString(frag_id_str);

MPI_Barrier(comm_spec.comm());

Expand Down
6 changes: 3 additions & 3 deletions analytical_engine/benchmarks/property_graph_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ int main(int argc, char** argv) {
LOG(INFO) << "[property graph ids]:";
}
LOG(INFO) << "\n[frag-" << comm_spec.fid()
<< "]: " << vineyard::VYObjectIDToString(fragment_id);
<< "]: " << vineyard::ObjectIDToString(fragment_id);

MPI_Barrier(comm_spec.comm());

Expand All @@ -141,14 +141,14 @@ int main(int argc, char** argv) {
LOG(INFO) << "[empty graph ids]:";
}
LOG(INFO) << "\n[frag-" << comm_spec.fid()
<< "]: " << vineyard::VYObjectIDToString(empty_frag_id);
<< "]: " << vineyard::ObjectIDToString(empty_frag_id);
MPI_Barrier(comm_spec.comm());

if (comm_spec.worker_id() == 0) {
LOG(INFO) << "[ed graph ids]:";
}
LOG(INFO) << "\n[frag-" << comm_spec.fid()
<< "]: " << vineyard::VYObjectIDToString(ed_frag_id);
<< "]: " << vineyard::ObjectIDToString(ed_frag_id);

MPI_Barrier(comm_spec.comm());

Expand Down
32 changes: 14 additions & 18 deletions analytical_engine/core/context/mpi_object_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <vector>

#include "grape/worker/comm_spec.h"
#include "vineyard/basic/ds/object_set.h"
#include "vineyard/client/client.h"

namespace gs {
Expand All @@ -41,43 +40,40 @@ class MPIObjectSync {

void GatherWorkerObjectID(vineyard::Client& client,
grape::CommSpec const& comm_spec,
vineyard::ObjectID const object_id,
vineyard::ObjectSetBuilder& target_chunk_map) {
vineyard::ObjectID object_id,
std::vector<vineyard::ObjectID>& assembled_ids) {
// gather chunk id per worker, and add to the target chunkmap
if (comm_spec.worker_id() == 0) {
target_chunk_map.AddObject(client.instance_id(), object_id);
assembled_ids.push_back(object_id);
for (int src_worker_id = 1; src_worker_id < comm_spec.worker_num();
++src_worker_id) {
std::pair<vineyard::InstanceID, vineyard::ObjectID> chunk;
grape::recv_buffer(&chunk, 1, src_worker_id, comm_spec.comm(), 0x10);
target_chunk_map.AddObject(chunk.first, chunk.second);
vineyard::ObjectID recv_object_id;
grape::recv_buffer(&recv_object_id, 1, src_worker_id, comm_spec.comm(),
0x10);
assembled_ids.push_back(recv_object_id);
}
} else {
auto chunk = std::make_pair(client.instance_id(), object_id);
grape::send_buffer(&chunk, 1, 0, comm_spec.comm(), 0x10);
grape::send_buffer(&object_id, 1, 0, comm_spec.comm(), 0x10);
}
}

void GatherWorkerObjectIDs(vineyard::Client& client,
grape::CommSpec const& comm_spec,
std::vector<vineyard::ObjectID> const& object_ids,
vineyard::ObjectSetBuilder& target_chunk_map) {
// gather chunk id per worker, and add to the target chunkmap
std::vector<vineyard::ObjectID>& assembled_ids) {
// gather chunk id vector per worker, and add to the target chunkmap
if (comm_spec.worker_id() == 0) {
target_chunk_map.AddObjects(client.instance_id(), object_ids);
assembled_ids.insert(assembled_ids.end(), object_ids.begin(),
object_ids.end());
for (int src_worker_id = 1; src_worker_id < comm_spec.worker_num();
++src_worker_id) {
vineyard::InstanceID instance_id = vineyard::UnspecifiedInstanceID();
std::vector<vineyard::ObjectID> recv_object_ids;
grape::recv_buffer(&instance_id, 1, src_worker_id, comm_spec.comm(),
0x11);
grape::RecvVector(recv_object_ids, src_worker_id, comm_spec.comm(),
0x12);
target_chunk_map.AddObjects(instance_id, recv_object_ids);
assembled_ids.insert(assembled_ids.end(), recv_object_ids.begin(),
recv_object_ids.end());
}
} else {
auto instance_id = client.instance_id();
grape::send_buffer(&instance_id, 1, 0, comm_spec.comm(), 0x11);
grape::SendVector(object_ids, 0, comm_spec.comm(), 0x12);
}
}
Expand Down
18 changes: 6 additions & 12 deletions analytical_engine/core/context/tensor_dataframe_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,9 @@ class MPIGlobalTensorBuilder : public vineyard::GlobalTensorBuilder,
}

vineyard::Status Build(vineyard::Client& client) override {
GatherWorkerObjectIDs(client, comm_spec_, local_chunk_ids_,
this->partitions_builder_);
if (comm_spec_.worker_id() == 0) {
this->set_partitions_(this->partitions_builder_.Seal(client));
}
std::vector<vineyard::ObjectID> all_ids;
GatherWorkerObjectIDs(client, comm_spec_, local_chunk_ids_, all_ids);
this->AddPartitions(all_ids);
MPI_Barrier(comm_spec_.comm());
return vineyard::Status::OK();
}
Expand Down Expand Up @@ -123,8 +121,6 @@ class MPIGlobalDataFrameBuilder : public vineyard::GlobalDataFrameBuilder,
}
SyncGlobalObjectID(comm_spec_, id); // this sync can be seen as a barrier
if (comm_spec_.worker_id() != 0) {
// FIXME: the aim of `Construct` is to fillup the ObjectSet, needs better
// design.
df = std::make_shared<vineyard::GlobalDataFrame>();
vineyard::ObjectMeta meta;
VINEYARD_CHECK_OK(client.GetMetaData(id, meta, true));
Expand All @@ -134,11 +130,9 @@ class MPIGlobalDataFrameBuilder : public vineyard::GlobalDataFrameBuilder,
}

vineyard::Status Build(vineyard::Client& client) override {
GatherWorkerObjectIDs(client, comm_spec_, local_chunk_ids_,
object_set_builder_);
if (comm_spec_.worker_id() == 0) {
this->set_objects_(object_set_builder_.Seal(client));
}
std::vector<vineyard::ObjectID> all_ids;
GatherWorkerObjectIDs(client, comm_spec_, local_chunk_ids_, all_ids);
this->AddPartitions(all_ids);
MPI_Barrier(comm_spec_.comm());
return vineyard::Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ bl::result<std::string> GrapeInstance::contextToVineyardTensor(
CHECK(false);
}

auto s_id = vineyard::VYObjectIDToString(id);
auto s_id = vineyard::ObjectIDToString(id);

client_->PutName(id, s_id);

Expand Down Expand Up @@ -501,7 +501,7 @@ bl::result<std::string> GrapeInstance::contextToVineyardDataFrame(
CHECK(false);
}

auto s_id = vineyard::VYObjectIDToString(id);
auto s_id = vineyard::ObjectIDToString(id);

client_->PutName(id, s_id);

Expand Down
10 changes: 5 additions & 5 deletions analytical_engine/core/loader/arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ class ArrowFragmentLoader {
BOOST_LEAF_AUTO(
tables,
vineyard::GatherVTables(
client_, {vineyard::VYObjectIDFromString(vertices[0]->values)},
client_, {vineyard::ObjectIDFromString(vertices[0]->values)},
comm_spec_.local_id(), comm_spec_.local_num()));
if (tables.size() == 1 && tables[0] != nullptr) {
std::shared_ptr<arrow::KeyValueMetadata> meta;
Expand Down Expand Up @@ -487,8 +487,8 @@ class ArrowFragmentLoader {
} else if (vertices[i]->protocol == "vineyard") {
VLOG(2) << "read vertex table from vineyard: " << vertices[i]->values;
VY_OK_OR_RAISE(vineyard::ReadTableFromVineyard(
client_, vineyard::VYObjectIDFromString(vertices[i]->values),
table, comm_spec_.local_id(), comm_spec_.local_num()));
client_, vineyard::ObjectIDFromString(vertices[i]->values), table,
comm_spec_.local_id(), comm_spec_.local_num()));
if (table != nullptr) {
VLOG(2) << "schema of vertex table: "
<< table->schema()->ToString();
Expand Down Expand Up @@ -614,7 +614,7 @@ class ArrowFragmentLoader {
BOOST_LEAF_AUTO(tables,
vineyard::GatherETables(
client_,
{{vineyard::VYObjectIDFromString(
{{vineyard::ObjectIDFromString(
edges[0]->sub_labels[0].values)}},
comm_spec_.local_id(), comm_spec_.local_num()));
if (tables.size() == 1 && tables[0].size() == 1 &&
Expand Down Expand Up @@ -684,7 +684,7 @@ class ArrowFragmentLoader {
LOG(INFO) << "read edge table from vineyard: "
<< sub_labels[j].values;
VY_OK_OR_RAISE(vineyard::ReadTableFromVineyard(
client_, vineyard::VYObjectIDFromString(sub_labels[j].values),
client_, vineyard::ObjectIDFromString(sub_labels[j].values),
table, comm_spec_.local_id(), comm_spec_.local_num()));
if (table == nullptr) {
VLOG(2) << "edge table is null";
Expand Down
1 change: 1 addition & 0 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ void LoadGraph(
VINEYARD_CHECK_OK(client.GetMetaData(frag_id, obj_meta));
vineyard::ObjectID new_frag_id;
VINEYARD_CHECK_OK(client.CreateMetaData(obj_meta, new_frag_id));
VINEYARD_CHECK_OK(client.Persist(new_frag_id));
auto new_frag = std::static_pointer_cast<_GRAPH_TYPE>(
client.GetObject(new_frag_id));

Expand Down
4 changes: 2 additions & 2 deletions analytical_engine/test/run_load_from_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ static std::shared_ptr<gs::detail::Graph> make_graph_info(
for (auto const& e : ess) {
auto sublabel = gs::detail::Edge::SubLabel();
auto pstream = client.GetObject<vineyard::ParallelStream>(
vineyard::VYObjectIDFromString(e));
vineyard::ObjectIDFromString(e));
VINEYARD_ASSERT(pstream != nullptr,
"The pstream " + e + " doesn't exist");
auto stream = pstream->GetStream<vineyard::DataframeStream>(0);
Expand All @@ -64,7 +64,7 @@ static std::shared_ptr<gs::detail::Graph> make_graph_info(
for (auto const& v : vstreams) {
auto vertex = std::make_shared<gs::detail::Vertex>();
auto pstream = client.GetObject<vineyard::ParallelStream>(
vineyard::VYObjectIDFromString(v));
vineyard::ObjectIDFromString(v));
VINEYARD_ASSERT(pstream != nullptr, "The stream " + v + " doesn't exist");
auto stream = pstream->GetStream<vineyard::DataframeStream>(0);
auto params = stream->GetParams();
Expand Down
2 changes: 1 addition & 1 deletion coordinator/gscoordinator/template/CMakeLists.template
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ set(CMAKE_THREAD_PREFER_PTHREAD ON)
find_package(Threads REQUIRED)

# find vineyard-----------------------------------------------------------------
find_package(vineyard 0.1.7 REQUIRED)
find_package(vineyard 0.1.8 REQUIRED)
include_directories(${VINEYARD_INCLUDE_DIRS})
add_compile_options(-DENABLE_SELECTOR)

Expand Down
4 changes: 2 additions & 2 deletions coordinator/gscoordinator/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
__version__ = fp.read().strip()
__version_tuple__ = (int(v) for v in __version__.split("."))
else:
__version__ = "0.1.2"
__version_tuple__ = (0, 1, 2)
__version__ = "0.1.3"
__version_tuple__ = (0, 1, 3)

del version_file_path
2 changes: 1 addition & 1 deletion docs/loading_graph.rst
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ directly be passed to corresponding storage class. Like `host` and `port` to `HD

.. code:: python
from graphscope import Loader
from graphscope.framework.loader import Loader
ds1 = Loader("file:///var/datafiles/group.e")
ds2 = Loader("oss://graphscope_bucket/datafiles/group.e", key='access-id', secret='secret-access-key', endpoint='oss-cn-hangzhou.aliyuncs.com')
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/loading_graph.rst
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ GraphScope 以

.. code:: python
from graphscope import Loader
from graphscope.framework.loader import Loader
ds1 = Loader("file:///var/datafiles/group.e")
ds2 = Loader("oss://graphscope_bucket/datafiles/group.e", key='access-id', secret='secret-access-key', endpoint='oss-cn-hangzhou.aliyuncs.com')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ find_package(Threads REQUIRED)
# we need edge src/dst ids in etable.
add_definitions(-DENDPOINT_LISTS)

find_package(vineyard 0.1.7 REQUIRED)
find_package(vineyard 0.1.8 REQUIRED)
add_library(native_store global_store_ffi.cc
htap_ds_impl.cc
graph_builder_ffi.cc
Expand Down
Loading

0 comments on commit bdd7916

Please sign in to comment.