diff --git a/analytical_engine/CMakeLists.txt b/analytical_engine/CMakeLists.txt index 4e88741f73c0..ff0623414525 100644 --- a/analytical_engine/CMakeLists.txt +++ b/analytical_engine/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 2.8) +cmake_minimum_required(VERSION 3.1) if ("${GRAPHSCOPE_VERSION}" STREQUAL "") set(GRAPHSCOPE_ANALYTICAL_MAJOR_VERSION 0) @@ -45,7 +45,9 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) include_directories(${PROJECT_SOURCE_DIR}) # Set flags -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -Wall") +set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c99") if (APPLE) set(CMAKE_MACOSX_RPATH ON) diff --git a/analytical_engine/apps/pregel/louvain/auxiliary.h b/analytical_engine/apps/pregel/louvain/auxiliary.h new file mode 100644 index 000000000000..4cc0fdeb0d52 --- /dev/null +++ b/analytical_engine/apps/pregel/louvain/auxiliary.h @@ -0,0 +1,185 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_AUXILIARY_H_ +#define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_AUXILIARY_H_ + +#include +#include + +#include "grape/grape.h" + +namespace gs { + +// aggregators define +constexpr char change_aggregator[] = "change_aggregator"; +constexpr char edge_weight_aggregator[] = "total_edge_weight_aggregator"; +constexpr char actual_quality_aggregator[] = "actual_quality_aggregator"; + +// major phase of louvain +constexpr int phase_one_start_step = 0; +constexpr int phase_two_start_step = -2; +constexpr int compress_community_step = -1; +constexpr int sync_result_step = -10; +constexpr int terminate_step = -9; + +// minor step of phase 1 +constexpr int phase_one_minor_step_0 = 0; +constexpr int phase_one_minor_step_1 = 1; +constexpr int phase_one_minor_step_2 = 2; + +/** + * The state of a vertex. + */ +template +struct LouvainNodeState { + using vid_t = VID_T; + using edata_t = double; + + vid_t community = 0; + edata_t community_sigma_total; + + // the internal edge weight of a node + edata_t internal_weight; + + // degree of the node + edata_t node_weight; + + // 1 if the node has changed communities this cycle, otherwise 0 + int64_t changed; + + bool reset_total_edge_weight; + bool is_from_louvain_vertex_reader = false; + bool use_fake_edges = false; + bool is_alived_community = true; + + std::map fake_edges; + std::vector nodes_in_community; + edata_t total_edge_weight; + + LouvainNodeState() + : community(0), + community_sigma_total(0.0), + internal_weight(0.0), + node_weight(0.0), + changed(0), + reset_total_edge_weight(false), + is_from_louvain_vertex_reader(false), + use_fake_edges(false), + is_alived_community(true) {} + + ~LouvainNodeState() = default; +}; + +/** + * Message type of louvain. + */ +template +struct LouvainMessage { + using vid_t = VID_T; + using edata_t = double; + + vid_t community_id; + edata_t community_sigma_total; + edata_t edge_weight; + vid_t source_id; + vid_t dst_id; + + // For reconstruct graph info. + // Each vertex send self's meta info to its community and silence itself, + // the community compress its member's data and make self a new vertex for + // next phase. + edata_t internal_weight = 0; + std::map edges; + std::vector nodes_in_self_community; + + LouvainMessage() + : community_id(0), + community_sigma_total(0.0), + edge_weight(0.0), + source_id(0), + dst_id(0) {} + + LouvainMessage(const vid_t& community_id, edata_t community_sigma_total, + edata_t edge_weight, const vid_t& source_id, + const vid_t& dst_id) + : community_id(community_id), + community_sigma_total(community_sigma_total), + edge_weight(edge_weight), + source_id(source_id), + dst_id(dst_id) {} + + ~LouvainMessage() = default; + + // for message manager to serialize and diserialize LouvainMessage + friend grape::InArchive& operator<<(grape::InArchive& in_archive, + const LouvainMessage& u) { + in_archive << u.community_id; + in_archive << u.community_sigma_total; + in_archive << u.edge_weight; + in_archive << u.source_id; + in_archive << u.dst_id; + in_archive << u.internal_weight; + in_archive << u.edges; + in_archive << u.nodes_in_self_community; + return in_archive; + } + friend grape::OutArchive& operator>>(grape::OutArchive& out_archive, + LouvainMessage& val) { + out_archive >> val.community_id; + out_archive >> val.community_sigma_total; + out_archive >> val.edge_weight; + out_archive >> val.source_id; + out_archive >> val.dst_id; + out_archive >> val.internal_weight; + out_archive >> val.edges; + out_archive >> val.nodes_in_self_community; + return out_archive; + } +}; + +/** + * Determine if progress is still being made or if the computaion should halt. + * + * @param history change history of the pass. + * @param min_progress The minimum delta X required to be considered progress. + * where X is the number of nodes that have changed their community on a + * particular pass. + * @param progress_tries Number of times the minimum.progress setting is not met + * before exiting form the current level and compressing the graph + * @return true + * @return false + */ +bool decide_to_halt(const std::vector& history, int min_progress, + int progress_tries) { + // halt if the most recent change was 0 + if (0 == history.back()) { + return true; + } + // halt if the change count has increased progress_tries times + int64_t previous = history.front(); + int count = 0; + for (const auto& cur : history) { + if (previous - cur <= min_progress) { + count++; + } + previous = cur; + } + return (count > progress_tries); +} + +} // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_AUXILIARY_H_ diff --git a/analytical_engine/apps/pregel/louvain/louvain.h b/analytical_engine/apps/pregel/louvain/louvain.h new file mode 100644 index 000000000000..8df3e85efa01 --- /dev/null +++ b/analytical_engine/apps/pregel/louvain/louvain.h @@ -0,0 +1,420 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_H_ +#define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_H_ + +#include +#include +#include + +#include "apps/pregel/louvain/auxiliary.h" +#include "apps/pregel/louvain/louvain_vertex.h" +#include "core/app/pregel/i_vertex_program.h" +#include "core/app/pregel/pregel_compute_context.h" + +namespace gs { + +/* + * Distribute-louvain algorithm description: + * phase-1 + * 0. Each vertex receives community values from its community hub + * and sends its own community to its neighbors. + * 1. Each vertex determines if it should move to a neighboring community or not + * and sends its information to its community hub. + * 2. Each community hub re-calculates community totals and sends the updates + * to each community member. + * repeate phase 1 process until a local maxima of the modularity attained. + * phase-2 + * -2 Community hub calls its member to gather the community sigma tot. + * -1 Compress each community such that they are represented by one node. + * + * reapply the phase-1 process to the new graph. + * + * The passes are iterated until there are no more changes and a maximum of + * modularity is attained. + * + * References: + * https://sotera.github.io/distributed-graph-analytics/louvain/ + * https://github.com/Sotera/distributed-graph-analytics + */ + +template +class PregelLouvain + : public IPregelProgram< + LouvainVertex>, + PregelComputeContext>> { + public: + using fragment_t = FRAG_T; + using oid_t = typename fragment_t::oid_t; + using vid_t = typename fragment_t::vid_t; + using edata_t = double; + using vd_t = oid_t; + using md_t = LouvainMessage; + using compute_context_t = PregelComputeContext; + using pregel_vertex_t = LouvainVertex; + using state_t = LouvainNodeState; + + public: + void Init(pregel_vertex_t& v, compute_context_t& context) override { + state_t& state = v.state(); + edata_t sigma_total = 0.0; + for (auto& e : v.outgoing_edges()) { + sigma_total += static_cast(e.get_data()); + } + + state.community = v.get_gid(); + state.community_sigma_total = sigma_total + state.internal_weight; + state.node_weight = sigma_total; + state.is_from_louvain_vertex_reader = true; + state.nodes_in_community.push_back(state.community); + } + + void Compute(grape::IteratorPair messages, pregel_vertex_t& v, + compute_context_t& context) override { + state_t& state = v.state(); + + int current_super_step = context.superstep(); + // the minor step in phase 1 + int current_minor_step = current_super_step % 3; + // the current iteration, two iterations make a full pass. + int current_iteration = current_super_step / 3; + + if (current_super_step == phase_two_start_step) { + sendCommunitiesInfo(v); + return; + } else if (current_super_step == compress_community_step) { + compressCommunities(v, messages); + return; + } + + // count the total edge weight of the graph on the phase-1 start only + if (current_super_step == phase_one_start_step) { + if (!state.is_from_louvain_vertex_reader) { + // not from the disk but from the previous round's result + state.community = v.get_gid(); + state.node_weight = 0; + // It must use fake edges since we already set them last round. + for (auto& e : v.fake_edges()) { + state.node_weight += e.second; + } + } + state.reset_total_edge_weight = true; + v.context()->local_total_edge_weight()[v.tid()] += + state.node_weight + state.internal_weight; + } + + if (current_super_step == phase_one_start_step && v.edge_size() == 0) { + // isolated nodes send themselves a message on the phase_1 start step + md_t message; + v.send_by_gid(v.get_gid(), message); + v.vote_to_halt(); + return; + } else if (current_super_step == 1 && v.edge_size() == 0) { + // isolated node aggregate their quality value and exit computation on + // step 1 + grape::IteratorPair msgs(NULL, NULL); + double q = calculateActualQuality(v, context, msgs); + v.context()->local_actual_quality()[v.tid()] += q; + v.vote_to_halt(); + return; + } + // at the start of each full pass check to see wether progress is still + // being made, if not halt + if (current_minor_step == phase_one_minor_step_1 && current_iteration > 0 && + current_iteration % 2 == 0) { + state.changed = 0; // reset changed. + if (v.context()->halt()) { + // phase-1 halt, calculate current actual quality and return. + double q = calculateActualQuality(v, context, messages); + replaceNodeEdgesWithCommunityEdges(v, messages); + v.context()->local_actual_quality()[v.tid()] += q; + return; + } + } + + switch (current_minor_step) { + case phase_one_minor_step_0: + getAndSendCommunityInfo(v, context, messages); + + // next step will require a progress check, aggregate the number of + // nodes who have changed community. + if (current_iteration > 0 && current_iteration % 2 == 0) { + v.context()->local_change_num()[v.tid()] += state.changed; + } + break; + case phase_one_minor_step_1: + calculateBestCommunity(v, context, messages, current_iteration); + break; + case phase_one_minor_step_2: + updateCommunities(v, messages); + break; + default: + LOG(ERROR) << "Invalid minor step: " << current_minor_step; + } + v.vote_to_halt(); + } + + private: + void aggregateQuality(compute_context_t& context, double quality) { + context.aggregate(actual_quality_aggregator, quality); + } + + edata_t getTotalEdgeWeight(compute_context_t& context, pregel_vertex_t& v) { + auto& state = v.state(); + if (state.reset_total_edge_weight) { + // we just aggregate the total edge weight in previous step. + state.total_edge_weight = context.template get_aggregated_value( + edge_weight_aggregator); + state.reset_total_edge_weight = false; + } + return state.total_edge_weight; + } + + /** + * Each vertex will receive its own communities sigma_total (if updated), + * and then send its current community info to its neighbors. + */ + void getAndSendCommunityInfo(pregel_vertex_t& vertex, + compute_context_t& context, + const grape::IteratorPair& messages) { + state_t& state = vertex.state(); + // set new community information. + if (context.superstep() > 0) { + assert(messages.size() == 1); + state.community = messages.begin()->community_id; + state.community_sigma_total = messages.begin()->community_sigma_total; + } + md_t out_message(state.community, state.community_sigma_total, 0.0, + vertex.get_gid(), 0); + if (vertex.use_fake_edges()) { + for (const auto& edge : vertex.fake_edges()) { + out_message.edge_weight = edge.second; + out_message.dst_id = edge.first; + vertex.send_by_gid(edge.first, out_message); + } + } else { + for (auto& edge : vertex.outgoing_edges()) { + auto neighbor_gid = vertex.fragment()->Vertex2Gid(edge.get_neighbor()); + out_message.edge_weight = static_cast(edge.get_data()); + out_message.dst_id = neighbor_gid; + vertex.send_by_gid(neighbor_gid, out_message); + } + } + } + + /** + * Based on community of each of its neighbors, each vertex determines if + * it should retain its current community or switch to a neighboring + * community. + * At the end of this step a message is sent to the nodes community hub so a + * new community sigma_total can be calculated. + */ + void calculateBestCommunity(pregel_vertex_t& vertex, + compute_context_t& context, + const grape::IteratorPair& messages, + int iteration) { + // group messages by communities. + std::map community_map; + for (auto& message : messages) { + vid_t community_id = message.community_id; + if (community_map.find(community_id) != community_map.end()) { + community_map[community_id].edge_weight += message.edge_weight; + } else { + community_map[community_id] = message; + } + } + + // calculate change in qulity for each potential community + auto& state = vertex.state(); + vid_t best_community_id = state.community; + vid_t starting_community_id = best_community_id; + double max_delta_q = 0.0; + for (auto& entry : community_map) { + double delta_q = calculateQualityDelta( + context, vertex, starting_community_id, entry.second.community_id, + entry.second.community_sigma_total, entry.second.edge_weight, + state.node_weight, state.internal_weight); + if (delta_q > max_delta_q || + (delta_q == max_delta_q && + entry.second.community_id < best_community_id)) { + best_community_id = entry.second.community_id; + max_delta_q = delta_q; + } + } + + // ignore switches based on iteration (prevent certain cycles) + if ((state.community > best_community_id && iteration % 2 == 0) || + (state.community < best_community_id && iteration % 2 != 0)) { + best_community_id = state.community; + } + + // update community + if (state.community != best_community_id) { + md_t c = community_map[best_community_id]; + assert(best_community_id == c.community_id); + state.community = c.community_id; + state.community_sigma_total = c.community_sigma_total; + state.changed = 1; // community changed. + } + // send node weight to the community hub to sum in next super step + md_t message(state.community, state.node_weight + state.internal_weight, 0, + vertex.get_gid(), state.community); + vertex.send_by_gid(state.community, message); + } + + /** + * determine the change in quality if a node were to move to + * the given community. + */ + double calculateQualityDelta(compute_context_t& context, pregel_vertex_t& v, + const vid_t& curr_community_id, + const vid_t& test_community_id, + edata_t test_sigma_total, + edata_t edge_weight_in_community, + edata_t node_weight, edata_t internal_weight) { + bool is_current_community = (curr_community_id == test_community_id); + edata_t m2 = getTotalEdgeWeight(context, v); + edata_t k_i_in_L = is_current_community + ? edge_weight_in_community + internal_weight + : edge_weight_in_community; + edata_t k_i_in = k_i_in_L; + edata_t k_i = node_weight + internal_weight; + edata_t sigma_tot = test_sigma_total; + if (is_current_community) { + sigma_tot -= k_i; + } + + double delta_q = 0.0; + if (!(is_current_community && sigma_tot == delta_q)) { + double dividend = k_i * sigma_tot; + delta_q = k_i_in - dividend / m2; + } + return delta_q; + } + + /** + * Each community hub aggregates the values from each of its members to + * update the node's sigma total, and then sends this back to each of its + * members. + */ + void updateCommunities(pregel_vertex_t& vertex, + const grape::IteratorPair& messages) { + // sum all community contributions + md_t sum; + sum.community_id = vertex.get_gid(); + for (auto& m : messages) { + sum.community_sigma_total += m.community_sigma_total; + } + + for (auto& m : messages) { + sum.dst_id = m.source_id; + vertex.send_by_gid(m.source_id, sum); + } + } + + /** + * Calculate this nodes contribution for the actual quality value + * of the graph. + */ + double calculateActualQuality(pregel_vertex_t& vertex, + compute_context_t& context, + const grape::IteratorPair& messages) { + auto& state = vertex.state(); + edata_t k_i_in = state.internal_weight; + for (auto& m : messages) { + if (m.community_id == state.community) { + k_i_in += vertex.get_edge_value(m.source_id); + } + } + edata_t sigma_tot = state.community_sigma_total; + edata_t m2 = getTotalEdgeWeight(context, vertex); + edata_t k_i = state.node_weight + state.internal_weight; + + double q = k_i_in / m2 - (sigma_tot * k_i) / pow(m2, 2); + q = q < 0 ? 0 : q; + return q; + } + + /** + * Replace each edge to a neighbor with an edge to that neighbors community + * instead. Done just before exiting computation. In the next state of the + * pipe line this edges are aggregated and all communities are represented + * as single nodes. Edges from the community to itself are tracked be the + * nodes internal weight. + */ + void replaceNodeEdgesWithCommunityEdges( + pregel_vertex_t& vertex, grape::IteratorPair& messages) { + std::map community_map; + for (auto& message : messages) { + const auto& community_id = message.community_id; + community_map[community_id] += message.edge_weight; + } + + vertex.set_fake_edges(std::move(community_map)); + } + + void sendCommunitiesInfo(pregel_vertex_t& vertex) { + state_t& state = vertex.state(); + md_t message; + message.internal_weight = state.internal_weight; + std::map edges; + assert(vertex.use_fake_edges()); + edges = vertex.fake_edges(); + message.edges = std::move(edges); + if (vertex.get_gid() != state.community) { + message.nodes_in_self_community.swap(vertex.nodes_in_self_community()); + } + message.dst_id = state.community; + vertex.send_by_gid(state.community, message); + vertex.vote_to_halt(); + } + + void compressCommunities(pregel_vertex_t& vertex, + grape::IteratorPair& messages) { + auto community_id = vertex.get_gid(); + edata_t weight = 0; + std::map edge_map; + auto& nodes_in_self_community = vertex.nodes_in_self_community(); + for (auto& m : messages) { + weight += m.internal_weight; + for (auto& entry : m.edges) { + if (entry.first == community_id) { + weight += entry.second; + } else { + edge_map[entry.first] += entry.second; + } + } + nodes_in_self_community.insert(nodes_in_self_community.end(), + m.nodes_in_self_community.begin(), + m.nodes_in_self_community.end()); + } + vertex.state().internal_weight = weight; + vertex.set_fake_edges(std::move(edge_map)); + vertex.state().is_from_louvain_vertex_reader = false; + + // send self fake message to activate next round. + md_t fake_message; + fake_message.dst_id = community_id; + vertex.send_by_gid(community_id, fake_message); + // do not vote to halt since next round those new vertex need to be active. + } +}; + +} // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_H_ diff --git a/analytical_engine/apps/pregel/louvain/louvain_app_base.h b/analytical_engine/apps/pregel/louvain/louvain_app_base.h new file mode 100644 index 000000000000..b2e5a44f476e --- /dev/null +++ b/analytical_engine/apps/pregel/louvain/louvain_app_base.h @@ -0,0 +1,340 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_APP_BASE_H_ +#define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_APP_BASE_H_ + +#include +#include +#include +#include + +#include "grape/grape.h" +#include "grape/utils/iterator_pair.h" + +#include "core/app/app_base.h" +#include "core/app/pregel/pregel_compute_context.h" + +#include "apps/pregel/louvain/auxiliary.h" +#include "apps/pregel/louvain/louvain.h" +#include "apps/pregel/louvain/louvain_context.h" +#include "apps/pregel/louvain/louvain_vertex.h" + +namespace gs { + +/** + * @brief This class is a specialized PregelAppBase for louvain. + * @param FRAG_T + * @param VERTEX_PROGRAM_T + */ +template > +class LouvainAppBase + : public grape::ParallelAppBase< + FRAG_T, + LouvainContext>>, + public grape::ParallelEngine, + public grape::Communicator { + public: + using fragment_t = FRAG_T; + using oid_t = typename FRAG_T::oid_t; + using vid_t = typename FRAG_T::vid_t; + using vertex_t = typename FRAG_T::vertex_t; + using app_t = LouvainAppBase; + using vertex_program_t = VERTEX_PROGRAM_T; + using vd_t = typename vertex_program_t::vd_t; + using md_t = typename vertex_program_t::md_t; + using pregel_compute_context_t = PregelComputeContext; + using context_t = LouvainContext; + using message_manager_t = grape::ParallelMessageManager; + using worker_t = grape::ParallelWorker; + + virtual ~LouvainAppBase() {} + + static std::shared_ptr CreateWorker(std::shared_ptr app, + std::shared_ptr frag) { + return std::shared_ptr(new worker_t(app, frag)); + } + + explicit LouvainAppBase(const vertex_program_t& program = vertex_program_t()) + : program_(program) {} + + void PEval(const fragment_t& frag, context_t& ctx, + message_manager_t& messages) { + // superstep is 0 in PEval + uint32_t thrd_num = thread_num(); + messages.InitChannels(thrd_num); + + // register the aggregators + ctx.compute_context().register_aggregator( + change_aggregator, PregelAggregatorType::kInt64SumAggregator); + ctx.compute_context().register_aggregator( + edge_weight_aggregator, PregelAggregatorType::kDoubleSumAggregator); + ctx.compute_context().register_aggregator( + actual_quality_aggregator, PregelAggregatorType::kDoubleSumAggregator); + ctx.ClearLocalAggregateValues(thrd_num); + + auto inner_vertices = frag.InnerVertices(); + ForEach(inner_vertices, [&frag, &ctx, this](int tid, vertex_t v) { + LouvainVertex pregel_vertex; + pregel_vertex.set_context(&ctx); + pregel_vertex.set_fragment(&frag); + pregel_vertex.set_compute_context(&ctx.compute_context()); + pregel_vertex.set_vertex(v); + pregel_vertex.set_tid(tid); + this->program_.Init(pregel_vertex, ctx.compute_context()); + }); + + grape::IteratorPair null_messages(nullptr, nullptr); + ForEach(inner_vertices, + [&null_messages, &frag, &ctx, this](int tid, vertex_t v) { + LouvainVertex pregel_vertex; + pregel_vertex.set_context(&ctx); + pregel_vertex.set_fragment(&frag); + pregel_vertex.set_compute_context(&ctx.compute_context()); + pregel_vertex.set_vertex(v); + pregel_vertex.set_tid(tid); + this->program_.Compute(null_messages, pregel_vertex, + ctx.compute_context()); + }); + + { + // sync aggregator + ctx.compute_context().aggregate(change_aggregator, + ctx.GetLocalChangeSum()); + ctx.compute_context().aggregate(edge_weight_aggregator, + ctx.GetLocalEdgeWeightSum()); + ctx.compute_context().aggregate(actual_quality_aggregator, + ctx.GetLocalQualitySum()); + for (auto& pair : ctx.compute_context().aggregators()) { + grape::InArchive iarc; + std::vector oarcs; + std::string name = pair.first; + pair.second->Serialize(iarc); + pair.second->Reset(); + AllGather(std::move(iarc), oarcs); + pair.second->DeserializeAndAggregate(oarcs); + pair.second->StartNewRound(); + } + ctx.ClearLocalAggregateValues(thrd_num); + } + + ctx.compute_context().clear_for_next_round(); + + if (!ctx.compute_context().all_halted()) { + messages.ForceContinue(); + } + } + + void IncEval(const fragment_t& frag, context_t& ctx, + message_manager_t& messages) { + ctx.compute_context().inc_step(); + uint32_t thrd_num = thread_num(); + + auto inner_vertices = frag.InnerVertices(); + auto outer_vertices = frag.OuterVertices(); + + int current_super_step = ctx.compute_context().superstep(); + // the minor step in phase 1 + int current_minor_step = current_super_step % 3; + // the current iteration, two iterations make a full pass. + int current_iteration = current_super_step / 3; + + VLOG(1) << "current super step: " << current_super_step + << " current minor step: " << current_minor_step + << " current iteration: " << current_iteration; + + if (current_super_step == terminate_step) { + // get result messages and terminate + messages.ParallelProcess>( + thrd_num, [&frag, &ctx](int tid, std::pair const& msg) { + vertex_t v; + frag.InnerVertexGid2Vertex(msg.first, v); + ctx.compute_context().vertex_data()[v] = msg.second; + }); + return; // the whole louvain terminate. + } else { + // get computation messages + std::vector>> buffer( + thrd_num, std::vector>(thrd_num)); + messages.ParallelProcess( + thrd_num, [&thrd_num, &buffer](int tid, md_t const& msg) { + buffer[tid][msg.dst_id % thrd_num].emplace_back(std::move(msg)); + }); + { + std::vector threads(thrd_num); + for (uint32_t tid = 0; tid < thrd_num; ++tid) { + threads[tid] = std::thread( + [&frag, &ctx, &thrd_num, &buffer](uint32_t tid) { + for (uint32_t index = 0; index < thrd_num; ++index) { + for (auto const& msg : buffer[index][tid]) { + vertex_t v; + frag.InnerVertexGid2Vertex(msg.dst_id, v); + ctx.compute_context().messages_in()[v].emplace_back( + std::move(msg)); + ctx.compute_context().activate(v); + } + } + }, + tid); + } + for (uint32_t tid = 0; tid < thrd_num; ++tid) { + threads[tid].join(); + } + } + } + + if (current_minor_step == phase_one_minor_step_1 && current_iteration > 0 && + current_iteration % 2 == 0) { + // aggreate total change + int64_t total_change = + ctx.compute_context().template get_aggregated_value( + change_aggregator); + ctx.change_history().push_back(total_change); + // check whether to halt phase-1 + bool to_halt = decide_to_halt(ctx.change_history(), ctx.min_progress(), + ctx.progress_tries()); + ctx.set_halt(to_halt); + if (ctx.halt()) { + // if halt, first aggregate actual quality in Compute of vertices and + // then start phase 2 in next super step. + VLOG(1) << "super step " << current_super_step << " decided to halt."; + messages.ForceContinue(); + } + VLOG(1) << "[INFO]: superstep: " << current_super_step + << " pass: " << current_iteration / 2 + << " total change: " << total_change; + } else if (ctx.halt()) { + // after decide_to_halt and aggregate actual quality in previous super + // step, here we check terminate computaion or start phase 2. + double actual_quality = + ctx.compute_context().template get_aggregated_value( + actual_quality_aggregator); + // after one pass if already decided halt, that means the pass yield no + // changes, so we halt computation. + if (current_super_step <= 14 || actual_quality <= ctx.prev_quality()) { + // turn to sync community result + ctx.compute_context().set_superstep(sync_result_step); + syncCommunity(frag, ctx, messages); + messages.ForceContinue(); + + LOG(INFO) << "computation complete, ACTUAL QUALITY: " << actual_quality; + return; + } else if (ctx.compute_context().superstep() > 0) { + // just halt phase 1 start phase 2. + VLOG(1) << "super step: " << current_super_step + << " decided to halt, ACTUAL QUALITY: " << actual_quality + << " previous QUALITY: " << ctx.prev_quality(); + + // start phase 2 to compress community. + ctx.compute_context().set_superstep(phase_two_start_step); + ctx.set_prev_quality(actual_quality); + ctx.change_history().clear(); + ctx.set_halt(false); + } + } + + // At the start of each pass, every alive node need to their + // communities node info to neighbors, so we active the node. + if (ctx.compute_context().superstep() == phase_two_start_step) { + ForEach(inner_vertices, [&ctx](int tid, vertex_t v) { + if (ctx.GetVertexState(v).is_alived_community) { + ctx.compute_context().activate(v); + } + }); + } + + ForEach(inner_vertices, [&frag, &ctx, this](int tid, vertex_t v) { + if (ctx.compute_context().active(v)) { + LouvainVertex pregel_vertex; + pregel_vertex.set_context(&ctx); + pregel_vertex.set_fragment(&frag); + pregel_vertex.set_compute_context(&ctx.compute_context()); + pregel_vertex.set_vertex(v); + pregel_vertex.set_tid(tid); + auto& cur_msgs = (ctx.compute_context().messages_in())[v]; + this->program_.Compute( + grape::IteratorPair( + &cur_msgs[0], + &cur_msgs[0] + static_cast(cur_msgs.size())), + pregel_vertex, ctx.compute_context()); + } else if (ctx.compute_context().superstep() == compress_community_step) { + ctx.GetVertexState(v).is_alived_community = false; + } + }); + + { + // sync aggregator + ctx.compute_context().aggregate(change_aggregator, + ctx.GetLocalChangeSum()); + ctx.compute_context().aggregate(edge_weight_aggregator, + ctx.GetLocalEdgeWeightSum()); + ctx.compute_context().aggregate(actual_quality_aggregator, + ctx.GetLocalQualitySum()); + for (auto& pair : ctx.compute_context().aggregators()) { + grape::InArchive iarc; + std::vector oarcs; + std::string name = pair.first; + pair.second->Serialize(iarc); + pair.second->Reset(); + AllGather(std::move(iarc), oarcs); + pair.second->DeserializeAndAggregate(oarcs); + pair.second->StartNewRound(); + } + ctx.ClearLocalAggregateValues(thrd_num); + } + + ctx.compute_context().clear_for_next_round(); + + if (!ctx.compute_context().all_halted()) { + messages.ForceContinue(); + } + } + + private: + // sync community id from community hub to community members. + void syncCommunity(const fragment_t& frag, context_t& ctx, + message_manager_t& messages) { + auto& vid_parser = ctx.compute_context().vid_parser(); + auto& comm_result = ctx.compute_context().vertex_data(); + auto inner_vertices = frag.InnerVertices(); + ForEach(inner_vertices, [&frag, &ctx, &messages, &comm_result, &vid_parser]( + int tid, vertex_t v) { + const auto& member_list = ctx.vertex_state()[v].nodes_in_community; + if (!member_list.empty()) { + auto community_id = frag.Gid2Oid(member_list.front()); + // send community id to members + for (const auto& member_gid : member_list) { + auto fid = vid_parser.GetFid(member_gid); + vertex_t member_v; + if (fid == frag.fid()) { + frag.InnerVertexGid2Vertex(member_gid, member_v); + comm_result[member_v] = community_id; + } else { + messages.Channels()[tid].SendToFragment( + fid, std::pair(member_gid, community_id)); + } + } + } + }); + } + + private: + vertex_program_t program_; +}; +} // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_APP_BASE_H_ diff --git a/analytical_engine/apps/pregel/louvain/louvain_context.h b/analytical_engine/apps/pregel/louvain/louvain_context.h new file mode 100644 index 000000000000..445a30054cc1 --- /dev/null +++ b/analytical_engine/apps/pregel/louvain/louvain_context.h @@ -0,0 +1,153 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_CONTEXT_H_ +#define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_CONTEXT_H_ + +#include + +#include "grape/grape.h" + +#include "core/context/vertex_data_context.h" + +namespace gs { + +/** + * @brief Context of louvain that holds the computation result with + * grape::VertexDataContext and some state of louvain process. + * @tparam FRAG_T + * @tparam COMPUTE_CONTEXT_T + */ +template +class LouvainContext + : public grape::VertexDataContext { + using fragment_t = FRAG_T; + using oid_t = typename FRAG_T::oid_t; + using vid_t = typename FRAG_T::vid_t; + using edata_t = double; + using vertex_t = typename FRAG_T::vertex_t; + using state_t = LouvainNodeState; + using vertex_state_array_t = + typename fragment_t::template vertex_array_t; + + public: + explicit LouvainContext(const FRAG_T& fragment) + : grape::VertexDataContext( + fragment), + compute_context_(this->data()) {} + + void Init(grape::ParallelMessageManager& messages, int min_progress, + int progress_tries) { + auto& frag = this->fragment(); + auto inner_vertices = frag.InnerVertices(); + + compute_context_.init(frag); + compute_context_.set_fragment(&frag); + compute_context_.set_parallel_message_manager(&messages); + + this->min_progress_ = min_progress; + this->progress_tries_ = progress_tries; + + vertex_state_.Init(inner_vertices); + halt_ = false; + prev_quality_ = 0.0; + } + + void Output(std::ostream& os) override { + auto& frag = this->fragment(); + auto& result = compute_context_.vertex_data(); + auto inner_vertices = frag.InnerVertices(); + for (auto v : inner_vertices) { + os << frag.GetId(v) << " " << result[v] << std::endl; + } + } + + state_t& GetVertexState(const vertex_t& v) { return vertex_state_[v]; } + + void ClearLocalAggregateValues(uint32_t thread_num) { + local_change_num_.clear(); + local_total_edge_weight_.clear(); + local_actual_quality_.clear(); + local_change_num_.resize(thread_num, 0); + local_total_edge_weight_.resize(thread_num, 0.0); + local_actual_quality_.resize(thread_num, 0.0); + } + + int64_t GetLocalChangeSum() { + int64_t sum = 0; + for (const auto& val : local_change_num_) { + sum += val; + } + return sum; + } + + edata_t GetLocalEdgeWeightSum() { + edata_t sum = 0; + for (const auto& val : local_total_edge_weight_) { + sum += val; + } + return sum; + } + + double GetLocalQualitySum() { + double sum = 0; + for (const auto& val : local_actual_quality_) { + sum += val; + } + return sum; + } + + COMPUTE_CONTEXT_T& compute_context() { return compute_context_; } + + std::vector& change_history() { return change_history_; } + + vertex_state_array_t& vertex_state() { return vertex_state_; } + + std::vector& local_change_num() { return local_change_num_; } + + std::vector& local_total_edge_weight() { + return local_total_edge_weight_; + } + std::vector& local_actual_quality() { return local_actual_quality_; } + + bool halt() { return halt_; } + void set_halt(bool halt) { halt_ = halt; } + + double prev_quality() { return prev_quality_; } + void set_prev_quality(double value) { prev_quality_ = value; } + + int min_progress() { return min_progress_; } + int progress_tries() { return progress_tries_; } + + private: + std::vector change_history_; + COMPUTE_CONTEXT_T compute_context_; + vertex_state_array_t vertex_state_; + + // members to store local aggrated value + std::vector local_change_num_; + std::vector local_total_edge_weight_; + std::vector local_actual_quality_; + + bool halt_; // phase-1 halt + double prev_quality_; + int min_progress_; + int progress_tries_; +}; + +} // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_CONTEXT_H_ diff --git a/analytical_engine/apps/pregel/louvain/louvain_vertex.h b/analytical_engine/apps/pregel/louvain/louvain_vertex.h new file mode 100644 index 000000000000..414cc17be5dd --- /dev/null +++ b/analytical_engine/apps/pregel/louvain/louvain_vertex.h @@ -0,0 +1,128 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_VERTEX_H_ +#define ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_VERTEX_H_ + +#include +#include +#include +#include + +#include "core/app/pregel/pregel_vertex.h" + +#include "apps/pregel/louvain/louvain_context.h" + +namespace gs { + +/** + * @brief LouvainVertex is a specific PregelVertex for louvain alorithm. + * LouvainVertex provides communication-related method to send messages to + * certain fragment and also access to context of louvain. + * @tparam FRAG_T + * @tparam VD_T + * @tparam MD_T + */ +template +class LouvainVertex : public PregelVertex { + using fragment_t = FRAG_T; + using oid_t = typename fragment_t::oid_t; + using vid_t = typename fragment_t::vid_t; + using edata_t = double; + using vertex_t = typename fragment_t::vertex_t; + using adj_list_t = typename fragment_t::const_adj_list_t; + using compute_context_t = PregelComputeContext; + using context_t = LouvainContext; + using state_t = LouvainNodeState; + + public: + using vd_t = VD_T; + using md_t = MD_T; + + state_t& state() { return context_->GetVertexState(this->vertex_); } + + void send_by_gid(vid_t dst_gid, const md_t& md) { + this->compute_context_->send_p2p_message(dst_gid, md, tid_); + } + + void set_context(context_t* context) { context_ = context; } + + vid_t get_gid() { return this->fragment_->Vertex2Gid(this->vertex_); } + + vid_t get_vertex_gid(const vertex_t& v) { + return this->fragment_->Vertex2Gid(v); + } + + size_t edge_size() { + if (!this->use_fake_edges()) { + return this->incoming_edges().Size() + this->outgoing_edges().Size(); + } else { + return this->fake_edges().size(); + } + } + + bool use_fake_edges() const { + return context_->GetVertexState(this->vertex_).use_fake_edges; + } + + const std::map& fake_edges() const { + return context_->GetVertexState(this->vertex_).fake_edges; + } + + edata_t get_edge_value(const vid_t& dst_id) { + if (!this->use_fake_edges()) { + for (auto& edge : this->incoming_edges()) { + if (this->fragment_->Vertex2Gid(edge.get_neighbor()) == dst_id) { + return static_cast(edge.get_data()); + } + } + for (auto& edge : this->outgoing_edges()) { + if (this->fragment_->Vertex2Gid(edge.get_neighbor()) == dst_id) { + return static_cast(edge.get_data()); + } + } + } else { + return this->fake_edges().at(dst_id); + } + return edata_t(); + } + + void set_fake_edges(std::map&& edges) { + state_t& ref_state = this->state(); + ref_state.fake_edges = std::move(edges); + ref_state.use_fake_edges = true; + } + + std::vector& nodes_in_self_community() { + return context_->GetVertexState(this->vertex_).nodes_in_community; + } + + int tid() { return tid_; } + void set_tid(int id) { tid_ = id; } + + PregelComputeContext* compute_context() { + return this->compute_context_; + } + + const fragment_t* fragment() { return this->fragment_; } + context_t* context() { return context_; } + + private: + int tid_; + context_t* context_; +}; +} // namespace gs + +#endif // ANALYTICAL_ENGINE_APPS_PREGEL_LOUVAIN_LOUVAIN_VERTEX_H_ diff --git a/analytical_engine/core/app/pregel/pregel_app_base.h b/analytical_engine/core/app/pregel/pregel_app_base.h index d961047e7153..2b5b748d10c9 100644 --- a/analytical_engine/core/app/pregel/pregel_app_base.h +++ b/analytical_engine/core/app/pregel/pregel_app_base.h @@ -16,7 +16,9 @@ limitations under the License. #ifndef ANALYTICAL_ENGINE_CORE_APP_PREGEL_PREGEL_APP_BASE_H_ #define ANALYTICAL_ENGINE_CORE_APP_PREGEL_PREGEL_APP_BASE_H_ +#include #include +#include #include "grape/grape.h" #include "grape/utils/iterator_pair.h" @@ -41,7 +43,8 @@ class PregelAppBase FRAG_T, PregelContext>> { + typename VERTEX_PROGRAM_T::md_t>>>, + public grape::Communicator { using vd_t = typename VERTEX_PROGRAM_T::vd_t; using md_t = typename VERTEX_PROGRAM_T::md_t; using pregel_compute_context_t = PregelComputeContext; @@ -94,11 +97,24 @@ class PregelAppBase } } - if (ctx.compute_context_.has_messages()) { - messages.ForceContinue(); + { + // Sync Aggregator + for (auto& pair : ctx.compute_context_.aggregators()) { + grape::InArchive iarc; + std::vector oarcs; + std::string name = pair.first; + pair.second->Serialize(iarc); + pair.second->Reset(); + AllGather(std::move(iarc), oarcs); + pair.second->DeserializeAndAggregate(oarcs); + pair.second->StartNewRound(); + } } ctx.compute_context_.clear_for_next_round(); + if (!ctx.compute_context_.all_halted()) { + messages.ForceContinue(); + } } void IncEval(const fragment_t& frag, pregel_context_t& ctx, @@ -144,11 +160,24 @@ class PregelAppBase } } - if (ctx.compute_context_.has_messages()) { - messages.ForceContinue(); + { + // Sync Aggregator + for (auto& pair : ctx.compute_context_.aggregators()) { + grape::InArchive iarc; + std::vector oarcs; + std::string name = pair.first; + pair.second->Serialize(iarc); + pair.second->Reset(); + AllGather(std::move(iarc), oarcs); + pair.second->DeserializeAndAggregate(oarcs); + pair.second->StartNewRound(); + } } ctx.compute_context_.clear_for_next_round(); + if (!ctx.compute_context_.all_halted()) { + messages.ForceContinue(); + } } private: @@ -166,7 +195,8 @@ class PregelAppBase FRAG_T, PregelContext>> { + typename VERTEX_PROGRAM_T::md_t>>>, + public grape::Communicator { using vd_t = typename VERTEX_PROGRAM_T::vd_t; using md_t = typename VERTEX_PROGRAM_T::md_t; using app_t = PregelAppBase; @@ -205,11 +235,25 @@ class PregelAppBase program_.Compute(null_messages, pregel_vertex, ctx.compute_context_); } - if (ctx.compute_context_.has_messages()) { - messages.ForceContinue(); + { + // Sync Aggregator + for (auto& pair : ctx.compute_context_.aggregators()) { + grape::InArchive iarc; + std::vector oarcs; + std::string name = pair.first; + pair.second->Serialize(iarc); + pair.second->Reset(); + AllGather(std::move(iarc), oarcs); + pair.second->DeserializeAndAggregate(oarcs); + pair.second->StartNewRound(); + } } ctx.compute_context_.clear_for_next_round(); + + if (!ctx.compute_context_.all_halted()) { + messages.ForceContinue(); + } } void IncEval(const fragment_t& frag, pregel_context_t& ctx, @@ -242,11 +286,25 @@ class PregelAppBase } } - if (ctx.compute_context_.has_messages()) { - messages.ForceContinue(); + { + // Sync Aggregator + for (auto& pair : ctx.compute_context_.aggregators()) { + grape::InArchive iarc; + std::vector oarcs; + std::string name = pair.first; + pair.second->Serialize(iarc); + pair.second->Reset(); + AllGather(std::move(iarc), oarcs); + pair.second->DeserializeAndAggregate(oarcs); + pair.second->StartNewRound(); + } } ctx.compute_context_.clear_for_next_round(); + + if (!ctx.compute_context_.all_halted()) { + messages.ForceContinue(); + } } private: diff --git a/analytical_engine/core/app/pregel/pregel_compute_context.h b/analytical_engine/core/app/pregel/pregel_compute_context.h index 70e245fd58f8..c4e4e16921a7 100644 --- a/analytical_engine/core/app/pregel/pregel_compute_context.h +++ b/analytical_engine/core/app/pregel/pregel_compute_context.h @@ -16,14 +16,21 @@ limitations under the License. #ifndef ANALYTICAL_ENGINE_CORE_APP_PREGEL_PREGEL_COMPUTE_CONTEXT_H_ #define ANALYTICAL_ENGINE_CORE_APP_PREGEL_PREGEL_COMPUTE_CONTEXT_H_ +#include + +#include #include #include #include #include +#include "grape/utils/atomic_ops.h" #include "grape/utils/iterator_pair.h" +#include "core/app/pregel/aggregators/aggregator.h" +#include "core/app/pregel/aggregators/aggregator_factory.h" #include "core/app/pregel/pregel_vertex.h" +#include "core/config.h" namespace gs { /** @@ -57,18 +64,20 @@ class PregelComputeContext { messages_in_.Init(inner_vertices, {}); halted_.Init(inner_vertices, false); + vid_parser_.Init(frag.fnum(), 1); inner_vertex_num_ = inner_vertices.size(); step_ = 0; voted_to_halt_num_ = 0; enable_combine_ = false; - has_messages_ = false; } void inc_step() { step_++; } int superstep() const { return step_; } + void set_superstep(int step) { step_ = step; } + void set_vertex_value(const pregel_vertex_t& vertex, const VD_T& value) { vertex_data_[vertex.vertex()] = value; } @@ -89,8 +98,7 @@ class PregelComputeContext { message_manager_->SyncStateOnOuterVertex(*fragment_, v, value); } else { - messages_in_[v].emplace_back(value); - has_messages_ = true; + messages_out_[v].emplace_back(value); } } } @@ -103,12 +111,21 @@ class PregelComputeContext { message_manager_->SyncStateOnOuterVertex(*fragment_, v, value); } else { - messages_in_[v].emplace_back(std::move(value)); - has_messages_ = true; + messages_out_[v].emplace_back(std::move(value)); } } } + void send_p2p_message(const vid_t& v_gid, const MD_T& value, int tid = 0) { + auto fid = vid_parser_.GetFid(v_gid); + parallel_message_manager_->Channels()[tid].SendToFragment(fid, value); + } + + void send_p2p_message(const vid_t v_gid, MD_T&& value, int tid = 0) { + auto fid = vid_parser_.GetFid(v_gid); + parallel_message_manager_->Channels()[tid].SendToFragment(fid, value); + } + template void apply_combine(COMBINATOR_T& cb) { auto vertices = fragment_->Vertices(); @@ -129,26 +146,34 @@ class PregelComputeContext { messages_in_[v].clear(); messages_in_[v].swap(messages_out_[v]); if (!messages_in_[v].empty()) { - has_messages_ = true; + activate(v); } } } - bool active(const vertex_t& v) { return !messages_in_[v].empty(); } + bool active(const vertex_t& v) { return !halted_[v]; } void activate(const vertex_t& v) { - halted_[v] = false; - --voted_to_halt_num_; + if (halted_[v] == true) { + halted_[v] = false; + } } void vote_to_halt(const pregel_vertex_t& vertex) { - // halted_[vertex.vertex()] = true; - // ++voted_to_halt_num_; + if (halted_[vertex.vertex()] == false) { + halted_[vertex.vertex()] = true; + } } - bool all_halted() { return voted_to_halt_num_ == inner_vertex_num_; } - - bool has_messages() { return has_messages_; } + bool all_halted() { + auto inner_vertices = fragment_->InnerVertices(); + for (auto& v : inner_vertices) { + if (!halted_[v]) { + return false; + } + } + return true; + } typename FRAG_T::template vertex_array_t>& messages_in() { return messages_in_; @@ -161,7 +186,18 @@ class PregelComputeContext { typename FRAG_T::template vertex_array_t& vertex_data() { return vertex_data_; } - void clear_for_next_round() { has_messages_ = false; } + void clear_for_next_round() { + if (!enable_combine_) { + auto inner_vertices = fragment_->InnerVertices(); + for (auto v : inner_vertices) { + messages_in_[v].clear(); + messages_in_[v].swap(messages_out_[v]); + if (!messages_in_[v].empty()) { + activate(v); + } + } + } + } void enable_combine() { enable_combine_ = true; } void set_fragment(const fragment_t* fragment) { fragment_ = fragment; } @@ -169,6 +205,11 @@ class PregelComputeContext { message_manager_ = message_manager; } + void set_parallel_message_manager( + grape::ParallelMessageManager* message_manager) { + parallel_message_manager_ = message_manager; + } + oid_t GetId(const vertex_t& v) { return fragment_->GetId(v); } void set_config(const std::string& key, const std::string& value) { @@ -184,9 +225,38 @@ class PregelComputeContext { } } + const vineyard::IdParser& vid_parser() const { return vid_parser_; } + + std::unordered_map>& aggregators() { + return aggregators_; + } + + void register_aggregator(const std::string& name, PregelAggregatorType type) { + if (aggregators_.find(name) == aggregators_.end()) { + aggregators_.emplace(name, AggregatorFactory::CreateAggregator(type)); + aggregators_.at(name)->Init(); + } + } + + template + void aggregate(const std::string& name, AGGR_TYPE value) { + if (aggregators_.find(name) != aggregators_.end()) { + std::dynamic_pointer_cast>(aggregators_.at(name)) + ->Aggregate(value); + } + } + + template + AGGR_TYPE get_aggregated_value(const std::string& name) { + return std::dynamic_pointer_cast>( + aggregators_.at(name)) + ->GetAggregatedValue(); + } + private: const fragment_t* fragment_; grape::DefaultMessageManager* message_manager_; + grape::ParallelMessageManager* parallel_message_manager_; typename FRAG_T::template vertex_array_t& vertex_data_; @@ -196,8 +266,6 @@ class PregelComputeContext { typename FRAG_T::template vertex_array_t> messages_out_; typename FRAG_T::template vertex_array_t> messages_in_; - bool has_messages_; - size_t inner_vertex_num_; size_t total_vertex_num_; @@ -205,6 +273,8 @@ class PregelComputeContext { int step_; std::unordered_map config_; + std::unordered_map> aggregators_; + vineyard::IdParser vid_parser_; }; } // namespace gs diff --git a/analytical_engine/core/app/pregel/pregel_vertex.h b/analytical_engine/core/app/pregel/pregel_vertex.h index bc0090d26d66..1855e749fa60 100644 --- a/analytical_engine/core/app/pregel/pregel_vertex.h +++ b/analytical_engine/core/app/pregel/pregel_vertex.h @@ -82,7 +82,7 @@ class PregelVertex { void set_vertex(vertex_t vertex) { vertex_ = vertex; } - private: + protected: const fragment_t* fragment_; PregelComputeContext* compute_context_; diff --git a/analytical_engine/core/fragment/arrow_projected_fragment.h b/analytical_engine/core/fragment/arrow_projected_fragment.h index 8e343d6b2fc6..4f872127b813 100644 --- a/analytical_engine/core/fragment/arrow_projected_fragment.h +++ b/analytical_engine/core/fragment/arrow_projected_fragment.h @@ -348,6 +348,8 @@ class ArrowProjectedFragment using nbr_unit_t = vineyard::property_graph_utils::NbrUnit; using adj_list_t = arrow_projected_fragment_impl::AdjList; + using const_adj_list_t = + arrow_projected_fragment_impl::AdjList; using vertex_map_t = ArrowProjectedVertexMap; using label_id_t = vineyard::property_graph_types::LABEL_ID_TYPE; using prop_id_t = vineyard::property_graph_types::PROP_ID_TYPE; diff --git a/coordinator/gscoordinator/builtin/app/.gs_conf.yaml b/coordinator/gscoordinator/builtin/app/.gs_conf.yaml index ce5bbc6b3ebb..8fb88f1a12a1 100644 --- a/coordinator/gscoordinator/builtin/app/.gs_conf.yaml +++ b/coordinator/gscoordinator/builtin/app/.gs_conf.yaml @@ -154,3 +154,11 @@ app: src: benchmarks/apps/bfs/property_bfs.h compatible_graph: - vineyard::ArrowFragment + - algo: louvain + type: cpp_pie + class_name: gs::LouvainAppBase + src: apps/pregel/louvain/louvain_app_base.h + compatible_graph: + - grape::ImmutableEdgecutFragment + - gs::ArrowProjectedFragment + - gs::DynamicProjectedFragment diff --git a/python/graphscope/analytical/app/__init__.py b/python/graphscope/analytical/app/__init__.py index be2eb6a086bb..aca448f79f14 100644 --- a/python/graphscope/analytical/app/__init__.py +++ b/python/graphscope/analytical/app/__init__.py @@ -27,6 +27,7 @@ from graphscope.analytical.app.k_core import k_core from graphscope.analytical.app.k_shell import k_shell from graphscope.analytical.app.katz_centrality import katz_centrality +from graphscope.analytical.app.louvain import louvain from graphscope.analytical.app.lpa import lpa from graphscope.analytical.app.pagerank import pagerank from graphscope.analytical.app.sssp import property_sssp diff --git a/python/graphscope/analytical/app/louvain.py b/python/graphscope/analytical/app/louvain.py new file mode 100644 index 000000000000..00b45b04ec21 --- /dev/null +++ b/python/graphscope/analytical/app/louvain.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from graphscope.framework.app import AppAssets +from graphscope.framework.app import not_compatible_for +from graphscope.framework.errors import InvalidArgumentError + +__all__ = [ + "louvain", +] + + +@not_compatible_for("arrow_property", "dynamic_property") +def louvain(graph, min_progress=1000, progress_tries=1): + """Compute best partition on the `graph` by louvain. + + Args: + graph (:class:`Graph`): A projected simple graph. + min_progress: The minimum delta X required to be considered progress, where X is the number of nodes + that have changed their community on a particular pass. + Delta X is then the difference in number of nodes that changed communities + on the current pass compared to the previous pass. + progress_tries: number of times the min_progress setting is not met + before exiting form the current level and compressing the graph. + + + Returns: + :class:`VertexDataContext`: A context with each vertex assigned with id of community it belongs to. + + References: + .. [1] Blondel, V.D. et al. Fast unfolding of communities in + large networks. J. Stat. Mech 10008, 1-12(2008). + .. [2] https://github.com/Sotera/distributed-graph-analytics + .. [3] https://sotera.github.io/distributed-graph-analytics/louvain/ + + Examples: + + .. code:: python + + import graphscope as gs + s = gs.session() + g = s.load_from('The parameters for loading a graph...') + pg = g.project_to_simple(v_label='vlabel', e_label='elabel', v_prop=None, e_prop='weight') + r = gs.louvain(pg) + s.close() + + """ + if graph.is_directed(): + raise InvalidArgumentError("Louvain not support directed graph.") + return AppAssets(algo="louvain")(graph, min_progress, progress_tries) diff --git a/python/tests/test_app.py b/python/tests/test_app.py index 0da22d4e2cd4..a65a25be1b8c 100644 --- a/python/tests/test_app.py +++ b/python/tests/test_app.py @@ -32,6 +32,7 @@ from graphscope import k_core from graphscope import k_shell from graphscope import katz_centrality +from graphscope import louvain from graphscope import lpa from graphscope import pagerank from graphscope import property_sssp @@ -181,6 +182,11 @@ def test_run_app_on_directed_graph( sorted(ctx5.to_numpy("r", vertex_range={"begin": 1, "end": 4})) == [5, 5, 6] ) + with pytest.raises( + InvalidArgumentError, match="Louvain not support directed graph." + ): + louvain(p2p_project_directed_graph) + def test_app_on_undirected_graph( p2p_project_undirected_graph, @@ -340,6 +346,9 @@ def test_app_on_undirected_graph( ) assert np.all(ctx10.to_numpy("r", vertex_range={"begin": 1, "end": 4}) == [0, 0, 0]) + # louvain + ctx10 = louvain(p2p_project_undirected_graph, min_progress=50, progress_tries=2) + def test_run_app_on_string_oid_graph(p2p_project_directed_graph_string): ctx = sssp(p2p_project_directed_graph_string, src="6")