Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a general project operator. #211

Merged
merged 3 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.12
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.14
options:
--shm-size 4096m
strategy:
Expand Down Expand Up @@ -135,7 +135,7 @@ jobs:
defaults:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.12
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.14
steps:
- name: Install Dependencies
run: |
Expand Down Expand Up @@ -175,7 +175,7 @@ jobs:
defaults:
run:
shell: scl enable devtoolset-7 -- bash --noprofile --norc -eo pipefail {0}
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.12
container: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.1.14
steps:
- name: Install Dependencies
run: |
Expand Down
2 changes: 1 addition & 1 deletion README-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ papers = interactive.execute("g.V().has('author', 'id', 2).out('writes').where(_
sub_graph = interactive.subgraph("g.V().has('year', inside(2014, 2020)).outE('cites')")

# project the projected graph to simple graph.
simple_g = sub_graph.project_to_simple(v_label="paper", e_label="cites")
simple_g = sub_graph.project(vertices={"paper": []}, edges={"cites": []})

ret1 = graphscope.k_core(simple_g, k=5)
ret2 = graphscope.triangles(simple_g)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ Please note that many algorithms may only work on *homogeneous* graphs, and ther
sub_graph = interactive.subgraph("g.V().has('year', inside(2014, 2020)).outE('cites')")

# project the projected graph to simple graph.
simple_g = sub_graph.project_to_simple(v_label="paper", e_label="cites")
simple_g = sub_graph.project(vertices={"paper": []}, edges={"cites": []})

ret1 = graphscope.k_core(simple_g, k=5)
ret2 = graphscope.triangles(simple_g)
Expand Down
2 changes: 1 addition & 1 deletion analytical_engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ endif ()
find_package(libgrapelite REQUIRED)
include_directories(${LIBGRAPELITE_INCLUDE_DIRS})

find_package(vineyard 0.1.12 REQUIRED)
find_package(vineyard 0.1.14 REQUIRED)
include_directories(${VINEYARD_INCLUDE_DIRS})
add_compile_options(-DENABLE_SELECTOR)

Expand Down
28 changes: 28 additions & 0 deletions analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "core/fragment/dynamic_fragment.h"
#include "core/fragment/dynamic_fragment_reporter.h"
#include "core/grape_instance.h"
#include "core/io/property_parser.h"
#include "core/launcher.h"
#include "core/object/app_entry.h"
#include "core/object/graph_utils.h"
Expand Down Expand Up @@ -154,6 +155,28 @@ bl::result<void> GrapeInstance::unloadApp(const rpc::GSParams& params) {

bl::result<rpc::GraphDef> GrapeInstance::projectGraph(
const rpc::GSParams& params) {
BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME));
BOOST_LEAF_AUTO(project_infos, gs::ParseProjectPropertyGraph(params));
BOOST_LEAF_AUTO(
frag_wrapper,
object_manager_.GetObject<ILabeledFragmentWrapper>(graph_name));

if (frag_wrapper->graph_def().graph_type() != rpc::ARROW_PROPERTY) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"projectGraph is only available for ArrowFragment");
}

std::string dst_graph_name = "graph_" + generateId();

BOOST_LEAF_AUTO(new_frag_wrapper,
frag_wrapper->Project(comm_spec_, dst_graph_name,
project_infos[0], project_infos[1]));
BOOST_LEAF_CHECK(object_manager_.PutObject(new_frag_wrapper));
return new_frag_wrapper->graph_def();
}

bl::result<rpc::GraphDef> GrapeInstance::projectToSimple(
const rpc::GSParams& params) {
std::string projected_id = "graph_projected_" + generateId();
BOOST_LEAF_AUTO(graph_name, params.Get<std::string>(rpc::GRAPH_NAME));
BOOST_LEAF_AUTO(type_sig, params.Get<std::string>(rpc::TYPE_SIGNATURE));
Expand Down Expand Up @@ -746,6 +769,11 @@ bl::result<std::shared_ptr<DispatchResult>> GrapeInstance::OnReceive(
r->set_graph_def(graph_def);
break;
}
case rpc::PROJECT_TO_SIMPLE: {
BOOST_LEAF_AUTO(graph_def, projectToSimple(params));
r->set_graph_def(graph_def);
break;
}
case rpc::MODIFY_VERTICES: {
#ifdef EXPERIMENTAL_ON
std::vector<std::string> vertices_to_modify;
Expand Down
2 changes: 2 additions & 0 deletions analytical_engine/core/grape_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class GrapeInstance : public Subscriber {

bl::result<rpc::GraphDef> projectGraph(const rpc::GSParams& params);

bl::result<rpc::GraphDef> projectToSimple(const rpc::GSParams& params);

bl::result<void> modifyVertices(const rpc::GSParams& params,
const std::vector<std::string>& vertices);

Expand Down
27 changes: 27 additions & 0 deletions analytical_engine/core/io/property_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,33 @@ inline bl::result<std::shared_ptr<detail::Graph>> ParseCreatePropertyGraph(
return graph;
}

inline bl::result<std::vector<std::map<int, std::vector<int>>>>
ParseProjectPropertyGraph(const gs::rpc::GSParams& params) {
BOOST_LEAF_AUTO(list, params.Get<rpc::AttrValue_ListValue>(
rpc::ARROW_PROPERTY_DEFINITION));
auto& items = list.func();
std::map<int, std::vector<int>> vertices, edges;
CHECK_EQ(items.size(), 2);
{
auto item = items[0];
for (auto& pair : item.attr()) {
auto props = pair.second.list().i();
vertices[pair.first] = {props.begin(), props.end()};
}
}
{
auto item = items[1];
for (auto& pair : item.attr()) {
auto props = pair.second.list().i();
edges[pair.first] = {props.begin(), props.end()};
}
}
std::vector<std::map<int, std::vector<int>>> res;
res.push_back(vertices);
res.push_back(edges);
return res;
}

} // namespace gs

#endif // ANALYTICAL_ENGINE_CORE_IO_PROPERTY_PARSER_H_
74 changes: 66 additions & 8 deletions analytical_engine/core/object/fragment_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,31 @@ class FragmentWrapper<vineyard::ArrowFragment<OID_T, VID_T>>
return std::dynamic_pointer_cast<IFragmentWrapper>(wrapper);
}

bl::result<std::shared_ptr<ILabeledFragmentWrapper>> Project(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::map<int, std::vector<int>>& vertices,
const std::map<int, std::vector<int>>& edges) override {
auto& meta = fragment_->meta();
auto* client = dynamic_cast<vineyard::Client*>(meta.GetClient());
BOOST_LEAF_AUTO(new_frag_id, fragment_->Project(*client, vertices, edges));
VINEYARD_CHECK_OK(client->Persist(new_frag_id));
BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup(
*client, new_frag_id, comm_spec));
auto new_frag = client->GetObject<fragment_t>(new_frag_id);

rpc::GraphDef new_graph_def;

new_graph_def.set_key(dst_graph_name);
new_graph_def.set_vineyard_id(frag_group_id);
new_graph_def.set_generate_eid(graph_def_.generate_eid());

set_graph_def(new_frag, new_graph_def);

auto wrapper = std::make_shared<FragmentWrapper<fragment_t>>(
dst_graph_name, new_graph_def, new_frag);
return std::dynamic_pointer_cast<ILabeledFragmentWrapper>(wrapper);
}

bl::result<std::shared_ptr<ILabeledFragmentWrapper>> AddColumn(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
std::shared_ptr<IContextWrapper>& ctx_wrapper,
Expand Down Expand Up @@ -143,14 +168,6 @@ class FragmentWrapper<vineyard::ArrowFragment<OID_T, VID_T>>
->vertex_map_id();
}

if (vm_id_from_ctx != fragment_->vertex_map_id()) {
RETURN_GS_ERROR(
vineyard::ErrorCode::kIllegalStateError,
"ctx holds a vertex map id = " + std::to_string(vm_id_from_ctx) +
", but the vertex map id of fragment is " +
std::to_string(fragment_->vertex_map_id()));
}

std::map<label_id_t,
std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
columns;
Expand Down Expand Up @@ -197,6 +214,47 @@ class FragmentWrapper<vineyard::ArrowFragment<OID_T, VID_T>>
BOOST_LEAF_ASSIGN(columns,
vp_ctx_wrapper->ToArrowArrays(comm_spec, selectors));
}
vineyard::ObjectMeta ctx_meta, cur_meta;
VINEYARD_CHECK_OK(client->GetMetaData(vm_id_from_ctx, ctx_meta));
VINEYARD_CHECK_OK(
client->GetMetaData(fragment_->vertex_map_id(), cur_meta));
auto ctx_fnum = ctx_meta.GetKeyValue<fid_t>("fnum");
auto cur_fnum = cur_meta.GetKeyValue<fid_t>("fnum");
if (ctx_fnum != cur_fnum) {
RETURN_GS_ERROR(
vineyard::ErrorCode::kIllegalStateError,
"Fragment number of context differ from the destination fragment");
}

for (const auto& pair : columns) {
if (fragment_->schema().GetVertexLabelName(pair.first).empty()) {
RETURN_GS_ERROR(vineyard::ErrorCode::kIllegalStateError,
"Label id " + std::to_string(pair.first) +
" is invalid in the destination fragment");
}
for (fid_t i = 0; i < cur_fnum; ++i) {
auto name =
"o2g_" + std::to_string(i) + "_" + std::to_string(pair.first);
auto id_in_ctx = ctx_meta.GetMemberMeta(name).GetId();
auto id_in_cur = cur_meta.GetMemberMeta(name).GetId();
if (id_in_ctx != id_in_cur) {
RETURN_GS_ERROR(vineyard::ErrorCode::kIllegalStateError,
"Vertex datastructure " + name +
"in context differ from vertex map of the "
"destination fragment");
}
name = "oid_arrays_" + std::to_string(i) + "_" +
std::to_string(pair.first);
id_in_ctx = ctx_meta.GetMemberMeta(name).GetId();
id_in_cur = cur_meta.GetMemberMeta(name).GetId();
if (id_in_ctx != id_in_cur) {
RETURN_GS_ERROR(vineyard::ErrorCode::kIllegalStateError,
"Vertex datastructure " + name +
"in context differ from vertex map of the "
"destionation fragment");
}
}
}

auto new_frag_id = fragment_->AddVertexColumns(*client, columns);

Expand Down
5 changes: 5 additions & 0 deletions analytical_engine/core/object/i_fragment_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ class ILabeledFragmentWrapper : public IFragmentWrapper {
explicit ILabeledFragmentWrapper(std::string id)
: IFragmentWrapper(std::move(id), ObjectType::kLabeledFragmentWrapper) {}

virtual bl::result<std::shared_ptr<ILabeledFragmentWrapper>> Project(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::map<int, std::vector<int>>& vertices,
const std::map<int, std::vector<int>>& edges) = 0;

virtual bl::result<std::shared_ptr<ILabeledFragmentWrapper>> AddColumn(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
std::shared_ptr<IContextWrapper>& ctx_wrapper,
Expand Down
11 changes: 6 additions & 5 deletions analytical_engine/frame/project_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
/**
* project_frame.cc serves as a frame to be compiled with
* ArrowProjectedFragment/DynamicProjectedFragment. The frame will be compiled
* when the client issues a PROJECT_GRAPH request. Then, a library will be
* when the client issues a PROJECT_TO_SIMPLE request. Then, a library will be
* produced based on the frame. The reason we need the frame is the template
* parameters are unknown before the project request has arrived at the
* analytical engine. A dynamic library is necessary to prevent hardcode data
Expand All @@ -42,10 +42,11 @@
namespace gs {

template <typename FRAG_T>
class ProjectFrame {};
class ProjectSimpleFrame {};

template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T>
class ProjectFrame<gs::ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>> {
class ProjectSimpleFrame<
gs::ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>> {
using fragment_t = vineyard::ArrowFragment<OID_T, VID_T>;
using projected_fragment_t =
gs::ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>;
Expand Down Expand Up @@ -123,7 +124,7 @@ class ProjectFrame<gs::ArrowProjectedFragment<OID_T, VID_T, VDATA_T, EDATA_T>> {

#ifdef EXPERIMENTAL_ON
template <typename VDATA_T, typename EDATA_T>
class ProjectFrame<gs::DynamicProjectedFragment<VDATA_T, EDATA_T>> {
class ProjectSimpleFrame<gs::DynamicProjectedFragment<VDATA_T, EDATA_T>> {
using fragment_t = gs::DynamicFragment;
using projected_fragment_t = gs::DynamicProjectedFragment<VDATA_T, EDATA_T>;

Expand Down Expand Up @@ -171,7 +172,7 @@ void Project(
std::shared_ptr<gs::IFragmentWrapper>& wrapper_in,
const std::string& projected_graph_name, const gs::rpc::GSParams& params,
gs::bl::result<std::shared_ptr<gs::IFragmentWrapper>>& wrapper_out) {
wrapper_out = gs::ProjectFrame<_PROJECTED_GRAPH_TYPE>::Project(
wrapper_out = gs::ProjectSimpleFrame<_PROJECTED_GRAPH_TYPE>::Project(
wrapper_in, projected_graph_name, params);
}

Expand Down
2 changes: 1 addition & 1 deletion coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def RunStep(self, request, context): # noqa: C901
and op.attr[types_pb2.GRAPH_TYPE].graph_type == types_pb2.ARROW_PROPERTY
)
or op.op == types_pb2.TRANSFORM_GRAPH
or op.op == types_pb2.PROJECT_GRAPH
or op.op == types_pb2.PROJECT_TO_SIMPLE
or op.op == types_pb2.ADD_EDGES
or op.op == types_pb2.ADD_VERTICES
):
Expand Down
2 changes: 1 addition & 1 deletion coordinator/gscoordinator/template/CMakeLists.template
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ set(CMAKE_THREAD_PREFER_PTHREAD ON)
find_package(Threads REQUIRED)

# find vineyard-----------------------------------------------------------------
find_package(vineyard 0.1.12 REQUIRED)
find_package(vineyard 0.1.14 REQUIRED)
include_directories(${VINEYARD_INCLUDE_DIRS})
add_compile_options(-DENABLE_SELECTOR)

Expand Down
2 changes: 1 addition & 1 deletion demo/node_classification_on_citation.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
")\n",
"\n",
"# project the subgraph to simple graph by selecting papers and their citations.\n",
"simple_g = sub_graph.project_to_simple(v_label=\"paper\", e_label=\"cites\")\n",
"simple_g = sub_graph.project(vertices={\"paper\": []}, edges={\"cites\": []})\n",
"\n",
"# compute the kcore and triangle-counting.\n",
"kc_result = graphscope.k_core(simple_g, k=5)\n",
Expand Down
4 changes: 2 additions & 2 deletions docs/analytics_engine.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Built-in algorithms can be easily invoked over loaded graphs. For example,

# some other algorithms may only support evaluate on simple graph
# hence we need to generate one by selecting a kind of vertices and edges.
simple_g = g.project_to_simple(v_label="users", e_label="follows")
simple_g = g.project(vertices={"users": []}, edges={"follows": []})
result_pr = pagerank(simple_g)

A full-list of builtin algorithms is shown as below. Whether an algorithm supports
Expand Down Expand Up @@ -91,7 +91,7 @@ the graph as a new property (column) of the vertices(edges).

.. code:: python

simple_g = sub_graph.project_to_simple(vlabel="paper", elabel="cites")
simple_g = g.project(vertices={"paper": []}, edges={"sites": []})
ret = graphscope.kcore(simple_g, k=5)

# add the results as new columns to the citation graph
Expand Down
2 changes: 1 addition & 1 deletion docs/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ to generate the structural features of each paper node.
sub_graph = interactive.subgraph("g.V().has('year', inside(2014, 2020)).outE('cites')")

# project the projected graph to simple graph.
simple_g = sub_graph.project_to_simple(vlabel="paper", elabel="cites")
simple_g = sub_graph.project(vertices={"paper": []}, edges={"cites": []})

ret1 = graphscope.kcore(simple_g, k=5)
ret2 = graphscope.triangle(simple_g)
Expand Down
Loading