Skip to content

Commit

Permalink
PeerManagerImpl fixes
Browse files Browse the repository at this point in the history
Signed-off-by: iceseer <[email protected]>

# Conflicts:
#	core/blockchain/impl/block_tree_impl.cpp
#	core/injector/application_injector.cpp
#	core/network/impl/peer_manager_impl.cpp
#	core/network/impl/peer_manager_impl.hpp
#	core/network/impl/peer_view.cpp
#	core/network/peer_view.hpp
#	core/parachain/validator/impl/parachain_observer.cpp

# Conflicts:
#	core/injector/application_injector.cpp
#	core/network/impl/peer_manager_impl.cpp
#	core/network/impl/peer_manager_impl.hpp
  • Loading branch information
iceseer committed Nov 22, 2022
1 parent bcd9218 commit 32d28d9
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 144 deletions.
232 changes: 136 additions & 96 deletions core/network/impl/peer_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,28 @@ OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, PeerManagerImpl::Error, e) {
return "Unknown error in ChainSpecImpl";
}

namespace {

template <typename P, typename F>
bool openOutgoing(std::shared_ptr<kagome::network::StreamEngine> &se,
std::shared_ptr<P> const &protocol,
kagome::network::PeerManager::PeerInfo const &pi,
F &&func) {
BOOST_ASSERT(se);
BOOST_ASSERT(protocol);

if (se->reserveOutgoing(pi.id, protocol)) {
protocol->newOutgoingStream(
pi, [func{std::forward<F>(func)}](auto &&stream_res) mutable {
return std::forward<F>(func)(std::move(stream_res));
});
return true;
}
return false;
}

} // namespace

namespace kagome::network {
PeerManagerImpl::PeerManagerImpl(
std::shared_ptr<application::AppStateManager> app_state_manager,
Expand Down Expand Up @@ -75,6 +97,7 @@ namespace kagome::network {
BOOST_ASSERT(router_ != nullptr);
BOOST_ASSERT(storage_ != nullptr);
BOOST_ASSERT(hasher_ != nullptr);
BOOST_ASSERT(peer_view_);
BOOST_ASSERT(reputation_repository_ != nullptr);
BOOST_ASSERT(peer_view_ != nullptr);

Expand Down Expand Up @@ -249,12 +272,15 @@ namespace kagome::network {
outcome::result<
std::pair<network::CollatorPublicKey const &, network::ParachainId>>
PeerManagerImpl::insert_advertisement(PeerState &peer_state,
ParachainState &parachain_state,
primitives::BlockHash para_hash) {
if (!peer_state.collator_state) return Error::UNDECLARED_COLLATOR;

if (parachain_state.our_view.count(para_hash) == 0)
auto my_view = peer_view_->getMyView();
BOOST_ASSERT(my_view);

if (!my_view->get().contains(para_hash)) {
return Error::OUT_OF_VIEW;
}

if (peer_state.collator_state.value().advertisements.count(para_hash) != 0)
return Error::DUPLICATE;
Expand Down Expand Up @@ -508,10 +534,6 @@ namespace kagome::network {
it->second.best_block = status.best_block;
}

ParachainState &PeerManagerImpl::parachainState() {
return parachain_state_;
}

void PeerManagerImpl::updatePeerState(const PeerId &peer_id,
const BlockAnnounce &announce) {
auto hash = hasher_->blake2b_256(scale::encode(announce.header).value());
Expand Down Expand Up @@ -566,6 +588,104 @@ namespace kagome::network {
queue_to_connect_.size());
}

template <typename F>
void PeerManagerImpl::openBlockAnnounceProtocol(
PeerInfo const &peer_info,
libp2p::network::ConnectionManager::ConnectionSPtr const &connection,
F &&opened_callback) {
auto block_announce_protocol = router_->getBlockAnnounceProtocol();
BOOST_ASSERT_MSG(block_announce_protocol,
"Router did not provide block announce protocol");

if (!openOutgoing(
stream_engine_,
block_announce_protocol,
peer_info,
[wp = weak_from_this(),
peer_info,
protocol = block_announce_protocol,
connection,
opened_callback{std::forward<F>(opened_callback)}](
auto &&stream_res) mutable {
auto self = wp.lock();
if (not self) {
return;
}

auto &peer_id = peer_info.id;

self->stream_engine_->dropReserveOutgoing(peer_id, protocol);
self->connecting_peers_.erase(peer_id);

if (not stream_res.has_value()) {
self->log_->warn("Unable to create stream {} with {}: {}",
protocol->protocolName(),
peer_id,
stream_res.error().message());
self->disconnectFromPeer(peer_id);
return;
}
PeerType peer_type = connection->isInitiator()
? PeerType::PEER_TYPE_OUT
: PeerType::PEER_TYPE_IN;

// Add to active peer list
if (auto [ap_it, added] = self->active_peers_.emplace(
peer_id, PeerDescriptor{peer_type, self->clock_->now()});
added) {
self->recently_active_peers_.insert(peer_id);

// And remove from queue
if (auto piq_it = self->peers_in_queue_.find(peer_id);
piq_it != self->peers_in_queue_.end()) {
auto qtc_it =
std::find_if(self->queue_to_connect_.cbegin(),
self->queue_to_connect_.cend(),
[&peer_id = peer_id](const auto &item) {
return peer_id == item.get();
});
self->queue_to_connect_.erase(qtc_it);
self->peers_in_queue_.erase(piq_it);
BOOST_ASSERT(self->queue_to_connect_.size()
== self->peers_in_queue_.size());

SL_DEBUG(self->log_,
"Remained peers in queue for connect: {}",
self->peers_in_queue_.size());
}
self->sync_peer_num_->set(self->active_peers_.size());
}

self->reserveStreams(peer_id);
self->startPingingPeer(peer_id);

/// Process callback when opened successfully
std::forward<F>(opened_callback)(
self, peer_info, self->getPeerState(peer_id));
})) {
SL_DEBUG(log_,
"Stream {} with {} is alive or connecting",
block_announce_protocol->protocolName(),
peer_info.id);
}
}

void PeerManagerImpl::tryOpenGrandpaProtocol(PeerInfo const &peer_info,
PeerState &r_info) {
if (auto o_info_opt = getPeerState(own_peer_info_.id);
o_info_opt.has_value()) {
auto &o_info = o_info_opt.value();

// Establish outgoing grandpa stream if node synced
if (r_info.best_block.number <= o_info.get().best_block.number) {
auto grandpa_protocol = router_->getGrandpaProtocol();
BOOST_ASSERT_MSG(grandpa_protocol,
"Router did not provide grandpa protocol");
grandpa_protocol->newOutgoingStream(peer_info, [](const auto &...) {});
}
}
}

void PeerManagerImpl::processFullyConnectedPeer(const PeerId &peer_id) {
// Skip connection to itself
if (isSelfPeer(peer_id)) {
Expand Down Expand Up @@ -621,96 +741,16 @@ namespace kagome::network {
}

PeerInfo peer_info{.id = peer_id, .addresses = {}};

auto block_announce_protocol = router_->getBlockAnnounceProtocol();
BOOST_ASSERT_MSG(block_announce_protocol,
"Router did not provide block announce protocol");

if (stream_engine_->reserveOutgoing(peer_info.id,
block_announce_protocol)) {
block_announce_protocol->newOutgoingStream(
peer_info,
[wp = weak_from_this(),
peer_info,
protocol = block_announce_protocol,
connection](auto &&stream_res) {
auto self = wp.lock();
if (not self) {
return;
}

auto &peer_id = peer_info.id;

self->stream_engine_->dropReserveOutgoing(peer_id, protocol);
if (not stream_res.has_value()) {
self->log_->warn("Unable to create stream {} with {}: {}",
protocol->protocolName(),
peer_id,
stream_res.error().message());
self->connecting_peers_.erase(peer_id);
self->disconnectFromPeer(peer_id);
return;
}
PeerType peer_type = connection->isInitiator()
? PeerType::PEER_TYPE_OUT
: PeerType::PEER_TYPE_IN;

// Add to active peer list
if (auto [ap_it, added] = self->active_peers_.emplace(
peer_id, PeerDescriptor{peer_type, self->clock_->now()});
added) {
self->recently_active_peers_.insert(peer_id);

// And remove from queue
if (auto piq_it = self->peers_in_queue_.find(peer_id);
piq_it != self->peers_in_queue_.end()) {
auto qtc_it =
std::find_if(self->queue_to_connect_.cbegin(),
self->queue_to_connect_.cend(),
[&peer_id = peer_id](const auto &item) {
return peer_id == item.get();
});
self->queue_to_connect_.erase(qtc_it);
self->peers_in_queue_.erase(piq_it);
BOOST_ASSERT(self->queue_to_connect_.size()
== self->peers_in_queue_.size());

SL_DEBUG(self->log_,
"Remained peers in queue for connect: {}",
self->peers_in_queue_.size());
}
self->sync_peer_num_->set(self->active_peers_.size());
}

self->connecting_peers_.erase(peer_id);

self->reserveStreams(peer_id);
self->startPingingPeer(peer_id);

// Establish outgoing grandpa stream if node synced
auto r_info_opt = self->getPeerState(peer_id);
auto o_info_opt = self->getPeerState(self->own_peer_info_.id);
if (r_info_opt.has_value() and o_info_opt.has_value()) {
auto &r_info = r_info_opt.value();
auto &o_info = o_info_opt.value();

if (r_info.get().best_block.number
<= o_info.get().best_block.number) {
auto grandpa_protocol = self->router_->getGrandpaProtocol();
BOOST_ASSERT_MSG(grandpa_protocol,
"Router did not provide grandpa protocol");
grandpa_protocol->newOutgoingStream(peer_info,
[](const auto &...) {});
}
}
});
} else {
SL_DEBUG(log_,
"Stream {} with {} is alive",
block_announce_protocol->protocolName(),
peer_id);
connecting_peers_.erase(peer_id);
}
openBlockAnnounceProtocol(
peer_info,
connection,
[](std::shared_ptr<PeerManagerImpl> &self,
PeerInfo const &peer_info,
std::optional<std::reference_wrapper<PeerState>> peer_state) {
if (peer_state.has_value()) {
self->tryOpenGrandpaProtocol(peer_info, peer_state.value().get());
}
});

auto addresses_res =
host_.getPeerRepository().getAddressRepository().getAddresses(peer_id);
Expand Down
15 changes: 9 additions & 6 deletions core/network/impl/peer_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,9 @@ namespace kagome::network {
network::CollatorPublicKey const &collator_id,
network::ParachainId para_id) override;

/** @see PeerManager::parachainState */
ParachainState &parachainState() override;

outcome::result<
std::pair<network::CollatorPublicKey const &, network::ParachainId>>
insert_advertisement(PeerState &peer_state,
ParachainState &parachain_state,
primitives::BlockHash para_hash) override;

/** @see PeerManager::forEachPeer */
Expand Down Expand Up @@ -149,6 +145,14 @@ namespace kagome::network {

void processFullyConnectedPeer(const PeerId &peer_id);

template <typename F>
void openBlockAnnounceProtocol(
PeerInfo const &peer_info,
libp2p::network::ConnectionManager::ConnectionSPtr const &connection,
F &&opened_callback);
void tryOpenGrandpaProtocol(PeerInfo const &peer_info,
PeerState &peer_state);

/// Opens streams set for special peer (i.e. new-discovered)
void connectToPeer(const PeerId &peer_id);

Expand Down Expand Up @@ -196,8 +200,7 @@ namespace kagome::network {
metrics::Gauge *sync_peer_num_;

// parachain
ParachainState parachain_state_;
std::shared_ptr<PeerView> peer_view_;
std::shared_ptr<network::PeerView> peer_view_;

log::Logger log_;
};
Expand Down
14 changes: 2 additions & 12 deletions core/network/peer_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ namespace kagome::network {
libp2p::peer::PeerId const &peer_id;
};

/*
* Parachain state view.
*/
struct ParachainState {
std::unordered_map<BlockHash, bool> our_view;
};
using OurView = network::View;

struct PeerState {
clock::SteadyClock::TimePoint time;
Expand All @@ -49,6 +44,7 @@ namespace kagome::network {
std::optional<VoterSetId> set_id = std::nullopt;
BlockNumber last_finalized = 0;
std::optional<CollatorState> collator_state = std::nullopt;
std::optional<View> view;
};

struct StreamEngine;
Expand Down Expand Up @@ -113,14 +109,8 @@ namespace kagome::network {
virtual outcome::result<
std::pair<network::CollatorPublicKey const &, network::ParachainId>>
insert_advertisement(PeerState &peer_state,
ParachainState &parachain_state,
primitives::BlockHash para_hash) = 0;

/**
* Allows to update parachains states.
*/
virtual ParachainState &parachainState() = 0;

/**
* Updates collation state and stores parachain id. Should be called once
* for each peer per connection. If else -> reduce reputation.
Expand Down
30 changes: 1 addition & 29 deletions core/parachain/validator/impl/parachain_observer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,7 @@ namespace kagome::observers {

void onAdvertise(libp2p::peer::PeerId const &peer_id,
primitives::BlockHash relay_parent) override {
auto &parachain_state = pm_->parachainState();
bool const contains_para_hash =
(parachain_state.our_view.count(relay_parent) != 0);

if (!contains_para_hash) {
logger_->warn("Advertise collation out of view from peer {}", peer_id);
return;
}

auto const peer_state = pm_->getPeerState(peer_id);
if (!peer_state) {
logger_->warn("Received collation advertise from unknown peer {}",
peer_id);
return;
}

auto result = pm_->insert_advertisement(
peer_state->get(), parachain_state, std::move(relay_parent));
if (!result) {
logger_->warn("Insert advertisement from {} failed: {}",
peer_id,
result.error().message());
return;
}

processor_->requestCollations(
network::PendingCollation{.para_id = result.value().second,
.relay_parent = relay_parent,
.peer_id = peer_id});
/// TODO(iceseer): removed because of merge
}

void onDeclare(libp2p::peer::PeerId const &peer_id,
Expand Down
2 changes: 1 addition & 1 deletion test/mock/core/network/peer_manager_mock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace kagome::network {

MOCK_METHOD(AdvResult,
insert_advertisement,
(PeerState &, ParachainState &, primitives::BlockHash),
(PeerState &, primitives::BlockHash),
(override));

MOCK_METHOD(ParachainState &, parachainState, (), (override));
Expand Down

0 comments on commit 32d28d9

Please sign in to comment.