From 5340bbd32e77429301d6441468496ed7ef1cbe6c Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Mon, 8 Mar 2021 01:33:13 +0800 Subject: [PATCH 01/18] init louvain add louvain context and vertex add louvain test update update fix fix fix update update update compiled update runnable runable update update add louvain to client update adapt to ArrowProjectedFragment sync community after comm ready vd_t be oid_t use SendToFragment runnable of p2p n = 4 fix use parallel worker fix sendp2p and aggregate fix pallell workers update update cleanup parallel processing messages revise parallel process messages clean up code remove halted aggregator edge weight as double format update update fix update revise code style remove unnecessary virtual declare update --- analytical_engine/CMakeLists.txt | 6 +- .../apps/pregel/louvain/auxiliary.h | 172 +++++++ .../apps/pregel/louvain/louvain.h | 432 ++++++++++++++++++ .../apps/pregel/louvain/louvain_app_base.h | 329 +++++++++++++ .../apps/pregel/louvain/louvain_context.h | 154 +++++++ .../apps/pregel/louvain/louvain_vertex.h | 138 ++++++ .../core/app/pregel/pregel_app_base.h | 78 +++- .../core/app/pregel/pregel_compute_context.h | 100 +++- .../core/app/pregel/pregel_vertex.h | 24 +- .../core/fragment/arrow_projected_fragment.h | 2 + .../gscoordinator/builtin/app/.gs_conf.yaml | 8 + python/graphscope/analytical/app/__init__.py | 1 + python/graphscope/analytical/app/louvain.py | 61 +++ python/tests/test_app.py | 4 + 14 files changed, 1473 insertions(+), 36 deletions(-) create mode 100644 analytical_engine/apps/pregel/louvain/auxiliary.h create mode 100644 analytical_engine/apps/pregel/louvain/louvain.h create mode 100644 analytical_engine/apps/pregel/louvain/louvain_app_base.h create mode 100644 analytical_engine/apps/pregel/louvain/louvain_context.h create mode 100644 analytical_engine/apps/pregel/louvain/louvain_vertex.h create mode 100644 python/graphscope/analytical/app/louvain.py diff --git a/analytical_engine/CMakeLists.txt b/analytical_engine/CMakeLists.txt index 4e88741f73c0..ff0623414525 100644 --- a/analytical_engine/CMakeLists.txt +++ b/analytical_engine/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 2.8) +cmake_minimum_required(VERSION 3.1) if ("${GRAPHSCOPE_VERSION}" STREQUAL "") set(GRAPHSCOPE_ANALYTICAL_MAJOR_VERSION 0) @@ -45,7 +45,9 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) include_directories(${PROJECT_SOURCE_DIR}) # Set flags -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -Wall") +set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c99") if (APPLE) set(CMAKE_MACOSX_RPATH ON) diff --git a/analytical_engine/apps/pregel/louvain/auxiliary.h b/analytical_engine/apps/pregel/louvain/auxiliary.h new file mode 100644 index 000000000000..c740a4c4e066 --- /dev/null +++ b/analytical_engine/apps/pregel/louvain/auxiliary.h @@ -0,0 +1,172 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_AUXILIARY_H_ +#define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_AUXILIARY_H_ + +#include +#include +#include +#include +#include + +#include "grape/grape.h" + +namespace gs { + +// aggregators define +constexpr char change_aggregator[] = "change_aggregator"; +constexpr char edge_weight_aggregator[] = + "total_edge_weight_aggregator"; +constexpr char actual_quality_aggregator[] = "actual_quality_aggregator"; + +// major phase of louvain +constexpr int phase_one_start_step = 0; +constexpr int phase_two_start_step = -2; +constexpr int compress_community_step = -1; +constexpr int sync_result_step = -10; +constexpr int terminate_step = -9; + +// minor step of phase 1 +constexpr int phase_one_minor_step_0 = 0; +constexpr int phase_one_minor_step_1 = 1; +constexpr int phase_one_minor_step_2 = 2; + + +template +struct LouvainNodeState { + using vid_t = VID_T; + using edata_t = double; + + vid_t community = 0; + edata_t community_sigma_total; + + // the internal edge weight of a node + edata_t internal_weight; + + // degree of the node + edata_t node_weight; + + // 1 if the node has changed communities this cycle, otherwise 0 + int64_t changed; + bool reset_total_edge_weight; + + bool is_from_louvain_vertex_reader = false; + bool use_fake_edges = false; + bool is_alived_community = true; + std::map fake_edges; + std::vector nodes_in_community; + edata_t total_edge_weight; + + LouvainNodeState() + : community(0), + community_sigma_total(0.0), + internal_weight(0.0), + node_weight(0.0), + changed(0), + reset_total_edge_weight(false), + is_from_louvain_vertex_reader(false), + use_fake_edges(false), + is_alived_community(true) {} + + ~LouvainNodeState() = default; +}; + +template +struct LouvainMessage { + using vid_t = VID_T; + using edata_t = double; + + vid_t community_id; + edata_t community_sigma_total; + edata_t edge_weight; + vid_t source_id; + vid_t dst_id; + + // For reconstruct graph info. + // Each vertex send self's meta info to its community and silence itself, + // the community compress its member's data and make self a new vertex for + // next stage. + edata_t internal_weight = 0; + std::map edges; + std::vector nodes_in_self_community; + + LouvainMessage() + : community_id(0), + community_sigma_total(0.0), + edge_weight(0.0), + source_id(0), + dst_id(0) {} + + LouvainMessage(const vid_t& community_id, edata_t community_sigma_total, + edata_t edge_weight, const vid_t& source_id, + const vid_t& dst_id) + : community_id(community_id), + community_sigma_total(community_sigma_total), + edge_weight(edge_weight), + source_id(source_id), + dst_id(dst_id) {} + + ~LouvainMessage() = default; + + // for message manager to serialize and diserialize LouvainMessage + friend grape::InArchive& operator<<(grape::InArchive& in_archive, + const LouvainMessage& u) { + in_archive << u.community_id; + in_archive << u.community_sigma_total; + in_archive << u.edge_weight; + in_archive << u.source_id; + in_archive << u.dst_id; + in_archive << u.internal_weight; + in_archive << u.edges; + in_archive << u.nodes_in_self_community; + return in_archive; + } + friend grape::OutArchive& operator>>(grape::OutArchive& out_archive, + LouvainMessage& val) { + out_archive >> val.community_id; + out_archive >> val.community_sigma_total; + out_archive >> val.edge_weight; + out_archive >> val.source_id; + out_archive >> val.dst_id; + out_archive >> val.internal_weight; + out_archive >> val.edges; + out_archive >> val.nodes_in_self_community; + return out_archive; + } +}; + +// util function to decide wether we should halt the phase. +bool decide_to_halt(const std::vector& history, int tolerance, + int min_progress) { + // halt if the most recent change was 0 + if (0 == history.back()) { + return true; + } + // halt if the change count has increased tolerance times + int64_t previous = history.front(); + int count = 0; + for (const auto& cur : history) { + if (cur >= previous - min_progress) { + count++; + } + previous = cur; + } + return (count > tolerance); +} + +} // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_AUXILIARY_H_ diff --git a/analytical_engine/apps/pregel/louvain/louvain.h b/analytical_engine/apps/pregel/louvain/louvain.h new file mode 100644 index 000000000000..8194012bdf29 --- /dev/null +++ b/analytical_engine/apps/pregel/louvain/louvain.h @@ -0,0 +1,432 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_H_ +#define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "apps/pregel/louvain/auxiliary.h" +#include "apps/pregel/louvain/louvain_vertex.h" +#include "core/app/pregel/i_vertex_program.h" +#include "core/app/pregel/pregel_compute_context.h" + +namespace gs { + +/* + * Distribute-louvain algorithm description: + * phase-1 + * 0. Each vertex receives community values from its community hub + * and sends its own community to its neighbors. + * 1. Each vertex determines if it should move to a neighboring community or not + * and sends its information to its community hub. + * 2. Each community hub re-calculates community totals and sends the updates + * to each community member. + * repeate stage 1 process until a loval maxima of the modularity attained. + * phase-2 + * -2 community hub calls its member to gather the community sigma tot. + * -1 Compress each community such that they are represented by one node. + * + * reapply the Phase-1 process to the new graph + * + * The passes are iterated until there are no more changes and a maximum of + * modularity is attained. + * + * References: + * https://sotera.github.io/distributed-graph-analytics/louvain/ + * https://github.com/Sotera/distributed-graph-analytics + */ + +template +class PregelLouvain + : public IPregelProgram< + LouvainVertex>, + PregelComputeContext>> { + public: + using fragment_t = FRAG_T; + using oid_t = typename fragment_t::oid_t; + using vid_t = typename fragment_t::vid_t; + using edata_t = double; + using vd_t = oid_t; + using md_t = LouvainMessage; + using compute_context_t = PregelComputeContext; + using pregel_vertex_t = LouvainVertex; + using state_t = LouvainNodeState; + + public: + void Init(pregel_vertex_t& v, compute_context_t& context) override { + state_t& state = v.state(); + edata_t sigma_total = 0.0; + for (auto& e : v.outgoing_edges()) { + sigma_total += static_cast(e.get_data()); + } + + state.community = v.get_gid(); + state.community_sigma_total = sigma_total + state.internal_weight; + state.node_weight = sigma_total; + state.is_from_louvain_vertex_reader = true; + state.nodes_in_community.push_back(state.community); + } + + void Compute(grape::IteratorPair messages, pregel_vertex_t& v, + compute_context_t& context) override { + state_t& state = v.state(); + + int current_super_step = context.superstep(); + // the minor step in phase 1 + int current_minor_step = current_super_step % 3; + // the current iteration, two iterations make a full pass. + int current_iteration = current_super_step / 3; + + if (current_super_step == phase_two_start_step) { + sendCommunitiesInfo(v); + return; + } else if (current_super_step == compress_community_step) { + compressCommunities(v, messages); + return; + } + + // count the total edge weight of the graph on the phase-1 start only + if (current_super_step == phase_one_start_step) { + if (!state.is_from_louvain_vertex_reader) { + // not from the disk but from the previous round's result + state.community = v.get_gid(); + edata_t edge_weight_aggregation = 0; + // It must use fake edges since we already set them last round. + for (auto& e : v.fake_edges()) { + edge_weight_aggregation += e.second; + } + state.node_weight = edge_weight_aggregation; + } + state.reset_total_edge_weight = true; + v.context()->local_total_edge_weight()[v.tid()] += + state.node_weight + state.internal_weight; + } + + if (current_super_step == phase_one_start_step && v.edge_size() == 0) { + // isolated nodes send themselves a message on the phase_1 start step + md_t message; + v.send_by_gid(v.get_gid(), message); + v.vote_to_halt(); + return; + } else if (current_super_step == 1 && v.edge_size() == 0) { + // isolated node aggregate their quality value and exit computation on + // step 1 + grape::IteratorPair msgs(NULL, NULL); + double q = calculateActualQuality(v, context, msgs); + v.context()->local_actual_quality()[v.tid()] += q; + v.vote_to_halt(); + return; + } + // at the start of each full pass check to see if progress is still being + // made, if not halt + if (current_minor_step == phase_one_minor_step_1 && current_iteration > 0 && + current_iteration % 2 == 0) { + state.changed = 0; // change count is per pass + if (v.context()->halt()) { + // stage 2 + double q = calculateActualQuality(v, context, messages); + replaceNodeEdgesWithCommunityEdges(v, messages); + v.context()->local_actual_quality()[v.tid()] += q; + return; + } + } + + switch (current_minor_step) { + case phase_one_minor_step_0: + getAndSendCommunityInfo(v, context, messages); + + // next step will require a progress check, aggregate the number of + // nodes who have changed community. + if (current_iteration > 0 && current_iteration % 2 == 0) { + v.context()->local_change_num()[v.tid()] += state.changed; + } + break; + case phase_one_minor_step_1: + calculateBestCommunity(v, context, messages, current_iteration); + break; + case phase_one_minor_step_2: + updateCommunities(v, messages); + break; + default: + LOG(ERROR) << "Invalid minor step: " << current_minor_step; + } + v.vote_to_halt(); + } + + private: + void aggregateQuality(compute_context_t& context, double quality) { + context.aggregate(actual_quality_aggregator, quality); + } + + // Get the total edge weight of the graph. + edata_t getTotalEdgeWeight(compute_context_t& context, + pregel_vertex_t& v) { + auto& state = v.state(); + if (state.reset_total_edge_weight) { + // we just aggregate the total edge weight in previous step. + state.total_edge_weight = + context.template get_aggregated_value( + edge_weight_aggregator); + state.reset_total_edge_weight = false; + } + return state.total_edge_weight; + } + + /** + * Each vertex will receive its own communities sigma_total (if updated), + * and then send its current community info to its neighbors. + */ + void getAndSendCommunityInfo(pregel_vertex_t& vertex, + compute_context_t& context, + const grape::IteratorPair& messages) { + state_t& state = vertex.state(); + // set new community information. + if (context.superstep() > 0) { + assert(messages.size() == 1); + state.community = messages.begin()->community_id; + state.community_sigma_total = messages.begin()->community_sigma_total; + } + if (vertex.use_fake_edges()) { + for (const auto& edge : vertex.fake_edges()) { + md_t out_message(state.community, state.community_sigma_total, + edge.second, vertex.get_gid(), edge.first); + vertex.send_by_gid(edge.first, out_message); + } + } else { + for (auto& edge : vertex.outgoing_edges()) { + auto nei_gid = vertex.fragment()->Vertex2Gid(edge.get_neighbor()); + md_t out_message(state.community, state.community_sigma_total, + static_cast(edge.get_data()), + vertex.get_gid(), nei_gid); + vertex.send_by_gid(nei_gid, out_message); + } + } + } + + /** + * Based on community of each of its neighbors, each vertex determines if + * it should retain its current community or switch to a neighboring + * community. + * At the end of this step a message is sent to the nodes community hub so a + * new community sigma_total can be calculated. + */ + void calculateBestCommunity(pregel_vertex_t& vertex, + compute_context_t& context, + const grape::IteratorPair& messages, + int iteration) { + // group messages by communities. + std::map community_map; + for (auto& message : messages) { + vid_t community_id = message.community_id; + edata_t weight = message.edge_weight; + if (community_map.find(community_id) != community_map.end()) { + community_map[community_id].edge_weight = + community_map[community_id].edge_weight + weight; + } else { + community_map[community_id] = message; + } + } + + // calculate change in qulity for each potential community + auto& state = vertex.state(); + vid_t best_community_id = state.community; + vid_t starting_community_id = best_community_id; + double max_delta_q = 0.0; + for (auto& entry : community_map) { + double delta_q = calculateQualityDelta( + context, vertex, starting_community_id, entry.second.community_id, + entry.second.community_sigma_total, entry.second.edge_weight, + state.node_weight, state.internal_weight); + if (delta_q > max_delta_q || + (delta_q == max_delta_q && + entry.second.community_id < best_community_id)) { + best_community_id = entry.second.community_id; + max_delta_q = delta_q; + } + } + + // ignore switches based on iteration (prevent certain cycles) + if ((state.community > best_community_id && iteration % 2 == 0) || + (state.community < best_community_id && iteration % 2 != 0)) { + best_community_id = state.community; + } + + // update community + if (state.community != best_community_id) { + md_t c = community_map[best_community_id]; + assert(best_community_id == c.community_id); + state.community = c.community_id; + state.community_sigma_total = c.community_sigma_total; + state.changed = 1; // commuity changed. + } + // send node weight to the community hub to be summed in next super step + md_t message(state.community, state.node_weight + state.internal_weight, 0, + vertex.get_gid(), state.community); + vertex.send_by_gid(state.community, message); + } + + /** + * determine the change in quality if a node were to move to + * the given community. + */ + double calculateQualityDelta(compute_context_t& context, pregel_vertex_t& v, + const vid_t& curr_community_id, + const vid_t& test_community_id, + edata_t test_sigma_total, + edata_t edge_weight_in_community, + edata_t node_weight, edata_t internal_weight) { + bool is_current_community = (curr_community_id == test_community_id); + edata_t m2 = getTotalEdgeWeight(context, v); + edata_t k_i_in_L = is_current_community + ? edge_weight_in_community + internal_weight + : edge_weight_in_community; + edata_t k_i_in = k_i_in_L; + edata_t k_i = node_weight + internal_weight; + edata_t sigma_tot = test_sigma_total; + if (is_current_community) { + sigma_tot -= k_i; + } + + double delta_q = 0.0; + if (!(is_current_community && sigma_tot == delta_q)) { + double dividend = k_i * sigma_tot; + delta_q = k_i_in - dividend / m2; + } + return delta_q; + } + + /** + * Each community hub aggregates the values from each of its members to + * update the node's sigma total, and then sends this back to each of its + * members. + */ + void updateCommunities(pregel_vertex_t& vertex, + const grape::IteratorPair& messages) { + // sum all community contributions + md_t sum; + sum.community_id = vertex.get_gid(); + for (auto& m : messages) { + sum.community_sigma_total += m.community_sigma_total; + } + + for (auto& m : messages) { + sum.dst_id = m.source_id; + vertex.send_by_gid(m.source_id, sum); + } + } + + /** + * Calculate this nodes contribution for the actual quality value + * of the graph. + */ + double calculateActualQuality(pregel_vertex_t& vertex, + compute_context_t& context, + const grape::IteratorPair& messages) { + auto& state = vertex.state(); + edata_t k_i_in = state.internal_weight; + for (auto& m : messages) { + if (m.community_id == state.community) { + k_i_in += vertex.get_edge_value(m.source_id); + } + } + edata_t sigma_tot = state.community_sigma_total; + edata_t m2 = getTotalEdgeWeight(context, vertex); + edata_t k_i = state.node_weight + state.internal_weight; + + double q = k_i_in / m2 - (sigma_tot * k_i) / pow(m2, 2); + q = q < 0 ? 0 : q; + return q; + } + + /** + * Replace each edge to a neighbor with an edge to that neighbors community + * instead. Done just before exiting computation. In the next state of the + * pipe line this edges are aggregated and all communities are represented + * as single nodes. Edges from the community to itself are tracked be the + * nodes internal weight. + */ + void replaceNodeEdgesWithCommunityEdges( + pregel_vertex_t& vertex, grape::IteratorPair& messages) { + std::map community_map; + for (auto& message : messages) { + const auto& community_id = message.community_id; + community_map[community_id] += message.edge_weight; + } + + vertex.set_fake_edges(std::move(community_map)); + } + + void sendCommunitiesInfo(pregel_vertex_t& vertex) { + state_t& state = vertex.state(); + md_t message; + message.internal_weight = state.internal_weight; + std::map edges; + assert(vertex.use_fake_edges()); + edges = vertex.fake_edges(); + message.edges = std::move(edges); + if (vertex.get_gid() != state.community) { + message.nodes_in_self_community.swap(vertex.nodes_in_self_community()); + } + message.dst_id = state.community; + vertex.send_by_gid(state.community, message); + + vertex.vote_to_halt(); + } + + void compressCommunities(pregel_vertex_t& vertex, + grape::IteratorPair& messages) { + auto community_id = vertex.get_gid(); + edata_t weight = 0; + std::map edge_map; + auto& nodes_in_self_community = vertex.nodes_in_self_community(); + for (auto& m : messages) { + weight += m.internal_weight; + for (auto& entry : m.edges) { + if (entry.first == community_id) { + weight += entry.second; + } else { + edge_map[entry.first] += entry.second; + } + } + nodes_in_self_community.insert(nodes_in_self_community.end(), + m.nodes_in_self_community.begin(), + m.nodes_in_self_community.end()); + } + vertex.state().internal_weight = weight; + vertex.set_fake_edges(std::move(edge_map)); + vertex.state().is_from_louvain_vertex_reader = false; + + // send self fake message to activate next round. + md_t fake_message; + fake_message.dst_id = community_id; + vertex.send_by_gid(community_id, fake_message); + // do not vote to halt since next round those new vertex need to be active. + } +}; + +} // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_H_ diff --git a/analytical_engine/apps/pregel/louvain/louvain_app_base.h b/analytical_engine/apps/pregel/louvain/louvain_app_base.h new file mode 100644 index 000000000000..2605ea4cf90a --- /dev/null +++ b/analytical_engine/apps/pregel/louvain/louvain_app_base.h @@ -0,0 +1,329 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_APP_BASE_H_ +#define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_APP_BASE_H_ + +#include +#include +#include +#include +#include + +#include "grape/grape.h" +#include "grape/utils/iterator_pair.h" + +#include "core/app/app_base.h" +#include "core/app/pregel/pregel_compute_context.h" + +#include "apps/pregel/louvain/auxiliary.h" +#include "apps/pregel/louvain/louvain.h" +#include "apps/pregel/louvain/louvain_context.h" +#include "apps/pregel/louvain/louvain_vertex.h" + +namespace gs { + +/** + * @brief This class is a specialized PregelAppBase for louvain. + * @param FRAG_T + * @param VERTEX_PROGRAM_T + */ +template > +class LouvainAppBase + : public grape::ParallelAppBase< + FRAG_T, + LouvainContext>>, + public grape::ParallelEngine, + public grape::Communicator { + public: + using fragment_t = FRAG_T; + using oid_t = typename FRAG_T::oid_t; + using vid_t = typename FRAG_T::vid_t; + using vertex_t = typename FRAG_T::vertex_t; + using app_t = LouvainAppBase; + using vertex_program_t = VERTEX_PROGRAM_T; + using vd_t = typename vertex_program_t::vd_t; + using md_t = typename vertex_program_t::md_t; + using pregel_compute_context_t = PregelComputeContext; + using context_t = LouvainContext; + using message_manager_t = grape::ParallelMessageManager; + using worker_t = grape::ParallelWorker; + + virtual ~LouvainAppBase() {} + + static std::shared_ptr CreateWorker(std::shared_ptr app, + std::shared_ptr frag) { + return std::shared_ptr(new worker_t(app, frag)); + } + + explicit LouvainAppBase(const vertex_program_t& program = vertex_program_t()) + : program_(program) {} + + void PEval(const fragment_t& frag, context_t& ctx, + message_manager_t& messages) { + // superstep is 0 in PEval + uint32_t thrd_num = thread_num(); + messages.InitChannels(thrd_num); + + // register the aggregators + ctx.compute_context().register_aggregator( + change_aggregator, PregelAggregatorType::kInt64SumAggregator); + ctx.compute_context().register_aggregator( + edge_weight_aggregator, PregelAggregatorType::kDoubleSumAggregator); + ctx.compute_context().register_aggregator( + actual_quality_aggregator, PregelAggregatorType::kDoubleSumAggregator); + ctx.ClearLocalAggregateValues(thrd_num); + + auto inner_vertices = frag.InnerVertices(); + ForEach(inner_vertices, [&frag, &ctx, this](int tid, vertex_t v) { + LouvainVertex pregel_vertex; + pregel_vertex.set_context(&ctx); + pregel_vertex.set_fragment(&frag); + pregel_vertex.set_compute_context(&ctx.compute_context()); + pregel_vertex.set_vertex(v); + pregel_vertex.set_tid(tid); + this->program_.Init(pregel_vertex, ctx.compute_context()); + }); + + grape::IteratorPair null_messages(nullptr, nullptr); + ForEach(inner_vertices, + [&null_messages, &frag, &ctx, this](int tid, vertex_t v) { + LouvainVertex pregel_vertex; + pregel_vertex.set_context(&ctx); + pregel_vertex.set_fragment(&frag); + pregel_vertex.set_compute_context(&ctx.compute_context()); + pregel_vertex.set_vertex(v); + pregel_vertex.set_tid(tid); + this->program_.Compute(null_messages, pregel_vertex, + ctx.compute_context()); + }); + + { + // Sync Aggregator + ctx.compute_context().aggregate(change_aggregator, + ctx.GetLocalChangeSum()); + ctx.compute_context().aggregate(edge_weight_aggregator, + ctx.GetLocalEdgeWeightSum()); + ctx.compute_context().aggregate(actual_quality_aggregator, + ctx.GetLocalQualitySum()); + for (auto& pair : ctx.compute_context().aggregators()) { + grape::InArchive iarc; + std::vector oarcs; + std::string name = pair.first; + pair.second->Serialize(iarc); + pair.second->Reset(); + AllGather(std::move(iarc), oarcs); + pair.second->DeserializeAndAggregate(oarcs); + pair.second->StartNewRound(); + } + ctx.ClearLocalAggregateValues(thrd_num); + } + + ctx.compute_context().clear_for_next_round(); + + if (!ctx.compute_context().all_halted()) { + messages.ForceContinue(); + } + } + + void IncEval(const fragment_t& frag, context_t& ctx, + message_manager_t& messages) { + ctx.compute_context().inc_step(); + uint32_t thrd_num = thread_num(); + + auto inner_vertices = frag.InnerVertices(); + auto outer_vertices = frag.OuterVertices(); + + int current_super_step = ctx.compute_context().superstep(); + int current_minor_step = current_super_step % 3; + int current_iteration = current_super_step / 3; + + LOG(INFO) << "current super step: " << current_super_step + << " current minor step: " << current_minor_step + << " current iteration: " << current_iteration; + + if (current_super_step == terminate_step) { + // get result messages and terminate + messages.ParallelProcess>( + thrd_num, [&frag, &ctx](int tid, std::pair const& msg) { + vertex_t v; + frag.InnerVertexGid2Vertex(msg.first, v); + ctx.compute_context().vertex_data()[v] = msg.second; + }); + return; // the whole louvain terminate. + } else { + // get computation messages + std::vector>> buffer( + thrd_num, std::vector>(thrd_num)); + messages.ParallelProcess( + thrd_num, [&thrd_num, &buffer](int tid, md_t const& msg) { + buffer[tid][msg.dst_id % thrd_num].emplace_back(msg); + }); + { + std::vector threads(thrd_num); + for (uint32_t tid = 0; tid < thrd_num; ++tid) { + threads[tid] = std::thread( + [&frag, &ctx, &thrd_num, &buffer](uint32_t tid) { + for (uint32_t index = 0; index < thrd_num; ++index) { + for (auto const& msg : buffer[index][tid]) { + vertex_t v; + frag.InnerVertexGid2Vertex(msg.dst_id, v); + ctx.compute_context().messages_in()[v].emplace_back( + std::move(msg)); + ctx.compute_context().activate(v); + } + } + }, + tid); + } + for (uint32_t tid = 0; tid < thrd_num; ++tid) { + threads[tid].join(); + } + } + } + + if (current_minor_step == phase_one_minor_step_1 && current_iteration > 0 && + current_iteration % 2 == 0) { + int64_t totalChange = + ctx.compute_context().template get_aggregated_value( + change_aggregator); + ctx.change_history().push_back(totalChange); + bool to_halt = decide_to_halt( + ctx.change_history(), ctx.tolerance(), ctx.min_progress()); + ctx.set_halt(to_halt); + if (ctx.halt()) { + LOG(INFO) << "super step " << current_super_step << " decided to halt."; + } + LOG(INFO) << "[INFO]: superstep: " << current_super_step + << " pass: " << current_iteration / 2 + << " totalChange: " << totalChange; + } else if (ctx.halt()) { + double actual_quality = + ctx.compute_context().template get_aggregated_value( + actual_quality_aggregator); + // after one pass if already decided halt, that means the pass yield no + // changes, so we halt computation. + if (current_super_step <= 14 || actual_quality <= ctx.prev_quality()) { + // the louvain computation complete. + LOG(INFO) << "computation complete, ACTUAL QUALITY: " << actual_quality; + ctx.compute_context().set_superstep(sync_result_step); + syncCommunity(frag, ctx, messages); + messages.ForceContinue(); + return; + } else if (ctx.compute_context().superstep() > 0) { + // phase 1 halt + LOG(INFO) << "super step: " << current_super_step + << " decided to halt, ACTUAL QUALITY: " << actual_quality + << " previous QUALITY: " << ctx.prev_quality(); + // start phase 2 to compress community. + ctx.compute_context().set_superstep(phase_two_start_step); + ctx.set_prev_quality(actual_quality); + ctx.change_history().clear(); + ctx.set_halt(false); + } + } + + // At the start of each pass, every alive node send to their + // communities node info, we active the node. + if (ctx.compute_context().superstep() == phase_two_start_step) { + ForEach(inner_vertices, [&ctx](int tid, vertex_t v) { + if (ctx.GetVertexState(v).is_alived_community) { + ctx.compute_context().activate(v); + } + }); + } + + ForEach(inner_vertices, [&frag, &ctx, this](int tid, vertex_t v) { + if (ctx.compute_context().active(v)) { + LouvainVertex pregel_vertex; + pregel_vertex.set_context(&ctx); + pregel_vertex.set_fragment(&frag); + pregel_vertex.set_compute_context(&ctx.compute_context()); + pregel_vertex.set_vertex(v); + pregel_vertex.set_tid(tid); + auto& cur_msgs = (ctx.compute_context().messages_in())[v]; + this->program_.Compute( + grape::IteratorPair( + &cur_msgs[0], + &cur_msgs[0] + static_cast(cur_msgs.size())), + pregel_vertex, ctx.compute_context()); + } else if (ctx.compute_context().superstep() == compress_community_step) { + ctx.GetVertexState(v).is_alived_community = false; + } + }); + + { + // Sync Aggregator + ctx.compute_context().aggregate(change_aggregator, + ctx.GetLocalChangeSum()); + ctx.compute_context().aggregate(edge_weight_aggregator, + ctx.GetLocalEdgeWeightSum()); + ctx.compute_context().aggregate(actual_quality_aggregator, + ctx.GetLocalQualitySum()); + for (auto& pair : ctx.compute_context().aggregators()) { + grape::InArchive iarc; + std::vector oarcs; + std::string name = pair.first; + pair.second->Serialize(iarc); + pair.second->Reset(); + AllGather(std::move(iarc), oarcs); + pair.second->DeserializeAndAggregate(oarcs); + pair.second->StartNewRound(); + } + ctx.ClearLocalAggregateValues(thrd_num); + } + + ctx.compute_context().clear_for_next_round(); + if (!ctx.compute_context().all_halted()) { + messages.ForceContinue(); + } + } + + private: + // sync community id from community hub to community members. + void syncCommunity(const fragment_t& frag, context_t& ctx, + message_manager_t& messages) { + auto& vid_parser = ctx.compute_context().vid_parser(); + auto& comm_result = ctx.compute_context().vertex_data(); + auto inner_vertices = frag.InnerVertices(); + ForEach(inner_vertices, [&frag, &ctx, &messages, &comm_result, &vid_parser]( + int tid, vertex_t v) { + const auto& member_list = ctx.vertex_state()[v].nodes_in_community; + if (!member_list.empty()) { + auto community_id = frag.Gid2Oid(member_list.front()); + // send community id to members + for (const auto& member_gid : member_list) { + auto fid = vid_parser.GetFid(member_gid); + vertex_t member_v; + if (fid == frag.fid()) { + frag.InnerVertexGid2Vertex(member_gid, member_v); + comm_result[member_v] = community_id; + } else { + messages.Channels()[tid].SendToFragment( + fid, std::pair(member_gid, community_id)); + } + } + } + }); + } + + private: + vertex_program_t program_; +}; +} // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_APP_BASE_H_ diff --git a/analytical_engine/apps/pregel/louvain/louvain_context.h b/analytical_engine/apps/pregel/louvain/louvain_context.h new file mode 100644 index 000000000000..73e5a73e1d9a --- /dev/null +++ b/analytical_engine/apps/pregel/louvain/louvain_context.h @@ -0,0 +1,154 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_CONTEXT_H_ +#define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_CONTEXT_H_ + +#include +#include +#include +#include +#include + +#include "boost/property_tree/json_parser.hpp" +#include "boost/property_tree/ptree.hpp" + +#include "grape/grape.h" + +#include "core/context/vertex_data_context.h" + +namespace gs { + +template +class LouvainContext + : public grape::VertexDataContext { + using fragment_t = FRAG_T; + using oid_t = typename FRAG_T::oid_t; + using vid_t = typename FRAG_T::vid_t; + using edata_t = double; + using vertex_t = typename FRAG_T::vertex_t; + using state_t = LouvainNodeState; + using vertex_state_array_t = + typename fragment_t::template vertex_array_t; + + public: + explicit LouvainContext(const FRAG_T& fragment) + : grape::VertexDataContext( + fragment), + compute_context_(this->data()) {} + + void Init(grape::ParallelMessageManager& messages, int tolerance, + int min_progress) { + auto& frag = this->fragment(); + auto inner_vertices = frag.InnerVertices(); + + compute_context_.init(frag); + compute_context_.set_fragment(&frag); + compute_context_.set_parallel_message_manager(&messages); + + this->tolerance_ = tolerance; + this->min_progress_ = min_progress; + + vertex_state_.Init(inner_vertices); + halt_ = false; + prev_quality_ = 0.0; + } + + void Output(std::ostream& os) override { + auto& frag = this->fragment(); + auto& result = compute_context_.vertex_data(); + auto inner_vertices = frag.InnerVertices(); + for (auto v : inner_vertices) { + os << frag.GetId(v) << " " << result[v] << std::endl; + } + } + + state_t& GetVertexState(const vertex_t& v) { return vertex_state_[v]; } + + void ClearLocalAggregateValues(uint32_t thread_num) { + local_change_num_.clear(); + local_total_edge_weight_.clear(); + local_actual_quality_.clear(); + local_change_num_.resize(thread_num, 0); + local_total_edge_weight_.resize(thread_num, 0.0); + local_actual_quality_.resize(thread_num, 0.0); + } + + int64_t GetLocalChangeSum() { + int64_t sum = 0; + for (const auto& val : local_change_num_) { + sum += val; + } + return sum; + } + + edata_t GetLocalEdgeWeightSum() { + edata_t sum = 0; + for (const auto& val : local_total_edge_weight_) { + sum += val; + } + return sum; + } + + double GetLocalQualitySum() { + double sum = 0; + for (const auto& val : local_actual_quality_) { + sum += val; + } + return sum; + } + + COMPUTE_CONTEXT_T& compute_context() { return compute_context_; } + + std::vector& change_history() { return change_history_; } + + vertex_state_array_t& vertex_state() { return vertex_state_; } + + std::vector& local_change_num() { return local_change_num_; } + + std::vector& local_total_edge_weight() { + return local_total_edge_weight_; + } + std::vector& local_actual_quality() { return local_actual_quality_; } + + bool halt() { return halt_; } + void set_halt(bool halt) { halt_ = halt; } + + double prev_quality() { return prev_quality_; } + void set_prev_quality(double value) { prev_quality_ = value; } + + int tolerance() { return tolerance_; } + int min_progress() { return min_progress_; } + + private: + std::vector change_history_; + COMPUTE_CONTEXT_T compute_context_; + vertex_state_array_t vertex_state_; + + // members to store local aggrated value + std::vector local_change_num_; + std::vector local_total_edge_weight_; + std::vector local_actual_quality_; + + bool halt_; // phase-1 halt + double prev_quality_; + int tolerance_; + int min_progress_; +}; + +} // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_CONTEXT_H_ diff --git a/analytical_engine/apps/pregel/louvain/louvain_vertex.h b/analytical_engine/apps/pregel/louvain/louvain_vertex.h new file mode 100644 index 000000000000..90d9677e0777 --- /dev/null +++ b/analytical_engine/apps/pregel/louvain/louvain_vertex.h @@ -0,0 +1,138 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_VERTEX_H_ +#define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_VERTEX_H_ + +#include +#include +#include +#include +#include + +#include "core/app/pregel/pregel_vertex.h" + +#include "apps/pregel/louvain/louvain_context.h" + +namespace gs { + +template +class LouvainVertex : public PregelVertex { + using fragment_t = FRAG_T; + using oid_t = typename fragment_t::oid_t; + using vid_t = typename fragment_t::vid_t; + using edata_t = double; + using vertex_t = typename fragment_t::vertex_t; + using adj_list_t = typename fragment_t::const_adj_list_t; + using compute_context_t = PregelComputeContext; + using context_t = LouvainContext; + using state_t = LouvainNodeState; + + public: + using vd_t = VD_T; + using md_t = MD_T; + + state_t& state() { return context_->GetVertexState(vertex_); } + + vertex_t vertex() const { return vertex_; } + + adj_list_t outgoing_edges() { return fragment_->GetOutgoingAdjList(vertex_); } + + adj_list_t incoming_edges() { return fragment_->GetIncomingAdjList(vertex_); } + + void send_by_gid(vid_t dst_gid, const md_t& md) { + compute_context_->send_p2p_message(dst_gid, md, tid_); + } + + void vote_to_halt() { compute_context_->vote_to_halt(*this); } + + void set_fragment(const fragment_t* fragment) { fragment_ = fragment; } + + void set_compute_context(compute_context_t* compute_context) { + compute_context_ = compute_context; + } + + void set_context(context_t* context) { context_ = context; } + + void set_vertex(vertex_t vertex) { vertex_ = vertex; } + + vid_t get_gid() { return fragment_->Vertex2Gid(vertex_); } + + vid_t get_vertex_gid(const vertex_t& v) { return fragment_->Vertex2Gid(v); } + + size_t edge_size() { + if (!this->use_fake_edges()) { + return this->incoming_edges().Size() + this->outgoing_edges().Size(); + } else { + return this->fake_edges().size(); + } + } + + bool use_fake_edges() const { + return context_->GetVertexState(vertex_).use_fake_edges; + } + + const std::map& fake_edges() const { + return context_->GetVertexState(vertex_).fake_edges; + } + + edata_t get_edge_value(const vid_t& dst_id) { + if (!this->use_fake_edges()) { + for (auto& edge : this->incoming_edges()) { + if (fragment_->Vertex2Gid(edge.get_neighbor()) == dst_id) { + return static_cast(edge.get_data()); + } + } + for (auto& edge : this->outgoing_edges()) { + if (fragment_->Vertex2Gid(edge.get_neighbor()) == dst_id) { + return static_cast(edge.get_data()); + } + } + } else { + return this->fake_edges().at(dst_id); + } + return edata_t(); + } + + void set_fake_edges(std::map&& edges) { + state_t& ref_state = this->state(); + ref_state.fake_edges = std::move(edges); + ref_state.use_fake_edges = true; + } + + std::vector& nodes_in_self_community() { + return context_->GetVertexState(vertex_).nodes_in_community; + } + + int tid() { return tid_; } + void set_tid(int id) { tid_ = id; } + + PregelComputeContext* compute_context() { + return compute_context_; + } + + const fragment_t* fragment() { return fragment_; } + context_t* context() { return context_; } + + private: + int tid_; + const fragment_t* fragment_; + PregelComputeContext* compute_context_; + context_t* context_; + vertex_t vertex_; +}; +} // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_VERTEX_H_ diff --git a/analytical_engine/core/app/pregel/pregel_app_base.h b/analytical_engine/core/app/pregel/pregel_app_base.h index d961047e7153..2b5b748d10c9 100644 --- a/analytical_engine/core/app/pregel/pregel_app_base.h +++ b/analytical_engine/core/app/pregel/pregel_app_base.h @@ -16,7 +16,9 @@ limitations under the License. #ifndef ANALYTICAL_ENGINE_CORE_APP_PREGEL_PREGEL_APP_BASE_H_ #define ANALYTICAL_ENGINE_CORE_APP_PREGEL_PREGEL_APP_BASE_H_ +#include #include +#include #include "grape/grape.h" #include "grape/utils/iterator_pair.h" @@ -41,7 +43,8 @@ class PregelAppBase FRAG_T, PregelContext>> { + typename VERTEX_PROGRAM_T::md_t>>>, + public grape::Communicator { using vd_t = typename VERTEX_PROGRAM_T::vd_t; using md_t = typename VERTEX_PROGRAM_T::md_t; using pregel_compute_context_t = PregelComputeContext; @@ -94,11 +97,24 @@ class PregelAppBase } } - if (ctx.compute_context_.has_messages()) { - messages.ForceContinue(); + { + // Sync Aggregator + for (auto& pair : ctx.compute_context_.aggregators()) { + grape::InArchive iarc; + std::vector oarcs; + std::string name = pair.first; + pair.second->Serialize(iarc); + pair.second->Reset(); + AllGather(std::move(iarc), oarcs); + pair.second->DeserializeAndAggregate(oarcs); + pair.second->StartNewRound(); + } } ctx.compute_context_.clear_for_next_round(); + if (!ctx.compute_context_.all_halted()) { + messages.ForceContinue(); + } } void IncEval(const fragment_t& frag, pregel_context_t& ctx, @@ -144,11 +160,24 @@ class PregelAppBase } } - if (ctx.compute_context_.has_messages()) { - messages.ForceContinue(); + { + // Sync Aggregator + for (auto& pair : ctx.compute_context_.aggregators()) { + grape::InArchive iarc; + std::vector oarcs; + std::string name = pair.first; + pair.second->Serialize(iarc); + pair.second->Reset(); + AllGather(std::move(iarc), oarcs); + pair.second->DeserializeAndAggregate(oarcs); + pair.second->StartNewRound(); + } } ctx.compute_context_.clear_for_next_round(); + if (!ctx.compute_context_.all_halted()) { + messages.ForceContinue(); + } } private: @@ -166,7 +195,8 @@ class PregelAppBase FRAG_T, PregelContext>> { + typename VERTEX_PROGRAM_T::md_t>>>, + public grape::Communicator { using vd_t = typename VERTEX_PROGRAM_T::vd_t; using md_t = typename VERTEX_PROGRAM_T::md_t; using app_t = PregelAppBase; @@ -205,11 +235,25 @@ class PregelAppBase program_.Compute(null_messages, pregel_vertex, ctx.compute_context_); } - if (ctx.compute_context_.has_messages()) { - messages.ForceContinue(); + { + // Sync Aggregator + for (auto& pair : ctx.compute_context_.aggregators()) { + grape::InArchive iarc; + std::vector oarcs; + std::string name = pair.first; + pair.second->Serialize(iarc); + pair.second->Reset(); + AllGather(std::move(iarc), oarcs); + pair.second->DeserializeAndAggregate(oarcs); + pair.second->StartNewRound(); + } } ctx.compute_context_.clear_for_next_round(); + + if (!ctx.compute_context_.all_halted()) { + messages.ForceContinue(); + } } void IncEval(const fragment_t& frag, pregel_context_t& ctx, @@ -242,11 +286,25 @@ class PregelAppBase } } - if (ctx.compute_context_.has_messages()) { - messages.ForceContinue(); + { + // Sync Aggregator + for (auto& pair : ctx.compute_context_.aggregators()) { + grape::InArchive iarc; + std::vector oarcs; + std::string name = pair.first; + pair.second->Serialize(iarc); + pair.second->Reset(); + AllGather(std::move(iarc), oarcs); + pair.second->DeserializeAndAggregate(oarcs); + pair.second->StartNewRound(); + } } ctx.compute_context_.clear_for_next_round(); + + if (!ctx.compute_context_.all_halted()) { + messages.ForceContinue(); + } } private: diff --git a/analytical_engine/core/app/pregel/pregel_compute_context.h b/analytical_engine/core/app/pregel/pregel_compute_context.h index 70e245fd58f8..cea8f89dc8e5 100644 --- a/analytical_engine/core/app/pregel/pregel_compute_context.h +++ b/analytical_engine/core/app/pregel/pregel_compute_context.h @@ -16,14 +16,21 @@ limitations under the License. #ifndef ANALYTICAL_ENGINE_CORE_APP_PREGEL_PREGEL_COMPUTE_CONTEXT_H_ #define ANALYTICAL_ENGINE_CORE_APP_PREGEL_PREGEL_COMPUTE_CONTEXT_H_ +#include + +#include #include #include #include #include +#include "grape/utils/atomic_ops.h" #include "grape/utils/iterator_pair.h" +#include "core/app/pregel/aggregators/aggregator.h" +#include "core/app/pregel/aggregators/aggregator_factory.h" #include "core/app/pregel/pregel_vertex.h" +#include "core/config.h" namespace gs { /** @@ -57,18 +64,20 @@ class PregelComputeContext { messages_in_.Init(inner_vertices, {}); halted_.Init(inner_vertices, false); + vid_parser_.Init(frag.fnum(), 1); inner_vertex_num_ = inner_vertices.size(); step_ = 0; voted_to_halt_num_ = 0; enable_combine_ = false; - has_messages_ = false; } void inc_step() { step_++; } int superstep() const { return step_; } + void set_superstep(int step) { step_ = step; } + void set_vertex_value(const pregel_vertex_t& vertex, const VD_T& value) { vertex_data_[vertex.vertex()] = value; } @@ -89,8 +98,7 @@ class PregelComputeContext { message_manager_->SyncStateOnOuterVertex(*fragment_, v, value); } else { - messages_in_[v].emplace_back(value); - has_messages_ = true; + messages_out_[v].emplace_back(value); } } } @@ -103,12 +111,21 @@ class PregelComputeContext { message_manager_->SyncStateOnOuterVertex(*fragment_, v, value); } else { - messages_in_[v].emplace_back(std::move(value)); - has_messages_ = true; + messages_out_[v].emplace_back(std::move(value)); } } } + void send_p2p_message(const vid_t& v_gid, const MD_T& value, int tid = 0) { + auto fid = vid_parser_.GetFid(v_gid); + parallel_message_manager_->Channels()[tid].SendToFragment(fid, value); + } + + void send_p2p_message(const vid_t v_gid, MD_T&& value, int tid = 0) { + auto fid = vid_parser_.GetFid(v_gid); + parallel_message_manager_->Channels()[tid].SendToFragment(fid, value); + } + template void apply_combine(COMBINATOR_T& cb) { auto vertices = fragment_->Vertices(); @@ -129,27 +146,31 @@ class PregelComputeContext { messages_in_[v].clear(); messages_in_[v].swap(messages_out_[v]); if (!messages_in_[v].empty()) { - has_messages_ = true; + activate(v); } } } - bool active(const vertex_t& v) { return !messages_in_[v].empty(); } + bool active(const vertex_t& v) { return !halted_[v]; } void activate(const vertex_t& v) { - halted_[v] = false; - --voted_to_halt_num_; + if (halted_[v] == true) { + halted_[v] = false; + size_t one = 1; + __sync_fetch_and_sub(&voted_to_halt_num_, one); + } } void vote_to_halt(const pregel_vertex_t& vertex) { - // halted_[vertex.vertex()] = true; - // ++voted_to_halt_num_; + if (halted_[vertex.vertex()] == false) { + halted_[vertex.vertex()] = true; + size_t one = 1; + grape::atomic_add(voted_to_halt_num_, one); + } } bool all_halted() { return voted_to_halt_num_ == inner_vertex_num_; } - bool has_messages() { return has_messages_; } - typename FRAG_T::template vertex_array_t>& messages_in() { return messages_in_; } @@ -161,7 +182,18 @@ class PregelComputeContext { typename FRAG_T::template vertex_array_t& vertex_data() { return vertex_data_; } - void clear_for_next_round() { has_messages_ = false; } + void clear_for_next_round() { + if (!enable_combine_) { + auto inner_vertices = fragment_->InnerVertices(); + for (auto v : inner_vertices) { + messages_in_[v].clear(); + messages_in_[v].swap(messages_out_[v]); + if (!messages_in_[v].empty()) { + activate(v); + } + } + } + } void enable_combine() { enable_combine_ = true; } void set_fragment(const fragment_t* fragment) { fragment_ = fragment; } @@ -169,6 +201,11 @@ class PregelComputeContext { message_manager_ = message_manager; } + void set_parallel_message_manager( + grape::ParallelMessageManager* message_manager) { + parallel_message_manager_ = message_manager; + } + oid_t GetId(const vertex_t& v) { return fragment_->GetId(v); } void set_config(const std::string& key, const std::string& value) { @@ -184,9 +221,40 @@ class PregelComputeContext { } } + const vineyard::IdParser& vid_parser() const { + return vid_parser_; + } + + std::unordered_map>& aggregators() { + return aggregators_; + } + + void register_aggregator(const std::string& name, PregelAggregatorType type) { + if (aggregators_.find(name) == aggregators_.end()) { + aggregators_.emplace(name, AggregatorFactory::CreateAggregator(type)); + aggregators_.at(name)->Init(); + } + } + + template + void aggregate(const std::string& name, AGGR_TYPE value) { + if (aggregators_.find(name) != aggregators_.end()) { + std::dynamic_pointer_cast>(aggregators_.at(name)) + ->Aggregate(value); + } + } + + template + AGGR_TYPE get_aggregated_value(const std::string& name) { + return std::dynamic_pointer_cast>( + aggregators_.at(name)) + ->GetAggregatedValue(); + } + private: const fragment_t* fragment_; grape::DefaultMessageManager* message_manager_; + grape::ParallelMessageManager* parallel_message_manager_; typename FRAG_T::template vertex_array_t& vertex_data_; @@ -196,8 +264,6 @@ class PregelComputeContext { typename FRAG_T::template vertex_array_t> messages_out_; typename FRAG_T::template vertex_array_t> messages_in_; - bool has_messages_; - size_t inner_vertex_num_; size_t total_vertex_num_; @@ -205,6 +271,8 @@ class PregelComputeContext { int step_; std::unordered_map config_; + std::unordered_map> aggregators_; + vineyard::IdParser vid_parser_; }; } // namespace gs diff --git a/analytical_engine/core/app/pregel/pregel_vertex.h b/analytical_engine/core/app/pregel/pregel_vertex.h index bc0090d26d66..64aba9f91c2a 100644 --- a/analytical_engine/core/app/pregel/pregel_vertex.h +++ b/analytical_engine/core/app/pregel/pregel_vertex.h @@ -55,13 +55,19 @@ class PregelVertex { compute_context_->set_vertex_value(*this, std::move(value)); } - const VD_T& value() { return compute_context_->get_vertex_value(*this); } + const VD_T& value() { + return compute_context_->get_vertex_value(*this); + } - vertex_t vertex() const { return vertex_; } + virtual vertex_t vertex() const { return vertex_; } - adj_list_t outgoing_edges() { return fragment_->GetOutgoingAdjList(vertex_); } + virtual adj_list_t outgoing_edges() { + return fragment_->GetOutgoingAdjList(vertex_); + } - adj_list_t incoming_edges() { return fragment_->GetIncomingAdjList(vertex_); } + virtual adj_list_t incoming_edges() { + return fragment_->GetIncomingAdjList(vertex_); + } void send(const vertex_t& v, const MD_T& value) { compute_context_->send_message(v, value); @@ -71,16 +77,18 @@ class PregelVertex { compute_context_->send_message(v, std::move(value)); } - void vote_to_halt() { compute_context_->vote_to_halt(*this); } + virtual void vote_to_halt() { compute_context_->vote_to_halt(*this); } - void set_fragment(const fragment_t* fragment) { fragment_ = fragment; } + virtual void set_fragment(const fragment_t* fragment) { + fragment_ = fragment; + } - void set_compute_context( + virtual void set_compute_context( PregelComputeContext* compute_comtext) { compute_context_ = compute_comtext; } - void set_vertex(vertex_t vertex) { vertex_ = vertex; } + virtual void set_vertex(vertex_t vertex) { vertex_ = vertex; } private: const fragment_t* fragment_; diff --git a/analytical_engine/core/fragment/arrow_projected_fragment.h b/analytical_engine/core/fragment/arrow_projected_fragment.h index 8e343d6b2fc6..4f872127b813 100644 --- a/analytical_engine/core/fragment/arrow_projected_fragment.h +++ b/analytical_engine/core/fragment/arrow_projected_fragment.h @@ -348,6 +348,8 @@ class ArrowProjectedFragment using nbr_unit_t = vineyard::property_graph_utils::NbrUnit; using adj_list_t = arrow_projected_fragment_impl::AdjList; + using const_adj_list_t = + arrow_projected_fragment_impl::AdjList; using vertex_map_t = ArrowProjectedVertexMap; using label_id_t = vineyard::property_graph_types::LABEL_ID_TYPE; using prop_id_t = vineyard::property_graph_types::PROP_ID_TYPE; diff --git a/coordinator/gscoordinator/builtin/app/.gs_conf.yaml b/coordinator/gscoordinator/builtin/app/.gs_conf.yaml index ce5bbc6b3ebb..8fb88f1a12a1 100644 --- a/coordinator/gscoordinator/builtin/app/.gs_conf.yaml +++ b/coordinator/gscoordinator/builtin/app/.gs_conf.yaml @@ -154,3 +154,11 @@ app: src: benchmarks/apps/bfs/property_bfs.h compatible_graph: - vineyard::ArrowFragment + - algo: louvain + type: cpp_pie + class_name: gs::LouvainAppBase + src: apps/pregel/louvain/louvain_app_base.h + compatible_graph: + - grape::ImmutableEdgecutFragment + - gs::ArrowProjectedFragment + - gs::DynamicProjectedFragment diff --git a/python/graphscope/analytical/app/__init__.py b/python/graphscope/analytical/app/__init__.py index be2eb6a086bb..aca448f79f14 100644 --- a/python/graphscope/analytical/app/__init__.py +++ b/python/graphscope/analytical/app/__init__.py @@ -27,6 +27,7 @@ from graphscope.analytical.app.k_core import k_core from graphscope.analytical.app.k_shell import k_shell from graphscope.analytical.app.katz_centrality import katz_centrality +from graphscope.analytical.app.louvain import louvain from graphscope.analytical.app.lpa import lpa from graphscope.analytical.app.pagerank import pagerank from graphscope.analytical.app.sssp import property_sssp diff --git a/python/graphscope/analytical/app/louvain.py b/python/graphscope/analytical/app/louvain.py new file mode 100644 index 000000000000..de72508e037e --- /dev/null +++ b/python/graphscope/analytical/app/louvain.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from graphscope.framework.app import AppAssets +from graphscope.framework.app import not_compatible_for + +__all__ = [ + "louvain", +] + + +@not_compatible_for("arrow_property", "dynamic_property") +def louvain(graph, tolerance=3, min_progress=100): + """Compute best partition on the `graph`. + + Args: + graph (:class:`Graph`): A projected simple graph. + tolerance: number of times the min_progress setting is not met + before exiting form the current level and compressing the graph + min_progress: The minimum delta X required to be considered progress, where X is the number of nodes + that have changed their community on a particular pass. + Delta X is then the difference in number of nodes that changed communities + on the current pass compared to the previous pass. + + Returns: + :class:`VertexDataContext`: A context with each vertex assigned with id of community it belongs to. + + References: + .. [1] Blondel, V.D. et al. Fast unfolding of communities in + large networks. J. Stat. Mech 10008, 1-12(2008). + .. [2] https://github.com/Sotera/distributed-graph-analytics + .. [3] https://sotera.github.io/distributed-graph-analytics/louvain/ + + Examples: + + .. code:: python + + import graphscope as gs + s = gs.session() + g = s.load_from('The parameters for loading a graph...') + pg = g.project_to_simple(v_label='vlabel', e_label='elabel', v_prop=None, e_prop='weight') + r = gs.louvain(pg) + s.close() + + """ + return AppAssets(algo="louvain")(graph, tolerance, min_progress) diff --git a/python/tests/test_app.py b/python/tests/test_app.py index 5ecddfa99532..b65c85cd5520 100644 --- a/python/tests/test_app.py +++ b/python/tests/test_app.py @@ -32,6 +32,7 @@ from graphscope import k_core from graphscope import k_shell from graphscope import katz_centrality +from graphscope import louvain from graphscope import lpa from graphscope import pagerank from graphscope import property_sssp @@ -340,6 +341,9 @@ def test_app_on_undirected_graph( ) assert np.all(ctx10.to_numpy("r", vertex_range={"begin": 1, "end": 4}) == [0, 0, 0]) + # louvain + ctx10 = louvain(p2p_project_undirected_graph, tolerance=0, min_progress=1) + def test_run_app_on_string_oid_graph(p2p_project_directed_graph_string): ctx = sssp(p2p_project_directed_graph_string, src="6") From 316cd58ff3048c10dc587ffe93076bb2cb8046e1 Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Mon, 22 Mar 2021 09:49:13 +0800 Subject: [PATCH 02/18] format --- analytical_engine/apps/pregel/louvain/auxiliary.h | 6 ++---- analytical_engine/apps/pregel/louvain/louvain.h | 10 ++++------ .../apps/pregel/louvain/louvain_app_base.h | 12 ++++++------ .../core/app/pregel/pregel_compute_context.h | 4 +--- analytical_engine/core/app/pregel/pregel_vertex.h | 4 +--- 5 files changed, 14 insertions(+), 22 deletions(-) diff --git a/analytical_engine/apps/pregel/louvain/auxiliary.h b/analytical_engine/apps/pregel/louvain/auxiliary.h index c740a4c4e066..bc6aaa8db56b 100644 --- a/analytical_engine/apps/pregel/louvain/auxiliary.h +++ b/analytical_engine/apps/pregel/louvain/auxiliary.h @@ -27,9 +27,8 @@ namespace gs { // aggregators define -constexpr char change_aggregator[] = "change_aggregator"; -constexpr char edge_weight_aggregator[] = - "total_edge_weight_aggregator"; +constexpr char change_aggregator[] = "change_aggregator"; +constexpr char edge_weight_aggregator[] = "total_edge_weight_aggregator"; constexpr char actual_quality_aggregator[] = "actual_quality_aggregator"; // major phase of louvain @@ -44,7 +43,6 @@ constexpr int phase_one_minor_step_0 = 0; constexpr int phase_one_minor_step_1 = 1; constexpr int phase_one_minor_step_2 = 2; - template struct LouvainNodeState { using vid_t = VID_T; diff --git a/analytical_engine/apps/pregel/louvain/louvain.h b/analytical_engine/apps/pregel/louvain/louvain.h index 8194012bdf29..f1ce16c419d5 100644 --- a/analytical_engine/apps/pregel/louvain/louvain.h +++ b/analytical_engine/apps/pregel/louvain/louvain.h @@ -182,14 +182,12 @@ class PregelLouvain } // Get the total edge weight of the graph. - edata_t getTotalEdgeWeight(compute_context_t& context, - pregel_vertex_t& v) { + edata_t getTotalEdgeWeight(compute_context_t& context, pregel_vertex_t& v) { auto& state = v.state(); if (state.reset_total_edge_weight) { // we just aggregate the total edge weight in previous step. - state.total_edge_weight = - context.template get_aggregated_value( - edge_weight_aggregator); + state.total_edge_weight = context.template get_aggregated_value( + edge_weight_aggregator); state.reset_total_edge_weight = false; } return state.total_edge_weight; @@ -397,7 +395,7 @@ class PregelLouvain } void compressCommunities(pregel_vertex_t& vertex, - grape::IteratorPair& messages) { + grape::IteratorPair& messages) { auto community_id = vertex.get_gid(); edata_t weight = 0; std::map edge_map; diff --git a/analytical_engine/apps/pregel/louvain/louvain_app_base.h b/analytical_engine/apps/pregel/louvain/louvain_app_base.h index 2605ea4cf90a..4fb96fc530f8 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_app_base.h +++ b/analytical_engine/apps/pregel/louvain/louvain_app_base.h @@ -115,11 +115,11 @@ class LouvainAppBase { // Sync Aggregator ctx.compute_context().aggregate(change_aggregator, - ctx.GetLocalChangeSum()); + ctx.GetLocalChangeSum()); ctx.compute_context().aggregate(edge_weight_aggregator, - ctx.GetLocalEdgeWeightSum()); + ctx.GetLocalEdgeWeightSum()); ctx.compute_context().aggregate(actual_quality_aggregator, - ctx.GetLocalQualitySum()); + ctx.GetLocalQualitySum()); for (auto& pair : ctx.compute_context().aggregators()) { grape::InArchive iarc; std::vector oarcs; @@ -202,8 +202,8 @@ class LouvainAppBase ctx.compute_context().template get_aggregated_value( change_aggregator); ctx.change_history().push_back(totalChange); - bool to_halt = decide_to_halt( - ctx.change_history(), ctx.tolerance(), ctx.min_progress()); + bool to_halt = decide_to_halt(ctx.change_history(), ctx.tolerance(), + ctx.min_progress()); ctx.set_halt(to_halt); if (ctx.halt()) { LOG(INFO) << "super step " << current_super_step << " decided to halt."; @@ -296,7 +296,7 @@ class LouvainAppBase private: // sync community id from community hub to community members. void syncCommunity(const fragment_t& frag, context_t& ctx, - message_manager_t& messages) { + message_manager_t& messages) { auto& vid_parser = ctx.compute_context().vid_parser(); auto& comm_result = ctx.compute_context().vertex_data(); auto inner_vertices = frag.InnerVertices(); diff --git a/analytical_engine/core/app/pregel/pregel_compute_context.h b/analytical_engine/core/app/pregel/pregel_compute_context.h index cea8f89dc8e5..d16978350ef4 100644 --- a/analytical_engine/core/app/pregel/pregel_compute_context.h +++ b/analytical_engine/core/app/pregel/pregel_compute_context.h @@ -221,9 +221,7 @@ class PregelComputeContext { } } - const vineyard::IdParser& vid_parser() const { - return vid_parser_; - } + const vineyard::IdParser& vid_parser() const { return vid_parser_; } std::unordered_map>& aggregators() { return aggregators_; diff --git a/analytical_engine/core/app/pregel/pregel_vertex.h b/analytical_engine/core/app/pregel/pregel_vertex.h index 64aba9f91c2a..197346756c09 100644 --- a/analytical_engine/core/app/pregel/pregel_vertex.h +++ b/analytical_engine/core/app/pregel/pregel_vertex.h @@ -55,9 +55,7 @@ class PregelVertex { compute_context_->set_vertex_value(*this, std::move(value)); } - const VD_T& value() { - return compute_context_->get_vertex_value(*this); - } + const VD_T& value() { return compute_context_->get_vertex_value(*this); } virtual vertex_t vertex() const { return vertex_; } From e63b578f4a98af1de760e290367f4f366f9978b5 Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Mon, 22 Mar 2021 14:41:31 +0800 Subject: [PATCH 03/18] just for perf test --- .../apps/pregel/louvain/auxiliary.h | 11 ++++++++-- .../apps/pregel/louvain/louvain.h | 6 +++--- .../apps/pregel/louvain/louvain_context.h | 6 ++++++ .../apps/pregel/louvain/louvain_vertex.h | 8 ++++++++ .../core/app/pregel/pregel_compute_context.h | 20 ++++++++++++++----- 5 files changed, 41 insertions(+), 10 deletions(-) diff --git a/analytical_engine/apps/pregel/louvain/auxiliary.h b/analytical_engine/apps/pregel/louvain/auxiliary.h index bc6aaa8db56b..077c4d01bb1e 100644 --- a/analytical_engine/apps/pregel/louvain/auxiliary.h +++ b/analytical_engine/apps/pregel/louvain/auxiliary.h @@ -43,6 +43,9 @@ constexpr int phase_one_minor_step_0 = 0; constexpr int phase_one_minor_step_1 = 1; constexpr int phase_one_minor_step_2 = 2; +/** + * The state of a vertex. + */ template struct LouvainNodeState { using vid_t = VID_T; @@ -59,11 +62,12 @@ struct LouvainNodeState { // 1 if the node has changed communities this cycle, otherwise 0 int64_t changed; - bool reset_total_edge_weight; + bool reset_total_edge_weight; bool is_from_louvain_vertex_reader = false; bool use_fake_edges = false; bool is_alived_community = true; + std::map fake_edges; std::vector nodes_in_community; edata_t total_edge_weight; @@ -82,6 +86,9 @@ struct LouvainNodeState { ~LouvainNodeState() = default; }; +/** + * Message type of louvain. + */ template struct LouvainMessage { using vid_t = VID_T; @@ -96,7 +103,7 @@ struct LouvainMessage { // For reconstruct graph info. // Each vertex send self's meta info to its community and silence itself, // the community compress its member's data and make self a new vertex for - // next stage. + // next phase. edata_t internal_weight = 0; std::map edges; std::vector nodes_in_self_community; diff --git a/analytical_engine/apps/pregel/louvain/louvain.h b/analytical_engine/apps/pregel/louvain/louvain.h index f1ce16c419d5..52c1bcf6706b 100644 --- a/analytical_engine/apps/pregel/louvain/louvain.h +++ b/analytical_engine/apps/pregel/louvain/louvain.h @@ -42,12 +42,12 @@ namespace gs { * and sends its information to its community hub. * 2. Each community hub re-calculates community totals and sends the updates * to each community member. - * repeate stage 1 process until a loval maxima of the modularity attained. + * repeate phase 1 process until a local maxima of the modularity attained. * phase-2 - * -2 community hub calls its member to gather the community sigma tot. + * -2 Community hub calls its member to gather the community sigma tot. * -1 Compress each community such that they are represented by one node. * - * reapply the Phase-1 process to the new graph + * reapply the phase-1 process to the new graph. * * The passes are iterated until there are no more changes and a maximum of * modularity is attained. diff --git a/analytical_engine/apps/pregel/louvain/louvain_context.h b/analytical_engine/apps/pregel/louvain/louvain_context.h index 73e5a73e1d9a..7effa02d190c 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_context.h +++ b/analytical_engine/apps/pregel/louvain/louvain_context.h @@ -31,6 +31,12 @@ limitations under the License. namespace gs { +/** + * @brief PregelContext holds the computation result with + * grape::VertexDataContext. + * @tparam FRAG_T + * @tparam COMPUTE_CONTEXT_T + */ template class LouvainContext : public grape::VertexDataContext class LouvainVertex : public PregelVertex { using fragment_t = FRAG_T; diff --git a/analytical_engine/core/app/pregel/pregel_compute_context.h b/analytical_engine/core/app/pregel/pregel_compute_context.h index d16978350ef4..f7e7bc0c2b44 100644 --- a/analytical_engine/core/app/pregel/pregel_compute_context.h +++ b/analytical_engine/core/app/pregel/pregel_compute_context.h @@ -156,20 +156,30 @@ class PregelComputeContext { void activate(const vertex_t& v) { if (halted_[v] == true) { halted_[v] = false; - size_t one = 1; - __sync_fetch_and_sub(&voted_to_halt_num_, one); + // size_t one = 1; + // __sync_fetch_and_sub(&voted_to_halt_num_, one); } } void vote_to_halt(const pregel_vertex_t& vertex) { if (halted_[vertex.vertex()] == false) { halted_[vertex.vertex()] = true; - size_t one = 1; - grape::atomic_add(voted_to_halt_num_, one); + // size_t one = 1; + // grape::atomic_add(voted_to_halt_num_, one); } } - bool all_halted() { return voted_to_halt_num_ == inner_vertex_num_; } + bool all_halted() { + auto inner_vertices = fragment_->InnerVertices(); + size_t cnt = 0; + for (auto& v : inner_vertices) { + if (halted_[v]) { + ++cnt; + } + } + return cnt == inner_vertex_num_; + // return voted_to_halt_num_ == inner_vertex_num_; + } typename FRAG_T::template vertex_array_t>& messages_in() { return messages_in_; From cd8e6c73b6361a7755c267bde00fa91aa3d2f48a Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 11:13:37 +0800 Subject: [PATCH 04/18] add comments --- .../apps/pregel/louvain/louvain.h | 38 +++++++------- .../apps/pregel/louvain/louvain_app_base.h | 50 +++++++++++-------- .../core/app/pregel/pregel_compute_context.h | 7 ++- 3 files changed, 50 insertions(+), 45 deletions(-) diff --git a/analytical_engine/apps/pregel/louvain/louvain.h b/analytical_engine/apps/pregel/louvain/louvain.h index 52c1bcf6706b..032c23a7e6d1 100644 --- a/analytical_engine/apps/pregel/louvain/louvain.h +++ b/analytical_engine/apps/pregel/louvain/louvain.h @@ -113,12 +113,11 @@ class PregelLouvain if (!state.is_from_louvain_vertex_reader) { // not from the disk but from the previous round's result state.community = v.get_gid(); - edata_t edge_weight_aggregation = 0; + state.node_weight = 0; // It must use fake edges since we already set them last round. for (auto& e : v.fake_edges()) { - edge_weight_aggregation += e.second; + state.node_weight += e.second; } - state.node_weight = edge_weight_aggregation; } state.reset_total_edge_weight = true; v.context()->local_total_edge_weight()[v.tid()] += @@ -140,13 +139,13 @@ class PregelLouvain v.vote_to_halt(); return; } - // at the start of each full pass check to see if progress is still being - // made, if not halt + // at the start of each full pass check to see wether progress is still + // being made, if not halt if (current_minor_step == phase_one_minor_step_1 && current_iteration > 0 && current_iteration % 2 == 0) { - state.changed = 0; // change count is per pass + state.changed = 0; // reset changed. if (v.context()->halt()) { - // stage 2 + // phase-1 halt, calculate current actual quality and return. double q = calculateActualQuality(v, context, messages); replaceNodeEdgesWithCommunityEdges(v, messages); v.context()->local_actual_quality()[v.tid()] += q; @@ -181,7 +180,6 @@ class PregelLouvain context.aggregate(actual_quality_aggregator, quality); } - // Get the total edge weight of the graph. edata_t getTotalEdgeWeight(compute_context_t& context, pregel_vertex_t& v) { auto& state = v.state(); if (state.reset_total_edge_weight) { @@ -207,19 +205,20 @@ class PregelLouvain state.community = messages.begin()->community_id; state.community_sigma_total = messages.begin()->community_sigma_total; } + md_t out_message(state.community, state.community_sigma_total, 0.0, + vertex.get_gid(), 0); if (vertex.use_fake_edges()) { for (const auto& edge : vertex.fake_edges()) { - md_t out_message(state.community, state.community_sigma_total, - edge.second, vertex.get_gid(), edge.first); + out_message.edge_weight = edge.second; + out_message.dst_id = edge.first; vertex.send_by_gid(edge.first, out_message); } } else { for (auto& edge : vertex.outgoing_edges()) { - auto nei_gid = vertex.fragment()->Vertex2Gid(edge.get_neighbor()); - md_t out_message(state.community, state.community_sigma_total, - static_cast(edge.get_data()), - vertex.get_gid(), nei_gid); - vertex.send_by_gid(nei_gid, out_message); + auto neighbor_gid = vertex.fragment()->Vertex2Gid(edge.get_neighbor()); + out_message.edge_weight = static_cast(edge.get_data()); + out_message.dst_id = neighbor_gid; + vertex.send_by_gid(neighbor_gid, out_message); } } } @@ -239,10 +238,8 @@ class PregelLouvain std::map community_map; for (auto& message : messages) { vid_t community_id = message.community_id; - edata_t weight = message.edge_weight; if (community_map.find(community_id) != community_map.end()) { - community_map[community_id].edge_weight = - community_map[community_id].edge_weight + weight; + community_map[community_id].edge_weight += message.edge_weight; } else { community_map[community_id] = message; } @@ -278,9 +275,9 @@ class PregelLouvain assert(best_community_id == c.community_id); state.community = c.community_id; state.community_sigma_total = c.community_sigma_total; - state.changed = 1; // commuity changed. + state.changed = 1; // community changed. } - // send node weight to the community hub to be summed in next super step + // send node weight to the community hub to sum in next super step md_t message(state.community, state.node_weight + state.internal_weight, 0, vertex.get_gid(), state.community); vertex.send_by_gid(state.community, message); @@ -390,7 +387,6 @@ class PregelLouvain } message.dst_id = state.community; vertex.send_by_gid(state.community, message); - vertex.vote_to_halt(); } diff --git a/analytical_engine/apps/pregel/louvain/louvain_app_base.h b/analytical_engine/apps/pregel/louvain/louvain_app_base.h index 4fb96fc530f8..6383cf1312a7 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_app_base.h +++ b/analytical_engine/apps/pregel/louvain/louvain_app_base.h @@ -113,7 +113,7 @@ class LouvainAppBase }); { - // Sync Aggregator + // sync aggregator ctx.compute_context().aggregate(change_aggregator, ctx.GetLocalChangeSum()); ctx.compute_context().aggregate(edge_weight_aggregator, @@ -149,12 +149,14 @@ class LouvainAppBase auto outer_vertices = frag.OuterVertices(); int current_super_step = ctx.compute_context().superstep(); + // the minor step in phase 1 int current_minor_step = current_super_step % 3; + // the current iteration, two iterations make a full pass. int current_iteration = current_super_step / 3; - LOG(INFO) << "current super step: " << current_super_step - << " current minor step: " << current_minor_step - << " current iteration: " << current_iteration; + VLOG(1) << "current super step: " << current_super_step + << " current minor step: " << current_minor_step + << " current iteration: " << current_iteration; if (current_super_step == terminate_step) { // get result messages and terminate @@ -171,11 +173,11 @@ class LouvainAppBase thrd_num, std::vector>(thrd_num)); messages.ParallelProcess( thrd_num, [&thrd_num, &buffer](int tid, md_t const& msg) { - buffer[tid][msg.dst_id % thrd_num].emplace_back(msg); + buffer[tid][msg.dst_id % thrd_num].emplace_back(std::move(msg)); }); { std::vector threads(thrd_num); - for (uint32_t tid = 0; tid < thrd_num; ++tid) { + for (uint32_t tid = 0; tid < thrd_num_msg; ++tid) { threads[tid] = std::thread( [&frag, &ctx, &thrd_num, &buffer](uint32_t tid) { for (uint32_t index = 0; index < thrd_num; ++index) { @@ -198,37 +200,43 @@ class LouvainAppBase if (current_minor_step == phase_one_minor_step_1 && current_iteration > 0 && current_iteration % 2 == 0) { - int64_t totalChange = + // aggreate total change + int64_t total_change = ctx.compute_context().template get_aggregated_value( change_aggregator); - ctx.change_history().push_back(totalChange); + ctx.change_history().push_back(total_change); + // check whether to halt phase-1 bool to_halt = decide_to_halt(ctx.change_history(), ctx.tolerance(), ctx.min_progress()); ctx.set_halt(to_halt); if (ctx.halt()) { LOG(INFO) << "super step " << current_super_step << " decided to halt."; + messages.ForceContinue(); } - LOG(INFO) << "[INFO]: superstep: " << current_super_step - << " pass: " << current_iteration / 2 - << " totalChange: " << totalChange; + VLOG(1) << "[INFO]: superstep: " << current_super_step + << " pass: " << current_iteration / 2 + << " total change: " << totalChange; } else if (ctx.halt()) { + // aggregate actual quality produce in previous step. double actual_quality = ctx.compute_context().template get_aggregated_value( actual_quality_aggregator); // after one pass if already decided halt, that means the pass yield no // changes, so we halt computation. if (current_super_step <= 14 || actual_quality <= ctx.prev_quality()) { - // the louvain computation complete. - LOG(INFO) << "computation complete, ACTUAL QUALITY: " << actual_quality; + // turn to sync community result ctx.compute_context().set_superstep(sync_result_step); syncCommunity(frag, ctx, messages); messages.ForceContinue(); + + LOG(INFO) << "computation complete, ACTUAL QUALITY: " << actual_quality; return; } else if (ctx.compute_context().superstep() > 0) { - // phase 1 halt - LOG(INFO) << "super step: " << current_super_step - << " decided to halt, ACTUAL QUALITY: " << actual_quality - << " previous QUALITY: " << ctx.prev_quality(); + // just halt phase 1 + VLOG(1) << "super step: " << current_super_step + << " decided to halt, ACTUAL QUALITY: " << actual_quality + << " previous QUALITY: " << ctx.prev_quality(); + // start phase 2 to compress community. ctx.compute_context().set_superstep(phase_two_start_step); ctx.set_prev_quality(actual_quality); @@ -237,8 +245,8 @@ class LouvainAppBase } } - // At the start of each pass, every alive node send to their - // communities node info, we active the node. + // At the start of each pass, every alive node need to their + // communities node info to neighbors, so we active the node. if (ctx.compute_context().superstep() == phase_two_start_step) { ForEach(inner_vertices, [&ctx](int tid, vertex_t v) { if (ctx.GetVertexState(v).is_alived_community) { @@ -267,7 +275,7 @@ class LouvainAppBase }); { - // Sync Aggregator + // sync aggregator ctx.compute_context().aggregate(change_aggregator, ctx.GetLocalChangeSum()); ctx.compute_context().aggregate(edge_weight_aggregator, @@ -288,12 +296,14 @@ class LouvainAppBase } ctx.compute_context().clear_for_next_round(); + if (!ctx.compute_context().all_halted()) { messages.ForceContinue(); } } private: + // sync community id from community hub to community members. void syncCommunity(const fragment_t& frag, context_t& ctx, message_manager_t& messages) { diff --git a/analytical_engine/core/app/pregel/pregel_compute_context.h b/analytical_engine/core/app/pregel/pregel_compute_context.h index f7e7bc0c2b44..d0b525d8efa7 100644 --- a/analytical_engine/core/app/pregel/pregel_compute_context.h +++ b/analytical_engine/core/app/pregel/pregel_compute_context.h @@ -171,13 +171,12 @@ class PregelComputeContext { bool all_halted() { auto inner_vertices = fragment_->InnerVertices(); - size_t cnt = 0; for (auto& v : inner_vertices) { - if (halted_[v]) { - ++cnt; + if (!halted_[v]) { + return false; } } - return cnt == inner_vertex_num_; + return true; // return voted_to_halt_num_ == inner_vertex_num_; } From d13ad71dd533ce4170c642bc4850884a68185e96 Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 11:34:07 +0800 Subject: [PATCH 05/18] revise --- .../apps/pregel/louvain/louvain_app_base.h | 1 - .../apps/pregel/louvain/louvain_vertex.h | 19 ------------------- .../core/app/pregel/pregel_compute_context.h | 1 - .../core/app/pregel/pregel_vertex.h | 16 ++++++++-------- 4 files changed, 8 insertions(+), 29 deletions(-) diff --git a/analytical_engine/apps/pregel/louvain/louvain_app_base.h b/analytical_engine/apps/pregel/louvain/louvain_app_base.h index 6383cf1312a7..85aed10fd76f 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_app_base.h +++ b/analytical_engine/apps/pregel/louvain/louvain_app_base.h @@ -303,7 +303,6 @@ class LouvainAppBase } private: - // sync community id from community hub to community members. void syncCommunity(const fragment_t& frag, context_t& ctx, message_manager_t& messages) { diff --git a/analytical_engine/apps/pregel/louvain/louvain_vertex.h b/analytical_engine/apps/pregel/louvain/louvain_vertex.h index 2720334ae695..e0cc61749b73 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_vertex.h +++ b/analytical_engine/apps/pregel/louvain/louvain_vertex.h @@ -54,28 +54,12 @@ class LouvainVertex : public PregelVertex { state_t& state() { return context_->GetVertexState(vertex_); } - vertex_t vertex() const { return vertex_; } - - adj_list_t outgoing_edges() { return fragment_->GetOutgoingAdjList(vertex_); } - - adj_list_t incoming_edges() { return fragment_->GetIncomingAdjList(vertex_); } - void send_by_gid(vid_t dst_gid, const md_t& md) { compute_context_->send_p2p_message(dst_gid, md, tid_); } - void vote_to_halt() { compute_context_->vote_to_halt(*this); } - - void set_fragment(const fragment_t* fragment) { fragment_ = fragment; } - - void set_compute_context(compute_context_t* compute_context) { - compute_context_ = compute_context; - } - void set_context(context_t* context) { context_ = context; } - void set_vertex(vertex_t vertex) { vertex_ = vertex; } - vid_t get_gid() { return fragment_->Vertex2Gid(vertex_); } vid_t get_vertex_gid(const vertex_t& v) { return fragment_->Vertex2Gid(v); } @@ -136,10 +120,7 @@ class LouvainVertex : public PregelVertex { private: int tid_; - const fragment_t* fragment_; - PregelComputeContext* compute_context_; context_t* context_; - vertex_t vertex_; }; } // namespace gs diff --git a/analytical_engine/core/app/pregel/pregel_compute_context.h b/analytical_engine/core/app/pregel/pregel_compute_context.h index d0b525d8efa7..793ae2340558 100644 --- a/analytical_engine/core/app/pregel/pregel_compute_context.h +++ b/analytical_engine/core/app/pregel/pregel_compute_context.h @@ -177,7 +177,6 @@ class PregelComputeContext { } } return true; - // return voted_to_halt_num_ == inner_vertex_num_; } typename FRAG_T::template vertex_array_t>& messages_in() { diff --git a/analytical_engine/core/app/pregel/pregel_vertex.h b/analytical_engine/core/app/pregel/pregel_vertex.h index 197346756c09..ed134be53c86 100644 --- a/analytical_engine/core/app/pregel/pregel_vertex.h +++ b/analytical_engine/core/app/pregel/pregel_vertex.h @@ -57,13 +57,13 @@ class PregelVertex { const VD_T& value() { return compute_context_->get_vertex_value(*this); } - virtual vertex_t vertex() const { return vertex_; } + vertex_t vertex() const { return vertex_; } - virtual adj_list_t outgoing_edges() { + adj_list_t outgoing_edges() { return fragment_->GetOutgoingAdjList(vertex_); } - virtual adj_list_t incoming_edges() { + adj_list_t incoming_edges() { return fragment_->GetIncomingAdjList(vertex_); } @@ -75,20 +75,20 @@ class PregelVertex { compute_context_->send_message(v, std::move(value)); } - virtual void vote_to_halt() { compute_context_->vote_to_halt(*this); } + void vote_to_halt() { compute_context_->vote_to_halt(*this); } - virtual void set_fragment(const fragment_t* fragment) { + void set_fragment(const fragment_t* fragment) { fragment_ = fragment; } - virtual void set_compute_context( + void set_compute_context( PregelComputeContext* compute_comtext) { compute_context_ = compute_comtext; } - virtual void set_vertex(vertex_t vertex) { vertex_ = vertex; } + void set_vertex(vertex_t vertex) { vertex_ = vertex; } - private: + protected: const fragment_t* fragment_; PregelComputeContext* compute_context_; From 6e62769b6aeb2d0a96cb81517b594f14942a2866 Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 11:54:06 +0800 Subject: [PATCH 06/18] update --- .../apps/pregel/louvain/louvain_app_base.h | 6 ++--- .../apps/pregel/louvain/louvain_vertex.h | 26 ++++++++++--------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/analytical_engine/apps/pregel/louvain/louvain_app_base.h b/analytical_engine/apps/pregel/louvain/louvain_app_base.h index 85aed10fd76f..4b3138481c67 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_app_base.h +++ b/analytical_engine/apps/pregel/louvain/louvain_app_base.h @@ -177,7 +177,7 @@ class LouvainAppBase }); { std::vector threads(thrd_num); - for (uint32_t tid = 0; tid < thrd_num_msg; ++tid) { + for (uint32_t tid = 0; tid < thrd_num; ++tid) { threads[tid] = std::thread( [&frag, &ctx, &thrd_num, &buffer](uint32_t tid) { for (uint32_t index = 0; index < thrd_num; ++index) { @@ -210,12 +210,12 @@ class LouvainAppBase ctx.min_progress()); ctx.set_halt(to_halt); if (ctx.halt()) { - LOG(INFO) << "super step " << current_super_step << " decided to halt."; + VLOG(1) << "super step " << current_super_step << " decided to halt."; messages.ForceContinue(); } VLOG(1) << "[INFO]: superstep: " << current_super_step << " pass: " << current_iteration / 2 - << " total change: " << totalChange; + << " total change: " << total_change; } else if (ctx.halt()) { // aggregate actual quality produce in previous step. double actual_quality = diff --git a/analytical_engine/apps/pregel/louvain/louvain_vertex.h b/analytical_engine/apps/pregel/louvain/louvain_vertex.h index e0cc61749b73..7d88c43f7434 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_vertex.h +++ b/analytical_engine/apps/pregel/louvain/louvain_vertex.h @@ -31,7 +31,7 @@ namespace gs { /** * @brief LouvainVertex is a specific PregelVertex for louvain alorithm. * LouvainVertex provides communication-related method to send messages to - * certain fragment and also can access to context of louvain. + * certain fragment and also access to context of louvain. * @tparam FRAG_T * @tparam VD_T * @tparam MD_T @@ -52,17 +52,19 @@ class LouvainVertex : public PregelVertex { using vd_t = VD_T; using md_t = MD_T; - state_t& state() { return context_->GetVertexState(vertex_); } + state_t& state() { return context_->GetVertexState(this->vertex_); } void send_by_gid(vid_t dst_gid, const md_t& md) { - compute_context_->send_p2p_message(dst_gid, md, tid_); + this->compute_context_->send_p2p_message(dst_gid, md, tid_); } void set_context(context_t* context) { context_ = context; } - vid_t get_gid() { return fragment_->Vertex2Gid(vertex_); } + vid_t get_gid() { return this->fragment_->Vertex2Gid(this->vertex_); } - vid_t get_vertex_gid(const vertex_t& v) { return fragment_->Vertex2Gid(v); } + vid_t get_vertex_gid(const vertex_t& v) { + return this->fragment_->Vertex2Gid(v); + } size_t edge_size() { if (!this->use_fake_edges()) { @@ -73,22 +75,22 @@ class LouvainVertex : public PregelVertex { } bool use_fake_edges() const { - return context_->GetVertexState(vertex_).use_fake_edges; + return context_->GetVertexState(this->vertex_).use_fake_edges; } const std::map& fake_edges() const { - return context_->GetVertexState(vertex_).fake_edges; + return context_->GetVertexState(this->vertex_).fake_edges; } edata_t get_edge_value(const vid_t& dst_id) { if (!this->use_fake_edges()) { for (auto& edge : this->incoming_edges()) { - if (fragment_->Vertex2Gid(edge.get_neighbor()) == dst_id) { + if (this->fragment_->Vertex2Gid(edge.get_neighbor()) == dst_id) { return static_cast(edge.get_data()); } } for (auto& edge : this->outgoing_edges()) { - if (fragment_->Vertex2Gid(edge.get_neighbor()) == dst_id) { + if (this->fragment_->Vertex2Gid(edge.get_neighbor()) == dst_id) { return static_cast(edge.get_data()); } } @@ -105,17 +107,17 @@ class LouvainVertex : public PregelVertex { } std::vector& nodes_in_self_community() { - return context_->GetVertexState(vertex_).nodes_in_community; + return context_->GetVertexState(this->vertex_).nodes_in_community; } int tid() { return tid_; } void set_tid(int id) { tid_ = id; } PregelComputeContext* compute_context() { - return compute_context_; + return this->compute_context_; } - const fragment_t* fragment() { return fragment_; } + const fragment_t* fragment() { return this->fragment_; } context_t* context() { return context_; } private: From 64dcfaa4773b90f0d6847e54a9f84ec3e04b5f56 Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 11:59:58 +0800 Subject: [PATCH 07/18] update --- .../apps/pregel/louvain/louvain_context.h | 4 ++-- .../core/app/pregel/pregel_compute_context.h | 2 -- analytical_engine/core/app/pregel/pregel_vertex.h | 12 +++--------- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/analytical_engine/apps/pregel/louvain/louvain_context.h b/analytical_engine/apps/pregel/louvain/louvain_context.h index 7effa02d190c..c4abdc57be20 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_context.h +++ b/analytical_engine/apps/pregel/louvain/louvain_context.h @@ -32,8 +32,8 @@ limitations under the License. namespace gs { /** - * @brief PregelContext holds the computation result with - * grape::VertexDataContext. + * @brief Context of louvain that holds the computation result with + * grape::VertexDataContext and some state of louvain process. * @tparam FRAG_T * @tparam COMPUTE_CONTEXT_T */ diff --git a/analytical_engine/core/app/pregel/pregel_compute_context.h b/analytical_engine/core/app/pregel/pregel_compute_context.h index 793ae2340558..553c5ecdeb96 100644 --- a/analytical_engine/core/app/pregel/pregel_compute_context.h +++ b/analytical_engine/core/app/pregel/pregel_compute_context.h @@ -164,8 +164,6 @@ class PregelComputeContext { void vote_to_halt(const pregel_vertex_t& vertex) { if (halted_[vertex.vertex()] == false) { halted_[vertex.vertex()] = true; - // size_t one = 1; - // grape::atomic_add(voted_to_halt_num_, one); } } diff --git a/analytical_engine/core/app/pregel/pregel_vertex.h b/analytical_engine/core/app/pregel/pregel_vertex.h index ed134be53c86..1855e749fa60 100644 --- a/analytical_engine/core/app/pregel/pregel_vertex.h +++ b/analytical_engine/core/app/pregel/pregel_vertex.h @@ -59,13 +59,9 @@ class PregelVertex { vertex_t vertex() const { return vertex_; } - adj_list_t outgoing_edges() { - return fragment_->GetOutgoingAdjList(vertex_); - } + adj_list_t outgoing_edges() { return fragment_->GetOutgoingAdjList(vertex_); } - adj_list_t incoming_edges() { - return fragment_->GetIncomingAdjList(vertex_); - } + adj_list_t incoming_edges() { return fragment_->GetIncomingAdjList(vertex_); } void send(const vertex_t& v, const MD_T& value) { compute_context_->send_message(v, value); @@ -77,9 +73,7 @@ class PregelVertex { void vote_to_halt() { compute_context_->vote_to_halt(*this); } - void set_fragment(const fragment_t* fragment) { - fragment_ = fragment; - } + void set_fragment(const fragment_t* fragment) { fragment_ = fragment; } void set_compute_context( PregelComputeContext* compute_comtext) { From 20f288905ea8b6c4e401e563c14976792e1de619 Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 12:33:47 +0800 Subject: [PATCH 08/18] update --- analytical_engine/apps/pregel/louvain/auxiliary.h | 6 +++--- .../apps/pregel/louvain/louvain_app_base.h | 4 ++-- .../apps/pregel/louvain/louvain_context.h | 10 +++++----- python/graphscope/analytical/app/louvain.py | 13 +++++++------ python/tests/test_app.py | 3 ++- 5 files changed, 19 insertions(+), 17 deletions(-) diff --git a/analytical_engine/apps/pregel/louvain/auxiliary.h b/analytical_engine/apps/pregel/louvain/auxiliary.h index 077c4d01bb1e..ec2f0b584519 100644 --- a/analytical_engine/apps/pregel/louvain/auxiliary.h +++ b/analytical_engine/apps/pregel/louvain/auxiliary.h @@ -154,8 +154,8 @@ struct LouvainMessage { }; // util function to decide wether we should halt the phase. -bool decide_to_halt(const std::vector& history, int tolerance, - int min_progress) { +bool decide_to_halt(const std::vector& history, int min_progress, + int progress_tries) { // halt if the most recent change was 0 if (0 == history.back()) { return true; @@ -169,7 +169,7 @@ bool decide_to_halt(const std::vector& history, int tolerance, } previous = cur; } - return (count > tolerance); + return (count > progress_tries); } } // namespace gs diff --git a/analytical_engine/apps/pregel/louvain/louvain_app_base.h b/analytical_engine/apps/pregel/louvain/louvain_app_base.h index 4b3138481c67..6fe9c117f4bc 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_app_base.h +++ b/analytical_engine/apps/pregel/louvain/louvain_app_base.h @@ -206,8 +206,8 @@ class LouvainAppBase change_aggregator); ctx.change_history().push_back(total_change); // check whether to halt phase-1 - bool to_halt = decide_to_halt(ctx.change_history(), ctx.tolerance(), - ctx.min_progress()); + bool to_halt = decide_to_halt(ctx.change_history(), ctx.min_progress(), + ctx.progress_tries()); ctx.set_halt(to_halt); if (ctx.halt()) { VLOG(1) << "super step " << current_super_step << " decided to halt."; diff --git a/analytical_engine/apps/pregel/louvain/louvain_context.h b/analytical_engine/apps/pregel/louvain/louvain_context.h index c4abdc57be20..82a5c10bd5d5 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_context.h +++ b/analytical_engine/apps/pregel/louvain/louvain_context.h @@ -56,8 +56,8 @@ class LouvainContext fragment), compute_context_(this->data()) {} - void Init(grape::ParallelMessageManager& messages, int tolerance, - int min_progress) { + void Init(grape::ParallelMessageManager& messages, int min_progress, + int progress_tries) { auto& frag = this->fragment(); auto inner_vertices = frag.InnerVertices(); @@ -65,8 +65,8 @@ class LouvainContext compute_context_.set_fragment(&frag); compute_context_.set_parallel_message_manager(&messages); - this->tolerance_ = tolerance; this->min_progress_ = min_progress; + this->progress_tries_ = progress_tries; vertex_state_.Init(inner_vertices); halt_ = false; @@ -136,8 +136,8 @@ class LouvainContext double prev_quality() { return prev_quality_; } void set_prev_quality(double value) { prev_quality_ = value; } - int tolerance() { return tolerance_; } int min_progress() { return min_progress_; } + int progress_tries() { return progress_tries_; } private: std::vector change_history_; @@ -151,8 +151,8 @@ class LouvainContext bool halt_; // phase-1 halt double prev_quality_; - int tolerance_; int min_progress_; + int progress_tries_; }; } // namespace gs diff --git a/python/graphscope/analytical/app/louvain.py b/python/graphscope/analytical/app/louvain.py index de72508e037e..39302244d9a3 100644 --- a/python/graphscope/analytical/app/louvain.py +++ b/python/graphscope/analytical/app/louvain.py @@ -25,17 +25,18 @@ @not_compatible_for("arrow_property", "dynamic_property") -def louvain(graph, tolerance=3, min_progress=100): - """Compute best partition on the `graph`. +def louvain(graph, min_progress=50, progress_tries=2): + """Compute best partition on the `graph` by louvain. Args: graph (:class:`Graph`): A projected simple graph. - tolerance: number of times the min_progress setting is not met - before exiting form the current level and compressing the graph min_progress: The minimum delta X required to be considered progress, where X is the number of nodes that have changed their community on a particular pass. - Delta X is then the difference in number of nodes that changed communities - on the current pass compared to the previous pass. + Delta X is then the difference in number of nodes that changed communities + on the current pass compared to the previous pass. + progress_tries: number of times the min_progress setting is not met + before exiting form the current level and compressing the graph. + Returns: :class:`VertexDataContext`: A context with each vertex assigned with id of community it belongs to. diff --git a/python/tests/test_app.py b/python/tests/test_app.py index b65c85cd5520..87f2ce686405 100644 --- a/python/tests/test_app.py +++ b/python/tests/test_app.py @@ -342,7 +342,8 @@ def test_app_on_undirected_graph( assert np.all(ctx10.to_numpy("r", vertex_range={"begin": 1, "end": 4}) == [0, 0, 0]) # louvain - ctx10 = louvain(p2p_project_undirected_graph, tolerance=0, min_progress=1) + ctx10 = louvain(p2p_project_undirected_graph, min_progress=50, + progress_tries=2) def test_run_app_on_string_oid_graph(p2p_project_directed_graph_string): From 06bd01e16813ff97f7e935a414c8fff465fd1932 Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 13:22:19 +0800 Subject: [PATCH 09/18] update --- python/tests/test_app.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/tests/test_app.py b/python/tests/test_app.py index 87f2ce686405..6671ef320f90 100644 --- a/python/tests/test_app.py +++ b/python/tests/test_app.py @@ -342,8 +342,7 @@ def test_app_on_undirected_graph( assert np.all(ctx10.to_numpy("r", vertex_range={"begin": 1, "end": 4}) == [0, 0, 0]) # louvain - ctx10 = louvain(p2p_project_undirected_graph, min_progress=50, - progress_tries=2) + ctx10 = louvain(p2p_project_undirected_graph, min_progress=50, progress_tries=2) def test_run_app_on_string_oid_graph(p2p_project_directed_graph_string): From c1d07c4c4ec6201d9f34558bf2ec3212047d2bc4 Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 14:06:45 +0800 Subject: [PATCH 10/18] update --- python/graphscope/analytical/app/louvain.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/graphscope/analytical/app/louvain.py b/python/graphscope/analytical/app/louvain.py index 39302244d9a3..44ae2eeb0156 100644 --- a/python/graphscope/analytical/app/louvain.py +++ b/python/graphscope/analytical/app/louvain.py @@ -59,4 +59,4 @@ def louvain(graph, min_progress=50, progress_tries=2): s.close() """ - return AppAssets(algo="louvain")(graph, tolerance, min_progress) + return AppAssets(algo="louvain")(graph, min_progress, progress_tries) From 105f42b682d4b88b682d588692e04f1395f4c740 Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 14:25:22 +0800 Subject: [PATCH 11/18] revise comments --- analytical_engine/apps/pregel/louvain/louvain_app_base.h | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/analytical_engine/apps/pregel/louvain/louvain_app_base.h b/analytical_engine/apps/pregel/louvain/louvain_app_base.h index 6fe9c117f4bc..c5d758606bce 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_app_base.h +++ b/analytical_engine/apps/pregel/louvain/louvain_app_base.h @@ -210,6 +210,8 @@ class LouvainAppBase ctx.progress_tries()); ctx.set_halt(to_halt); if (ctx.halt()) { + // if halt, first aggregate actual quality in Compute of vertices and + // then start phase 2 in next super step. VLOG(1) << "super step " << current_super_step << " decided to halt."; messages.ForceContinue(); } @@ -217,7 +219,8 @@ class LouvainAppBase << " pass: " << current_iteration / 2 << " total change: " << total_change; } else if (ctx.halt()) { - // aggregate actual quality produce in previous step. + // after decide_to_halt and aggregate actual quality in previous super + // step, here we check terminate computaion or start phase 2. double actual_quality = ctx.compute_context().template get_aggregated_value( actual_quality_aggregator); @@ -232,7 +235,7 @@ class LouvainAppBase LOG(INFO) << "computation complete, ACTUAL QUALITY: " << actual_quality; return; } else if (ctx.compute_context().superstep() > 0) { - // just halt phase 1 + // just halt phase 1 start phase 2. VLOG(1) << "super step: " << current_super_step << " decided to halt, ACTUAL QUALITY: " << actual_quality << " previous QUALITY: " << ctx.prev_quality(); From 0c08d46265df0844af0b8df83dd6a8a8a5873905 Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 16:03:22 +0800 Subject: [PATCH 12/18] add directed check before run app --- python/graphscope/analytical/app/louvain.py | 5 ++++- python/tests/test_app.py | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/python/graphscope/analytical/app/louvain.py b/python/graphscope/analytical/app/louvain.py index 44ae2eeb0156..dcd581125faf 100644 --- a/python/graphscope/analytical/app/louvain.py +++ b/python/graphscope/analytical/app/louvain.py @@ -18,6 +18,7 @@ from graphscope.framework.app import AppAssets from graphscope.framework.app import not_compatible_for +from graphscope.framework.errors import InvalidArgumentError __all__ = [ "louvain", @@ -25,7 +26,7 @@ @not_compatible_for("arrow_property", "dynamic_property") -def louvain(graph, min_progress=50, progress_tries=2): +def louvain(graph, min_progress=100, progress_tries=2): """Compute best partition on the `graph` by louvain. Args: @@ -59,4 +60,6 @@ def louvain(graph, min_progress=50, progress_tries=2): s.close() """ + if graph.is_directed(): + raise InvalidArgumentError("Louvain not support directed graph.") return AppAssets(algo="louvain")(graph, min_progress, progress_tries) diff --git a/python/tests/test_app.py b/python/tests/test_app.py index 6671ef320f90..71e29bc0cd4e 100644 --- a/python/tests/test_app.py +++ b/python/tests/test_app.py @@ -182,6 +182,9 @@ def test_run_app_on_directed_graph( sorted(ctx5.to_numpy("r", vertex_range={"begin": 1, "end": 4})) == [5, 5, 6] ) + with pytest.raises(InvalidArgumentError, match="Louvain not support directed graph."): + louvain(p2p_project_directed_graphg) + def test_app_on_undirected_graph( p2p_project_undirected_graph, From 4ee2f82f3d4cdc6c6755b0907057c5f42534f5ee Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 16:06:42 +0800 Subject: [PATCH 13/18] fix --- python/tests/test_app.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/tests/test_app.py b/python/tests/test_app.py index 71e29bc0cd4e..03ab5b9be227 100644 --- a/python/tests/test_app.py +++ b/python/tests/test_app.py @@ -182,7 +182,9 @@ def test_run_app_on_directed_graph( sorted(ctx5.to_numpy("r", vertex_range={"begin": 1, "end": 4})) == [5, 5, 6] ) - with pytest.raises(InvalidArgumentError, match="Louvain not support directed graph."): + with pytest.raises( + InvalidArgumentError, match="Louvain not support directed graph." + ): louvain(p2p_project_directed_graphg) From 6ff23144f5b16f0f561273d6d8585cbff248d70b Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 16:20:59 +0800 Subject: [PATCH 14/18] remove unnessary header --- analytical_engine/apps/pregel/louvain/auxiliary.h | 3 --- analytical_engine/apps/pregel/louvain/louvain.h | 6 ------ analytical_engine/apps/pregel/louvain/louvain_app_base.h | 2 -- analytical_engine/apps/pregel/louvain/louvain_context.h | 7 ------- analytical_engine/apps/pregel/louvain/louvain_vertex.h | 1 - 5 files changed, 19 deletions(-) diff --git a/analytical_engine/apps/pregel/louvain/auxiliary.h b/analytical_engine/apps/pregel/louvain/auxiliary.h index ec2f0b584519..5e61bad4f9db 100644 --- a/analytical_engine/apps/pregel/louvain/auxiliary.h +++ b/analytical_engine/apps/pregel/louvain/auxiliary.h @@ -16,10 +16,7 @@ #ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_AUXILIARY_H_ #define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_AUXILIARY_H_ -#include #include -#include -#include #include #include "grape/grape.h" diff --git a/analytical_engine/apps/pregel/louvain/louvain.h b/analytical_engine/apps/pregel/louvain/louvain.h index 032c23a7e6d1..eb9393cd0c5a 100644 --- a/analytical_engine/apps/pregel/louvain/louvain.h +++ b/analytical_engine/apps/pregel/louvain/louvain.h @@ -16,14 +16,8 @@ #ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_H_ #define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_H_ -#include -#include #include -#include -#include -#include #include -#include #include #include "apps/pregel/louvain/auxiliary.h" diff --git a/analytical_engine/apps/pregel/louvain/louvain_app_base.h b/analytical_engine/apps/pregel/louvain/louvain_app_base.h index c5d758606bce..6c179f3ef47b 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_app_base.h +++ b/analytical_engine/apps/pregel/louvain/louvain_app_base.h @@ -16,8 +16,6 @@ limitations under the License. #ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_APP_BASE_H_ #define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_APP_BASE_H_ -#include -#include #include #include #include diff --git a/analytical_engine/apps/pregel/louvain/louvain_context.h b/analytical_engine/apps/pregel/louvain/louvain_context.h index 82a5c10bd5d5..445a30054cc1 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_context.h +++ b/analytical_engine/apps/pregel/louvain/louvain_context.h @@ -16,15 +16,8 @@ limitations under the License. #ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_CONTEXT_H_ #define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_CONTEXT_H_ -#include -#include -#include -#include #include -#include "boost/property_tree/json_parser.hpp" -#include "boost/property_tree/ptree.hpp" - #include "grape/grape.h" #include "core/context/vertex_data_context.h" diff --git a/analytical_engine/apps/pregel/louvain/louvain_vertex.h b/analytical_engine/apps/pregel/louvain/louvain_vertex.h index 7d88c43f7434..414cc17be5dd 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_vertex.h +++ b/analytical_engine/apps/pregel/louvain/louvain_vertex.h @@ -16,7 +16,6 @@ limitations under the License. #ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_VERTEX_H_ #define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_VERTEX_H_ -#include #include #include #include From df5d10846c18a4d8d4746dbf1505808a9ba2e17a Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 16:26:40 +0800 Subject: [PATCH 15/18] fix --- analytical_engine/apps/pregel/louvain/louvain.h | 2 +- analytical_engine/apps/pregel/louvain/louvain_app_base.h | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/analytical_engine/apps/pregel/louvain/louvain.h b/analytical_engine/apps/pregel/louvain/louvain.h index eb9393cd0c5a..8df3e85efa01 100644 --- a/analytical_engine/apps/pregel/louvain/louvain.h +++ b/analytical_engine/apps/pregel/louvain/louvain.h @@ -17,7 +17,7 @@ #define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_H_ #include -#include +#include #include #include "apps/pregel/louvain/auxiliary.h" diff --git a/analytical_engine/apps/pregel/louvain/louvain_app_base.h b/analytical_engine/apps/pregel/louvain/louvain_app_base.h index 6c179f3ef47b..b2e5a44f476e 100644 --- a/analytical_engine/apps/pregel/louvain/louvain_app_base.h +++ b/analytical_engine/apps/pregel/louvain/louvain_app_base.h @@ -16,6 +16,7 @@ limitations under the License. #ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_APP_BASE_H_ #define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_APP_BASE_H_ +#include #include #include #include From f75e836e3b11d5efa771a214842fd5e9855e8f61 Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 16:59:06 +0800 Subject: [PATCH 16/18] fix --- python/tests/test_app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/test_app.py b/python/tests/test_app.py index 03ab5b9be227..a44d2181079f 100644 --- a/python/tests/test_app.py +++ b/python/tests/test_app.py @@ -185,7 +185,7 @@ def test_run_app_on_directed_graph( with pytest.raises( InvalidArgumentError, match="Louvain not support directed graph." ): - louvain(p2p_project_directed_graphg) + louvain(p2p_project_directed_graph) def test_app_on_undirected_graph( From 20d151ed86b27f8caad457ea44369be5fa3e0653 Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Wed, 24 Mar 2021 21:03:00 +0800 Subject: [PATCH 17/18] update --- .../apps/pregel/louvain/auxiliary.h | 17 ++++++++++++++--- python/graphscope/analytical/app/louvain.py | 2 +- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/analytical_engine/apps/pregel/louvain/auxiliary.h b/analytical_engine/apps/pregel/louvain/auxiliary.h index 5e61bad4f9db..4cc0fdeb0d52 100644 --- a/analytical_engine/apps/pregel/louvain/auxiliary.h +++ b/analytical_engine/apps/pregel/louvain/auxiliary.h @@ -150,18 +150,29 @@ struct LouvainMessage { } }; -// util function to decide wether we should halt the phase. +/** + * Determine if progress is still being made or if the computaion should halt. + * + * @param history change history of the pass. + * @param min_progress The minimum delta X required to be considered progress. + * where X is the number of nodes that have changed their community on a + * particular pass. + * @param progress_tries Number of times the minimum.progress setting is not met + * before exiting form the current level and compressing the graph + * @return true + * @return false + */ bool decide_to_halt(const std::vector& history, int min_progress, int progress_tries) { // halt if the most recent change was 0 if (0 == history.back()) { return true; } - // halt if the change count has increased tolerance times + // halt if the change count has increased progress_tries times int64_t previous = history.front(); int count = 0; for (const auto& cur : history) { - if (cur >= previous - min_progress) { + if (previous - cur <= min_progress) { count++; } previous = cur; diff --git a/python/graphscope/analytical/app/louvain.py b/python/graphscope/analytical/app/louvain.py index dcd581125faf..00b45b04ec21 100644 --- a/python/graphscope/analytical/app/louvain.py +++ b/python/graphscope/analytical/app/louvain.py @@ -26,7 +26,7 @@ @not_compatible_for("arrow_property", "dynamic_property") -def louvain(graph, min_progress=100, progress_tries=2): +def louvain(graph, min_progress=1000, progress_tries=1): """Compute best partition on the `graph` by louvain. Args: From 5a19fcca734a45ea5063578477d098b745cb39db Mon Sep 17 00:00:00 2001 From: "qiaozi.zwb" Date: Thu, 25 Mar 2021 10:24:59 +0800 Subject: [PATCH 18/18] delete not use code --- analytical_engine/core/app/pregel/pregel_compute_context.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/analytical_engine/core/app/pregel/pregel_compute_context.h b/analytical_engine/core/app/pregel/pregel_compute_context.h index 553c5ecdeb96..c4e4e16921a7 100644 --- a/analytical_engine/core/app/pregel/pregel_compute_context.h +++ b/analytical_engine/core/app/pregel/pregel_compute_context.h @@ -156,8 +156,6 @@ class PregelComputeContext { void activate(const vertex_t& v) { if (halted_[v] == true) { halted_[v] = false; - // size_t one = 1; - // __sync_fetch_and_sub(&voted_to_halt_num_, one); } }