From 70bb6e55effd093120f0f9acc1f1226ff38c35c9 Mon Sep 17 00:00:00 2001 From: Tao He Date: Thu, 25 Feb 2021 22:12:49 +0800 Subject: [PATCH] Support loading from vineyard stream/dataframe using names. (#166) * And drop the vineyardd connection in subgraph query. * Define BOOST_BIND_GLOBAL_PLACEHOLDERS to fixes such warnings. * Drop some necessary work in the "copy graph" part. * Don't unload graph in serialization tests. * Use more cpu resources for etcd in CI. Signed-off-by: Tao He --- .github/workflows/ci.yml | 6 +-- analytical_engine/CMakeLists.txt | 8 ++-- .../core/loader/arrow_fragment_loader.h | 46 +++++++++++++------ .../core/object/fragment_wrapper.h | 11 +---- .../frame/property_graph_frame.cc | 21 +++------ .../template/CMakeLists.template | 5 +- .../executor/runtime/native/CMakeLists.txt | 2 +- interactive_engine/tests/function_test.sh | 2 +- k8s/graphscope.Dockerfile | 2 +- k8s/gsvineyard.Dockerfile | 7 ++- .../deploy/tests/test_demo_script.py | 8 +++- python/graphscope/framework/graph.py | 31 +++++++------ python/graphscope/framework/graph_utils.py | 25 +++++++--- python/graphscope/framework/loader.py | 20 +++++--- .../graphscope/framework/vineyard_object.py | 46 ------------------- python/graphscope/interactive/query.py | 14 +----- python/requirements.txt | 4 +- python/tests/test_create_graph.py | 6 +-- python/tests/test_graph.py | 4 +- 19 files changed, 126 insertions(+), 142 deletions(-) delete mode 100644 python/graphscope/framework/vineyard_object.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8ad2c626b109..e2ca7763dfef 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,7 +30,7 @@ jobs: run: shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0} container: - image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8 + image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.11 options: --shm-size 4096m strategy: @@ -135,7 +135,7 @@ jobs: defaults: run: shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0} - container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8 + container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.11 steps: - name: Install Dependencies run: | @@ -175,7 +175,7 @@ jobs: defaults: run: shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0} - container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.8 + container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.11 steps: - name: Install Dependencies run: | diff --git a/analytical_engine/CMakeLists.txt b/analytical_engine/CMakeLists.txt index 8946277407aa..28b8002edc63 100644 --- a/analytical_engine/CMakeLists.txt +++ b/analytical_engine/CMakeLists.txt @@ -76,9 +76,11 @@ find_package(MPI REQUIRED) include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH}) find_package(Boost REQUIRED COMPONENTS system filesystem - # required by folly - context program_options regex thread) + # required by folly + context program_options regex thread) include_directories(SYSTEM ${Boost_INCLUDE_DIRS}) +# eliminate a lot of warnings for newer version of boost library. +add_compile_options(-DBOOST_BIND_GLOBAL_PLACEHOLDERS) include("cmake/FindGFlags.cmake") if (GFLAGS_FOUND) @@ -105,7 +107,7 @@ endif () find_package(libgrapelite REQUIRED) include_directories(${LIBGRAPELITE_INCLUDE_DIRS}) -find_package(vineyard 0.1.8 REQUIRED) +find_package(vineyard 0.1.11 REQUIRED) include_directories(${VINEYARD_INCLUDE_DIRS}) add_compile_options(-DENABLE_SELECTOR) diff --git a/analytical_engine/core/loader/arrow_fragment_loader.h b/analytical_engine/core/loader/arrow_fragment_loader.h index 4b9ee97f8e27..2e9386e67cff 100644 --- a/analytical_engine/core/loader/arrow_fragment_loader.h +++ b/analytical_engine/core/loader/arrow_fragment_loader.h @@ -431,13 +431,13 @@ class ArrowFragmentLoader { int index, int total_parts) { // a special code path when multiple labeled vertex batches are mixed. if (vertices.size() == 1 && vertices[0]->protocol == "vineyard") { + VLOG(2) << "read vertex table from vineyard: " << vertices[0]->values; + BOOST_LEAF_AUTO(sourceId, resolveVYObject(vertices[0]->values)); auto read_procedure = [&]() -> boost::leaf::result>> { - BOOST_LEAF_AUTO( - tables, - vineyard::GatherVTables( - client_, {vineyard::ObjectIDFromString(vertices[0]->values)}, - comm_spec_.local_id(), comm_spec_.local_num())); + BOOST_LEAF_AUTO(tables, vineyard::GatherVTables( + client_, {sourceId}, comm_spec_.local_id(), + comm_spec_.local_num())); if (tables.size() == 1 && tables[0] != nullptr) { std::shared_ptr meta; if (tables[0]->schema()->metadata() == nullptr) { @@ -487,9 +487,10 @@ class ArrowFragmentLoader { table = tmp; } else if (vertices[i]->protocol == "vineyard") { VLOG(2) << "read vertex table from vineyard: " << vertices[i]->values; + BOOST_LEAF_AUTO(sourceId, resolveVYObject(vertices[i]->values)); VY_OK_OR_RAISE(vineyard::ReadTableFromVineyard( - client_, vineyard::ObjectIDFromString(vertices[i]->values), table, - comm_spec_.local_id(), comm_spec_.local_num())); + client_, sourceId, table, comm_spec_.local_id(), + comm_spec_.local_num())); if (table != nullptr) { VLOG(2) << "schema of vertex table: " << table->schema()->ToString(); @@ -609,15 +610,17 @@ class ArrowFragmentLoader { // a special code path when multiple labeled edge batches are mixed. if (edges.size() == 1 && edges[0]->sub_labels.size() == 1 && edges[0]->sub_labels[0].protocol == "vineyard") { + LOG(INFO) << "read edge table from vineyard: " + << edges[0]->sub_labels[0].values; + BOOST_LEAF_AUTO(sourceId, + resolveVYObject(edges[0]->sub_labels[0].values)); auto read_procedure = [&]() -> boost::leaf::result< std::vector>>> { BOOST_LEAF_AUTO(tables, - vineyard::GatherETables( - client_, - {{vineyard::ObjectIDFromString( - edges[0]->sub_labels[0].values)}}, - comm_spec_.local_id(), comm_spec_.local_num())); + vineyard::GatherETables(client_, {{sourceId}}, + comm_spec_.local_id(), + comm_spec_.local_num())); if (tables.size() == 1 && tables[0].size() == 1 && tables[0][0] != nullptr) { std::shared_ptr meta; @@ -684,9 +687,10 @@ class ArrowFragmentLoader { } else if (sub_labels[j].protocol == "vineyard") { LOG(INFO) << "read edge table from vineyard: " << sub_labels[j].values; + BOOST_LEAF_AUTO(sourceId, resolveVYObject(sub_labels[j].values)); VY_OK_OR_RAISE(vineyard::ReadTableFromVineyard( - client_, vineyard::ObjectIDFromString(sub_labels[j].values), - table, comm_spec_.local_id(), comm_spec_.local_num())); + client_, sourceId, table, comm_spec_.local_id(), + comm_spec_.local_num())); if (table == nullptr) { VLOG(2) << "edge table is null"; } else { @@ -718,6 +722,20 @@ class ArrowFragmentLoader { return tables; } + boost::leaf::result resolveVYObject( + std::string const& source) { + vineyard::ObjectID sourceId = vineyard::InvalidObjectID(); + // encoding: 'o' prefix for object id, and 's' prefix for object name. + CHECK_OR_RAISE(!source.empty() && (source[0] == 'o' || source[0] == 's')); + if (source[0] == 'o') { + sourceId = vineyard::ObjectIDFromString(source.substr(1)); + } else { + VY_OK_OR_RAISE(client_.GetName(source.substr(1), sourceId, true)); + } + CHECK_OR_RAISE(sourceId != vineyard::InvalidObjectID()); + return sourceId; + } + vineyard::Client& client_; grape::CommSpec comm_spec_; diff --git a/analytical_engine/core/object/fragment_wrapper.h b/analytical_engine/core/object/fragment_wrapper.h index d3ef00d987b4..a7e99ea9bb61 100644 --- a/analytical_engine/core/object/fragment_wrapper.h +++ b/analytical_engine/core/object/fragment_wrapper.h @@ -94,22 +94,15 @@ class FragmentWrapper> const std::string& copy_type) override { auto& meta = fragment_->meta(); auto* client = dynamic_cast(meta.GetClient()); - vineyard::ObjectMeta obj_meta; - VINEYARD_CHECK_OK(client->GetMetaData(fragment_->id(), obj_meta)); - vineyard::ObjectID new_frag_id; - VINEYARD_CHECK_OK(client->CreateMetaData(obj_meta, new_frag_id)); - VINEYARD_CHECK_OK(client->Persist(new_frag_id)); BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup( - *client, new_frag_id, comm_spec)); - auto new_frag = - std::static_pointer_cast(client->GetObject(new_frag_id)); + *client, fragment_->id(), comm_spec)); auto dst_graph_def = graph_def_; dst_graph_def.set_key(dst_graph_name); dst_graph_def.set_vineyard_id(frag_group_id); auto wrapper = std::make_shared>( - dst_graph_name, dst_graph_def, new_frag); + dst_graph_name, dst_graph_def, fragment_); return std::dynamic_pointer_cast(wrapper); } diff --git a/analytical_engine/frame/property_graph_frame.cc b/analytical_engine/frame/property_graph_frame.cc index f17eb006e285..58ad286fd00e 100644 --- a/analytical_engine/frame/property_graph_frame.cc +++ b/analytical_engine/frame/property_graph_frame.cc @@ -78,26 +78,19 @@ void LoadGraph( client.GetObject(frag_group_id)); auto fid = comm_spec.WorkerToFrag(comm_spec.worker_id()); auto frag_id = fg->Fragments().at(fid); - vineyard::ObjectMeta obj_meta; - VINEYARD_CHECK_OK(client.GetMetaData(frag_id, obj_meta)); - vineyard::ObjectID new_frag_id; - VINEYARD_CHECK_OK(client.CreateMetaData(obj_meta, new_frag_id)); - VINEYARD_CHECK_OK(client.Persist(new_frag_id)); - auto new_frag = std::static_pointer_cast<_GRAPH_TYPE>( - client.GetObject(new_frag_id)); - - VINEYARD_CHECK_OK(client.Persist(new_frag_id)); - BOOST_LEAF_AUTO( - new_frag_group_id, - vineyard::ConstructFragmentGroup(client, new_frag_id, comm_spec)); + auto frag = + std::static_pointer_cast<_GRAPH_TYPE>(client.GetObject(frag_id)); + + BOOST_LEAF_AUTO(new_frag_group_id, vineyard::ConstructFragmentGroup( + client, frag_id, comm_spec)); gs::rpc::GraphDef graph_def; graph_def.set_key(graph_name); graph_def.set_vineyard_id(new_frag_group_id); - gs::set_graph_def(new_frag, graph_def); + gs::set_graph_def(frag, graph_def); auto wrapper = std::make_shared>( - graph_name, graph_def, new_frag); + graph_name, graph_def, frag); return std::dynamic_pointer_cast(wrapper); } else { BOOST_LEAF_AUTO(graph_info, gs::ParseCreatePropertyGraph(params)); diff --git a/coordinator/gscoordinator/template/CMakeLists.template b/coordinator/gscoordinator/template/CMakeLists.template index 393bd70facc4..151144d5cc29 100644 --- a/coordinator/gscoordinator/template/CMakeLists.template +++ b/coordinator/gscoordinator/template/CMakeLists.template @@ -29,6 +29,9 @@ endif () include_directories(${CMAKE_SOURCE_DIR}) +# eliminate a lot of warnings for newer version of boost library. +add_compile_options(-DBOOST_BIND_GLOBAL_PLACEHOLDERS) + # find MPI---------------------------------------------------------------------- find_package(MPI REQUIRED) include_directories(SYSTEM ${MPI_CXX_INCLUDE_PATH}) @@ -38,7 +41,7 @@ set(CMAKE_THREAD_PREFER_PTHREAD ON) find_package(Threads REQUIRED) # find vineyard----------------------------------------------------------------- -find_package(vineyard 0.1.8 REQUIRED) +find_package(vineyard 0.1.11 REQUIRED) include_directories(${VINEYARD_INCLUDE_DIRS}) add_compile_options(-DENABLE_SELECTOR) diff --git a/interactive_engine/src/executor/runtime/native/CMakeLists.txt b/interactive_engine/src/executor/runtime/native/CMakeLists.txt index 236e233bc9a8..cfca9181c5f5 100644 --- a/interactive_engine/src/executor/runtime/native/CMakeLists.txt +++ b/interactive_engine/src/executor/runtime/native/CMakeLists.txt @@ -44,7 +44,7 @@ find_package(Threads REQUIRED) # we need edge src/dst ids in etable. add_definitions(-DENDPOINT_LISTS) -find_package(vineyard 0.1.8 REQUIRED) +find_package(vineyard 0.1.11 REQUIRED) add_library(native_store global_store_ffi.cc htap_ds_impl.cc graph_builder_ffi.cc diff --git a/interactive_engine/tests/function_test.sh b/interactive_engine/tests/function_test.sh index 3765f5f0919d..a975b3529273 100755 --- a/interactive_engine/tests/function_test.sh +++ b/interactive_engine/tests/function_test.sh @@ -34,7 +34,7 @@ function _start { curl -XPOST http://localhost:${_port} -d 'import graphscope' curl -XPOST http://localhost:${_port} -d 'graphscope.set_option(show_log=True)' curl -XPOST http://localhost:${_port} -d 'from graphscope.framework.loader import Loader' - curl_sess="curl -XPOST http://localhost:${_port} -d 'session = graphscope.session(num_workers=${workers}, k8s_volumes={\"data\": {\"type\": \"hostPath\", \"field\": {\"path\": \"${GS_TEST_DIR}\", \"type\": \"Directory\"}, \"mounts\": {\"mountPath\": \"/testingdata\"}}}, k8s_coordinator_cpu=1.0, k8s_coordinator_mem='\''4Gi'\'', k8s_vineyard_cpu=1.0, k8s_vineyard_mem='\''4Gi'\'', k8s_vineyard_shared_mem='\''4Gi'\'', k8s_engine_cpu=1.0, k8s_engine_mem='\''4Gi'\'', k8s_gie_graph_manager_image='\''${gie_manager_image}'\'', k8s_gs_image='\''${gs_image}'\'')' --write-out %{http_code} --silent --output ./curl.tmp" + curl_sess="curl -XPOST http://localhost:${_port} -d 'session = graphscope.session(num_workers=${workers}, k8s_volumes={\"data\": {\"type\": \"hostPath\", \"field\": {\"path\": \"${GS_TEST_DIR}\", \"type\": \"Directory\"}, \"mounts\": {\"mountPath\": \"/testingdata\"}}}, k8s_coordinator_cpu=1.0, k8s_coordinator_mem='\''4Gi'\'', k8s_vineyard_cpu=1.0, k8s_vineyard_mem='\''4Gi'\'', k8s_vineyard_shared_mem='\''4Gi'\'', k8s_engine_cpu=1.0, k8s_engine_mem='\''4Gi'\'', k8s_etcd_cpu=2, k8s_gie_graph_manager_image='\''${gie_manager_image}'\'', k8s_gs_image='\''${gs_image}'\'')' --write-out %{http_code} --silent --output ./curl.tmp" echo $curl_sess code=`sh -c "$curl_sess"` diff --git a/k8s/graphscope.Dockerfile b/k8s/graphscope.Dockerfile index f678cebb5077..c13f5ebc5682 100644 --- a/k8s/graphscope.Dockerfile +++ b/k8s/graphscope.Dockerfile @@ -4,7 +4,7 @@ # the result image includes all runtime stuffs of graphscope, with analytical engine, # learning engine and interactive engine installed. -ARG BASE_VERSION=v0.1.8 +ARG BASE_VERSION=v0.1.11 FROM registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:$BASE_VERSION as builder ARG CI=true diff --git a/k8s/gsvineyard.Dockerfile b/k8s/gsvineyard.Dockerfile index 43db3306252d..d036c5cd7763 100644 --- a/k8s/gsvineyard.Dockerfile +++ b/k8s/gsvineyard.Dockerfile @@ -14,7 +14,7 @@ RUN cd /tmp && \ make -j`nproc` && \ make install && \ cd /tmp && \ - git clone -b v0.1.8 https://github.com/alibaba/libvineyard.git --depth=1 && \ + git clone -b v0.1.11 https://github.com/alibaba/libvineyard.git --depth=1 && \ cd libvineyard && \ git submodule update --init && \ mkdir -p /tmp/libvineyard/build && \ @@ -32,4 +32,9 @@ RUN cd /tmp && \ mkdir -p /opt/graphscope/dist && \ cp -f wheelhouse/* /opt/graphscope/dist && \ pip3 install wheelhouse/*.whl && \ + cd /tmp/libvineyard/modules/io && \ + python3 setup.py bdist_wheel && \ + cp -f dist/* /opt/graphscope/dist && \ + pip3 install dist/* && \ + cd /tmp && \ rm -fr /tmp/libvineyard /tmp/libgrape-lite diff --git a/python/graphscope/deploy/tests/test_demo_script.py b/python/graphscope/deploy/tests/test_demo_script.py index babd777f8940..6cad2fdf1e1d 100644 --- a/python/graphscope/deploy/tests/test_demo_script.py +++ b/python/graphscope/deploy/tests/test_demo_script.py @@ -82,6 +82,7 @@ def test_demo(data_dir): k8s_vineyard_mem="512Mi", k8s_engine_cpu=0.1, k8s_engine_mem="1500Mi", + k8s_etcd_cpu=2, k8s_vineyard_shared_mem="2Gi", k8s_volumes=get_k8s_volumes(), ) @@ -121,6 +122,7 @@ def test_demo_distribute(data_dir, modern_graph_data_dir): k8s_vineyard_mem="512Mi", k8s_engine_cpu=0.1, k8s_engine_mem="1500Mi", + k8s_etcd_cpu=2, k8s_vineyard_shared_mem="2Gi", k8s_volumes=get_k8s_volumes(), ) @@ -201,6 +203,7 @@ def test_multiple_session(data_dir): k8s_engine_cpu=0.1, k8s_engine_mem="1500Mi", k8s_vineyard_shared_mem="2Gi", + k8s_etcd_cpu=2, k8s_volumes=get_k8s_volumes(), ) info = sess.info @@ -220,6 +223,7 @@ def test_multiple_session(data_dir): k8s_engine_cpu=0.1, k8s_engine_mem="1500Mi", k8s_vineyard_shared_mem="2Gi", + k8s_etcd_cpu=2, k8s_volumes=get_k8s_volumes(), ) @@ -245,6 +249,7 @@ def test_query_modern_graph(modern_graph_data_dir): k8s_engine_cpu=0.1, k8s_engine_mem="1500Mi", k8s_vineyard_shared_mem="2Gi", + k8s_etcd_cpu=2, k8s_volumes=get_k8s_volumes(), ) graph = load_modern_graph(sess, modern_graph_data_dir) @@ -278,6 +283,7 @@ def test_traversal_modern_graph(modern_graph_data_dir): k8s_engine_cpu=0.1, k8s_engine_mem="1500Mi", k8s_vineyard_shared_mem="2Gi", + k8s_etcd_cpu=2, k8s_volumes=get_k8s_volumes(), ) graph = load_modern_graph(sess, modern_graph_data_dir) @@ -325,6 +331,7 @@ def test_serialize_roundtrip(p2p_property_dir): k8s_engine_cpu=0.1, k8s_engine_mem="1500Mi", k8s_vineyard_shared_mem="2Gi", + k8s_etcd_cpu=2, k8s_volumes=get_k8s_volumes(), ) graph = sess.load_from( @@ -346,7 +353,6 @@ def test_serialize_roundtrip(p2p_property_dir): generate_eid=False, ) graph.serialize("/tmp/serialize") - graph.unload() new_graph = Graph.deserialize("/tmp/serialize", sess) pg = new_graph.project_to_simple(0, 0, 0, 2) ctx = graphscope.sssp(pg, src=6) diff --git a/python/graphscope/framework/graph.py b/python/graphscope/framework/graph.py index c7227d1d4074..a1f7b56dbb2b 100644 --- a/python/graphscope/framework/graph.py +++ b/python/graphscope/framework/graph.py @@ -21,6 +21,8 @@ import logging from typing import Mapping +import vineyard + from graphscope.client.session import get_session_by_id from graphscope.framework.dag_utils import add_column from graphscope.framework.dag_utils import copy_graph @@ -42,7 +44,6 @@ from graphscope.framework.utils import s_to_attr from graphscope.framework.utils import transform_labeled_vertex_property_data_selector from graphscope.framework.utils import transform_vertex_range -from graphscope.framework.vineyard_object import VineyardObject from graphscope.proto import types_pb2 from graphscope.proto.graph_def_pb2 import GraphDef @@ -94,7 +95,7 @@ def __init__(self, session_id, incoming_data=None): - :class:`GraphDef` - :class:`nx.Graph` - :class:`Graph` - - :class:`VineyardObject` + - :class:`vineyard.Object`, :class:`vineyard.ObjectId` or :class:`vineyard.ObjectName` """ # Don't import the :code:`NXGraph` in top-level statments to improve the @@ -120,7 +121,9 @@ def __init__(self, session_id, incoming_data=None): graph_def = self._from_nx_graph(incoming_data) elif isinstance(incoming_data, Graph): graph_def = self._copy_from(incoming_data) - elif isinstance(incoming_data, VineyardObject): + elif isinstance( + incoming_data, (vineyard.Object, vineyard.ObjectID, vineyard.ObjectName) + ): graph_def = self._from_vineyard(incoming_data) else: raise ValueError( @@ -497,20 +500,24 @@ def _from_vineyard(self, vineyard_object): """Load a graph from a already existed vineyard graph. Args: - vineyard_object (:class:`VineyardObject`): vineyard object, which contains a graph. + vineyard_object (:class:`vineyard.Object`, :class:`vineyard.ObjectID` + or :class:`vineyard.ObjectName`): vineyard object, + which represents a graph. Returns: A graph_def. """ - if vineyard_object.object_id is not None: - return self._from_vineyard_id(vineyard_object.object_id) - elif vineyard_object.object_name is not None: - return self._from_vineyard_name(vineyard_object.object_name) + if isinstance(vineyard_object, vineyard.Object): + return self._from_vineyard_id(vineyard_object.id) + if isinstance(vineyard_object, vineyard.ObjectID): + return self._from_vineyard_id(vineyard_object) + if isinstance(vineyard_object, vineyard.ObjectName): + return self._from_vineyard_name(vineyard_object) def _from_vineyard_id(self, vineyard_id): config = {} config[types_pb2.IS_FROM_VINEYARD_ID] = b_to_attr(True) - config[types_pb2.VINEYARD_ID] = i_to_attr(vineyard_id) + config[types_pb2.VINEYARD_ID] = i_to_attr(int(vineyard_id)) # FIXME(hetao) hardcode oid/vid type for codegen, when loading from vineyard # # the metadata should be retrived from vineyard @@ -523,7 +530,7 @@ def _from_vineyard_id(self, vineyard_id): def _from_vineyard_name(self, vineyard_name): config = {} config[types_pb2.IS_FROM_VINEYARD_ID] = b_to_attr(True) - config[types_pb2.VINEYARD_NAME] = s_to_attr(vineyard_name) + config[types_pb2.VINEYARD_NAME] = s_to_attr(str(vineyard_name)) # FIXME(hetao) hardcode oid/vid type for codegen, when loading from vineyard # # the metadata should be retrived from vineyard @@ -624,9 +631,7 @@ def deserialize(cls, path, sess, **kwargs): deployment=deployment, hosts=hosts, ) - return cls( - sess.session_id, VineyardObject(object_id=int(vineyard.ObjectID(graph_id))) - ) + return cls(sess.session_id, vineyard.ObjectID(graph_id)) def draw(self, vertices, hop=1): """Visualize the graph data in the result cell when the draw functions are invoked diff --git a/python/graphscope/framework/graph_utils.py b/python/graphscope/framework/graph_utils.py index f0f0680ae3b6..615b0f75a385 100644 --- a/python/graphscope/framework/graph_utils.py +++ b/python/graphscope/framework/graph_utils.py @@ -26,6 +26,7 @@ import numpy as np import pandas as pd +import vineyard from graphscope.client.session import get_default_session from graphscope.framework import dag_utils @@ -34,13 +35,23 @@ from graphscope.framework.errors import check_argument from graphscope.framework.graph import Graph from graphscope.framework.loader import Loader -from graphscope.framework.vineyard_object import VineyardObject from graphscope.proto import attr_value_pb2 from graphscope.proto import types_pb2 __all__ = ["load_from"] -LoaderVariants = Union[Loader, str, Sequence[np.ndarray], pd.DataFrame, VineyardObject] + +VineyardObjectTypes = (vineyard.Object, vineyard.ObjectID, vineyard.ObjectName) + +LoaderVariants = Union[ + Loader, + str, + Sequence[np.ndarray], + pd.DataFrame, + vineyard.Object, + vineyard.ObjectID, + vineyard.ObjectName, +] class VertexLabel(object): @@ -405,7 +416,7 @@ def normalize_parameter_edges( """ def process_sub_label(items): - if isinstance(items, (Loader, str, pd.DataFrame, VineyardObject)): + if isinstance(items, (Loader, str, pd.DataFrame, *VineyardObjectTypes)): return EdgeSubLabel(items, properties=None, source=None, destination=None) elif isinstance(items, Sequence): if all([isinstance(item, np.ndarray) for item in items]): @@ -422,11 +433,11 @@ def process_sub_label(items): def process_label(label, items): e_label = EdgeLabel(label) - if isinstance(items, (Loader, str, pd.DataFrame, VineyardObject)): + if isinstance(items, (Loader, str, pd.DataFrame, *VineyardObjectTypes)): e_label.add_sub_label(process_sub_label(items)) elif isinstance(items, Sequence): if isinstance( - items[0], (Loader, str, pd.DataFrame, VineyardObject, np.ndarray) + items[0], (Loader, str, pd.DataFrame, *VineyardObjectTypes, np.ndarray) ): e_label.add_sub_label(process_sub_label(items)) else: @@ -466,7 +477,7 @@ def normalize_parameter_vertices( """ def process_label(label, items): - if isinstance(items, (Loader, str, pd.DataFrame, VineyardObject)): + if isinstance(items, (Loader, str, pd.DataFrame, *VineyardObjectTypes)): return VertexLabel(label=label, loader=items) elif isinstance(items, Sequence): if all([isinstance(item, np.ndarray) for item in items]): @@ -606,7 +617,7 @@ def load_from( sess = get_default_session() if sess is None: raise ValueError("No default session found.") - if isinstance(edges, (Graph, nx.Graph, VineyardObject)): + if isinstance(edges, (Graph, nx.Graph, *VineyardObjectTypes)): return Graph(sess.session_id, edges) oid_type = utils.normalize_data_type_str(oid_type) e_labels = normalize_parameter_edges(edges) diff --git a/python/graphscope/framework/loader.py b/python/graphscope/framework/loader.py index f58b1f6537f9..004d366c9e73 100644 --- a/python/graphscope/framework/loader.py +++ b/python/graphscope/framework/loader.py @@ -28,7 +28,6 @@ from graphscope.client.session import get_default_session from graphscope.framework import utils from graphscope.framework.errors import check_argument -from graphscope.framework.vineyard_object import VineyardObject from graphscope.proto import attr_value_pb2 from graphscope.proto import types_pb2 @@ -173,9 +172,9 @@ def resolve(self, source): self.process_location(source) elif isinstance(source, pd.DataFrame): self.process_pandas(source) - elif isinstance(source, VineyardObject): - self.process_vy_object(source) - elif vineyard is not None and isinstance(source, vineyard.ObjectID): + elif vineyard is not None and isinstance( + source, (vineyard.Object, vineyard.ObjectID, vineyard.ObjectName) + ): self.process_vy_object(source) elif isinstance(source, Sequence): # Assume a list of numpy array are passed as COO matrix, with length >= 2. @@ -293,10 +292,17 @@ def finish(self): def process_vy_object(self, source): self.protocol = "vineyard" - if isinstance(source, vineyard.ObjectID): - self.source = repr(source) + # encoding: add a `o` prefix to object id, and a `s` prefix to object name. + if isinstance(source, vineyard.Object): + self.source = "o%s" % repr(source.id) + elif isinstance(source, vineyard.ObjectID): + self.source = "o%s" % repr(source) + elif isinstance(source, vineyard.ObjectName): + self.source = "s%s" % str(source) else: - self.source = repr(source.object_id) + raise ValueError( + "Invalid input source: not a vineyard's Object, ObjectID or ObjectName" + ) def select_columns(self, columns: Sequence[Tuple[str, int]], include_all=False): for name, data_type in columns: diff --git a/python/graphscope/framework/vineyard_object.py b/python/graphscope/framework/vineyard_object.py deleted file mode 100644 index 541d80816b80..000000000000 --- a/python/graphscope/framework/vineyard_object.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# -# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -class VineyardObject: - """A vineyard object may hold a id, or a name. - - Attributes: - object_id - object_name - """ - - def __init__(self, object_id=None, object_name=None): - self._object_id = object_id - self._object_name = object_name - - @property - def object_id(self): - return self._object_id - - @object_id.setter - def object_id(self, object_id): - self._object_id = object_id - - @property - def object_name(self): - return self._object_name - - @object_name.setter - def object_name(self, object_name): - self._object_name = object_name diff --git a/python/graphscope/interactive/query.py b/python/graphscope/interactive/query.py index a160fbf29e7c..bc4aac33dccb 100644 --- a/python/graphscope/interactive/query.py +++ b/python/graphscope/interactive/query.py @@ -98,22 +98,12 @@ def subgraph(self, gremlin_script): def load_subgraph(name): import vineyard - host, port = self._graphscope_session.info["engine_config"][ - "vineyard_rpc_endpoint" - ].split(":") - client = vineyard.connect(host, int(port)) - - # get vertex/edge stream id - vstream = client.get_name("__%s_vertex_stream" % name, True) - estream = client.get_name("__%s_edge_stream" % name, True) - # invoke load_from g = self._graphscope_session.load_from( - edges=[Loader(estream)], - vertices=[Loader(vstream)], + edges=[Loader(vineyard.ObjectName("__%s_edge_stream" % name))], + vertices=[Loader(vineyard.ObjectName("__%s_vertex_stream" % name))], generate_eid=False, ) - client.put_name(vineyard.ObjectID(g.vineyard_id), graph_name) logger.info("subgraph has been loaded") return g diff --git a/python/requirements.txt b/python/requirements.txt index f955fe860029..602dbca85355 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -10,5 +10,5 @@ pandas protobuf>=3.12.0 PyYAML scipy -vineyard==0.1.8 -vineyard-io==0.1.8 +vineyard==0.1.11 +vineyard-io==0.1.11 diff --git a/python/tests/test_create_graph.py b/python/tests/test_create_graph.py index b0af3cd8b13c..00fdf32fc358 100644 --- a/python/tests/test_create_graph.py +++ b/python/tests/test_create_graph.py @@ -21,11 +21,11 @@ import numpy as np import pandas as pd import pytest +import vineyard import graphscope from graphscope.framework.errors import AnalyticalEngineInternalError from graphscope.framework.loader import Loader -from graphscope.framework.vineyard_object import VineyardObject @pytest.fixture @@ -461,9 +461,7 @@ def test_errors_on_file_format( with pytest.raises(AnalyticalEngineInternalError, match="End Of File"): g3 = graphscope_session.load_from(edges=empty_file) with pytest.raises(AnalyticalEngineInternalError, match="Object not exists"): - g4 = graphscope_session.load_from( - VineyardObject(object_name="non_exist_vy_name") - ) + g4 = graphscope_session.load_from(vineyard.ObjectName("non_exist_vy_name")) def test_error_on_non_existing_load_strategy( diff --git a/python/tests/test_graph.py b/python/tests/test_graph.py index 6b6dd23817b7..8b161f150953 100644 --- a/python/tests/test_graph.py +++ b/python/tests/test_graph.py @@ -21,6 +21,7 @@ import numpy as np import pytest +import vineyard import graphscope from graphscope import property_sssp @@ -29,7 +30,6 @@ from graphscope.framework.errors import InvalidArgumentError from graphscope.framework.graph import Graph from graphscope.framework.loader import Loader -from graphscope.framework.vineyard_object import VineyardObject from graphscope.proto import types_pb2 logger = logging.getLogger("graphscope") @@ -51,7 +51,7 @@ def test_load_graph_copy(graphscope_session, arrow_property_graph): g2.unload() assert not g2.loaded() # test load from vineyard's graph - g3 = graphscope_session.load_from(VineyardObject(object_id=g.vineyard_id)) + g3 = graphscope_session.load_from(vineyard.ObjectID(g.vineyard_id)) assert g3.loaded()