Skip to content

Commit

Permalink
Rework the event system of sc-network (paritytech#1370)
Browse files Browse the repository at this point in the history
This commit introduces a new concept called `NotificationService` which
allows Polkadot protocols to communicate with the underlying
notification protocol implementation directly, without routing events
through `NetworkWorker`. This implies that each protocol has its own
service which it uses to communicate with remote peers and that each
`NotificationService` is unique with respect to the underlying
notification protocol, meaning `NotificationService` for the transaction
protocol can only be used to send and receive transaction-related
notifications.

The `NotificationService` concept introduces two additional benefits:
  * allow protocols to start using custom handshakes
  * allow protocols to accept/reject inbound peers

Previously the validation of inbound connections was solely the
responsibility of `ProtocolController`. This caused issues with light
peers and `SyncingEngine` as `ProtocolController` would accept more
peers than `SyncingEngine` could accept which caused peers to have
differing views of their own states. `SyncingEngine` would reject excess
peers but these rejections were not properly communicated to those peers
causing them to assume that they were accepted.

With `NotificationService`, the local handshake is not sent to remote
peer if peer is rejected which allows it to detect that it was rejected.

This commit also deprecates the use of `NetworkEventStream` for all
notification-related events and going forward only DHT events are
provided through `NetworkEventStream`. If protocols wish to follow each
other's events, they must introduce additional abtractions, as is done
for GRANDPA and transactions protocols by following the syncing protocol
through `SyncEventStream`.

Fixes paritytech#512
Fixes paritytech#514
Fixes paritytech#515
Fixes paritytech#554
Fixes paritytech#556

---
These changes are transferred from
paritytech/substrate#14197 but there are no
functional changes compared to that PR

---------

Co-authored-by: Dmitry Markin <[email protected]>
Co-authored-by: Alexandru Vasile <[email protected]>
  • Loading branch information
3 people authored Nov 28, 2023
1 parent cd9ee20 commit 62d6f29
Show file tree
Hide file tree
Showing 86 changed files with 4,545 additions and 1,842 deletions.
7 changes: 4 additions & 3 deletions substrate/bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
&client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"),
&config.chain_spec,
);
net_config.add_notification_protocol(sc_consensus_grandpa::grandpa_peers_set_config(
grandpa_protocol_name.clone(),
));
let (grandpa_protocol_config, grandpa_notification_service) =
sc_consensus_grandpa::grandpa_peers_set_config(grandpa_protocol_name.clone());
net_config.add_notification_protocol(grandpa_protocol_config);

let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new(
backend.clone(),
Expand Down Expand Up @@ -316,6 +316,7 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
link: grandpa_link,
network,
sync: Arc::new(sync_service),
notification_service: grandpa_notification_service,
voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(),
prometheus_registry,
shared_voter_state: SharedVoterState::empty(),
Expand Down
33 changes: 18 additions & 15 deletions substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,28 +370,28 @@ pub fn new_full_base(
let shared_voter_state = rpc_setup;
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);

let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed");

let grandpa_protocol_name = grandpa::protocol_standard_name(&genesis_hash, &config.chain_spec);
net_config.add_notification_protocol(grandpa::grandpa_peers_set_config(
grandpa_protocol_name.clone(),
));
let (grandpa_protocol_config, grandpa_notification_service) =
grandpa::grandpa_peers_set_config(grandpa_protocol_name.clone());
net_config.add_notification_protocol(grandpa_protocol_config);

let statement_handler_proto = sc_network_statement::StatementHandlerPrototype::new(
genesis_hash,
config.chain_spec.fork_id(),
);
net_config.add_notification_protocol(statement_handler_proto.set_config());
let (statement_handler_proto, statement_config) =
sc_network_statement::StatementHandlerPrototype::new(
genesis_hash,
config.chain_spec.fork_id(),
);
net_config.add_notification_protocol(statement_config);

let mixnet_protocol_name =
sc_mixnet::protocol_name(genesis_hash.as_ref(), config.chain_spec.fork_id());
if let Some(mixnet_config) = &mixnet_config {
net_config.add_notification_protocol(sc_mixnet::peers_set_config(
mixnet_protocol_name.clone(),
mixnet_config,
));
}
let mixnet_notification_service = mixnet_config.as_ref().map(|mixnet_config| {
let (config, notification_service) =
sc_mixnet::peers_set_config(mixnet_protocol_name.clone(), mixnet_config);
net_config.add_notification_protocol(config);
notification_service
});

let warp_sync = Arc::new(grandpa::warp_proof::NetworkProvider::new(
backend.clone(),
Expand Down Expand Up @@ -422,6 +422,8 @@ pub fn new_full_base(
mixnet_protocol_name,
transaction_pool.clone(),
Some(keystore_container.keystore()),
mixnet_notification_service
.expect("`NotificationService` exists since mixnet was enabled; qed"),
);
task_manager.spawn_handle().spawn("mixnet", None, mixnet);
}
Expand Down Expand Up @@ -590,6 +592,7 @@ pub fn new_full_base(
link: grandpa_link,
network: network.clone(),
sync: Arc::new(sync_service.clone()),
notification_service: grandpa_notification_service,
telemetry: telemetry.as_ref().map(|x| x.handle()),
voting_rule: grandpa::VotingRulesBuilder::default().build(),
prometheus_registry: prometheus_registry.clone(),
Expand Down
12 changes: 9 additions & 3 deletions substrate/client/consensus/beefy/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,16 @@ pub(crate) mod beefy_protocol_name {
/// For standard protocol name see [`beefy_protocol_name::gossip_protocol_name`].
pub fn beefy_peers_set_config(
gossip_protocol_name: sc_network::ProtocolName,
) -> sc_network::config::NonDefaultSetConfig {
let mut cfg = sc_network::config::NonDefaultSetConfig::new(gossip_protocol_name, 1024 * 1024);
) -> (sc_network::config::NonDefaultSetConfig, Box<dyn sc_network::NotificationService>) {
let (mut cfg, notification_service) = sc_network::config::NonDefaultSetConfig::new(
gossip_protocol_name,
Vec::new(),
1024 * 1024,
None,
Default::default(),
);
cfg.allow_non_reserved(25, 25);
cfg
(cfg, notification_service)
}

// cost scalars for reporting peers.
Expand Down
6 changes: 5 additions & 1 deletion substrate/client/consensus/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use parking_lot::Mutex;
use prometheus::Registry;
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer};
use sc_consensus::BlockImport;
use sc_network::{NetworkRequest, ProtocolName};
use sc_network::{NetworkRequest, NotificationService, ProtocolName};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{
Expand Down Expand Up @@ -178,6 +178,8 @@ pub struct BeefyNetworkParams<B: Block, N, S> {
pub network: Arc<N>,
/// Syncing service implementing a sync oracle and an event stream for peers.
pub sync: Arc<S>,
/// Handle for receiving notification events.
pub notification_service: Box<dyn NotificationService>,
/// Chain specific BEEFY gossip protocol name. See
/// [`communication::beefy_protocol_name::gossip_protocol_name`].
pub gossip_protocol_name: ProtocolName,
Expand Down Expand Up @@ -243,6 +245,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
let BeefyNetworkParams {
network,
sync,
notification_service,
gossip_protocol_name,
justifications_protocol_name,
..
Expand All @@ -264,6 +267,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
let gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
notification_service,
gossip_protocol_name.clone(),
gossip_validator.clone(),
None,
Expand Down
41 changes: 37 additions & 4 deletions substrate/client/consensus/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use substrate_test_runtime_client::{BlockBuilderExt, ClientExt};
use tokio::time::Duration;

const GENESIS_HASH: H256 = H256::zero();
fn beefy_gossip_proto_name() -> ProtocolName {
pub(crate) fn beefy_gossip_proto_name() -> ProtocolName {
gossip_protocol_name(GENESIS_HASH, None)
}

Expand Down Expand Up @@ -371,6 +371,7 @@ async fn voter_init_setup(
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
net.peer(0).network_service().clone(),
net.peer(0).sync_service().clone(),
net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(),
"/beefy/whatever",
gossip_validator,
None,
Expand All @@ -392,6 +393,14 @@ where
{
let tasks = FuturesUnordered::new();

let mut notification_services = peers
.iter()
.map(|(peer_id, _, _)| {
let peer = &mut net.peers[*peer_id];
(*peer_id, peer.take_notification_service(&beefy_gossip_proto_name()).unwrap())
})
.collect::<std::collections::HashMap<_, _>>();

for (peer_id, key, api) in peers.into_iter() {
let peer = &net.peers[peer_id];

Expand All @@ -409,6 +418,7 @@ where
let network_params = crate::BeefyNetworkParams {
network: peer.network_service().clone(),
sync: peer.sync_service().clone(),
notification_service: notification_services.remove(&peer_id).unwrap(),
gossip_protocol_name: beefy_gossip_proto_name(),
justifications_protocol_name: on_demand_justif_handler.protocol_name(),
_phantom: PhantomData,
Expand Down Expand Up @@ -1045,7 +1055,25 @@ async fn should_initialize_voter_at_custom_genesis() {
net.peer(0).client().as_client().finalize_block(hashes[8], None).unwrap();

// load persistent state - nothing in DB, should init at genesis
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
//
// NOTE: code from `voter_init_setup()` is moved here because the new network event system
// doesn't allow creating a new `GossipEngine` as the notification handle is consumed by the
// first `GossipEngine`
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let (gossip_validator, _) = GossipValidator::new(known_peers);
let gossip_validator = Arc::new(gossip_validator);
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
net.peer(0).network_service().clone(),
net.peer(0).sync_service().clone(),
net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(),
"/beefy/whatever",
gossip_validator,
None,
);
let (beefy_genesis, best_grandpa) =
wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
let persisted_state =
load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1).unwrap();

// Test initialization at session boundary.
// verify voter initialized with single session starting at block `custom_pallet_genesis` (7)
Expand Down Expand Up @@ -1075,7 +1103,11 @@ async fn should_initialize_voter_at_custom_genesis() {

net.peer(0).client().as_client().finalize_block(hashes[10], None).unwrap();
// load persistent state - state preset in DB, but with different pallet genesis
let new_persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
// the network state persists and uses the old `GossipEngine` initialized for `peer(0)`
let (beefy_genesis, best_grandpa) =
wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
let new_persisted_state =
load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1).unwrap();

// verify voter initialized with single session starting at block `new_pallet_genesis` (10)
let sessions = new_persisted_state.voting_oracle().sessions();
Expand Down Expand Up @@ -1371,7 +1403,7 @@ async fn gossipped_finality_proofs() {
let api = Arc::new(TestApi::with_validator_set(&validator_set));
let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect();

let charlie = &net.peers[2];
let charlie = &mut net.peers[2];
let known_peers = Arc::new(Mutex::new(KnownPeers::<Block>::new()));
// Charlie will run just the gossip engine and not the full voter.
let (gossip_validator, _) = GossipValidator::new(known_peers);
Expand All @@ -1384,6 +1416,7 @@ async fn gossipped_finality_proofs() {
let mut charlie_gossip_engine = sc_network_gossip::GossipEngine::new(
charlie.network_service().clone(),
charlie.sync_service().clone(),
charlie.take_notification_service(&beefy_gossip_proto_name()).unwrap(),
beefy_gossip_proto_name(),
charlie_gossip_validator.clone(),
None,
Expand Down
4 changes: 4 additions & 0 deletions substrate/client/consensus/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1145,12 +1145,16 @@ pub(crate) mod tests {
let api = Arc::new(TestApi::with_validator_set(&genesis_validator_set));
let network = peer.network_service().clone();
let sync = peer.sync_service().clone();
let notification_service = peer
.take_notification_service(&crate::tests::beefy_gossip_proto_name())
.unwrap();
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let (gossip_validator, gossip_report_stream) = GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
notification_service,
"/beefy/1",
gossip_validator.clone(),
None,
Expand Down
4 changes: 3 additions & 1 deletion substrate/client/consensus/grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use finality_grandpa::{
Message::{Precommit, Prevote, PrimaryPropose},
};
use parity_scale_codec::{Decode, DecodeAll, Encode};
use sc_network::{NetworkBlock, NetworkSyncForkRequest, ReputationChange};
use sc_network::{NetworkBlock, NetworkSyncForkRequest, NotificationService, ReputationChange};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
use sp_keystore::KeystorePtr;
Expand Down Expand Up @@ -247,6 +247,7 @@ impl<B: BlockT, N: Network<B>, S: Syncing<B>> NetworkBridge<B, N, S> {
pub(crate) fn new(
service: N,
sync: S,
notification_service: Box<dyn NotificationService>,
config: crate::Config,
set_state: crate::environment::SharedVoterSetState<B>,
prometheus_registry: Option<&Registry>,
Expand All @@ -260,6 +261,7 @@ impl<B: BlockT, N: Network<B>, S: Syncing<B>> NetworkBridge<B, N, S> {
let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
service.clone(),
sync.clone(),
notification_service,
protocol,
validator.clone(),
prometheus_registry,
Expand Down
Loading

0 comments on commit 62d6f29

Please sign in to comment.