From 71b5ee76ef0b20f5325c299fd0bff2ffddf0ab07 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 27 Jun 2018 17:10:56 +0300 Subject: [PATCH 1/2] fix light sync deadlock --- substrate/network/src/chain.rs | 5 +- substrate/network/src/error.rs | 2 + substrate/network/src/import_queue.rs | 535 ++++++++++++++++++++++++++ substrate/network/src/lib.rs | 1 + substrate/network/src/protocol.rs | 8 +- substrate/network/src/service.rs | 10 +- substrate/network/src/sync.rs | 127 +++--- substrate/network/src/test/mod.rs | 4 +- 8 files changed, 602 insertions(+), 90 deletions(-) create mode 100644 substrate/network/src/import_queue.rs diff --git a/substrate/network/src/chain.rs b/substrate/network/src/chain.rs index a2eff9b54d8a5..9505e96e93388 100644 --- a/substrate/network/src/chain.rs +++ b/substrate/network/src/chain.rs @@ -25,7 +25,7 @@ use runtime_primitives::bft::Justification; pub trait Client: Send + Sync { /// Import a new block. Parent is supposed to be existing in the blockchain. - fn import(&self, is_best: bool, header: Block::Header, justification: Justification, body: Option>) -> Result; + fn import(&self, origin: BlockOrigin, header: Block::Header, justification: Justification, body: Option>) -> Result; /// Get blockchain info. fn info(&self) -> Result, Error>; @@ -55,10 +55,9 @@ impl Client for PolkadotClient where Block: BlockT, Error: From<<>::State as state_machine::backend::Backend>::Error>, { - fn import(&self, is_best: bool, header: Block::Header, justification: Justification, body: Option>) -> Result { + fn import(&self, origin: BlockOrigin, header: Block::Header, justification: Justification, body: Option>) -> Result { // TODO: defer justification check. let justified_header = self.check_justification(header, justification.into())?; - let origin = if is_best { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync }; (self as &PolkadotClient).import_block(origin, justified_header, body) } diff --git a/substrate/network/src/error.rs b/substrate/network/src/error.rs index 120cfe0b4f358..790cb8ea47af1 100644 --- a/substrate/network/src/error.rs +++ b/substrate/network/src/error.rs @@ -16,12 +16,14 @@ //! Polkadot service possible errors. +use std::io::Error as IoError; use network::Error as NetworkError; use client; error_chain! { foreign_links { Network(NetworkError) #[doc = "Devp2p error."]; + Io(IoError) #[doc = "IO error."]; } links { diff --git a/substrate/network/src/import_queue.rs b/substrate/network/src/import_queue.rs new file mode 100644 index 0000000000000..637257bfd0cee --- /dev/null +++ b/substrate/network/src/import_queue.rs @@ -0,0 +1,535 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see .? + +//! Blocks import queue. + +use std::collections::{HashSet, VecDeque}; +use std::sync::{Arc, Weak}; +use std::sync::atomic::{AtomicBool, Ordering}; +use parking_lot::{Condvar, Mutex, RwLock}; + +use client::{BlockOrigin, BlockStatus, ImportResult}; +use network::PeerId; +use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Zero}; + +use blocks::BlockData; +use chain::Client; +use error::{ErrorKind, Error}; +use io::SyncIo; +use protocol::Protocol; +use service::{ExecuteInContext, Service}; +use sync::ChainSync; + +/// Blocks import queue API. +pub trait ImportQueue: Send + Sync where B::Header: HeaderT { + fn start(&self, service: Weak>, chain: Weak>) -> Result<(), Error>; + fn clear(&self); + fn status(&self) -> ImportQueueStatus; + fn is_importing(&self, hash: &B::Hash) -> bool; + fn import_blocks(&self, sync: &mut ChainSync, io: &mut SyncIo, protocol: &Protocol, blocks: (BlockOrigin, Vec>)); +} + +/// Import queue status. It isn't completely accurate. +pub struct ImportQueueStatus { + /// Number of blocks that are currently in the queue. + pub importing_count: usize, + /// The number of the best block that was ever in the queue since start/last failure. + pub best_importing_number: <::Header as HeaderT>::Number, +} + +/// Blocks import queue that is importing blocks in the separate thread. +pub struct AsyncImportQueue { + handle: Mutex>>, + data: Arc>, +} + +/// Locks order: queue, queue_blocks, best_importing_number +struct AsyncImportQueueData { + signal: Condvar, + queue: Mutex>)>>, + queue_blocks: RwLock>, + best_importing_number: RwLock<<::Header as HeaderT>::Number>, + is_stopping: AtomicBool, +} + +impl AsyncImportQueue where B::Header: HeaderT { + pub fn new() -> Self { + Self { + handle: Mutex::new(None), + data: Arc::new(AsyncImportQueueData::new()), + } + } +} + +impl AsyncImportQueueData where B::Header: HeaderT { + pub fn new() -> Self { + Self { + signal: Default::default(), + queue: Mutex::new(VecDeque::new()), + queue_blocks: RwLock::new(HashSet::new()), + best_importing_number: RwLock::new(Zero::zero()), + is_stopping: Default::default(), + } + } +} + +impl ImportQueue for AsyncImportQueue where B::Header: HeaderT { + fn start(&self, service: Weak>, chain: Weak>) -> Result<(), Error> { + debug_assert!(self.handle.lock().is_none()); + + let qdata = self.data.clone(); + *self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || { + import_thread(service, chain, qdata) + }).map_err(|err| Error::from(ErrorKind::Io(err)))?); + Ok(()) + } + + fn clear(&self) { + let mut queue = self.data.queue.lock(); + let mut queue_blocks = self.data.queue_blocks.write(); + let mut best_importing_number = self.data.best_importing_number.write(); + queue_blocks.clear(); + queue.clear(); + *best_importing_number = Zero::zero(); + } + + fn status(&self) -> ImportQueueStatus { + ImportQueueStatus { + importing_count: self.data.queue_blocks.read().len(), + best_importing_number: *self.data.best_importing_number.read(), + } + } + + fn is_importing(&self, hash: &B::Hash) -> bool { + self.data.queue_blocks.read().contains(hash) + } + + fn import_blocks(&self, _sync: &mut ChainSync, _io: &mut SyncIo, _protocol: &Protocol, blocks: (BlockOrigin, Vec>)) { + let mut queue = self.data.queue.lock(); + let mut queue_blocks = self.data.queue_blocks.write(); + let mut best_importing_number = self.data.best_importing_number.write(); + let new_best_importing_number = blocks.1.last().and_then(|b| b.block.header.as_ref().map(|h| h.number().clone())).unwrap_or_default(); + queue_blocks.extend(blocks.1.iter().map(|b| b.block.hash.clone())); + if new_best_importing_number > *best_importing_number { + *best_importing_number = new_best_importing_number; + } + queue.push_back(blocks); + self.data.signal.notify_one(); + } +} + +impl Drop for AsyncImportQueue { + fn drop(&mut self) { + if let Some(handle) = self.handle.lock().take() { + self.data.is_stopping.store(true, Ordering::SeqCst); + let _ = handle.join(); + } + } +} + +/// Blocks import thread. +fn import_thread(service: Weak>, chain: Weak>, qdata: Arc>) where B::Header: HeaderT { + loop { + let new_blocks = { + let mut queue_lock = qdata.queue.lock(); + if queue_lock.is_empty() { + qdata.signal.wait(&mut queue_lock); + } + + match queue_lock.pop_front() { + Some(new_blocks) => new_blocks, + None => break, + } + }; + + if qdata.is_stopping.load(Ordering::SeqCst) { + break; + } + + match (service.upgrade(), chain.upgrade()) { + (Some(service), Some(chain)) => { + let blocks_hashes: Vec = new_blocks.1.iter().map(|b| b.block.hash.clone()).collect(); + if !import_many_blocks(&mut SyncLink::Indirect(&*service), &*chain, Some(&*qdata), new_blocks) { + break; + } + + let mut queue_blocks = qdata.queue_blocks.write(); + for blocks_hash in blocks_hashes { + queue_blocks.remove(&blocks_hash); + } + }, + _ => break, + } + } + + trace!(target: "sync", "Stopping import thread"); +} + +/// ChainSync link trait. +trait SyncLinkApi { + /// Block imported. + fn block_imported(&mut self, hash: &B::Hash, number: u64); + /// Maintain sync. + fn maintain_sync(&mut self); + /// Disconnect from peer. + fn disconnect(&mut self, peer_id: PeerId); + /// Disconnect from peer and restart sync. + fn disconnect_and_restart(&mut self, peer_id: PeerId); + /// Restart sync. + fn restart(&mut self); +} + +/// Link with the ChainSync service. +enum SyncLink<'a, B: 'static + BlockT> where B::Header: HeaderT { + /// Indirect link (through service). + Indirect(&'a Service), + /// Direct references are given. + #[cfg(test)] + Direct(&'a mut ChainSync, &'a mut SyncIo, &'a Protocol), +} + +/// Block import successful result. +#[derive(Debug, PartialEq)] +enum BlockImportResult { + /// Block is not imported. + NotImported(H, N), + /// Imported known block. + ImportedKnown(H, N), + /// Imported unknown block. + ImportedUnknown(H, N), +} + +/// Block import error. +#[derive(Debug, PartialEq)] +enum BlockImportError { + /// Disconnect from peer and continue import of next bunch of blocks. + Disconnect(PeerId), + /// Disconnect from peer and restart sync. + DisconnectAndRestart(PeerId), + /// Restart sync. + Restart, +} + +/// Import a bunch of blocks. +fn import_many_blocks<'a, B: BlockT>( + link: &mut SyncLinkApi, + chain: &Client, + qdata: Option<&AsyncImportQueueData>, + blocks: (BlockOrigin, Vec>) +) -> bool where B::Header: HeaderT { + let (blocks_origin, blocks) = blocks; + let count = blocks.len(); + let mut imported = 0; + // Blocks in the response/drain should be in ascending order. + for block in blocks { + let import_result = import_single_block(chain, blocks_origin.clone(), block); + let is_import_failed = import_result.is_err(); + imported += process_import_result(link, import_result); + if is_import_failed { + qdata.map(|qdata| *qdata.best_importing_number.write() = Zero::zero()); + return true; + } + + if qdata.map(|qdata| qdata.is_stopping.load(Ordering::SeqCst)).unwrap_or_default() { + return false; + } + } + + trace!(target: "sync", "Imported {} of {}", imported, count); + link.maintain_sync(); + true +} + +/// Single block import function. +fn import_single_block( + chain: &Client, + block_origin: BlockOrigin, + block: BlockData +) -> Result::Header as HeaderT>::Number>, BlockImportError> { + let origin = block.origin; + let block = block.block; + match (block.header, block.justification) { + (Some(header), Some(justification)) => { + let number = header.number().clone(); + let hash = header.hash(); + let parent = header.parent_hash().clone(); + + // check whether the block is known before importing. + match chain.block_status(&BlockId::Hash(hash)) { + Ok(BlockStatus::InChain) => return Ok(BlockImportResult::NotImported(hash, number)), + Ok(_) => {}, + Err(e) => { + debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e); + return Err(BlockImportError::Restart); + } + } + + let result = chain.import( + block_origin, + header, + justification, + block.body.map(|b| b.to_extrinsics()), + ); + match result { + Ok(ImportResult::AlreadyInChain) => { + trace!(target: "sync", "Block already in chain {}: {:?}", number, hash); + Ok(BlockImportResult::ImportedKnown(hash, number)) + }, + Ok(ImportResult::AlreadyQueued) => { + trace!(target: "sync", "Block already queued {}: {:?}", number, hash); + Ok(BlockImportResult::ImportedKnown(hash, number)) + }, + Ok(ImportResult::Queued) => { + trace!(target: "sync", "Block queued {}: {:?}", number, hash); + Ok(BlockImportResult::ImportedUnknown(hash, number)) + }, + Ok(ImportResult::UnknownParent) => { + debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent); + Err(BlockImportError::Restart) + }, + Ok(ImportResult::KnownBad) => { + debug!(target: "sync", "Bad block {}: {:?}", number, hash); + Err(BlockImportError::DisconnectAndRestart(origin)) //TODO: use persistent ID + } + Err(e) => { + debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e); + Err(BlockImportError::Restart) + } + } + }, + (None, _) => { + debug!(target: "sync", "Header {} was not provided by {} ", block.hash, origin); + Err(BlockImportError::Disconnect(origin)) //TODO: use persistent ID + }, + (_, None) => { + debug!(target: "sync", "Justification set for block {} was not provided by {} ", block.hash, origin); + Err(BlockImportError::Disconnect(origin)) //TODO: use persistent ID + } + } +} + +/// Process single block import result. +fn process_import_result<'a, B: BlockT>( + link: &mut SyncLinkApi, + result: Result::Header as HeaderT>::Number>, BlockImportError> +) -> usize where B::Header: HeaderT { + match result { + Ok(BlockImportResult::NotImported(_, _)) => 0, + Ok(BlockImportResult::ImportedKnown(hash, number)) => { + link.block_imported(&hash, number); + 0 + }, + Ok(BlockImportResult::ImportedUnknown(hash, number)) => { + link.block_imported(&hash, number); + 1 + }, + Err(BlockImportError::Disconnect(peer_id)) => { + link.disconnect(peer_id); + 0 + }, + Err(BlockImportError::DisconnectAndRestart(peer_id)) => { + link.disconnect_and_restart(peer_id); + 0 + }, + Err(BlockImportError::Restart) => { + link.restart(); + 0 + }, + } +} + +impl<'a, B: 'static + BlockT> SyncLink<'a, B> where B::Header: HeaderT { + /// Execute closure with locked ChainSync. + fn with_sync, &mut SyncIo, &Protocol)>(&mut self, closure: F) { + match *self { + #[cfg(test)] + SyncLink::Direct(ref mut sync, ref mut io, ref protocol) => + closure(*sync, *io, *protocol), + SyncLink::Indirect(ref service) => + service.execute_in_context(move |io, protocol| { + let mut sync = protocol.sync().write(); + closure(&mut *sync, io, protocol) + }), + } + } +} + +impl<'a, B: 'static + BlockT> SyncLinkApi for SyncLink<'a, B> where B::Header: HeaderT { + fn block_imported(&mut self, hash: &B::Hash, number: u64) { + self.with_sync(|sync, _, _| sync.block_imported(&hash, number)) + } + + fn maintain_sync(&mut self) { + self.with_sync(|sync, io, protocol| sync.maintain_sync(io, protocol)) + } + + fn disconnect(&mut self, peer_id: PeerId) { + self.with_sync(|_, io, _| io.disconnect_peer(peer_id)) + } + + fn disconnect_and_restart(&mut self, peer_id: PeerId) { + self.with_sync(|sync, io, protocol| { + io.disconnect_peer(peer_id); + sync.restart(io, protocol); + }) + } + + fn restart(&mut self) { + self.with_sync(|sync, io, protocol| sync.restart(io, protocol)) + } +} + +#[cfg(test)] +pub mod tests { + use client; + use message; + use test_client::{self, TestClient}; + use test_client::runtime::{Block, Hash}; + use super::*; + + /// Blocks import queue that is importing blocks in the same thread. + pub struct SyncImportQueue; + + impl ImportQueue for SyncImportQueue where B::Header: HeaderT { + fn start(&self, _service: Weak>, _chain: Weak>) -> Result<(), Error> { Ok(()) } + + fn clear(&self) { } + + fn status(&self) -> ImportQueueStatus { + ImportQueueStatus { + importing_count: 0, + best_importing_number: Zero::zero(), + } + } + + fn is_importing(&self, _hash: &B::Hash) -> bool { + false + } + + fn import_blocks(&self, sync: &mut ChainSync, io: &mut SyncIo, protocol: &Protocol, blocks: (BlockOrigin, Vec>)) { + let chain = protocol.chain(); + import_many_blocks(&mut SyncLink::Direct(sync, io, protocol), chain, None, blocks); + } + } + + #[derive(Default)] + struct TestLink { + imported: usize, + maintains: usize, + disconnects: usize, + restarts: usize, + } + + impl TestLink { + fn total(&self) -> usize { + self.imported + self.maintains + self.disconnects + self.restarts + } + } + + impl SyncLinkApi for TestLink { + fn block_imported(&mut self, _hash: &B::Hash, _number: u64) { self.imported += 1; } + fn maintain_sync(&mut self) { self.maintains += 1; } + fn disconnect(&mut self, _peer_id: PeerId) { self.disconnects += 1; } + fn disconnect_and_restart(&mut self, _peer_id: PeerId) { self.disconnects += 1; self.restarts += 1; } + fn restart(&mut self) { self.restarts += 1; } + } + + fn prepare_good_block() -> (client::Client, Hash, u64, BlockData) { + let client = test_client::new(); + let block = client.new_block().unwrap().bake().unwrap(); + client.justify_and_import(BlockOrigin::File, block).unwrap(); + + let (hash, number) = (client.block_hash(1).unwrap().unwrap(), 1); + let block = message::BlockData:: { + hash: client.block_hash(1).unwrap().unwrap(), + header: client.header(&BlockId::Number(1)).unwrap(), + body: None, + receipt: None, + message_queue: None, + justification: client.justification(&BlockId::Number(1)).unwrap(), + }; + + (client, hash, number, BlockData { block, origin: 0 }) + } + + #[test] + fn import_single_good_block_works() { + let (_, hash, number, block) = prepare_good_block(); + assert_eq!(import_single_block(&test_client::new(), BlockOrigin::File, block), Ok(BlockImportResult::ImportedUnknown(hash, number))); + } + + #[test] + fn import_single_good_known_block_is_ignored() { + let (client, hash, number, block) = prepare_good_block(); + assert_eq!(import_single_block(&client, BlockOrigin::File, block), Ok(BlockImportResult::NotImported(hash, number))); + } + + #[test] + fn import_single_good_block_without_header_fails() { + let (_, _, _, mut block) = prepare_good_block(); + block.block.header = None; + assert_eq!(import_single_block(&test_client::new(), BlockOrigin::File, block), Err(BlockImportError::Disconnect(0))); + } + + #[test] + fn import_single_good_block_without_justification_fails() { + let (_, _, _, mut block) = prepare_good_block(); + block.block.justification = None; + assert_eq!(import_single_block(&test_client::new(), BlockOrigin::File, block), Err(BlockImportError::Disconnect(0))); + } + + #[test] + fn process_import_result_works() { + let mut link = TestLink::default(); + assert_eq!(process_import_result::(&mut link, Ok(BlockImportResult::NotImported(Default::default(), 0))), 0); + assert_eq!(link.total(), 0); + + let mut link = TestLink::default(); + assert_eq!(process_import_result::(&mut link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 0); + assert_eq!(link.total(), 1); + assert_eq!(link.imported, 1); + + let mut link = TestLink::default(); + assert_eq!(process_import_result::(&mut link, Ok(BlockImportResult::ImportedUnknown(Default::default(), 0))), 1); + assert_eq!(link.total(), 1); + assert_eq!(link.imported, 1); + + let mut link = TestLink::default(); + assert_eq!(process_import_result::(&mut link, Err(BlockImportError::Disconnect(0))), 0); + assert_eq!(link.total(), 1); + assert_eq!(link.disconnects, 1); + + let mut link = TestLink::default(); + assert_eq!(process_import_result::(&mut link, Err(BlockImportError::DisconnectAndRestart(0))), 0); + assert_eq!(link.total(), 2); + assert_eq!(link.disconnects, 1); + assert_eq!(link.restarts, 1); + + let mut link = TestLink::default(); + assert_eq!(process_import_result::(&mut link, Err(BlockImportError::Restart)), 0); + assert_eq!(link.total(), 1); + assert_eq!(link.restarts, 1); + } + + #[test] + fn import_many_blocks_stops_when_stopping() { + let (_, _, _, block) = prepare_good_block(); + let qdata = AsyncImportQueueData::new(); + qdata.is_stopping.store(true, Ordering::SeqCst); + assert!(!import_many_blocks(&mut TestLink::default(), &test_client::new(), Some(&qdata), (BlockOrigin::File, vec![block.clone(), block]))); + } +} diff --git a/substrate/network/src/lib.rs b/substrate/network/src/lib.rs index 15779ea6522cd..0685d9318e1a5 100644 --- a/substrate/network/src/lib.rs +++ b/substrate/network/src/lib.rs @@ -56,6 +56,7 @@ mod chain; mod blocks; mod consensus; mod on_demand; +mod import_queue; pub mod error; #[cfg(test)] mod test; diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index 6113ab735730e..a5852359d008c 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -29,6 +29,7 @@ use message::generic::Message as GenericMessage; use sync::{ChainSync, Status as SyncStatus, SyncState}; use consensus::Consensus; use service::{Role, TransactionPool, BftMessageStream}; +use import_queue::ImportQueue; use config::ProtocolConfig; use chain::Client; use on_demand::OnDemandService; @@ -108,11 +109,12 @@ impl Protocol where pub fn new( config: ProtocolConfig, chain: Arc>, + import_queue: Arc>, on_demand: Option>, transaction_pool: Arc> ) -> error::Result { let info = chain.info()?; - let sync = ChainSync::new(config.roles, &info); + let sync = ChainSync::new(config.roles, &info, import_queue); let protocol = Protocol { config: config, chain: chain, @@ -535,4 +537,8 @@ impl Protocol where pub fn chain(&self) -> &Client { &*self.chain } + + pub fn sync(&self) -> &RwLock> { + &self.sync + } } diff --git a/substrate/network/src/service.rs b/substrate/network/src/service.rs index f26e31337da74..5828451dbda6f 100644 --- a/substrate/network/src/service.rs +++ b/substrate/network/src/service.rs @@ -30,6 +30,7 @@ use error::Error; use chain::Client; use message::LocalizedBftMessage; use on_demand::OnDemandService; +use import_queue::AsyncImportQueue; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; /// Polkadot devp2p protocol id @@ -149,14 +150,21 @@ pub struct Service where B::Header: HeaderT { impl Service where B::Header: HeaderT { /// Creates and register protocol with the network service pub fn new(params: Params) -> Result>, Error> { + let chain = params.chain.clone(); let service = NetworkService::new(params.network_config.clone(), None)?; + let import_queue = Arc::new(AsyncImportQueue::new()); let sync = Arc::new(Service { network: service, handler: Arc::new(ProtocolHandler { - protocol: Protocol::new(params.config, params.chain, params.on_demand, params.transaction_pool)?, + protocol: Protocol::new(params.config, params.chain, import_queue, params.on_demand, params.transaction_pool)?, }), }); + sync.handler.protocol.sync().read().queue().start( + Arc::downgrade(&sync), + Arc::downgrade(&chain) + )?; + Ok(sync) } diff --git a/substrate/network/src/sync.rs b/substrate/network/src/sync.rs index 8a296d4534f4f..633c2a7933c8c 100644 --- a/substrate/network/src/sync.rs +++ b/substrate/network/src/sync.rs @@ -15,18 +15,24 @@ // along with Polkadot. If not, see .? use std::collections::HashMap; +use std::sync::Arc; use io::SyncIo; use protocol::Protocol; use network::PeerId; -use client::{ImportResult, BlockStatus, ClientInfo}; +use client::{BlockOrigin, BlockStatus, ClientInfo}; +use client::error::Error as ClientError; use blocks::{self, BlockCollection}; +use chain::Client; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; use runtime_primitives::generic::BlockId; use message::{self, generic::Message as GenericMessage}; use service::Role; +use import_queue::ImportQueue; // Maximum blocks to request in a single packet. const MAX_BLOCKS_TO_REQUEST: usize = 128; +// Maximum blocks to store in the import queue. +const MAX_IMPORING_BLOCKS: usize = 2048; struct PeerSync { pub common_hash: B::Hash, @@ -52,6 +58,7 @@ pub struct ChainSync { best_queued_number: u64, best_queued_hash: B::Hash, required_block_attributes: Vec, + import_queue: Arc>, } /// Reported sync state. @@ -76,7 +83,7 @@ impl ChainSync where B::Header: HeaderT, { /// Create a new instance. - pub fn new(role: Role, info: &ClientInfo) -> Self { + pub fn new(role: Role, info: &ClientInfo, import_queue: Arc>) -> Self { let mut required_block_attributes = vec![ message::BlockAttribute::Header, message::BlockAttribute::Justification @@ -92,6 +99,7 @@ impl ChainSync where best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash), best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number), required_block_attributes: required_block_attributes, + import_queue, } } @@ -99,6 +107,11 @@ impl ChainSync where self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number) } + /// Returns import queue reference. + pub fn queue(&self) -> &Arc> { + &self.import_queue + } + /// Returns sync status pub fn status(&self) -> Status { let best_seen = self.best_seen_block(); @@ -115,7 +128,7 @@ impl ChainSync where /// Handle new connected peer. pub fn new_peer(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) { if let Some(info) = protocol.peer_info(peer_id) { - match (protocol.chain().block_status(&BlockId::Hash(info.best_hash)), info.best_number) { + match (block_status(&*protocol.chain(), &*self.import_queue, info.best_hash), info.best_number) { (Err(e), _) => { debug!(target:"sync", "Error reading blockchain: {:?}", e); io.disconnect_peer(peer_id); @@ -168,8 +181,6 @@ impl ChainSync where } pub fn on_block_data(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, _request: message::BlockRequest, response: message::BlockResponse) { - let count = response.blocks.len(); - let mut imported: usize = 0; let new_blocks = if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { match peer.state { PeerSyncState::DownloadingNew(start_block) => { @@ -233,90 +244,20 @@ impl ChainSync where }; let best_seen = self.best_seen_block(); - // Blocks in the response/drain should be in ascending order. - for block in new_blocks { - let origin = block.origin; - let block = block.block; - match (block.header, block.justification) { - (Some(header), Some(justification)) => { - let number = header.number().clone(); - let hash = header.hash(); - let parent = header.parent_hash().clone(); - let is_best = best_seen.as_ref().map_or(false, |n| number >= *n); - - // check whether the block is known before importing. - match protocol.chain().block_status(&BlockId::Hash(hash)) { - Ok(BlockStatus::InChain) => continue, - Ok(_) => {}, - Err(e) => { - debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e); - self.restart(io, protocol); - return; - } - } - - let result = protocol.chain().import( - is_best, - header, - justification, - block.body.map(|b| b.to_extrinsics()), - ); - match result { - Ok(ImportResult::AlreadyInChain) => { - trace!(target: "sync", "Block already in chain {}: {:?}", number, hash); - self.block_imported(&hash, number); - }, - Ok(ImportResult::AlreadyQueued) => { - trace!(target: "sync", "Block already queued {}: {:?}", number, hash); - self.block_imported(&hash, number); - }, - Ok(ImportResult::Queued) => { - trace!(target: "sync", "Block queued {}: {:?}", number, hash); - self.block_imported(&hash, number); - imported = imported + 1; - }, - Ok(ImportResult::UnknownParent) => { - debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent); - self.restart(io, protocol); - return; - }, - Ok(ImportResult::KnownBad) => { - debug!(target: "sync", "Bad block {}: {:?}", number, hash); - io.disable_peer(origin); //TODO: use persistent ID - self.restart(io, protocol); - return; - } - Err(e) => { - debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e); - self.restart(io, protocol); - return; - } - } - }, - (None, _) => { - debug!(target: "sync", "Header {} was not provided by {} ", block.hash, origin); - io.disable_peer(origin); //TODO: use persistent ID - return; - }, - (_, None) => { - debug!(target: "sync", "Justification set for block {} was not provided by {} ", block.hash, origin); - io.disable_peer(origin); //TODO: use persistent ID - return; - } - } - } - trace!(target: "sync", "Imported {} of {}", imported, count); - self.maintain_sync(io, protocol); + let is_best = new_blocks.first().and_then(|b| b.block.header.as_ref()).map(|h| best_seen.as_ref().map_or(false, |n| h.number() >= n)); + let origin = if is_best.unwrap_or_default() { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync }; + let import_queue = self.import_queue.clone(); + import_queue.import_blocks(self, io, protocol, (origin, new_blocks)) } - fn maintain_sync(&mut self, io: &mut SyncIo, protocol: &Protocol) { + pub fn maintain_sync(&mut self, io: &mut SyncIo, protocol: &Protocol) { let peers: Vec = self.peers.keys().map(|p| *p).collect(); for peer in peers { self.download_new(io, protocol, peer); } } - fn block_imported(&mut self, hash: &B::Hash, number: u64) { + pub fn block_imported(&mut self, hash: &B::Hash, number: u64) { if number > self.best_queued_number { self.best_queued_number = number; self.best_queued_hash = *hash; @@ -370,7 +311,7 @@ impl ChainSync where fn is_known_or_already_downloading(&self, protocol: &Protocol, hash: &B::Hash) -> bool { self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) - || protocol.chain().block_status(&BlockId::Hash(*hash)).ok().map_or(false, |s| s != BlockStatus::Unknown) + || block_status(&*protocol.chain(), &*self.import_queue, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown) } pub fn peer_disconnected(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) { @@ -380,6 +321,7 @@ impl ChainSync where } pub fn restart(&mut self, io: &mut SyncIo, protocol: &Protocol) { + self.import_queue.clear(); self.blocks.clear(); let ids: Vec = self.peers.keys().map(|p| *p).collect(); for id in ids { @@ -427,10 +369,18 @@ impl ChainSync where // Issue a request for a peer to download new blocks, if any are available fn download_new(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) { if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { - trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", peer_id, peer.common_number, peer.best_number); + let import_status = self.import_queue.status(); + // when there are too many blocks in the queue => do not try to download new blocks + if import_status.importing_count > MAX_IMPORING_BLOCKS { + return; + } + // we should not download already queued blocks + let common_number = ::std::cmp::max(peer.common_number, import_status.best_importing_number); + + trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", peer_id, common_number, peer.best_number); match peer.state { PeerSyncState::Available => { - if let Some(range) = self.blocks.needed_blocks(peer_id, MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) { + if let Some(range) = self.blocks.needed_blocks(peer_id, MAX_BLOCKS_TO_REQUEST, peer.best_number, common_number) { trace!(target: "sync", "Requesting blocks from {}, ({} to {})", peer_id, range.start, range.end); let request = message::generic::BlockRequest { id: 0, @@ -464,3 +414,12 @@ impl ChainSync where protocol.send_message(io, peer_id, GenericMessage::BlockRequest(request)); } } + +/// Get block status, taking into account import queue. +fn block_status(chain: &Client, queue: &ImportQueue, hash: B::Hash) -> Result where B::Header: HeaderT { + if queue.is_importing(&hash) { + return Ok(BlockStatus::Queued); + } + + chain.block_status(&BlockId::Hash(hash)) +} diff --git a/substrate/network/src/test/mod.rs b/substrate/network/src/test/mod.rs index 104bb8b9814cf..1f7d3ee94d4da 100644 --- a/substrate/network/src/test/mod.rs +++ b/substrate/network/src/test/mod.rs @@ -32,6 +32,7 @@ use service::TransactionPool; use network::{PeerId, SessionInfo, Error as NetworkError}; use keyring::Keyring; use codec::Slicable; +use import_queue::tests::SyncImportQueue; use test_client::{self, TestClient}; use test_client::runtime::{Block, Hash, Transfer, Extrinsic}; @@ -234,7 +235,8 @@ impl TestNet { pub fn add_peer(&mut self, config: &ProtocolConfig) { let client = Arc::new(test_client::new()); let tx_pool = Arc::new(EmptyTransactionPool); - let sync = Protocol::new(config.clone(), client.clone(), None, tx_pool).unwrap(); + let import_queue = Arc::new(SyncImportQueue); + let sync = Protocol::new(config.clone(), client.clone(), import_queue, None, tx_pool).unwrap(); self.peers.push(Arc::new(Peer { sync: sync, client: client, From b5cc58e4a4bdfcb9cdfc67c78c180c28dbb6616d Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 29 Jun 2018 09:08:52 +0300 Subject: [PATCH 2/2] some more docs --- substrate/network/src/import_queue.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/substrate/network/src/import_queue.rs b/substrate/network/src/import_queue.rs index 637257bfd0cee..02d1055dcf835 100644 --- a/substrate/network/src/import_queue.rs +++ b/substrate/network/src/import_queue.rs @@ -36,10 +36,15 @@ use sync::ChainSync; /// Blocks import queue API. pub trait ImportQueue: Send + Sync where B::Header: HeaderT { + /// Start operating (if required). fn start(&self, service: Weak>, chain: Weak>) -> Result<(), Error>; + /// Clear the queue when sync is restarting. fn clear(&self); + /// Get queue status. fn status(&self) -> ImportQueueStatus; + /// Is block with given hash is currently in the queue. fn is_importing(&self, hash: &B::Hash) -> bool; + /// Import bunch of blocks. fn import_blocks(&self, sync: &mut ChainSync, io: &mut SyncIo, protocol: &Protocol, blocks: (BlockOrigin, Vec>)); }