diff --git a/core/network/impl/peer_manager_impl.cpp b/core/network/impl/peer_manager_impl.cpp index 838a0e972e..2aeb4a5f70 100644 --- a/core/network/impl/peer_manager_impl.cpp +++ b/core/network/impl/peer_manager_impl.cpp @@ -34,6 +34,28 @@ OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, PeerManagerImpl::Error, e) { return "Unknown error in ChainSpecImpl"; } +namespace { + + template + bool openOutgoing(std::shared_ptr &se, + std::shared_ptr

const &protocol, + kagome::network::PeerManager::PeerInfo const &pi, + F &&func) { + BOOST_ASSERT(se); + BOOST_ASSERT(protocol); + + if (se->reserveOutgoing(pi.id, protocol)) { + protocol->newOutgoingStream( + pi, [func{std::forward(func)}](auto &&stream_res) mutable { + return std::forward(func)(std::move(stream_res)); + }); + return true; + } + return false; + } + +} // namespace + namespace kagome::network { PeerManagerImpl::PeerManagerImpl( std::shared_ptr app_state_manager, @@ -75,6 +97,7 @@ namespace kagome::network { BOOST_ASSERT(router_ != nullptr); BOOST_ASSERT(storage_ != nullptr); BOOST_ASSERT(hasher_ != nullptr); + BOOST_ASSERT(peer_view_); BOOST_ASSERT(reputation_repository_ != nullptr); BOOST_ASSERT(peer_view_ != nullptr); @@ -249,12 +272,15 @@ namespace kagome::network { outcome::result< std::pair> PeerManagerImpl::insert_advertisement(PeerState &peer_state, - ParachainState ¶chain_state, primitives::BlockHash para_hash) { if (!peer_state.collator_state) return Error::UNDECLARED_COLLATOR; - if (parachain_state.our_view.count(para_hash) == 0) + auto my_view = peer_view_->getMyView(); + BOOST_ASSERT(my_view); + + if (!my_view->get().contains(para_hash)) { return Error::OUT_OF_VIEW; + } if (peer_state.collator_state.value().advertisements.count(para_hash) != 0) return Error::DUPLICATE; @@ -508,10 +534,6 @@ namespace kagome::network { it->second.best_block = status.best_block; } - ParachainState &PeerManagerImpl::parachainState() { - return parachain_state_; - } - void PeerManagerImpl::updatePeerState(const PeerId &peer_id, const BlockAnnounce &announce) { auto hash = hasher_->blake2b_256(scale::encode(announce.header).value()); @@ -566,6 +588,104 @@ namespace kagome::network { queue_to_connect_.size()); } + template + void PeerManagerImpl::openBlockAnnounceProtocol( + PeerInfo const &peer_info, + libp2p::network::ConnectionManager::ConnectionSPtr const &connection, + F &&opened_callback) { + auto block_announce_protocol = router_->getBlockAnnounceProtocol(); + BOOST_ASSERT_MSG(block_announce_protocol, + "Router did not provide block announce protocol"); + + if (!openOutgoing( + stream_engine_, + block_announce_protocol, + peer_info, + [wp = weak_from_this(), + peer_info, + protocol = block_announce_protocol, + connection, + opened_callback{std::forward(opened_callback)}]( + auto &&stream_res) mutable { + auto self = wp.lock(); + if (not self) { + return; + } + + auto &peer_id = peer_info.id; + + self->stream_engine_->dropReserveOutgoing(peer_id, protocol); + self->connecting_peers_.erase(peer_id); + + if (not stream_res.has_value()) { + self->log_->warn("Unable to create stream {} with {}: {}", + protocol->protocolName(), + peer_id, + stream_res.error().message()); + self->disconnectFromPeer(peer_id); + return; + } + PeerType peer_type = connection->isInitiator() + ? PeerType::PEER_TYPE_OUT + : PeerType::PEER_TYPE_IN; + + // Add to active peer list + if (auto [ap_it, added] = self->active_peers_.emplace( + peer_id, PeerDescriptor{peer_type, self->clock_->now()}); + added) { + self->recently_active_peers_.insert(peer_id); + + // And remove from queue + if (auto piq_it = self->peers_in_queue_.find(peer_id); + piq_it != self->peers_in_queue_.end()) { + auto qtc_it = + std::find_if(self->queue_to_connect_.cbegin(), + self->queue_to_connect_.cend(), + [&peer_id = peer_id](const auto &item) { + return peer_id == item.get(); + }); + self->queue_to_connect_.erase(qtc_it); + self->peers_in_queue_.erase(piq_it); + BOOST_ASSERT(self->queue_to_connect_.size() + == self->peers_in_queue_.size()); + + SL_DEBUG(self->log_, + "Remained peers in queue for connect: {}", + self->peers_in_queue_.size()); + } + self->sync_peer_num_->set(self->active_peers_.size()); + } + + self->reserveStreams(peer_id); + self->startPingingPeer(peer_id); + + /// Process callback when opened successfully + std::forward(opened_callback)( + self, peer_info, self->getPeerState(peer_id)); + })) { + SL_DEBUG(log_, + "Stream {} with {} is alive or connecting", + block_announce_protocol->protocolName(), + peer_info.id); + } + } + + void PeerManagerImpl::tryOpenGrandpaProtocol(PeerInfo const &peer_info, + PeerState &r_info) { + if (auto o_info_opt = getPeerState(own_peer_info_.id); + o_info_opt.has_value()) { + auto &o_info = o_info_opt.value(); + + // Establish outgoing grandpa stream if node synced + if (r_info.best_block.number <= o_info.get().best_block.number) { + auto grandpa_protocol = router_->getGrandpaProtocol(); + BOOST_ASSERT_MSG(grandpa_protocol, + "Router did not provide grandpa protocol"); + grandpa_protocol->newOutgoingStream(peer_info, [](const auto &...) {}); + } + } + } + void PeerManagerImpl::processFullyConnectedPeer(const PeerId &peer_id) { // Skip connection to itself if (isSelfPeer(peer_id)) { @@ -621,96 +741,16 @@ namespace kagome::network { } PeerInfo peer_info{.id = peer_id, .addresses = {}}; - - auto block_announce_protocol = router_->getBlockAnnounceProtocol(); - BOOST_ASSERT_MSG(block_announce_protocol, - "Router did not provide block announce protocol"); - - if (stream_engine_->reserveOutgoing(peer_info.id, - block_announce_protocol)) { - block_announce_protocol->newOutgoingStream( - peer_info, - [wp = weak_from_this(), - peer_info, - protocol = block_announce_protocol, - connection](auto &&stream_res) { - auto self = wp.lock(); - if (not self) { - return; - } - - auto &peer_id = peer_info.id; - - self->stream_engine_->dropReserveOutgoing(peer_id, protocol); - if (not stream_res.has_value()) { - self->log_->warn("Unable to create stream {} with {}: {}", - protocol->protocolName(), - peer_id, - stream_res.error().message()); - self->connecting_peers_.erase(peer_id); - self->disconnectFromPeer(peer_id); - return; - } - PeerType peer_type = connection->isInitiator() - ? PeerType::PEER_TYPE_OUT - : PeerType::PEER_TYPE_IN; - - // Add to active peer list - if (auto [ap_it, added] = self->active_peers_.emplace( - peer_id, PeerDescriptor{peer_type, self->clock_->now()}); - added) { - self->recently_active_peers_.insert(peer_id); - - // And remove from queue - if (auto piq_it = self->peers_in_queue_.find(peer_id); - piq_it != self->peers_in_queue_.end()) { - auto qtc_it = - std::find_if(self->queue_to_connect_.cbegin(), - self->queue_to_connect_.cend(), - [&peer_id = peer_id](const auto &item) { - return peer_id == item.get(); - }); - self->queue_to_connect_.erase(qtc_it); - self->peers_in_queue_.erase(piq_it); - BOOST_ASSERT(self->queue_to_connect_.size() - == self->peers_in_queue_.size()); - - SL_DEBUG(self->log_, - "Remained peers in queue for connect: {}", - self->peers_in_queue_.size()); - } - self->sync_peer_num_->set(self->active_peers_.size()); - } - - self->connecting_peers_.erase(peer_id); - - self->reserveStreams(peer_id); - self->startPingingPeer(peer_id); - - // Establish outgoing grandpa stream if node synced - auto r_info_opt = self->getPeerState(peer_id); - auto o_info_opt = self->getPeerState(self->own_peer_info_.id); - if (r_info_opt.has_value() and o_info_opt.has_value()) { - auto &r_info = r_info_opt.value(); - auto &o_info = o_info_opt.value(); - - if (r_info.get().best_block.number - <= o_info.get().best_block.number) { - auto grandpa_protocol = self->router_->getGrandpaProtocol(); - BOOST_ASSERT_MSG(grandpa_protocol, - "Router did not provide grandpa protocol"); - grandpa_protocol->newOutgoingStream(peer_info, - [](const auto &...) {}); - } - } - }); - } else { - SL_DEBUG(log_, - "Stream {} with {} is alive", - block_announce_protocol->protocolName(), - peer_id); - connecting_peers_.erase(peer_id); - } + openBlockAnnounceProtocol( + peer_info, + connection, + [](std::shared_ptr &self, + PeerInfo const &peer_info, + std::optional> peer_state) { + if (peer_state.has_value()) { + self->tryOpenGrandpaProtocol(peer_info, peer_state.value().get()); + } + }); auto addresses_res = host_.getPeerRepository().getAddressRepository().getAddresses(peer_id); diff --git a/core/network/impl/peer_manager_impl.hpp b/core/network/impl/peer_manager_impl.hpp index 3ef6cbbff0..16063d81fb 100644 --- a/core/network/impl/peer_manager_impl.hpp +++ b/core/network/impl/peer_manager_impl.hpp @@ -100,13 +100,9 @@ namespace kagome::network { network::CollatorPublicKey const &collator_id, network::ParachainId para_id) override; - /** @see PeerManager::parachainState */ - ParachainState ¶chainState() override; - outcome::result< std::pair> insert_advertisement(PeerState &peer_state, - ParachainState ¶chain_state, primitives::BlockHash para_hash) override; /** @see PeerManager::forEachPeer */ @@ -149,6 +145,14 @@ namespace kagome::network { void processFullyConnectedPeer(const PeerId &peer_id); + template + void openBlockAnnounceProtocol( + PeerInfo const &peer_info, + libp2p::network::ConnectionManager::ConnectionSPtr const &connection, + F &&opened_callback); + void tryOpenGrandpaProtocol(PeerInfo const &peer_info, + PeerState &peer_state); + /// Opens streams set for special peer (i.e. new-discovered) void connectToPeer(const PeerId &peer_id); @@ -196,8 +200,7 @@ namespace kagome::network { metrics::Gauge *sync_peer_num_; // parachain - ParachainState parachain_state_; - std::shared_ptr peer_view_; + std::shared_ptr peer_view_; log::Logger log_; }; diff --git a/core/network/peer_manager.hpp b/core/network/peer_manager.hpp index b641922eb6..3145e8ae84 100644 --- a/core/network/peer_manager.hpp +++ b/core/network/peer_manager.hpp @@ -34,12 +34,7 @@ namespace kagome::network { libp2p::peer::PeerId const &peer_id; }; - /* - * Parachain state view. - */ - struct ParachainState { - std::unordered_map our_view; - }; + using OurView = network::View; struct PeerState { clock::SteadyClock::TimePoint time; @@ -49,6 +44,7 @@ namespace kagome::network { std::optional set_id = std::nullopt; BlockNumber last_finalized = 0; std::optional collator_state = std::nullopt; + std::optional view; }; struct StreamEngine; @@ -113,14 +109,8 @@ namespace kagome::network { virtual outcome::result< std::pair> insert_advertisement(PeerState &peer_state, - ParachainState ¶chain_state, primitives::BlockHash para_hash) = 0; - /** - * Allows to update parachains states. - */ - virtual ParachainState ¶chainState() = 0; - /** * Updates collation state and stores parachain id. Should be called once * for each peer per connection. If else -> reduce reputation. diff --git a/core/parachain/validator/impl/parachain_observer.cpp b/core/parachain/validator/impl/parachain_observer.cpp index a3817f4fd5..cc50cc6ca6 100644 --- a/core/parachain/validator/impl/parachain_observer.cpp +++ b/core/parachain/validator/impl/parachain_observer.cpp @@ -33,35 +33,7 @@ namespace kagome::observers { void onAdvertise(libp2p::peer::PeerId const &peer_id, primitives::BlockHash relay_parent) override { - auto ¶chain_state = pm_->parachainState(); - bool const contains_para_hash = - (parachain_state.our_view.count(relay_parent) != 0); - - if (!contains_para_hash) { - logger_->warn("Advertise collation out of view from peer {}", peer_id); - return; - } - - auto const peer_state = pm_->getPeerState(peer_id); - if (!peer_state) { - logger_->warn("Received collation advertise from unknown peer {}", - peer_id); - return; - } - - auto result = pm_->insert_advertisement( - peer_state->get(), parachain_state, std::move(relay_parent)); - if (!result) { - logger_->warn("Insert advertisement from {} failed: {}", - peer_id, - result.error().message()); - return; - } - - processor_->requestCollations( - network::PendingCollation{.para_id = result.value().second, - .relay_parent = relay_parent, - .peer_id = peer_id}); + /// TODO(iceseer): removed because of merge } void onDeclare(libp2p::peer::PeerId const &peer_id, diff --git a/test/mock/core/network/peer_manager_mock.hpp b/test/mock/core/network/peer_manager_mock.hpp index 1750688c43..e2072a753e 100644 --- a/test/mock/core/network/peer_manager_mock.hpp +++ b/test/mock/core/network/peer_manager_mock.hpp @@ -26,7 +26,7 @@ namespace kagome::network { MOCK_METHOD(AdvResult, insert_advertisement, - (PeerState &, ParachainState &, primitives::BlockHash), + (PeerState &, primitives::BlockHash), (override)); MOCK_METHOD(ParachainState &, parachainState, (), (override));