Skip to content

Commit

Permalink
OnPeerDisconnected handler. (#1369)
Browse files Browse the repository at this point in the history
* OnPeerDisconnected handler.

Signed-off-by: iceseer <[email protected]>

* disconnect by reputation

Signed-off-by: iceseer <[email protected]>

* remove from connecting_peers_

Signed-off-by: iceseer <[email protected]>

Signed-off-by: iceseer <[email protected]>
  • Loading branch information
iceseer authored Oct 18, 2022
1 parent ebd9715 commit 58b2272
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 81 deletions.
143 changes: 63 additions & 80 deletions core/network/impl/peer_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "network/impl/peer_manager_impl.hpp"

#include <execinfo.h>
#include <limits>
#include <memory>

#include <libp2p/protocol/kademlia/impl/peer_routing_table.hpp>
Expand All @@ -16,7 +17,9 @@

namespace {
constexpr const char *syncPeerMetricName = "kagome_sync_peers";
}
/// Reputation change for a node when we get disconnected from it.
static constexpr int32_t kDisconnectReputation = -256;
} // namespace

OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, PeerManagerImpl::Error, e) {
using E = kagome::network::PeerManagerImpl::Error;
Expand Down Expand Up @@ -61,8 +64,7 @@ namespace kagome::network {
storage_{std::move(storage)},
hasher_{std::move(hasher)},
reputation_repository_{std::move(reputation_repository)},
log_(log::createLogger("PeerManager", "network")),
entry_counter_{0ull} {
log_(log::createLogger("PeerManager", "network")) {
BOOST_ASSERT(app_state_manager_ != nullptr);
BOOST_ASSERT(identify_ != nullptr);
BOOST_ASSERT(kademlia_ != nullptr);
Expand Down Expand Up @@ -126,6 +128,25 @@ namespace kagome::network {
}
});

peer_disconnected_handler_ =
host_.getBus()
.getChannel<libp2p::event::network::OnPeerDisconnectedChannel>()
.subscribe([wp = weak_from_this()](const PeerId &peer_id) {
if (auto self = wp.lock()) {
SL_DEBUG(self->log_,
"OnPeerDisconnectedChannel handler from peer {}",
peer_id);
self->stream_engine_->del(peer_id);
self->peer_states_.erase(peer_id);
self->active_peers_.erase(peer_id);
self->connecting_peers_.erase(peer_id);
self->sync_peer_num_->set(self->active_peers_.size());
SL_DEBUG(self->log_,
"Remained {} active peers",
self->active_peers_.size());
}
});

identify_->onIdentifyReceived([wp = weak_from_this()](
const PeerId &peer_id) {
if (auto self = wp.lock()) {
Expand Down Expand Up @@ -173,6 +194,7 @@ namespace kagome::network {
void PeerManagerImpl::stop() {
storeActivePeers();
add_peer_handle_.unsubscribe();
peer_disconnected_handler_.unsubscribe();
}

void PeerManagerImpl::connectToPeer(const PeerInfo &peer_info) {
Expand Down Expand Up @@ -243,54 +265,56 @@ namespace kagome::network {
SL_TRACE(log_, "Try to align peers number");

const auto target_count = app_config_.peeringConfig().targetPeerAmount;
const auto soft_limit = app_config_.peeringConfig().softLimit;
const auto hard_limit = app_config_.peeringConfig().hardLimit;
const auto peer_ttl = app_config_.peeringConfig().peerTtl;

align_timer_.cancel();

// disconnect from peers with negative reputation
std::vector<PeerId> peers_to_disconnect;
for (const auto &[peer_id, _] : active_peers_) {
if (reputation_repository_->reputation(peer_id) < 0) {
peers_to_disconnect.emplace_back(peer_id);
using PriorityType = int32_t;
using ItemType = std::pair<PriorityType, PeerId>;

std::vector<ItemType> peers_list;
peers_list.reserve(active_peers_.size());

uint64_t const now_ms =
std::chrono::time_point_cast<std::chrono::milliseconds>(clock_->now())
.time_since_epoch()
.count();
uint64_t const idle_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(peer_ttl).count();

for (const auto &[peer_id, desc] : active_peers_) {
uint64_t const last_activity_ms =
std::chrono::time_point_cast<std::chrono::milliseconds>(
desc.time_point)
.time_since_epoch()
.count();

const auto peer_reputation = reputation_repository_->reputation(peer_id);
if (peer_reputation < kDisconnectReputation
|| last_activity_ms + idle_ms < now_ms) {
peers_list.push_back(
std::make_pair(std::numeric_limits<PriorityType>::min(), peer_id));
// we have to store peers somewhere first due to inability to iterate
// over active_peers_ and do disconnectFromPeers (which modifies
// active_peers_) at the same time
} else {
peers_list.push_back(std::make_pair(peer_reputation, peer_id));
}
}
for (const auto &peer_id : peers_to_disconnect) {
SL_DEBUG(log_,
"Disconnecting from peer {} due to its negative reputation",
peer_id);
disconnectFromPeer(peer_id);
}

// Soft limit is exceeded
if (active_peers_.size() > soft_limit) {
// Get oldest peer
auto it = std::min_element(active_peers_.begin(),
active_peers_.end(),
[](const auto &item1, const auto &item2) {
return item1.second.time_point
< item2.second.time_point;
});
auto &[oldest_peer_id, oldest_descr] = *it;

if (active_peers_.size() > hard_limit) {
// Hard limit is exceeded
SL_DEBUG(log_, "Hard limit of of active peers is exceeded");
disconnectFromPeer(oldest_peer_id);
std::sort(peers_list.begin(),
peers_list.end(),
[](auto const &l, auto const &r) { return r.first < l.first; });

} else if (oldest_descr.time_point + peer_ttl < clock_->now()) {
// Peer is inactive long time
auto &oldest_peer_id_ref = oldest_peer_id;
SL_DEBUG(log_, "Found inactive peer: {}", oldest_peer_id_ref);
disconnectFromPeer(oldest_peer_id);

} else {
SL_TRACE(log_, "No peer to disconnect at soft limit");
}
for (; !peers_list.empty()
&& (peers_list.size() > hard_limit
|| peers_list.back().first
== std::numeric_limits<PriorityType>::min());
peers_list.pop_back()) {
const auto &peer_id = peers_list.back().second;
disconnectFromPeer(peer_id);
}

// Not enough active peers
Expand Down Expand Up @@ -404,54 +428,13 @@ namespace kagome::network {
kTimeoutForConnecting);
}

inline std::string createBT() {
static constexpr size_t kStackSize = 30ull;
void *a[kStackSize];
auto const count = backtrace(a, kStackSize);

/// TODO (iceseer): it's a test code and we doesnt care about allocations
char **names = backtrace_symbols(a, count);
auto cleaner = gsl::finally([names] { free(names); });

std::string data;
data.reserve(64 * kStackSize);
for (auto ix = 0; ix < count; ++ix) {
data += names[ix];
data += '\n';
}

return data;
}

void PeerManagerImpl::disconnectFromPeer(const PeerId &peer_id) {
if (peer_id == own_peer_info_.id) {
return;
}

++entry_counter_;
auto locker = gsl::finally([&] { --entry_counter_; });
auto entry_counter = entry_counter_.load();
SL_INFO(log_,
"Disconnect from peer {}, entry {}, thread {}",
peer_id,
entry_counter,
std::this_thread::get_id());
if (entry_counter > 1) {
SL_INFO(log_,
"Found double {} entry: the second one came from\n{}",
__func__,
createBT());
}

auto it = active_peers_.find(peer_id);
SL_INFO(log_, "Disconnect from peer {}", peer_id);
host_.disconnect(peer_id);
if (it != active_peers_.end()) {
stream_engine_->del(peer_id);
peer_states_.erase(peer_id);
active_peers_.erase(it);
sync_peer_num_->set(active_peers_.size());
SL_DEBUG(log_, "Remained {} active peers", active_peers_.size());
}
}

void PeerManagerImpl::keepAlive(const PeerId &peer_id) {
Expand Down
2 changes: 1 addition & 1 deletion core/network/impl/peer_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ namespace kagome::network {
std::shared_ptr<ReputationRepository> reputation_repository_;

libp2p::event::Handle add_peer_handle_;
libp2p::event::Handle peer_disconnected_handler_;
std::unordered_set<PeerId> peers_in_queue_;
std::deque<std::reference_wrapper<const PeerId>> queue_to_connect_;
std::unordered_set<PeerId> connecting_peers_;
Expand All @@ -194,7 +195,6 @@ namespace kagome::network {
ParachainState parachain_state_;

log::Logger log_;
std::atomic<uint64_t> entry_counter_;
};

} // namespace kagome::network
Expand Down

0 comments on commit 58b2272

Please sign in to comment.