From 9e9d37e740537aa09495c5c1b767b618611dba1d Mon Sep 17 00:00:00 2001 From: Yongmin Hu <840428827@qq.com> Date: Tue, 12 Jan 2021 11:27:27 +0800 Subject: [PATCH] add app kshell (#66) * add app kshell * providing kshell algorithm in python interface * fix clang-format error * python format * update k_shell.py * trigger GitHub actions --- analytical_engine/apps/kshell/kshell.h | 138 ++++++++++++++++++ .../apps/kshell/kshell_context.h | 84 +++++++++++ analytical_engine/test/run_app.cc | 2 + analytical_engine/test/run_app.h | 11 ++ .../gscoordinator/builtin/app/.gs_conf.yaml | 6 + python/graphscope/analytical/app/__init__.py | 1 + python/graphscope/analytical/app/k_shell.py | 52 +++++++ python/tests/conftest.py | 6 + python/tests/test_app.py | 20 +++ 9 files changed, 320 insertions(+) create mode 100644 analytical_engine/apps/kshell/kshell.h create mode 100644 analytical_engine/apps/kshell/kshell_context.h create mode 100644 python/graphscope/analytical/app/k_shell.py diff --git a/analytical_engine/apps/kshell/kshell.h b/analytical_engine/apps/kshell/kshell.h new file mode 100644 index 000000000000..3b83913a2ae2 --- /dev/null +++ b/analytical_engine/apps/kshell/kshell.h @@ -0,0 +1,138 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANALYTICAL_ENGINE_APPS_KSHELL_KSHELL_H_ +#define ANALYTICAL_ENGINE_APPS_KSHELL_KSHELL_H_ + +#include +#include +#include + +#include "kshell/kshell_context.h" + +namespace gs { +/** + * @brief Get a subgraph induced by nodes with core number k. + * That is, nodes in the k-core that are not in the (k+1)-core. + * @tparam FRAG_T + */ +template +class KShell : public grape::ParallelAppBase>, + public grape::ParallelEngine, + public grape::Communicator { + public: + INSTALL_PARALLEL_WORKER(KShell, KShellContext, FRAG_T) + static constexpr grape::MessageStrategy message_strategy = + grape::MessageStrategy::kSyncOnOuterVertex; + static constexpr grape::LoadStrategy load_strategy = + grape::LoadStrategy::kBothOutIn; + using vertex_t = typename fragment_t::vertex_t; + using vid_t = typename FRAG_T::vid_t; + + void UpdateDegree(const fragment_t& frag, + const grape::DenseVertexSet& frontier, + typename FRAG_T::template vertex_array_t< + std::shared_ptr>& degrees) { + ForEach(frontier, [°rees, &frag](int tid, vertex_t u) { + for (auto& e : frag.GetOutgoingAdjList(u)) { + auto v = e.get_neighbor(); + degrees[v]->operator--(); + } + + degrees[u]->store(0); + }); + } + + void PEval(const fragment_t& frag, context_t& ctx, + message_manager_t& messages) { + messages.InitChannels(thread_num()); + // we put all computing logic in IncEval + messages.ForceContinue(); + } + + void IncEval(const fragment_t& frag, context_t& ctx, + message_manager_t& messages) { + int thrd_num = thread_num(); + auto outer_vertices = frag.OuterVertices(); + auto& remaining_vertices = ctx.remaining_vertices; + auto& next_remaining_vertices = ctx.next_remaining_vertices; + auto& to_remove_vertices_k = ctx.to_remove_vertices_k; + auto& to_remove_vertices_inc = ctx.to_remove_vertices_inc; + auto& curr_k = ctx.curr_k; + auto& degrees = ctx.degrees; + + messages.ParallelProcess( + thrd_num, frag, + [°rees](int tid, vertex_t v, int32_t msg) { *degrees[v] += msg; }); + + // remove vertices which degree less or equal than curr_k + ForEach(remaining_vertices, [&to_remove_vertices_k, &to_remove_vertices_inc, + °rees, curr_k](int tid, vertex_t v) { + if (degrees[v]->load() <= curr_k) { + to_remove_vertices_k.Insert(v); + to_remove_vertices_inc.Insert(v); + } + }); + + // keep vertices which degree greater than currk + ForEach(remaining_vertices, + [&next_remaining_vertices, °rees, curr_k](int tid, vertex_t v) { + if (degrees[v]->load() > curr_k) { + next_remaining_vertices.Insert(v); + } + }); + + UpdateDegree(frag, to_remove_vertices_inc, degrees); + + ForEach(outer_vertices, [&frag, °rees, &messages](int tid, vertex_t v) { + int degree = degrees[v]->load(); + if (degree != 0) { + messages.Channels()[tid].SyncStateOnOuterVertex( + frag, v, degree); + degrees[v]->store(0); + } + }); + + bool curr_k_changed = false; + size_t global_removed_inc_count = 0; + Sum(to_remove_vertices_inc.Count(), global_removed_inc_count); + if (global_removed_inc_count == 0) { + curr_k++; + curr_k_changed = true; + } + + to_remove_vertices_inc.Clear(); + remaining_vertices.Clear(); + remaining_vertices.Swap(next_remaining_vertices); + + if (curr_k > ctx.k) { + auto inner_vertices = frag.InnerVertices(); + + for (auto v : inner_vertices) { + ctx.data()[v] = to_remove_vertices_k.Exist(v) ? 1 : 0; + } + return; + } + + if (curr_k_changed) { + to_remove_vertices_k.Clear(); + } + + messages.ForceContinue(); + } +}; +}; // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_KSHELL_KSHELL_H_ diff --git a/analytical_engine/apps/kshell/kshell_context.h b/analytical_engine/apps/kshell/kshell_context.h new file mode 100644 index 000000000000..120754195e93 --- /dev/null +++ b/analytical_engine/apps/kshell/kshell_context.h @@ -0,0 +1,84 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef ANALYTICAL_ENGINE_APPS_KSHELL_KSHELL_CONTEXT_H_ +#define ANALYTICAL_ENGINE_APPS_KSHELL_KSHELL_CONTEXT_H_ + +#include +#include +#include +#include +#include +#include + +#include "grape/grape.h" + +namespace gs { + +template +class KShellContext + : public grape::VertexDataContext { + public: + using oid_t = typename FRAG_T::oid_t; + using vid_t = typename FRAG_T::vid_t; + + explicit KShellContext(const FRAG_T& fragment) + : grape::VertexDataContext(fragment) {} + + typename FRAG_T::template vertex_array_t> + degrees; + grape::DenseVertexSet to_remove_vertices_k, to_remove_vertices_inc, + remaining_vertices, next_remaining_vertices; + int k; + int curr_k; + + void Init(grape::ParallelMessageManager& messages, int k) { + auto& frag = this->fragment(); + auto vertices = frag.Vertices(); + auto inner_vertices = frag.InnerVertices(); + + degrees.Init(vertices); + to_remove_vertices_k.Init(inner_vertices); + to_remove_vertices_inc.Init(inner_vertices); + remaining_vertices.Init(inner_vertices); + next_remaining_vertices.Init(inner_vertices); + this->k = k; + curr_k = 0; + + for (auto& v : vertices) { + degrees[v] = std::make_shared(0); + if (frag.IsInnerVertex(v)) { + remaining_vertices.Insert(v); + degrees[v]->store(frag.GetLocalOutDegree(v)); + } else { + degrees[v]->store(0); + } + } + } + + void Output(std::ostream& os) override { + auto& frag = this->fragment(); + auto inner_vertices = frag.InnerVertices(); + + for (auto& v : inner_vertices) { + if (to_remove_vertices_k.Exist(v)) { + os << frag.GetId(v) << '\n'; + } + } + } +}; +} // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_KSHELL_KSHELL_CONTEXT_H_ diff --git a/analytical_engine/test/run_app.cc b/analytical_engine/test/run_app.cc index e74e35b0004f..4db08055ab73 100644 --- a/analytical_engine/test/run_app.cc +++ b/analytical_engine/test/run_app.cc @@ -44,6 +44,8 @@ DEFINE_bool(hits_normalized, true, DEFINE_int32(kcore_k, 3, "The order of the core"); +DEFINE_int32(kshell_k, 3, "The order of the shell"); + DEFINE_double(katz_centrality_alpha, 0.1, "Attenuation factor"); DEFINE_double(katz_centrality_beta, 1.0, "Weight attributed to the immediate neighborhood."); diff --git a/analytical_engine/test/run_app.h b/analytical_engine/test/run_app.h index ecf15e33cf5d..e7b81b7c7336 100644 --- a/analytical_engine/test/run_app.h +++ b/analytical_engine/test/run_app.h @@ -60,6 +60,7 @@ limitations under the License. #include "apps/dfs/dfs.h" #include "apps/hits/hits.h" #include "apps/kcore/kcore.h" +#include "apps/kshell/kshell.h" #include "apps/sssp/sssp_average_length.h" #include "apps/sssp/sssp_has_path.h" #include "apps/sssp/sssp_path.h" @@ -86,6 +87,8 @@ DECLARE_bool(hits_normalized); DECLARE_int32(kcore_k); +DECLARE_int32(kshell_k); + DECLARE_double(katz_centrality_alpha); DECLARE_double(katz_centrality_beta); DECLARE_double(katz_centrality_tolerance); @@ -324,6 +327,14 @@ void Run() { CreateAndQuery(comm_spec, efile, vfile, out_prefix, FLAGS_datasource, fnum, spec, FLAGS_kcore_k); + } else if (name == "kshell") { + using GraphType = + grape::ImmutableEdgecutFragment; + using AppType = KShell; + CreateAndQuery(comm_spec, efile, vfile, out_prefix, + FLAGS_datasource, fnum, spec, + FLAGS_kshell_k); } else if (name == "hits") { using GraphType = grape::ImmutableEdgecutFragment