Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,8 @@ where
/// valid.
pub import_queue: Box<dyn ImportQueue<B>>,

/// Factory function that creates a new instance of chain sync.
pub create_chain_sync: Box<
dyn FnOnce(
sc_network_common::sync::SyncMode,
Arc<Client>,
Option<Arc<dyn WarpSyncProvider<B>>>,
) -> crate::error::Result<Box<dyn ChainSync<B>>>,
>,
/// Instance of chain sync implementation.
pub chain_sync: Box<dyn ChainSync<B>>,

/// Registry for recording prometheus metrics to.
pub metrics_registry: Option<Registry>,
Expand Down Expand Up @@ -138,8 +132,8 @@ where
/// both outgoing and incoming requests.
pub state_request_protocol_config: RequestResponseConfig,

/// Optional warp sync protocol support. Include protocol config and sync provider.
pub warp_sync: Option<(Arc<dyn WarpSyncProvider<B>>, RequestResponseConfig)>,
/// Optional warp sync protocol config.
pub warp_sync_protocol_config: Option<RequestResponseConfig>,
}

/// Role of the local node.
Expand Down Expand Up @@ -352,7 +346,7 @@ impl From<multiaddr::Error> for ParseErr {
}

/// Sync operation mode.
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum SyncMode {
/// Full block download and verification.
Full,
Expand Down
26 changes: 5 additions & 21 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
use crate::{
behaviour::{self, Behaviour, BehaviourOut},
bitswap::Bitswap,
config::{self, parse_str_addr, Params, TransportConfig},
config::{parse_str_addr, Params, TransportConfig},
discovery::DiscoveryConfig,
error::Error,
network_state::{
Expand Down Expand Up @@ -60,7 +60,7 @@ use metrics::{Histogram, HistogramVec, MetricSources, Metrics};
use parking_lot::Mutex;
use sc_client_api::{BlockBackend, ProofProvider};
use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link};
use sc_network_common::sync::{SyncMode, SyncState, SyncStatus};
use sc_network_common::sync::{SyncState, SyncStatus};
use sc_peerset::PeersetHandle;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_blockchain::{HeaderBackend, HeaderMetadata};
Expand Down Expand Up @@ -239,21 +239,6 @@ where

let default_notif_handshake_message = Roles::from(&params.role).encode();

let (warp_sync_provider, warp_sync_protocol_config) = match params.warp_sync {
Some((p, c)) => (Some(p), Some(c)),
None => (None, None),
};

let chain_sync = (params.create_chain_sync)(
match params.network_config.sync_mode {
config::SyncMode::Full => SyncMode::Full,
config::SyncMode::Fast { skip_proofs, storage_chain_mode } =>
SyncMode::LightState { skip_proofs, storage_chain_mode },
config::SyncMode::Warp => SyncMode::Warp,
},
params.chain.clone(),
warp_sync_provider,
)?;
let (protocol, peerset_handle, mut known_addresses) = Protocol::new(
From::from(&params.role),
params.chain.clone(),
Expand All @@ -266,7 +251,7 @@ where
)
.collect(),
params.metrics_registry.as_ref(),
chain_sync,
params.chain_sync,
)?;

// List of multiaddresses that we know in the network.
Expand Down Expand Up @@ -303,7 +288,6 @@ where
let is_major_syncing = Arc::new(AtomicBool::new(false));

// Build the swarm.
let client = params.chain.clone();
let (mut swarm, bandwidth): (Swarm<Behaviour<B, Client>>, _) = {
let user_agent = format!(
"{} ({})",
Expand Down Expand Up @@ -389,15 +373,15 @@ where
};

let behaviour = {
let bitswap = params.network_config.ipfs_server.then(|| Bitswap::new(client));
let bitswap = params.network_config.ipfs_server.then(|| Bitswap::new(params.chain));
let result = Behaviour::new(
protocol,
user_agent,
local_public,
discovery_config,
params.block_request_protocol_config,
params.state_request_protocol_config,
warp_sync_protocol_config,
params.warp_sync_protocol_config,
bitswap,
params.light_client_request_protocol_config,
params.network_config.request_response_protocols,
Expand Down
35 changes: 18 additions & 17 deletions client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type TestNetworkService = NetworkService<
/// > **Note**: We return the events stream in order to not possibly lose events between the
/// > construction of the service and the moment the events stream is grabbed.
fn build_test_full_node(
config: config::NetworkConfiguration,
network_config: config::NetworkConfiguration,
) -> (Arc<TestNetworkService>, impl Stream<Item = Event>) {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);

Expand Down Expand Up @@ -111,35 +111,36 @@ fn build_test_full_node(
protocol_config
};

let max_parallel_downloads = config.max_parallel_downloads;
let chain_sync = ChainSync::new(
match network_config.sync_mode {
config::SyncMode::Full => sc_network_common::sync::SyncMode::Full,
config::SyncMode::Fast { skip_proofs, storage_chain_mode } =>
sc_network_common::sync::SyncMode::LightState { skip_proofs, storage_chain_mode },
config::SyncMode::Warp => sc_network_common::sync::SyncMode::Warp,
},
client.clone(),
Box::new(DefaultBlockAnnounceValidator),
network_config.max_parallel_downloads,
None,
)
.unwrap();
let worker = NetworkWorker::new(config::Params {
role: config::Role::Full,
executor: None,
transactions_handler_executor: Box::new(|task| {
async_std::task::spawn(task);
}),
network_config: config,
network_config,
chain: client.clone(),
transaction_pool: Arc::new(crate::config::EmptyTransactionPool),
transaction_pool: Arc::new(config::EmptyTransactionPool),
protocol_id,
import_queue,
create_chain_sync: Box::new(
move |sync_mode, chain, warp_sync_provider| match ChainSync::new(
sync_mode,
chain,
Box::new(DefaultBlockAnnounceValidator),
max_parallel_downloads,
warp_sync_provider,
) {
Ok(chain_sync) => Ok(Box::new(chain_sync)),
Err(error) => Err(Box::new(error).into()),
},
),
chain_sync: Box::new(chain_sync),
metrics_registry: None,
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
warp_sync: None,
warp_sync_protocol_config: None,
})
.unwrap();

Expand Down
32 changes: 18 additions & 14 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,10 +835,25 @@ where
protocol_config
};

let max_parallel_downloads = network_config.max_parallel_downloads;
let block_announce_validator = config
.block_announce_validator
.unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator));
let chain_sync = ChainSync::new(
match network_config.sync_mode {
SyncMode::Full => sc_network_common::sync::SyncMode::Full,
SyncMode::Fast { skip_proofs, storage_chain_mode } =>
sc_network_common::sync::SyncMode::LightState {
skip_proofs,
storage_chain_mode,
},
SyncMode::Warp => sc_network_common::sync::SyncMode::Warp,
},
client.clone(),
block_announce_validator,
network_config.max_parallel_downloads,
Some(warp_sync),
)
.unwrap();
let network = NetworkWorker::new(sc_network::config::Params {
role: if config.is_authority { Role::Authority } else { Role::Full },
executor: None,
Expand All @@ -850,23 +865,12 @@ where
transaction_pool: Arc::new(EmptyTransactionPool),
protocol_id,
import_queue,
create_chain_sync: Box::new(move |sync_mode, chain, warp_sync_provider| {
match ChainSync::new(
sync_mode,
chain,
block_announce_validator,
max_parallel_downloads,
warp_sync_provider,
) {
Ok(chain_sync) => Ok(Box::new(chain_sync)),
Err(error) => Err(Box::new(error).into()),
}
}),
chain_sync: Box::new(chain_sync),
metrics_registry: None,
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
warp_sync: Some((warp_sync, warp_protocol_config)),
warp_sync_protocol_config: Some(warp_protocol_config),
})
.unwrap();

Expand Down
44 changes: 23 additions & 21 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,13 +760,15 @@ where
protocol_config
};

let warp_sync_params = warp_sync.map(|provider| {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) =
WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone());
spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
(provider, protocol_config)
});
let (warp_sync_provider, warp_sync_protocol_config) = warp_sync
.map(|provider| {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) =
WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone());
spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
(Some(provider), Some(protocol_config))
})
.unwrap_or_default();

let light_client_request_protocol_config = {
// Allow both outgoing and incoming requests.
Expand All @@ -776,7 +778,18 @@ where
protocol_config
};

let max_parallel_downloads = config.network.max_parallel_downloads;
let chain_sync = ChainSync::new(
match config.network.sync_mode {
SyncMode::Full => sc_network_common::sync::SyncMode::Full,
SyncMode::Fast { skip_proofs, storage_chain_mode } =>
sc_network_common::sync::SyncMode::LightState { skip_proofs, storage_chain_mode },
SyncMode::Warp => sc_network_common::sync::SyncMode::Warp,
},
client.clone(),
block_announce_validator,
config.network.max_parallel_downloads,
warp_sync_provider,
)?;
let network_params = sc_network::config::Params {
role: config.role.clone(),
executor: {
Expand All @@ -796,22 +809,11 @@ where
transaction_pool: transaction_pool_adapter as _,
protocol_id,
import_queue: Box::new(import_queue),
create_chain_sync: Box::new(
move |sync_mode, chain, warp_sync_provider| match ChainSync::new(
sync_mode,
chain,
block_announce_validator,
max_parallel_downloads,
warp_sync_provider,
) {
Ok(chain_sync) => Ok(Box::new(chain_sync)),
Err(error) => Err(Box::new(error).into()),
},
),
chain_sync: Box::new(chain_sync),
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_request_protocol_config,
state_request_protocol_config,
warp_sync: warp_sync_params,
warp_sync_protocol_config,
light_client_request_protocol_config,
};

Expand Down