Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions prdoc/pr_11004.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
title: 'Statementstore: Forward statements to light clients'
doc:
- audience: Node Dev
description: Forward statements to light clients. For now only done for testing purposes. Later on this needs to be improved to not overwhelm light clients.
crates:
- name: sc-network-statement
bump: patch
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures::{stream, Stream, StreamExt};
use sc_network::{
service::traits::{NotificationEvent, NotificationService},
utils::LruHashSet,
NetworkPeers, ObservedRole,
NetworkPeers,
};
use sc_network_statement::{
config::{MAX_KNOWN_STATEMENTS, MAX_PENDING_STATEMENTS},
Expand Down Expand Up @@ -219,10 +219,7 @@ fn build_handler(
let mut peers = HashMap::new();
peers.insert(
peer_id,
Peer::new_for_testing(
LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_STATEMENTS).unwrap()),
ObservedRole::Full,
),
Peer::new_for_testing(LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_STATEMENTS).unwrap())),
);

for _ in 0..num_threads {
Expand Down
55 changes: 13 additions & 42 deletions substrate/client/network/statement/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use sc_network::{
},
types::ProtocolName,
utils::{interval, LruHashSet},
NetworkBackend, NetworkEventStream, NetworkPeers, ObservedRole,
NetworkBackend, NetworkEventStream, NetworkPeers,
};
use sc_network_sync::{SyncEvent, SyncEventStream};
use sc_network_types::PeerId;
Expand Down Expand Up @@ -318,7 +318,6 @@ pub struct StatementHandler<
pub struct Peer {
/// Holds a set of statements known to this peer.
known_statements: LruHashSet<Hash>,
role: ObservedRole,
}

/// Tracks pending initial sync state for a peer (hashes only, statements fetched on-demand).
Expand Down Expand Up @@ -397,8 +396,8 @@ fn find_sendable_chunk(statements: &[&Statement]) -> ChunkResult {
impl Peer {
/// Create a new peer for testing/benchmarking purposes.
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_for_testing(known_statements: LruHashSet<Hash>, role: ObservedRole) -> Self {
Self { known_statements, role }
pub fn new_for_testing(known_statements: LruHashSet<Hash>) -> Self {
Self { known_statements }
}
}

Expand Down Expand Up @@ -557,31 +556,25 @@ where
async fn handle_notification_event(&mut self, event: NotificationEvent) {
match event {
NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
// only accept peers whose role can be determined
// Only accept peers whose role can be determined
let result = self
.network
.peer_role(peer, handshake)
.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
let _ = result_tx.send(result);
},
NotificationEvent::NotificationStreamOpened { peer, handshake, .. } => {
let Some(role) = self.network.peer_role(peer, handshake) else {
log::debug!(target: LOG_TARGET, "role for {peer} couldn't be determined");
return;
};

NotificationEvent::NotificationStreamOpened { peer, .. } => {
let _was_in = self.peers.insert(
peer,
Peer {
known_statements: LruHashSet::new(
NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
),
role,
},
);
debug_assert!(_was_in.is_none());

if !self.sync.is_major_syncing() && !role.is_light() {
if !self.sync.is_major_syncing() {
let hashes = self.statement_store.statement_hashes();
if !hashes.is_empty() {
self.pending_initial_syncs.insert(peer, PendingInitialSync { hashes });
Expand Down Expand Up @@ -729,12 +722,6 @@ where
return;
};

// Never send statements to light nodes
if peer.role.is_light() {
log::trace!(target: LOG_TARGET, "{who} is a light node, skipping propagation");
return;
}

let to_send: Vec<_> = statements
.iter()
.filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt))
Expand Down Expand Up @@ -870,24 +857,16 @@ mod tests {
#[derive(Clone)]
struct TestNetwork {
reported_peers: Arc<Mutex<Vec<(PeerId, sc_network::ReputationChange)>>>,
peer_roles: Arc<Mutex<HashMap<PeerId, ObservedRole>>>,
}

impl TestNetwork {
fn new() -> Self {
Self {
reported_peers: Arc::new(Mutex::new(Vec::new())),
peer_roles: Arc::new(Mutex::new(HashMap::new())),
}
Self { reported_peers: Arc::new(Mutex::new(Vec::new())) }
}

fn get_reports(&self) -> Vec<(PeerId, sc_network::ReputationChange)> {
self.reported_peers.lock().unwrap().clone()
}

fn set_peer_role(&self, peer: PeerId, role: ObservedRole) {
self.peer_roles.lock().unwrap().insert(peer, role);
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -963,8 +942,8 @@ mod tests {
unimplemented!()
}

fn peer_role(&self, peer: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
self.peer_roles.lock().unwrap().get(&peer).copied()
fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
unimplemented!()
}

async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
Expand Down Expand Up @@ -1226,10 +1205,7 @@ mod tests {
let mut peers = HashMap::new();
peers.insert(
peer_id,
Peer {
known_statements: LruHashSet::new(NonZeroUsize::new(100).unwrap()),
role: ObservedRole::Full,
},
Peer { known_statements: LruHashSet::new(NonZeroUsize::new(100).unwrap()) },
);

let handler = StatementHandler {
Expand Down Expand Up @@ -1463,7 +1439,7 @@ mod tests {

#[tokio::test]
async fn test_initial_sync_burst_single_peer() {
let (mut handler, statement_store, network, notification_service) =
let (mut handler, statement_store, _network, notification_service) =
build_handler_no_peers();

// Create 20MB of statements (200 statements x 100KB each)
Expand All @@ -1485,7 +1461,6 @@ mod tests {

// Setup peer and simulate connection
let peer_id = PeerId::random();
network.set_peer_role(peer_id, ObservedRole::Full);

handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
Expand Down Expand Up @@ -1547,7 +1522,7 @@ mod tests {

#[tokio::test]
async fn test_initial_sync_burst_multiple_peers_round_robin() {
let (mut handler, statement_store, network, notification_service) =
let (mut handler, statement_store, _network, notification_service) =
build_handler_no_peers();

// Create 20MB of statements (200 statements x 100KB each)
Expand All @@ -1569,9 +1544,6 @@ mod tests {
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let peer3 = PeerId::random();
network.set_peer_role(peer1, ObservedRole::Full);
network.set_peer_role(peer2, ObservedRole::Full);
network.set_peer_role(peer3, ObservedRole::Full);

// Connect peers
for peer in [peer1, peer2, peer3] {
Expand Down Expand Up @@ -1767,7 +1739,7 @@ mod tests {
//
// With the fix, both use max_statement_payload_size(), so the filter will reject
// statements that wouldn't fit in find_sendable_chunk.
let (mut handler, statement_store, network, notification_service) =
let (mut handler, statement_store, _network, notification_service) =
build_handler_no_peers();

let payload_limit = max_statement_payload_size();
Expand Down Expand Up @@ -1804,7 +1776,6 @@ mod tests {

// Setup peer and simulate connection
let peer_id = PeerId::random();
network.set_peer_role(peer_id, ObservedRole::Full);

handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
Expand Down
Loading