Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,4 +395,14 @@ pub trait ChainSync<Block: BlockT>: Send {

/// Decode implementation-specific state response.
fn decode_state_response(&self, response: &[u8]) -> Result<OpaqueStateResponse, String>;

/// Advance the state of `ChainSync`
///
/// Internally calls [`ChainSync::poll_block_announce_validation()`] and
/// this function should be polled until it returns [`Poll::Pending`] to
/// consume all pending events.
fn poll(
&mut self,
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
}
4 changes: 4 additions & 0 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub use sc_network_common::{

pub use libp2p::{build_multiaddr, core::PublicKey, identity};

use crate::ChainSyncInterface;
use core::{fmt, iter};
use libp2p::{
identity::{ed25519, Keypair},
Expand Down Expand Up @@ -91,6 +92,9 @@ where
/// Instance of chain sync implementation.
pub chain_sync: Box<dyn ChainSync<B>>,

/// Interface that can be used to delegate syncing-related function calls to `ChainSync`
pub chain_sync_service: Box<dyn ChainSyncInterface<B>>,

/// Registry for recording prometheus metrics to.
pub metrics_registry: Option<Registry>,

Expand Down
12 changes: 12 additions & 0 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ pub use service::{
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender,
NotificationSenderReady, OutboundFailure, PublicKey,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};

pub use sc_peerset::ReputationChange;

Expand All @@ -293,3 +294,14 @@ const MAX_CONNECTIONS_PER_PEER: usize = 2;

/// The maximum number of concurrent established connections that were incoming.
const MAX_CONNECTIONS_ESTABLISHED_INCOMING: u32 = 10_000;

/// Abstraction over syncing-related services
pub trait ChainSyncInterface<B: BlockT>:
NetworkSyncForkRequest<B::Hash, NumberFor<B>> + Send + Sync
{
}

impl<T, B: BlockT> ChainSyncInterface<B> for T where
T: NetworkSyncForkRequest<B::Hash, NumberFor<B>> + Send + Sync
{
}
19 changes: 5 additions & 14 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -947,18 +947,6 @@ where
self.chain_sync.clear_justification_requests();
}

/// Request syncing for the given block from given set of peers.
/// Uses `protocol` to queue a new block download request and tries to dispatch all pending
/// requests.
pub fn set_sync_fork_request(
&mut self,
peers: Vec<PeerId>,
hash: &B::Hash,
number: NumberFor<B>,
) {
self.chain_sync.set_sync_fork_request(peers, hash, number)
}

/// A batch of blocks have been processed, with or without errors.
/// Call this when a batch of blocks have been processed by the importqueue, with or without
/// errors.
Expand Down Expand Up @@ -1461,8 +1449,11 @@ where
self.pending_messages.push_back(event);
}

// Check if there is any block announcement validation finished.
while let Poll::Ready(result) = self.chain_sync.poll_block_announce_validation(cx) {
// Advance the state of `ChainSync`
//
// Process any received requests received from `NetworkService` and
// check if there is any block announcement validation finished.
while let Poll::Ready(result) = self.chain_sync.poll(cx) {
match self.process_block_announce_validation_result(result) {
CustomMessageOutcome::None => {},
outcome => self.pending_messages.push_back(outcome),
Expand Down
13 changes: 5 additions & 8 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
protocol::{self, NotificationsSink, NotifsHandlerError, PeerInfo, Protocol, Ready},
transport, ReputationChange,
transport, ChainSyncInterface, ReputationChange,
};

use futures::{channel::oneshot, prelude::*};
Expand Down Expand Up @@ -121,6 +121,8 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
peerset: PeersetHandle,
/// Channel that sends messages to the actual worker.
to_worker: TracingUnboundedSender<ServiceToWorkerMsg<B>>,
/// Interface that can be used to delegate calls to `ChainSync`
chain_sync_service: Box<dyn ChainSyncInterface<B>>,
/// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Updated by the [`NetworkWorker`].
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ProtocolName), NotificationsSink>>>,
Expand Down Expand Up @@ -433,6 +435,7 @@ where
local_peer_id,
local_identity,
to_worker,
chain_sync_service: params.chain_sync_service,
peers_notifications_sinks: peers_notifications_sinks.clone(),
notifications_sizes_metric: metrics
.as_ref()
Expand Down Expand Up @@ -814,7 +817,7 @@ where
/// a stale fork missing.
/// Passing empty `peers` set effectively removes the sync request.
fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SyncFork(peers, hash, number));
self.chain_sync_service.set_sync_fork_request(peers, hash, number);
}
}

Expand Down Expand Up @@ -1219,7 +1222,6 @@ enum ServiceToWorkerMsg<B: BlockT> {
RemoveSetReserved(ProtocolName, PeerId),
AddToPeersSet(ProtocolName, PeerId),
RemoveFromPeersSet(ProtocolName, PeerId),
SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>),
EventStream(out_events::Sender),
Request {
target: PeerId,
Expand Down Expand Up @@ -1380,11 +1382,6 @@ where
.behaviour_mut()
.user_protocol_mut()
.remove_from_peers_set(protocol, peer_id),
ServiceToWorkerMsg::SyncFork(peer_ids, hash, number) => this
.network_service
.behaviour_mut()
.user_protocol_mut()
.set_sync_fork_request(peer_ids, &hash, number),
ServiceToWorkerMsg::EventStream(sender) => this.event_streams.push(sender),
ServiceToWorkerMsg::Request {
target,
Expand Down
56 changes: 46 additions & 10 deletions client/network/src/service/chainsync_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{config, NetworkWorker};
use crate::{config, ChainSyncInterface, NetworkWorker};

use futures::prelude::*;
use libp2p::PeerId;
Expand All @@ -35,7 +35,7 @@ use sc_network_common::{
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
use sc_network_sync::{
block_request_handler::BlockRequestHandler, mock::MockChainSync,
state_request_handler::StateRequestHandler,
service::mock::MockChainSyncInterface, state_request_handler::StateRequestHandler,
};
use sp_core::H256;
use sp_runtime::{
Expand All @@ -56,6 +56,7 @@ const PROTOCOL_NAME: &str = "/foo";

fn make_network(
chain_sync: Box<dyn ChainSyncT<substrate_test_runtime_client::runtime::Block>>,
chain_sync_service: Box<dyn ChainSyncInterface<substrate_test_runtime_client::runtime::Block>>,
client: Arc<substrate_test_runtime_client::TestClient>,
) -> (TestNetworkWorker, Arc<substrate_test_runtime_client::TestClient>) {
let network_config = config::NetworkConfiguration {
Expand Down Expand Up @@ -174,6 +175,7 @@ fn make_network(
fork_id,
import_queue,
chain_sync,
chain_sync_service,
metrics_registry: None,
block_request_protocol_config,
state_request_protocol_config,
Expand All @@ -193,7 +195,7 @@ fn set_default_expecations_no_peers(
chain_sync.expect_state_request().returning(|| None);
chain_sync.expect_justification_requests().returning(|| Box::new(iter::empty()));
chain_sync.expect_warp_sync_request().returning(|| None);
chain_sync.expect_poll_block_announce_validation().returning(|_| Poll::Pending);
chain_sync.expect_poll().returning(|_| Poll::Pending);
chain_sync.expect_status().returning(|| SyncStatus {
state: SyncState::Idle,
best_seen_block: None,
Expand All @@ -207,11 +209,18 @@ fn set_default_expecations_no_peers(
#[async_std::test]
async fn normal_network_poll_no_peers() {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);

// build `ChainSync` and set default expectations for it
let mut chain_sync =
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());
set_default_expecations_no_peers(&mut chain_sync);

let (mut network, _) = make_network(chain_sync, client);
// build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be
// called)
let chain_sync_service =
Box::new(MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new());

let (mut network, _) = make_network(chain_sync, chain_sync_service, client);

// poll the network once
futures::future::poll_fn(|cx| {
Expand All @@ -224,6 +233,13 @@ async fn normal_network_poll_no_peers() {
#[async_std::test]
async fn request_justification() {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);

// build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be
// called)
let chain_sync_service =
Box::new(MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new());

// build `ChainSync` and verify that call to `request_justification()` is made
let mut chain_sync =
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());

Expand All @@ -237,7 +253,7 @@ async fn request_justification() {
.returning(|_, _| ());

set_default_expecations_no_peers(&mut chain_sync);
let (mut network, _) = make_network(chain_sync, client);
let (mut network, _) = make_network(chain_sync, chain_sync_service, client);

// send "request justifiction" message and poll the network
network.service().request_justification(&hash, number);
Expand All @@ -252,13 +268,20 @@ async fn request_justification() {
#[async_std::test]
async fn clear_justification_requests(&mut self) {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);

// build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be
// called)
let chain_sync_service =
Box::new(MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new());

// build `ChainSync` and verify that call to `clear_justification_requests()` is made
let mut chain_sync =
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());

chain_sync.expect_clear_justification_requests().once().returning(|| ());

set_default_expecations_no_peers(&mut chain_sync);
let (mut network, _) = make_network(chain_sync, client);
let (mut network, _) = make_network(chain_sync, chain_sync_service, client);

// send "request justifiction" message and poll the network
network.service().clear_justification_requests();
Expand All @@ -273,24 +296,31 @@ async fn clear_justification_requests(&mut self) {
#[async_std::test]
async fn set_sync_fork_request() {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);

// build `ChainSync` and set default expectations for it
let mut chain_sync =
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());
set_default_expecations_no_peers(&mut chain_sync);

// build `ChainSyncInterface` provider and verify that the `set_sync_fork_request()`
// call is delegated to `ChainSyncInterface` (which eventually forwards it to `ChainSync`)
let mut chain_sync_service =
MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new();

let hash = H256::random();
let number = 1337u64;
let peers = (0..3).map(|_| PeerId::random()).collect::<Vec<_>>();
let copy_peers = peers.clone();

chain_sync
chain_sync_service
.expect_set_sync_fork_request()
.withf(move |in_peers, in_hash, in_number| {
&peers == in_peers && &hash == in_hash && &number == in_number
})
.once()
.returning(|_, _, _| ());

set_default_expecations_no_peers(&mut chain_sync);
let (mut network, _) = make_network(chain_sync, client);
let (mut network, _) = make_network(chain_sync, Box::new(chain_sync_service), client);

// send "set sync fork request" message and poll the network
network.service().set_sync_fork_request(copy_peers, hash, number);
Expand All @@ -305,6 +335,12 @@ async fn set_sync_fork_request() {
#[async_std::test]
async fn on_block_finalized() {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
// build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be
// called)
let chain_sync_service =
Box::new(MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new());

// build `ChainSync` and verify that call to `on_block_finalized()` is made
let mut chain_sync =
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());

Expand All @@ -326,7 +362,7 @@ async fn on_block_finalized() {
.returning(|_, _| ());

set_default_expecations_no_peers(&mut chain_sync);
let (mut network, _) = make_network(chain_sync, client);
let (mut network, _) = make_network(chain_sync, chain_sync_service, client);

// send "set sync fork request" message and poll the network
network.on_block_finalized(hash, header);
Expand Down
3 changes: 2 additions & 1 deletion client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ fn build_test_full_node(
protocol_config
};

let chain_sync = ChainSync::new(
let (chain_sync, chain_sync_service) = ChainSync::new(
match network_config.sync_mode {
config::SyncMode::Full => sc_network_common::sync::SyncMode::Full,
config::SyncMode::Fast { skip_proofs, storage_chain_mode } =>
Expand Down Expand Up @@ -172,6 +172,7 @@ fn build_test_full_node(
fork_id,
import_queue,
chain_sync: Box::new(chain_sync),
chain_sync_service,
metrics_registry: None,
block_request_protocol_config,
state_request_protocol_config,
Expand Down
2 changes: 2 additions & 0 deletions client/network/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ sc-client-api = { version = "4.0.0-dev", path = "../../api" }
sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" }
sc-network-common = { version = "0.10.0-dev", path = "../common" }
sc-peerset = { version = "4.0.0-dev", path = "../../peerset" }
sc-utils = { version = "4.0.0-dev", path = "../../utils" }
sp-arithmetic = { version = "5.0.0", path = "../../../primitives/arithmetic" }
sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" }
sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" }
Expand All @@ -42,6 +43,7 @@ sp-finality-grandpa = { version = "4.0.0-dev", path = "../../../primitives/final
sp-runtime = { version = "6.0.0", path = "../../../primitives/runtime" }

[dev-dependencies]
async-std = { version = "1.11.0", features = ["attributes"] }
quickcheck = { version = "1.0.3", default-features = false }
sc-block-builder = { version = "0.10.0-dev", path = "../../block-builder" }
sp-test-primitives = { version = "2.0.0", path = "../../../primitives/test-primitives" }
Expand Down
Loading