Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ fn fill_network_configuration(
wasm_external_transport: None,
};

config.max_parallel_downloads = cli.max_parallel_downloads;

Ok(())
}

Expand Down
11 changes: 9 additions & 2 deletions core/cli/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ pub struct NetworkConfigurationParams {
pub port: Option<u16>,

/// Specify the number of outgoing connections we're trying to maintain.
#[structopt(long = "out-peers", value_name = "OUT_PEERS", default_value = "25")]
#[structopt(long = "out-peers", value_name = "COUNT", default_value = "25")]
pub out_peers: u32,

/// Specify the maximum number of incoming connections we're accepting.
#[structopt(long = "in-peers", value_name = "IN_PEERS", default_value = "25")]
#[structopt(long = "in-peers", value_name = "COUNT", default_value = "25")]
pub in_peers: u32,

/// Disable mDNS discovery.
Expand All @@ -160,6 +160,13 @@ pub struct NetworkConfigurationParams {
#[structopt(long = "no-mdns")]
pub no_mdns: bool,

/// Maximum number of peers to ask the same blocks in parallel.
///
/// This allows downlading announced blocks from multiple peers. Decrease to save
/// traffic and risk increased latency.
#[structopt(long = "max-parallel-downloads", value_name = "COUNT", default_value = "5")]
pub max_parallel_downloads: u32,

#[allow(missing_docs)]
#[structopt(flatten)]
pub node_key_params: NodeKeyParams
Expand Down
6 changes: 3 additions & 3 deletions core/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ impl TestNetFactory for GrandpaTestNet {

fn default_config() -> ProtocolConfig {
// the authority role ensures gossip hits all nodes here.
ProtocolConfig {
roles: Roles::AUTHORITY,
}
let mut config = ProtocolConfig::default();
config.roles = Roles::AUTHORITY;
config
}

fn make_verifier(
Expand Down
5 changes: 4 additions & 1 deletion core/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct Params<B: BlockT, S, H: ExHashT> {
pub specialization: S,

/// Type to check incoming block announcements.
pub block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
pub block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
}

bitflags! {
Expand Down Expand Up @@ -261,6 +261,8 @@ pub struct NetworkConfiguration {
pub node_name: String,
/// Configuration for the transport layer.
pub transport: TransportConfig,
/// Maximum number of peers to ask the same blocks in parallel.
pub max_parallel_downloads: u32,
}

impl Default for NetworkConfiguration {
Expand All @@ -282,6 +284,7 @@ impl Default for NetworkConfiguration {
enable_mdns: false,
wasm_external_transport: None,
},
max_parallel_downloads: 5,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions core/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,15 @@ struct ContextData<B: BlockT, H: ExHashT> {
pub struct ProtocolConfig {
/// Assigned roles.
pub roles: Roles,
/// Maximum number of peers to ask the same blocks in parallel.
pub max_parallel_downloads: u32,
}

impl Default for ProtocolConfig {
fn default() -> ProtocolConfig {
ProtocolConfig {
roles: Roles::FULL,
max_parallel_downloads: 5,
}
}
}
Expand All @@ -393,6 +396,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
&info,
finality_proof_request_builder,
block_announce_validator,
config.max_parallel_downloads,
);
let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config);
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
Expand Down
28 changes: 19 additions & 9 deletions core/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ pub struct ChainSync<B: BlockT> {
/// A flag that caches idle state with no pending requests.
is_idle: bool,
/// A type to check incoming block announcements.
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
/// Maximum number of peers to ask the same blocks in parallel.
max_parallel_downloads: u32,
}

/// All the data we have about a Peer that we are trying to sync with
Expand Down Expand Up @@ -282,7 +284,8 @@ impl<B: BlockT> ChainSync<B> {
client: Arc<dyn crate::chain::Client<B>>,
info: &ClientInfo<B>,
request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
max_parallel_downloads: u32,
) -> Self {
let mut required_block_attributes = BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION;

Expand All @@ -306,6 +309,7 @@ impl<B: BlockT> ChainSync<B> {
fork_targets: Default::default(),
is_idle: false,
block_announce_validator,
max_parallel_downloads,
}
}

Expand Down Expand Up @@ -571,6 +575,7 @@ impl<B: BlockT> ChainSync<B> {
let best_queued = self.best_queued_number;
let client = &self.client;
let queue = &self.queue_blocks;
let max_parallel = if major_sync { 1 } else { self.max_parallel_downloads };
let iter = self.peers.iter_mut().filter_map(move |(id, peer)| {
if !peer.state.is_available() {
trace!(target: "sync", "Peer {} is busy", id);
Expand All @@ -592,13 +597,19 @@ impl<B: BlockT> ChainSync<B> {
peer.state = PeerSyncState::DownloadingStale(hash);
have_requests = true;
Some((id.clone(), req))
} else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs, major_sync) {
} else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs, max_parallel) {
peer.state = PeerSyncState::DownloadingNew(range.start);
trace!(target: "sync", "New block request for {}", id);
trace!(
target: "sync",
"New block request for {}, (best:{}, common:{}) {:?}",
id,
peer.best_number,
peer.common_number,
req,
);
have_requests = true;
Some((id.clone(), req))
} else {
trace!(target: "sync", "No new block request for {}", id);
None
}
});
Expand Down Expand Up @@ -1006,7 +1017,7 @@ impl<B: BlockT> ChainSync<B> {
{
let header = &announce.header;
let number = *header.number();
debug!(target: "sync", "Received block announcement with number {:?}", number);
debug!(target: "sync", "Received block announcement {:?} with number {:?} from {}", hash, number, who);
if number.is_zero() {
warn!(target: "sync", "Ignored genesis block (#0) announcement from {}: {}", who, hash);
return OnBlockAnnounce::Nothing
Expand Down Expand Up @@ -1226,15 +1237,14 @@ fn peer_block_request<B: BlockT>(
peer: &PeerSync<B>,
blocks: &mut BlockCollection<B>,
attrs: &message::BlockAttributes,
major_sync: bool,
max_parallel_downloads: u32,
) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
let max_parallel = if major_sync { 1 } else { 3 };
if let Some(range) = blocks.needed_blocks(
id.clone(),
MAX_BLOCKS_TO_REQUEST,
peer.best_number,
peer.common_number,
max_parallel,
max_parallel_downloads,
) {
let request = message::generic::BlockRequest {
id: 0,
Expand Down
4 changes: 4 additions & 0 deletions core/network/src/protocol/sync/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ impl<B: BlockT> BlockCollection<B> {
max_parallel: u32,
) -> Option<Range<NumberFor<B>>>
{
if peer_best <= common {
// Bail out early
return None;
}
// First block number that we need to download
let first_different = common + <NumberFor<B>>::one();
let count = (count as u32).into();
Expand Down
5 changes: 4 additions & 1 deletion core/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
let num_connected = Arc::new(AtomicUsize::new(0));
let is_major_syncing = Arc::new(AtomicBool::new(false));
let (protocol, peerset_handle) = Protocol::new(
protocol::ProtocolConfig { roles: params.roles },
protocol::ProtocolConfig {
roles: params.roles,
max_parallel_downloads: params.network_config.max_parallel_downloads,
},
params.chain,
params.on_demand.as_ref().map(|od| od.checker().clone())
.unwrap_or(Arc::new(AlwaysBadChecker)),
Expand Down
1 change: 1 addition & 0 deletions core/service/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ fn node_config<G, E: Clone> (
enable_mdns: false,
wasm_external_transport: None,
},
max_parallel_downloads: NetworkConfiguration::default().max_parallel_downloads,
};

Configuration {
Expand Down