From 58b2272e5aa346df67111d63e60ea719e59d2549 Mon Sep 17 00:00:00 2001 From: Alexander Lednev <57529355+iceseer@users.noreply.github.com> Date: Tue, 18 Oct 2022 12:47:45 +0300 Subject: [PATCH] OnPeerDisconnected handler. (#1369) * OnPeerDisconnected handler. Signed-off-by: iceseer * disconnect by reputation Signed-off-by: iceseer * remove from connecting_peers_ Signed-off-by: iceseer Signed-off-by: iceseer --- core/network/impl/peer_manager_impl.cpp | 143 +++++++++++------------- core/network/impl/peer_manager_impl.hpp | 2 +- 2 files changed, 64 insertions(+), 81 deletions(-) diff --git a/core/network/impl/peer_manager_impl.cpp b/core/network/impl/peer_manager_impl.cpp index 9eb0fa168d..8c7b81208f 100644 --- a/core/network/impl/peer_manager_impl.cpp +++ b/core/network/impl/peer_manager_impl.cpp @@ -6,6 +6,7 @@ #include "network/impl/peer_manager_impl.hpp" #include +#include #include #include @@ -16,7 +17,9 @@ namespace { constexpr const char *syncPeerMetricName = "kagome_sync_peers"; -} + /// Reputation change for a node when we get disconnected from it. + static constexpr int32_t kDisconnectReputation = -256; +} // namespace OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, PeerManagerImpl::Error, e) { using E = kagome::network::PeerManagerImpl::Error; @@ -61,8 +64,7 @@ namespace kagome::network { storage_{std::move(storage)}, hasher_{std::move(hasher)}, reputation_repository_{std::move(reputation_repository)}, - log_(log::createLogger("PeerManager", "network")), - entry_counter_{0ull} { + log_(log::createLogger("PeerManager", "network")) { BOOST_ASSERT(app_state_manager_ != nullptr); BOOST_ASSERT(identify_ != nullptr); BOOST_ASSERT(kademlia_ != nullptr); @@ -126,6 +128,25 @@ namespace kagome::network { } }); + peer_disconnected_handler_ = + host_.getBus() + .getChannel() + .subscribe([wp = weak_from_this()](const PeerId &peer_id) { + if (auto self = wp.lock()) { + SL_DEBUG(self->log_, + "OnPeerDisconnectedChannel handler from peer {}", + peer_id); + self->stream_engine_->del(peer_id); + self->peer_states_.erase(peer_id); + self->active_peers_.erase(peer_id); + self->connecting_peers_.erase(peer_id); + self->sync_peer_num_->set(self->active_peers_.size()); + SL_DEBUG(self->log_, + "Remained {} active peers", + self->active_peers_.size()); + } + }); + identify_->onIdentifyReceived([wp = weak_from_this()]( const PeerId &peer_id) { if (auto self = wp.lock()) { @@ -173,6 +194,7 @@ namespace kagome::network { void PeerManagerImpl::stop() { storeActivePeers(); add_peer_handle_.unsubscribe(); + peer_disconnected_handler_.unsubscribe(); } void PeerManagerImpl::connectToPeer(const PeerInfo &peer_info) { @@ -243,54 +265,56 @@ namespace kagome::network { SL_TRACE(log_, "Try to align peers number"); const auto target_count = app_config_.peeringConfig().targetPeerAmount; - const auto soft_limit = app_config_.peeringConfig().softLimit; const auto hard_limit = app_config_.peeringConfig().hardLimit; const auto peer_ttl = app_config_.peeringConfig().peerTtl; align_timer_.cancel(); // disconnect from peers with negative reputation - std::vector peers_to_disconnect; - for (const auto &[peer_id, _] : active_peers_) { - if (reputation_repository_->reputation(peer_id) < 0) { - peers_to_disconnect.emplace_back(peer_id); + using PriorityType = int32_t; + using ItemType = std::pair; + + std::vector peers_list; + peers_list.reserve(active_peers_.size()); + + uint64_t const now_ms = + std::chrono::time_point_cast(clock_->now()) + .time_since_epoch() + .count(); + uint64_t const idle_ms = + std::chrono::duration_cast(peer_ttl).count(); + + for (const auto &[peer_id, desc] : active_peers_) { + uint64_t const last_activity_ms = + std::chrono::time_point_cast( + desc.time_point) + .time_since_epoch() + .count(); + + const auto peer_reputation = reputation_repository_->reputation(peer_id); + if (peer_reputation < kDisconnectReputation + || last_activity_ms + idle_ms < now_ms) { + peers_list.push_back( + std::make_pair(std::numeric_limits::min(), peer_id)); // we have to store peers somewhere first due to inability to iterate // over active_peers_ and do disconnectFromPeers (which modifies // active_peers_) at the same time + } else { + peers_list.push_back(std::make_pair(peer_reputation, peer_id)); } } - for (const auto &peer_id : peers_to_disconnect) { - SL_DEBUG(log_, - "Disconnecting from peer {} due to its negative reputation", - peer_id); - disconnectFromPeer(peer_id); - } - - // Soft limit is exceeded - if (active_peers_.size() > soft_limit) { - // Get oldest peer - auto it = std::min_element(active_peers_.begin(), - active_peers_.end(), - [](const auto &item1, const auto &item2) { - return item1.second.time_point - < item2.second.time_point; - }); - auto &[oldest_peer_id, oldest_descr] = *it; - if (active_peers_.size() > hard_limit) { - // Hard limit is exceeded - SL_DEBUG(log_, "Hard limit of of active peers is exceeded"); - disconnectFromPeer(oldest_peer_id); + std::sort(peers_list.begin(), + peers_list.end(), + [](auto const &l, auto const &r) { return r.first < l.first; }); - } else if (oldest_descr.time_point + peer_ttl < clock_->now()) { - // Peer is inactive long time - auto &oldest_peer_id_ref = oldest_peer_id; - SL_DEBUG(log_, "Found inactive peer: {}", oldest_peer_id_ref); - disconnectFromPeer(oldest_peer_id); - - } else { - SL_TRACE(log_, "No peer to disconnect at soft limit"); - } + for (; !peers_list.empty() + && (peers_list.size() > hard_limit + || peers_list.back().first + == std::numeric_limits::min()); + peers_list.pop_back()) { + const auto &peer_id = peers_list.back().second; + disconnectFromPeer(peer_id); } // Not enough active peers @@ -404,54 +428,13 @@ namespace kagome::network { kTimeoutForConnecting); } - inline std::string createBT() { - static constexpr size_t kStackSize = 30ull; - void *a[kStackSize]; - auto const count = backtrace(a, kStackSize); - - /// TODO (iceseer): it's a test code and we doesnt care about allocations - char **names = backtrace_symbols(a, count); - auto cleaner = gsl::finally([names] { free(names); }); - - std::string data; - data.reserve(64 * kStackSize); - for (auto ix = 0; ix < count; ++ix) { - data += names[ix]; - data += '\n'; - } - - return data; - } - void PeerManagerImpl::disconnectFromPeer(const PeerId &peer_id) { if (peer_id == own_peer_info_.id) { return; } - ++entry_counter_; - auto locker = gsl::finally([&] { --entry_counter_; }); - auto entry_counter = entry_counter_.load(); - SL_INFO(log_, - "Disconnect from peer {}, entry {}, thread {}", - peer_id, - entry_counter, - std::this_thread::get_id()); - if (entry_counter > 1) { - SL_INFO(log_, - "Found double {} entry: the second one came from\n{}", - __func__, - createBT()); - } - - auto it = active_peers_.find(peer_id); + SL_INFO(log_, "Disconnect from peer {}", peer_id); host_.disconnect(peer_id); - if (it != active_peers_.end()) { - stream_engine_->del(peer_id); - peer_states_.erase(peer_id); - active_peers_.erase(it); - sync_peer_num_->set(active_peers_.size()); - SL_DEBUG(log_, "Remained {} active peers", active_peers_.size()); - } } void PeerManagerImpl::keepAlive(const PeerId &peer_id) { diff --git a/core/network/impl/peer_manager_impl.hpp b/core/network/impl/peer_manager_impl.hpp index a428c528e5..d5f2f5a637 100644 --- a/core/network/impl/peer_manager_impl.hpp +++ b/core/network/impl/peer_manager_impl.hpp @@ -175,6 +175,7 @@ namespace kagome::network { std::shared_ptr reputation_repository_; libp2p::event::Handle add_peer_handle_; + libp2p::event::Handle peer_disconnected_handler_; std::unordered_set peers_in_queue_; std::deque> queue_to_connect_; std::unordered_set connecting_peers_; @@ -194,7 +195,6 @@ namespace kagome::network { ParachainState parachain_state_; log::Logger log_; - std::atomic entry_counter_; }; } // namespace kagome::network