From 5f38aa885b22ebb0e3a1d60120cea69f9f322628 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Fri, 7 Sep 2018 17:44:35 +0100 Subject: [PATCH 01/42] Log block set in block_sync for easier debugging --- ethcore/sync/src/block_sync.rs | 46 +++++++++++++++++----------------- ethcore/sync/src/chain/mod.rs | 6 ++--- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 55b23cba4d1..e2d9a1477e2 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -28,6 +28,7 @@ use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKi use ethcore::error::{ImportErrorKind, QueueErrorKind, BlockError}; use sync_io::SyncIo; use blocks::{BlockCollection, SyncBody, SyncHeader}; +use chain::BlockSet; const MAX_HEADERS_TO_REQUEST: usize = 128; const MAX_BODIES_TO_REQUEST: usize = 32; @@ -36,6 +37,17 @@ const SUBCHAIN_SIZE: u64 = 256; const MAX_ROUND_PARENTS: usize = 16; const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5; +macro_rules! trace_sync { + ($self:ident, target: $target:expr, $($arg:tt)*) => { + trace!(target: $target, $($arg)+, $self.block_set); + } +} +macro_rules! debug_sync { + ($self:ident, target: $target:expr, $($arg:tt)*) => { + debug!(target: $target, $($arg)+, $self.block_set); + } +} + #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Downloader state pub enum State { @@ -89,6 +101,8 @@ impl From for BlockDownloaderImportError { /// Block downloader strategy. /// Manages state and block data for a block download process. pub struct BlockDownloader { + /// Which set of blocks to download + block_set: BlockSet, /// Downloader state state: State, /// Highest block number seen @@ -117,29 +131,15 @@ pub struct BlockDownloader { } impl BlockDownloader { - /// Create a new instance of syncing strategy. This won't reorganize to before the - /// last kept state. - pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self { - BlockDownloader { - state: State::Idle, - highest_block: None, - last_imported_block: start_number, - last_imported_hash: start_hash.clone(), - last_round_start: start_number, - last_round_start_hash: start_hash.clone(), - blocks: BlockCollection::new(sync_receipts), - imported_this_round: None, - round_parents: VecDeque::new(), - download_receipts: sync_receipts, - target_hash: None, - retract_step: 1, - limit_reorg: true, - } - } - - /// Create a new instance of sync with unlimited reorg allowed. - pub fn with_unlimited_reorg(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self { + /// Create a new instance of syncing strategy. + /// For BlockSet::NewBlocks this won't reorganize to before the last kept state. + pub fn new(block_set: BlockSet, start_hash: &H256, start_number: BlockNumber) -> Self { + let (limit_reorg, sync_receipts) = match block_set { + BlockSet::NewBlocks => (true, false), + BlockSet::OldBlocks => (false, true) + }; BlockDownloader { + block_set: block_set, state: State::Idle, highest_block: None, last_imported_block: start_number, @@ -152,7 +152,7 @@ impl BlockDownloader { download_receipts: sync_receipts, target_hash: None, retract_step: 1, - limit_reorg: false, + limit_reorg: limit_reorg, } } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 06bd3febab6..de152ec1f0c 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -429,7 +429,7 @@ impl ChainSync { peers: HashMap::new(), handshaking_peers: HashMap::new(), active_peers: HashSet::new(), - new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number), + new_blocks: BlockDownloader::new(BlockSet::NewBlocks, &chain_info.best_block_hash, chain_info.best_block_number), old_blocks: None, last_sent_block_number: 0, network_id: config.network_id, @@ -637,13 +637,13 @@ impl ChainSync { pub fn update_targets(&mut self, chain: &BlockChainClient) { // Do not assume that the block queue/chain still has our last_imported_block let chain = chain.chain_info(); - self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number); + self.new_blocks = BlockDownloader::new(BlockSet::NewBlocks, &chain.best_block_hash, chain.best_block_number); self.old_blocks = None; if self.download_old_blocks { if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) { trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number); - let mut downloader = BlockDownloader::with_unlimited_reorg(true, &ancient_block_hash, ancient_block_number); + let mut downloader = BlockDownloader::new(BlockSet::OldBlocks, &ancient_block_hash, ancient_block_number); if let Some(hash) = chain.first_block_hash { trace!(target: "sync", "Downloader target set to {:?}", hash); downloader.set_target(&hash); From 5f219a489ff0498cfc8cfe36018495e2a61a608c Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Mon, 10 Sep 2018 15:20:08 +0100 Subject: [PATCH 02/42] logging macros --- ethcore/sync/src/block_sync.rs | 71 +++++++++++++++++----------------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index e2d9a1477e2..d5dfdd7e7fe 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -37,14 +37,15 @@ const SUBCHAIN_SIZE: u64 = 256; const MAX_ROUND_PARENTS: usize = 16; const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5; +// logging macros append BlockSet context for log filtering macro_rules! trace_sync { - ($self:ident, target: $target:expr, $($arg:tt)*) => { - trace!(target: $target, $($arg)+, $self.block_set); + ($self:ident, target: $target:expr, $fmt:expr, $($arg:tt)*) => { + trace!(self, target: $target, concat!($fmt, ", set = {:?}"), $($arg)*, $self.block_set); } } macro_rules! debug_sync { - ($self:ident, target: $target:expr, $($arg:tt)*) => { - debug!(target: $target, $($arg)+, $self.block_set); + ($self:ident, target: $target:expr, $fmt:expr, $($arg:tt)*) => { + debug!(self, target: $target, concat!($fmt, ", set = {:?}"), $($arg)*, $self.block_set); } } @@ -223,7 +224,7 @@ impl BlockDownloader { pub fn import_headers(&mut self, io: &mut SyncIo, r: &Rlp, expected_hash: Option) -> Result { let item_count = r.item_count().unwrap_or(0); if self.state == State::Idle { - trace!(target: "sync", "Ignored unexpected block headers"); + trace_sync!(self, target: "sync", "Ignored unexpected block headers"); return Ok(DownloadAction::None) } if item_count == 0 && (self.state == State::Blocks) { @@ -246,7 +247,7 @@ impl BlockDownloader { } any_known = any_known || self.blocks.contains_head(&hash); if self.blocks.contains(&hash) { - trace!(target: "sync", "Skipping existing block header {} ({:?})", number, hash); + trace_sync!(self, target: "sync", "Skipping existing block header {} ({:?})", number, hash); continue; } @@ -257,8 +258,8 @@ impl BlockDownloader { match io.chain().block_status(BlockId::Hash(hash.clone())) { BlockStatus::InChain | BlockStatus::Queued => { match self.state { - State::Blocks => trace!(target: "sync", "Header already in chain {} ({})", number, hash), - _ => trace!(target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state), + State::Blocks => trace_sync!(self, target: "sync", "Header already in chain {} ({})", number, hash), + _ => trace_sync!(self, target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state), } headers.push(info); hashes.push(hash); @@ -275,7 +276,7 @@ impl BlockDownloader { // Disable the peer for this syncing round if it gives invalid chain if !valid_response { - trace!(target: "sync", "Invalid headers response"); + trace_sync!(self, target: "sync", "Invalid headers response"); return Err(BlockDownloaderImportError::Invalid); } @@ -283,7 +284,7 @@ impl BlockDownloader { State::ChainHead => { if !headers.is_empty() { // TODO: validate heads better. E.g. check that there is enough distance between blocks. - trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); + trace_sync!(self, target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); self.blocks.reset_to(hashes); self.state = State::Blocks; return Ok(DownloadAction::Reset); @@ -292,7 +293,7 @@ impl BlockDownloader { let oldest_reorg = io.chain().pruning_info().earliest_state; let last = self.last_imported_block; if self.limit_reorg && best > last && (last == 0 || last < oldest_reorg) { - trace!(target: "sync", "No common block, disabling peer"); + trace_sync!(self, target: "sync", "No common block, disabling peer"); return Err(BlockDownloaderImportError::Invalid); } } @@ -301,13 +302,13 @@ impl BlockDownloader { let count = headers.len(); // At least one of the heades must advance the subchain. Otherwise they are all useless. if count == 0 || !any_known { - trace!(target: "sync", "No useful headers"); + trace_sync!(self, target: "sync", "No useful headers"); return Err(BlockDownloaderImportError::Useless); } self.blocks.insert_headers(headers); - trace!(target: "sync", "Inserted {} headers", count); + trace_sync!(self, target: "sync", "Inserted {} headers", count); }, - _ => trace!(target: "sync", "Unexpected headers({})", headers.len()), + _ => trace_sync!(self, target: "sync", "Unexpected headers({})", headers.len()), } Ok(DownloadAction::None) @@ -319,7 +320,7 @@ impl BlockDownloader { if item_count == 0 { return Err(BlockDownloaderImportError::Useless); } else if self.state != State::Blocks { - trace!(target: "sync", "Ignored unexpected block bodies"); + trace_sync!(self, target: "sync", "Ignored unexpected block bodies"); } else { let mut bodies = Vec::with_capacity(item_count); for i in 0..item_count { @@ -328,7 +329,7 @@ impl BlockDownloader { } if self.blocks.insert_bodies(bodies) != item_count { - trace!(target: "sync", "Deactivating peer for giving invalid block bodies"); + trace_sync!(self, target: "sync", "Deactivating peer for giving invalid block bodies"); return Err(BlockDownloaderImportError::Invalid); } } @@ -342,19 +343,19 @@ impl BlockDownloader { return Err(BlockDownloaderImportError::Useless); } else if self.state != State::Blocks { - trace!(target: "sync", "Ignored unexpected block receipts"); + trace_sync!(self, target: "sync", "Ignored unexpected block receipts"); } else { let mut receipts = Vec::with_capacity(item_count); for i in 0..item_count { let receipt = r.at(i).map_err(|e| { - trace!(target: "sync", "Error decoding block receipts RLP: {:?}", e); + trace_sync!(self, target: "sync", "Error decoding block receipts RLP: {:?}", e); BlockDownloaderImportError::Invalid })?; receipts.push(receipt.as_raw().to_vec()); } if self.blocks.insert_receipts(receipts) != item_count { - trace!(target: "sync", "Deactivating peer for giving invalid block receipts"); + trace_sync!(self, target: "sync", "Deactivating peer for giving invalid block receipts"); return Err(BlockDownloaderImportError::Invalid); } } @@ -363,7 +364,7 @@ impl BlockDownloader { fn start_sync_round(&mut self, io: &mut SyncIo) { self.state = State::ChainHead; - trace!(target: "sync", "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block); + trace_sync!(self, target: "sync", "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block); // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even // from the non-canonical part of the tree. So we also retract if nothing has been imported last round. let start = self.last_round_start; @@ -375,12 +376,12 @@ impl BlockDownloader { if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == start_hash) { self.last_imported_block = start - 1; self.last_imported_hash = p.clone(); - trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); + trace_sync!(self, target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); } else { let best = io.chain().chain_info().best_block_number; let oldest_reorg = io.chain().pruning_info().earliest_state; if self.limit_reorg && best > start && start < oldest_reorg { - debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash); + debug_sync!(self, target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash); self.reset(); } else { let n = start - cmp::min(self.retract_step, start); @@ -389,10 +390,10 @@ impl BlockDownloader { Some(h) => { self.last_imported_block = n; self.last_imported_hash = h; - trace!(target: "sync", "Searching common header in the blockchain {} ({})", start, self.last_imported_hash); + trace_sync!(self, target: "sync", "Searching common header in the blockchain {} ({})", start, self.last_imported_hash); } None => { - debug!(target: "sync", "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash); + debug_sync!(self, target: "sync", "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash); self.reset(); } } @@ -420,7 +421,7 @@ impl BlockDownloader { State::ChainHead => { if num_active_peers < MAX_PARALLEL_SUBCHAIN_DOWNLOAD { // Request subchain headers - trace!(target: "sync", "Starting sync with better chain"); + trace_sync!(self, target: "sync", "Starting sync with better chain"); // Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that // MAX_HEADERS_TO_REQUEST would include headers for neighbouring subchains return Some(BlockRequest::Headers { @@ -478,7 +479,7 @@ impl BlockDownloader { if self.target_hash.as_ref().map_or(false, |t| t == &h) { self.state = State::Complete; - trace!(target: "sync", "Sync target reached"); + trace_sync!(self, target: "sync", "Sync target reached"); return Ok(()); } @@ -490,15 +491,15 @@ impl BlockDownloader { match result { Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain), _)) => { - trace!(target: "sync", "Block already in chain {:?}", h); + trace_sync!(self, target: "sync", "Block already in chain {:?}", h); self.block_imported(&h, number, &parent); }, Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyQueued), _)) => { - trace!(target: "sync", "Block already queued {:?}", h); + trace_sync!(self, target: "sync", "Block already queued {:?}", h); self.block_imported(&h, number, &parent); }, Ok(_) => { - trace!(target: "sync", "Block queued {:?}", h); + trace_sync!(self, target: "sync", "Block queued {:?}", h); imported.insert(h.clone()); self.block_imported(&h, number, &parent); }, @@ -506,25 +507,25 @@ impl BlockDownloader { break; }, Err(BlockImportError(BlockImportErrorKind::Block(BlockError::UnknownParent(_)), _)) => { - trace!(target: "sync", "Unknown new block parent, restarting sync"); + trace_sync!(self, target: "sync", "Unknown new block parent, restarting sync"); break; }, Err(BlockImportError(BlockImportErrorKind::Block(BlockError::TemporarilyInvalid(_)), _)) => { - debug!(target: "sync", "Block temporarily invalid, restarting sync"); + debug_sync!(self, target: "sync", "Block temporarily invalid, restarting sync"); break; }, Err(BlockImportError(BlockImportErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => { - debug!(target: "sync", "Block import queue full ({}), restarting sync", limit); + debug_sync!(self, target: "sync", "Block import queue full ({}), restarting sync", limit); break; }, Err(e) => { - debug!(target: "sync", "Bad block {:?} : {:?}", h, e); + debug_sync!(self, target: "sync", "Bad block {:?} : {:?}", h, e); bad = true; break; } } } - trace!(target: "sync", "Imported {} of {}", imported.len(), count); + trace_sync!(self, target: "sync", "Imported {} of {}", imported.len(), count); self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len()); if bad { @@ -533,7 +534,7 @@ impl BlockDownloader { if self.blocks.is_empty() { // complete sync round - trace!(target: "sync", "Sync round complete"); + trace_sync!(self, target: "sync", "Sync round complete"); self.reset(); } Ok(()) From 8d7f742fc686fb1743fb3a4be1c0c4cac32d53d0 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Mon, 10 Sep 2018 18:25:05 +0100 Subject: [PATCH 03/42] Match no args in sync logging macros --- ethcore/sync/src/block_sync.rs | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index d5dfdd7e7fe..ea991128276 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -37,16 +37,23 @@ const SUBCHAIN_SIZE: u64 = 256; const MAX_ROUND_PARENTS: usize = 16; const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5; -// logging macros append BlockSet context for log filtering +// logging macros prepend BlockSet context for log filtering macro_rules! trace_sync { - ($self:ident, target: $target:expr, $fmt:expr, $($arg:tt)*) => { - trace!(self, target: $target, concat!($fmt, ", set = {:?}"), $($arg)*, $self.block_set); - } + ($self:ident, target: $target:expr, $fmt:expr, $($arg:tt)+) => { + trace!(target: $target, concat!("{:?}: ", $fmt), $self.block_set, $($arg)+); + }; + ($self:ident, target: $target:expr, $fmt:expr) => { + trace!(target: $target, concat!("{:?}: ", $fmt), $self.block_set); + }; } + macro_rules! debug_sync { - ($self:ident, target: $target:expr, $fmt:expr, $($arg:tt)*) => { - debug!(self, target: $target, concat!($fmt, ", set = {:?}"), $($arg)*, $self.block_set); - } + ($self:ident, target: $target:expr, $fmt:expr, $($arg:tt)+) => { + debug!(target: $target, concat!("{:?}: ", $fmt), $self.block_set, $($arg)+); + }; + ($self:ident, target: $target:expr, $fmt:expr) => { + debug!(target: $target, concat!("{:?}: ", $fmt), $self.block_set); + }; } #[derive(Copy, Clone, Eq, PartialEq, Debug)] @@ -511,7 +518,7 @@ impl BlockDownloader { break; }, Err(BlockImportError(BlockImportErrorKind::Block(BlockError::TemporarilyInvalid(_)), _)) => { - debug_sync!(self, target: "sync", "Block temporarily invalid, restarting sync"); + debug_sync!(self, target: "sync", "Block temporarily invalid: {:?}, restarting sync", h); break; }, Err(BlockImportError(BlockImportErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => { From 8f379ee69e1bda55f5c7c0c809f92c6ebec8c2de Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Mon, 10 Sep 2018 18:41:10 +0100 Subject: [PATCH 04/42] Add QueueFull error --- ethcore/sync/src/block_sync.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index ea991128276..5f38ecffec8 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -98,6 +98,8 @@ pub enum BlockDownloaderImportError { Invalid, /// Imported data is valid but rejected cause the downloader does not need it. Useless, + /// Block import queue is full, and some blocks were rejected + QueueFull } impl From for BlockDownloaderImportError { @@ -472,7 +474,7 @@ impl BlockDownloader { /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. pub fn collect_blocks(&mut self, io: &mut SyncIo, allow_out_of_order: bool) -> Result<(), BlockDownloaderImportError> { - let mut bad = false; + let mut import_err: Option = None; let mut imported = HashSet::new(); let blocks = self.blocks.drain(); let count = blocks.len(); @@ -523,11 +525,13 @@ impl BlockDownloader { }, Err(BlockImportError(BlockImportErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => { debug_sync!(self, target: "sync", "Block import queue full ({}), restarting sync", limit); + import_err = Some(BlockDownloaderImportError::Invalid); // todo: change handler in mod // Some(BlockDownloaderImportError::QueueFull); + // need to prevent race condition of peer requests coming back after reset -> NewBlockParent break; }, Err(e) => { debug_sync!(self, target: "sync", "Bad block {:?} : {:?}", h, e); - bad = true; + import_err = Some(BlockDownloaderImportError::Invalid); break; } } @@ -535,8 +539,8 @@ impl BlockDownloader { trace_sync!(self, target: "sync", "Imported {} of {}", imported.len(), count); self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len()); - if bad { - return Err(BlockDownloaderImportError::Invalid); + if let Some(err) = import_err { + return Err(err); } if self.blocks.is_empty() { From f5d244e287a417d7ebc65785850e20e30217f8ec Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Mon, 10 Sep 2018 20:31:26 +0100 Subject: [PATCH 05/42] Only allow importing headers if the first matches requested --- ethcore/sync/src/block_sync.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 5f38ecffec8..1cb5cb72917 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -248,11 +248,9 @@ impl BlockDownloader { let info = SyncHeader::from_rlp(r.at(i)?.as_raw().to_vec())?; let number = BlockNumber::from(info.header.number()); let hash = info.header.hash(); - // Check if any of the headers matches the hash we requested - if !valid_response { - if let Some(expected) = expected_hash { - valid_response = expected == hash; - } + // Check if the first of the headers matches the hash we requested + if i == 0 { + valid_response = expected_hash.map_or(false, |eh| eh == hash); } any_known = any_known || self.blocks.contains_head(&hash); if self.blocks.contains(&hash) { From f77c503933ffa3b014614f01498c7b767daacf09 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Mon, 10 Sep 2018 08:57:28 +0100 Subject: [PATCH 06/42] WIP --- ethcore/sync/src/block_sync.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 1cb5cb72917..f1378eafd2f 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -97,9 +97,7 @@ pub enum BlockDownloaderImportError { /// Imported data is rejected as invalid. Peer should be dropped. Invalid, /// Imported data is valid but rejected cause the downloader does not need it. - Useless, - /// Block import queue is full, and some blocks were rejected - QueueFull + Useless } impl From for BlockDownloaderImportError { From 0f3adb3697fae4dd2427feca6237adc6cd0244b7 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 11 Sep 2018 13:28:46 +0100 Subject: [PATCH 07/42] Test for chain head gaps and log --- ethcore/sync/src/block_sync.rs | 32 ++++++++++++++++++++++++++++---- ethcore/sync/src/chain/mod.rs | 3 ++- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index f1378eafd2f..ebe401878b8 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -289,10 +289,34 @@ impl BlockDownloader { State::ChainHead => { if !headers.is_empty() { // TODO: validate heads better. E.g. check that there is enough distance between blocks. - trace_sync!(self, target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); - self.blocks.reset_to(hashes); - self.state = State::Blocks; - return Ok(DownloadAction::Reset); + let mut distances = Vec::new(); + let mut last_distance = 0; + + if headers.len() > 1 { + for i in 0..headers.len() - 2 { + let n1 = BlockNumber::from(headers[i].header.number()); + let n2 = BlockNumber::from(headers[i+1].header.number()); + let d = n2 - n1; + if d != last_distance { + distances.push(n2 - n1); + } + last_distance = d; + } + } + trace_sync!(self, target: "sync", "subchain heads starting #{:?}, {:?}, distances {:?}", + BlockNumber::from(headers[0].header.number()), + headers[0].header.hash(), + distances + ); + + if distances.len() == 0 || distances == [127] { + trace_sync!(self, target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); + self.blocks.reset_to(hashes); + self.state = State::Blocks; + return Ok(DownloadAction::Reset); + } else { + trace_sync!(self, target: "sync", "not subchain heads, distances {:?}", distances); + } } else { let best = io.chain().chain_info().best_block_number; let oldest_reorg = io.chain().pruning_info().earliest_state; diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index de152ec1f0c..72412bbb8b7 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -511,9 +511,10 @@ impl ChainSync { fn reset(&mut self, io: &mut SyncIo) { self.new_blocks.reset(); let chain_info = io.chain().chain_info(); - for (_, ref mut p) in &mut self.peers { + for (pid, ref mut p) in &mut self.peers { if p.block_set != Some(BlockSet::OldBlocks) { p.reset_asking(); + trace!(target: "sync", "OldBlocks: Peer {:?} reset", pid); if p.difficulty.is_none() { // assume peer has up to date difficulty p.difficulty = Some(chain_info.pending_total_difficulty); From ad7bb2ec07714c01020d4b3d1363be1fb12436fc Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 11 Sep 2018 14:32:55 +0100 Subject: [PATCH 08/42] Calc distance even with 2 heads --- ethcore/sync/src/block_sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index ebe401878b8..0dac3d0d9dd 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -293,7 +293,7 @@ impl BlockDownloader { let mut last_distance = 0; if headers.len() > 1 { - for i in 0..headers.len() - 2 { + for i in 0..headers.len() - 1 { let n1 = BlockNumber::from(headers[i].header.number()); let n2 = BlockNumber::from(headers[i+1].header.number()); let d = n2 - n1; From 56747ef74d32593482617166608378d2b5281744 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 11 Sep 2018 14:41:53 +0100 Subject: [PATCH 09/42] Revert previous commits, preparing simple fix This reverts commit 5f38aa885b22ebb0e3a1d60120cea69f9f322628. --- ethcore/sync/src/block_sync.rs | 166 +++++++++++++-------------------- ethcore/sync/src/chain/mod.rs | 9 +- 2 files changed, 71 insertions(+), 104 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 0dac3d0d9dd..55b23cba4d1 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -28,7 +28,6 @@ use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKi use ethcore::error::{ImportErrorKind, QueueErrorKind, BlockError}; use sync_io::SyncIo; use blocks::{BlockCollection, SyncBody, SyncHeader}; -use chain::BlockSet; const MAX_HEADERS_TO_REQUEST: usize = 128; const MAX_BODIES_TO_REQUEST: usize = 32; @@ -37,25 +36,6 @@ const SUBCHAIN_SIZE: u64 = 256; const MAX_ROUND_PARENTS: usize = 16; const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5; -// logging macros prepend BlockSet context for log filtering -macro_rules! trace_sync { - ($self:ident, target: $target:expr, $fmt:expr, $($arg:tt)+) => { - trace!(target: $target, concat!("{:?}: ", $fmt), $self.block_set, $($arg)+); - }; - ($self:ident, target: $target:expr, $fmt:expr) => { - trace!(target: $target, concat!("{:?}: ", $fmt), $self.block_set); - }; -} - -macro_rules! debug_sync { - ($self:ident, target: $target:expr, $fmt:expr, $($arg:tt)+) => { - debug!(target: $target, concat!("{:?}: ", $fmt), $self.block_set, $($arg)+); - }; - ($self:ident, target: $target:expr, $fmt:expr) => { - debug!(target: $target, concat!("{:?}: ", $fmt), $self.block_set); - }; -} - #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Downloader state pub enum State { @@ -97,7 +77,7 @@ pub enum BlockDownloaderImportError { /// Imported data is rejected as invalid. Peer should be dropped. Invalid, /// Imported data is valid but rejected cause the downloader does not need it. - Useless + Useless, } impl From for BlockDownloaderImportError { @@ -109,8 +89,6 @@ impl From for BlockDownloaderImportError { /// Block downloader strategy. /// Manages state and block data for a block download process. pub struct BlockDownloader { - /// Which set of blocks to download - block_set: BlockSet, /// Downloader state state: State, /// Highest block number seen @@ -139,15 +117,10 @@ pub struct BlockDownloader { } impl BlockDownloader { - /// Create a new instance of syncing strategy. - /// For BlockSet::NewBlocks this won't reorganize to before the last kept state. - pub fn new(block_set: BlockSet, start_hash: &H256, start_number: BlockNumber) -> Self { - let (limit_reorg, sync_receipts) = match block_set { - BlockSet::NewBlocks => (true, false), - BlockSet::OldBlocks => (false, true) - }; + /// Create a new instance of syncing strategy. This won't reorganize to before the + /// last kept state. + pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self { BlockDownloader { - block_set: block_set, state: State::Idle, highest_block: None, last_imported_block: start_number, @@ -160,7 +133,26 @@ impl BlockDownloader { download_receipts: sync_receipts, target_hash: None, retract_step: 1, - limit_reorg: limit_reorg, + limit_reorg: true, + } + } + + /// Create a new instance of sync with unlimited reorg allowed. + pub fn with_unlimited_reorg(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self { + BlockDownloader { + state: State::Idle, + highest_block: None, + last_imported_block: start_number, + last_imported_hash: start_hash.clone(), + last_round_start: start_number, + last_round_start_hash: start_hash.clone(), + blocks: BlockCollection::new(sync_receipts), + imported_this_round: None, + round_parents: VecDeque::new(), + download_receipts: sync_receipts, + target_hash: None, + retract_step: 1, + limit_reorg: false, } } @@ -231,7 +223,7 @@ impl BlockDownloader { pub fn import_headers(&mut self, io: &mut SyncIo, r: &Rlp, expected_hash: Option) -> Result { let item_count = r.item_count().unwrap_or(0); if self.state == State::Idle { - trace_sync!(self, target: "sync", "Ignored unexpected block headers"); + trace!(target: "sync", "Ignored unexpected block headers"); return Ok(DownloadAction::None) } if item_count == 0 && (self.state == State::Blocks) { @@ -246,13 +238,15 @@ impl BlockDownloader { let info = SyncHeader::from_rlp(r.at(i)?.as_raw().to_vec())?; let number = BlockNumber::from(info.header.number()); let hash = info.header.hash(); - // Check if the first of the headers matches the hash we requested - if i == 0 { - valid_response = expected_hash.map_or(false, |eh| eh == hash); + // Check if any of the headers matches the hash we requested + if !valid_response { + if let Some(expected) = expected_hash { + valid_response = expected == hash; + } } any_known = any_known || self.blocks.contains_head(&hash); if self.blocks.contains(&hash) { - trace_sync!(self, target: "sync", "Skipping existing block header {} ({:?})", number, hash); + trace!(target: "sync", "Skipping existing block header {} ({:?})", number, hash); continue; } @@ -263,8 +257,8 @@ impl BlockDownloader { match io.chain().block_status(BlockId::Hash(hash.clone())) { BlockStatus::InChain | BlockStatus::Queued => { match self.state { - State::Blocks => trace_sync!(self, target: "sync", "Header already in chain {} ({})", number, hash), - _ => trace_sync!(self, target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state), + State::Blocks => trace!(target: "sync", "Header already in chain {} ({})", number, hash), + _ => trace!(target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state), } headers.push(info); hashes.push(hash); @@ -281,7 +275,7 @@ impl BlockDownloader { // Disable the peer for this syncing round if it gives invalid chain if !valid_response { - trace_sync!(self, target: "sync", "Invalid headers response"); + trace!(target: "sync", "Invalid headers response"); return Err(BlockDownloaderImportError::Invalid); } @@ -289,40 +283,16 @@ impl BlockDownloader { State::ChainHead => { if !headers.is_empty() { // TODO: validate heads better. E.g. check that there is enough distance between blocks. - let mut distances = Vec::new(); - let mut last_distance = 0; - - if headers.len() > 1 { - for i in 0..headers.len() - 1 { - let n1 = BlockNumber::from(headers[i].header.number()); - let n2 = BlockNumber::from(headers[i+1].header.number()); - let d = n2 - n1; - if d != last_distance { - distances.push(n2 - n1); - } - last_distance = d; - } - } - trace_sync!(self, target: "sync", "subchain heads starting #{:?}, {:?}, distances {:?}", - BlockNumber::from(headers[0].header.number()), - headers[0].header.hash(), - distances - ); - - if distances.len() == 0 || distances == [127] { - trace_sync!(self, target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); - self.blocks.reset_to(hashes); - self.state = State::Blocks; - return Ok(DownloadAction::Reset); - } else { - trace_sync!(self, target: "sync", "not subchain heads, distances {:?}", distances); - } + trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); + self.blocks.reset_to(hashes); + self.state = State::Blocks; + return Ok(DownloadAction::Reset); } else { let best = io.chain().chain_info().best_block_number; let oldest_reorg = io.chain().pruning_info().earliest_state; let last = self.last_imported_block; if self.limit_reorg && best > last && (last == 0 || last < oldest_reorg) { - trace_sync!(self, target: "sync", "No common block, disabling peer"); + trace!(target: "sync", "No common block, disabling peer"); return Err(BlockDownloaderImportError::Invalid); } } @@ -331,13 +301,13 @@ impl BlockDownloader { let count = headers.len(); // At least one of the heades must advance the subchain. Otherwise they are all useless. if count == 0 || !any_known { - trace_sync!(self, target: "sync", "No useful headers"); + trace!(target: "sync", "No useful headers"); return Err(BlockDownloaderImportError::Useless); } self.blocks.insert_headers(headers); - trace_sync!(self, target: "sync", "Inserted {} headers", count); + trace!(target: "sync", "Inserted {} headers", count); }, - _ => trace_sync!(self, target: "sync", "Unexpected headers({})", headers.len()), + _ => trace!(target: "sync", "Unexpected headers({})", headers.len()), } Ok(DownloadAction::None) @@ -349,7 +319,7 @@ impl BlockDownloader { if item_count == 0 { return Err(BlockDownloaderImportError::Useless); } else if self.state != State::Blocks { - trace_sync!(self, target: "sync", "Ignored unexpected block bodies"); + trace!(target: "sync", "Ignored unexpected block bodies"); } else { let mut bodies = Vec::with_capacity(item_count); for i in 0..item_count { @@ -358,7 +328,7 @@ impl BlockDownloader { } if self.blocks.insert_bodies(bodies) != item_count { - trace_sync!(self, target: "sync", "Deactivating peer for giving invalid block bodies"); + trace!(target: "sync", "Deactivating peer for giving invalid block bodies"); return Err(BlockDownloaderImportError::Invalid); } } @@ -372,19 +342,19 @@ impl BlockDownloader { return Err(BlockDownloaderImportError::Useless); } else if self.state != State::Blocks { - trace_sync!(self, target: "sync", "Ignored unexpected block receipts"); + trace!(target: "sync", "Ignored unexpected block receipts"); } else { let mut receipts = Vec::with_capacity(item_count); for i in 0..item_count { let receipt = r.at(i).map_err(|e| { - trace_sync!(self, target: "sync", "Error decoding block receipts RLP: {:?}", e); + trace!(target: "sync", "Error decoding block receipts RLP: {:?}", e); BlockDownloaderImportError::Invalid })?; receipts.push(receipt.as_raw().to_vec()); } if self.blocks.insert_receipts(receipts) != item_count { - trace_sync!(self, target: "sync", "Deactivating peer for giving invalid block receipts"); + trace!(target: "sync", "Deactivating peer for giving invalid block receipts"); return Err(BlockDownloaderImportError::Invalid); } } @@ -393,7 +363,7 @@ impl BlockDownloader { fn start_sync_round(&mut self, io: &mut SyncIo) { self.state = State::ChainHead; - trace_sync!(self, target: "sync", "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block); + trace!(target: "sync", "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block); // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even // from the non-canonical part of the tree. So we also retract if nothing has been imported last round. let start = self.last_round_start; @@ -405,12 +375,12 @@ impl BlockDownloader { if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == start_hash) { self.last_imported_block = start - 1; self.last_imported_hash = p.clone(); - trace_sync!(self, target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); + trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); } else { let best = io.chain().chain_info().best_block_number; let oldest_reorg = io.chain().pruning_info().earliest_state; if self.limit_reorg && best > start && start < oldest_reorg { - debug_sync!(self, target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash); + debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash); self.reset(); } else { let n = start - cmp::min(self.retract_step, start); @@ -419,10 +389,10 @@ impl BlockDownloader { Some(h) => { self.last_imported_block = n; self.last_imported_hash = h; - trace_sync!(self, target: "sync", "Searching common header in the blockchain {} ({})", start, self.last_imported_hash); + trace!(target: "sync", "Searching common header in the blockchain {} ({})", start, self.last_imported_hash); } None => { - debug_sync!(self, target: "sync", "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash); + debug!(target: "sync", "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash); self.reset(); } } @@ -450,7 +420,7 @@ impl BlockDownloader { State::ChainHead => { if num_active_peers < MAX_PARALLEL_SUBCHAIN_DOWNLOAD { // Request subchain headers - trace_sync!(self, target: "sync", "Starting sync with better chain"); + trace!(target: "sync", "Starting sync with better chain"); // Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that // MAX_HEADERS_TO_REQUEST would include headers for neighbouring subchains return Some(BlockRequest::Headers { @@ -494,7 +464,7 @@ impl BlockDownloader { /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. pub fn collect_blocks(&mut self, io: &mut SyncIo, allow_out_of_order: bool) -> Result<(), BlockDownloaderImportError> { - let mut import_err: Option = None; + let mut bad = false; let mut imported = HashSet::new(); let blocks = self.blocks.drain(); let count = blocks.len(); @@ -508,7 +478,7 @@ impl BlockDownloader { if self.target_hash.as_ref().map_or(false, |t| t == &h) { self.state = State::Complete; - trace_sync!(self, target: "sync", "Sync target reached"); + trace!(target: "sync", "Sync target reached"); return Ok(()); } @@ -520,15 +490,15 @@ impl BlockDownloader { match result { Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain), _)) => { - trace_sync!(self, target: "sync", "Block already in chain {:?}", h); + trace!(target: "sync", "Block already in chain {:?}", h); self.block_imported(&h, number, &parent); }, Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyQueued), _)) => { - trace_sync!(self, target: "sync", "Block already queued {:?}", h); + trace!(target: "sync", "Block already queued {:?}", h); self.block_imported(&h, number, &parent); }, Ok(_) => { - trace_sync!(self, target: "sync", "Block queued {:?}", h); + trace!(target: "sync", "Block queued {:?}", h); imported.insert(h.clone()); self.block_imported(&h, number, &parent); }, @@ -536,36 +506,34 @@ impl BlockDownloader { break; }, Err(BlockImportError(BlockImportErrorKind::Block(BlockError::UnknownParent(_)), _)) => { - trace_sync!(self, target: "sync", "Unknown new block parent, restarting sync"); + trace!(target: "sync", "Unknown new block parent, restarting sync"); break; }, Err(BlockImportError(BlockImportErrorKind::Block(BlockError::TemporarilyInvalid(_)), _)) => { - debug_sync!(self, target: "sync", "Block temporarily invalid: {:?}, restarting sync", h); + debug!(target: "sync", "Block temporarily invalid, restarting sync"); break; }, Err(BlockImportError(BlockImportErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => { - debug_sync!(self, target: "sync", "Block import queue full ({}), restarting sync", limit); - import_err = Some(BlockDownloaderImportError::Invalid); // todo: change handler in mod // Some(BlockDownloaderImportError::QueueFull); - // need to prevent race condition of peer requests coming back after reset -> NewBlockParent + debug!(target: "sync", "Block import queue full ({}), restarting sync", limit); break; }, Err(e) => { - debug_sync!(self, target: "sync", "Bad block {:?} : {:?}", h, e); - import_err = Some(BlockDownloaderImportError::Invalid); + debug!(target: "sync", "Bad block {:?} : {:?}", h, e); + bad = true; break; } } } - trace_sync!(self, target: "sync", "Imported {} of {}", imported.len(), count); + trace!(target: "sync", "Imported {} of {}", imported.len(), count); self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len()); - if let Some(err) = import_err { - return Err(err); + if bad { + return Err(BlockDownloaderImportError::Invalid); } if self.blocks.is_empty() { // complete sync round - trace_sync!(self, target: "sync", "Sync round complete"); + trace!(target: "sync", "Sync round complete"); self.reset(); } Ok(()) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 72412bbb8b7..06bd3febab6 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -429,7 +429,7 @@ impl ChainSync { peers: HashMap::new(), handshaking_peers: HashMap::new(), active_peers: HashSet::new(), - new_blocks: BlockDownloader::new(BlockSet::NewBlocks, &chain_info.best_block_hash, chain_info.best_block_number), + new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number), old_blocks: None, last_sent_block_number: 0, network_id: config.network_id, @@ -511,10 +511,9 @@ impl ChainSync { fn reset(&mut self, io: &mut SyncIo) { self.new_blocks.reset(); let chain_info = io.chain().chain_info(); - for (pid, ref mut p) in &mut self.peers { + for (_, ref mut p) in &mut self.peers { if p.block_set != Some(BlockSet::OldBlocks) { p.reset_asking(); - trace!(target: "sync", "OldBlocks: Peer {:?} reset", pid); if p.difficulty.is_none() { // assume peer has up to date difficulty p.difficulty = Some(chain_info.pending_total_difficulty); @@ -638,13 +637,13 @@ impl ChainSync { pub fn update_targets(&mut self, chain: &BlockChainClient) { // Do not assume that the block queue/chain still has our last_imported_block let chain = chain.chain_info(); - self.new_blocks = BlockDownloader::new(BlockSet::NewBlocks, &chain.best_block_hash, chain.best_block_number); + self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number); self.old_blocks = None; if self.download_old_blocks { if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) { trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number); - let mut downloader = BlockDownloader::new(BlockSet::OldBlocks, &ancient_block_hash, ancient_block_number); + let mut downloader = BlockDownloader::with_unlimited_reorg(true, &ancient_block_hash, ancient_block_number); if let Some(hash) = chain.first_block_hash { trace!(target: "sync", "Downloader target set to {:?}", hash); downloader.set_target(&hash); From 0eb865539e5dee37ab34f168f5fb643300de5ace Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 11 Sep 2018 17:03:17 +0100 Subject: [PATCH 10/42] Reject headers with no gaps when ChainHead --- ethcore/sync/src/block_sync.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 55b23cba4d1..1b6a0067531 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -282,11 +282,21 @@ impl BlockDownloader { match self.state { State::ChainHead => { if !headers.is_empty() { - // TODO: validate heads better. E.g. check that there is enough distance between blocks. - trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); - self.blocks.reset_to(hashes); - self.state = State::Blocks; - return Ok(DownloadAction::Reset); + let mut is_subchain_heads = headers.len() == 1; + if headers.len() > 1 { + let n0 = BlockNumber::from(headers[0].header.number()); + let n1 = BlockNumber::from(headers[1].header.number()); + is_subchain_heads = n1 - n0 > 1 + } + + if is_subchain_heads { + trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); + self.blocks.reset_to(hashes); + self.state = State::Blocks; + return Ok(DownloadAction::Reset); + } else { + debug!(target: "sync", "Ignoring consecutive headers. Expected subchains with gap."); + } } else { let best = io.chain().chain_info().best_block_number; let oldest_reorg = io.chain().pruning_info().earliest_state; From 72783a2c3e44cb90f97a5089283a184ebdbccb43 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 11 Sep 2018 17:05:00 +0100 Subject: [PATCH 11/42] Reset block sync download when queue full --- ethcore/sync/src/block_sync.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 1b6a0067531..f211138aecf 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -525,6 +525,7 @@ impl BlockDownloader { }, Err(BlockImportError(BlockImportErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => { debug!(target: "sync", "Block import queue full ({}), restarting sync", limit); + bad = true; break; }, Err(e) => { From 6f7c3c23b6814ef598c863b68a936e05e78f7469 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 11 Sep 2018 17:06:35 +0100 Subject: [PATCH 12/42] Simplify check for subchain heads --- ethcore/sync/src/block_sync.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index f211138aecf..4b12e30a711 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -282,12 +282,8 @@ impl BlockDownloader { match self.state { State::ChainHead => { if !headers.is_empty() { - let mut is_subchain_heads = headers.len() == 1; - if headers.len() > 1 { - let n0 = BlockNumber::from(headers[0].header.number()); - let n1 = BlockNumber::from(headers[1].header.number()); - is_subchain_heads = n1 - n0 > 1 - } + let is_subchain_heads = headers.len() == 1 || + headers.len() > 1 && headers[0].header.number() - headers[1].header.number() > 1; if is_subchain_heads { trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); From b4033f3861f5696e47fdfd9b55e5a4ddca664bbb Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 11 Sep 2018 17:22:11 +0100 Subject: [PATCH 13/42] Add comment to explain subchain heads filter --- ethcore/sync/src/block_sync.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 4b12e30a711..dde19168e44 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -282,8 +282,14 @@ impl BlockDownloader { match self.state { State::ChainHead => { if !headers.is_empty() { - let is_subchain_heads = headers.len() == 1 || - headers.len() > 1 && headers[0].header.number() - headers[1].header.number() > 1; + // When the round starts, subchain headers are requested + // with gaps to be filled. However if the round is reset + // while in State::Blocks then its possible to receive + // stale responses for the subchain heads in the gap. In + // this case the headers will have consecutive numbers so can be ignored here. + let is_subchain_heads = headers.len() == 1 + || headers.len() > 1 + && headers[0].header.number() - headers[1].header.number() > 1; if is_subchain_heads { trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); From 7d1bc76759b7294e726c63c30af7e646e7557709 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 11 Sep 2018 17:24:55 +0100 Subject: [PATCH 14/42] Fix is_subchain_heads check and comment --- ethcore/sync/src/block_sync.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index dde19168e44..da79d3c19bc 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -286,10 +286,11 @@ impl BlockDownloader { // with gaps to be filled. However if the round is reset // while in State::Blocks then its possible to receive // stale responses for the subchain heads in the gap. In - // this case the headers will have consecutive numbers so can be ignored here. + // this case the headers will have consecutive numbers + // so can be ignored here. let is_subchain_heads = headers.len() == 1 || headers.len() > 1 - && headers[0].header.number() - headers[1].header.number() > 1; + && headers[1].header.number() - headers[0].header.number() > 1; if is_subchain_heads { trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); @@ -297,7 +298,7 @@ impl BlockDownloader { self.state = State::Blocks; return Ok(DownloadAction::Reset); } else { - debug!(target: "sync", "Ignoring consecutive headers. Expected subchains with gap."); + debug!(target: "sync", "Ignoring consecutive headers: expected subchain headers with gap"); } } else { let best = io.chain().chain_info().best_block_number; From 9588e5a2b8b42e5b1cc4fd8717517c8f30f9584c Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Wed, 12 Sep 2018 17:22:29 +0100 Subject: [PATCH 15/42] Prevent premature round completion after restart This is a problem on mainnet where multiple stale peer requests will force many rounds to complete quickly, forcing the retraction. --- ethcore/sync/src/block_sync.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index da79d3c19bc..4f7e7aa0aa5 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -299,6 +299,7 @@ impl BlockDownloader { return Ok(DownloadAction::Reset); } else { debug!(target: "sync", "Ignoring consecutive headers: expected subchain headers with gap"); + return Err(BlockDownloaderImportError::Useless); } } else { let best = io.chain().chain_info().best_block_number; From 9b317559c21ac2c916e4ca8893bbd509f96ee138 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Thu, 13 Sep 2018 11:00:02 +0100 Subject: [PATCH 16/42] Reset stale old blocks request after queue full --- ethcore/sync/src/chain/mod.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 06bd3febab6..858bbd05252 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -861,6 +861,12 @@ impl ChainSync { }, BlockSet::OldBlocks => { if self.old_blocks.as_mut().map_or(false, |downloader| { downloader.collect_blocks(io, false) == Err(DownloaderImportError::Invalid) }) { + // reset stale requests from last sync round + for (_, ref mut p) in &mut self.peers { + if p.block_set == Some(BlockSet::OldBlocks) { + p.reset_asking(); + } + } self.restart(io); } else if self.old_blocks.as_ref().map_or(false, |downloader| { downloader.is_complete() }) { trace!(target: "sync", "Background block download is complete"); From 8895cc63d9db209ac9b1968ad3228d578c09eaf0 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Thu, 13 Sep 2018 15:37:47 +0100 Subject: [PATCH 17/42] Revert "Reject headers with no gaps when ChainHead" This reverts commit 0eb865539e5dee37ab34f168f5fb643300de5ace. --- ethcore/sync/src/block_sync.rs | 24 +++++------------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 4f7e7aa0aa5..137367036af 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -282,25 +282,11 @@ impl BlockDownloader { match self.state { State::ChainHead => { if !headers.is_empty() { - // When the round starts, subchain headers are requested - // with gaps to be filled. However if the round is reset - // while in State::Blocks then its possible to receive - // stale responses for the subchain heads in the gap. In - // this case the headers will have consecutive numbers - // so can be ignored here. - let is_subchain_heads = headers.len() == 1 - || headers.len() > 1 - && headers[1].header.number() - headers[0].header.number() > 1; - - if is_subchain_heads { - trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); - self.blocks.reset_to(hashes); - self.state = State::Blocks; - return Ok(DownloadAction::Reset); - } else { - debug!(target: "sync", "Ignoring consecutive headers: expected subchain headers with gap"); - return Err(BlockDownloaderImportError::Useless); - } + // TODO: validate heads better. E.g. check that there is enough distance between blocks. + trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); + self.blocks.reset_to(hashes); + self.state = State::Blocks; + return Ok(DownloadAction::Reset); } else { let best = io.chain().chain_info().best_block_number; let oldest_reorg = io.chain().pruning_info().earliest_state; From 6e4a560deb06b09b1a8067d4340f677b2e911697 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Thu, 13 Sep 2018 15:51:41 +0100 Subject: [PATCH 18/42] Add BlockSet to BlockDownloader logging Currently it is difficult to debug this because there are two instances, one for OldBlocks and one for NewBlocks. This adds the BlockSet to all log messages for easy log filtering. --- ethcore/sync/src/block_sync.rs | 116 ++++++++++++++++++--------------- ethcore/sync/src/chain/mod.rs | 6 +- 2 files changed, 65 insertions(+), 57 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 137367036af..a51b192dbde 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -28,6 +28,7 @@ use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKi use ethcore::error::{ImportErrorKind, QueueErrorKind, BlockError}; use sync_io::SyncIo; use blocks::{BlockCollection, SyncBody, SyncHeader}; +use chain::BlockSet; const MAX_HEADERS_TO_REQUEST: usize = 128; const MAX_BODIES_TO_REQUEST: usize = 32; @@ -36,6 +37,25 @@ const SUBCHAIN_SIZE: u64 = 256; const MAX_ROUND_PARENTS: usize = 16; const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5; +// logging macros prepend BlockSet context for log filtering +macro_rules! trace_sync { + ($self:ident, target: $target:expr, $fmt:expr, $($arg:tt)+) => { + trace!(target: $target, concat!("{:?}: ", $fmt), $self.block_set, $($arg)+); + }; + ($self:ident, target: $target:expr, $fmt:expr) => { + trace!(target: $target, concat!("{:?}: ", $fmt), $self.block_set); + }; +} + +macro_rules! debug_sync { + ($self:ident, target: $target:expr, $fmt:expr, $($arg:tt)+) => { + debug!(target: $target, concat!("{:?}: ", $fmt), $self.block_set, $($arg)+); + }; + ($self:ident, target: $target:expr, $fmt:expr) => { + debug!(target: $target, concat!("{:?}: ", $fmt), $self.block_set); + }; +} + #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Downloader state pub enum State { @@ -89,6 +109,8 @@ impl From for BlockDownloaderImportError { /// Block downloader strategy. /// Manages state and block data for a block download process. pub struct BlockDownloader { + /// Which set of blocks to download + block_set: BlockSet, /// Downloader state state: State, /// Highest block number seen @@ -117,29 +139,15 @@ pub struct BlockDownloader { } impl BlockDownloader { - /// Create a new instance of syncing strategy. This won't reorganize to before the - /// last kept state. - pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self { - BlockDownloader { - state: State::Idle, - highest_block: None, - last_imported_block: start_number, - last_imported_hash: start_hash.clone(), - last_round_start: start_number, - last_round_start_hash: start_hash.clone(), - blocks: BlockCollection::new(sync_receipts), - imported_this_round: None, - round_parents: VecDeque::new(), - download_receipts: sync_receipts, - target_hash: None, - retract_step: 1, - limit_reorg: true, - } - } - - /// Create a new instance of sync with unlimited reorg allowed. - pub fn with_unlimited_reorg(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self { + /// Create a new instance of syncing strategy. + /// For BlockSet::NewBlocks this won't reorganize to before the last kept state. + pub fn new(block_set: BlockSet, start_hash: &H256, start_number: BlockNumber) -> Self { + let (limit_reorg, sync_receipts) = match block_set { + BlockSet::NewBlocks => (true, false), + BlockSet::OldBlocks => (false, true) + }; BlockDownloader { + block_set: block_set, state: State::Idle, highest_block: None, last_imported_block: start_number, @@ -152,7 +160,7 @@ impl BlockDownloader { download_receipts: sync_receipts, target_hash: None, retract_step: 1, - limit_reorg: false, + limit_reorg: limit_reorg, } } @@ -223,7 +231,7 @@ impl BlockDownloader { pub fn import_headers(&mut self, io: &mut SyncIo, r: &Rlp, expected_hash: Option) -> Result { let item_count = r.item_count().unwrap_or(0); if self.state == State::Idle { - trace!(target: "sync", "Ignored unexpected block headers"); + trace_sync!(self, target: "sync", "Ignored unexpected block headers"); return Ok(DownloadAction::None) } if item_count == 0 && (self.state == State::Blocks) { @@ -246,7 +254,7 @@ impl BlockDownloader { } any_known = any_known || self.blocks.contains_head(&hash); if self.blocks.contains(&hash) { - trace!(target: "sync", "Skipping existing block header {} ({:?})", number, hash); + trace_sync!(self, target: "sync", "Skipping existing block header {} ({:?})", number, hash); continue; } @@ -257,8 +265,8 @@ impl BlockDownloader { match io.chain().block_status(BlockId::Hash(hash.clone())) { BlockStatus::InChain | BlockStatus::Queued => { match self.state { - State::Blocks => trace!(target: "sync", "Header already in chain {} ({})", number, hash), - _ => trace!(target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state), + State::Blocks => trace_sync!(self, target: "sync", "Header already in chain {} ({})", number, hash), + _ => trace_sync!(self, target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state), } headers.push(info); hashes.push(hash); @@ -275,7 +283,7 @@ impl BlockDownloader { // Disable the peer for this syncing round if it gives invalid chain if !valid_response { - trace!(target: "sync", "Invalid headers response"); + trace_sync!(self, target: "sync", "Invalid headers response"); return Err(BlockDownloaderImportError::Invalid); } @@ -283,7 +291,7 @@ impl BlockDownloader { State::ChainHead => { if !headers.is_empty() { // TODO: validate heads better. E.g. check that there is enough distance between blocks. - trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); + trace_sync!(self, target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); self.blocks.reset_to(hashes); self.state = State::Blocks; return Ok(DownloadAction::Reset); @@ -292,7 +300,7 @@ impl BlockDownloader { let oldest_reorg = io.chain().pruning_info().earliest_state; let last = self.last_imported_block; if self.limit_reorg && best > last && (last == 0 || last < oldest_reorg) { - trace!(target: "sync", "No common block, disabling peer"); + trace_sync!(self, target: "sync", "No common block, disabling peer"); return Err(BlockDownloaderImportError::Invalid); } } @@ -301,13 +309,13 @@ impl BlockDownloader { let count = headers.len(); // At least one of the heades must advance the subchain. Otherwise they are all useless. if count == 0 || !any_known { - trace!(target: "sync", "No useful headers"); + trace_sync!(self, target: "sync", "No useful headers"); return Err(BlockDownloaderImportError::Useless); } self.blocks.insert_headers(headers); - trace!(target: "sync", "Inserted {} headers", count); + trace_sync!(self, target: "sync", "Inserted {} headers", count); }, - _ => trace!(target: "sync", "Unexpected headers({})", headers.len()), + _ => trace_sync!(self, target: "sync", "Unexpected headers({})", headers.len()), } Ok(DownloadAction::None) @@ -319,7 +327,7 @@ impl BlockDownloader { if item_count == 0 { return Err(BlockDownloaderImportError::Useless); } else if self.state != State::Blocks { - trace!(target: "sync", "Ignored unexpected block bodies"); + trace_sync!(self, target: "sync", "Ignored unexpected block bodies"); } else { let mut bodies = Vec::with_capacity(item_count); for i in 0..item_count { @@ -328,7 +336,7 @@ impl BlockDownloader { } if self.blocks.insert_bodies(bodies) != item_count { - trace!(target: "sync", "Deactivating peer for giving invalid block bodies"); + trace_sync!(self, target: "sync", "Deactivating peer for giving invalid block bodies"); return Err(BlockDownloaderImportError::Invalid); } } @@ -342,19 +350,19 @@ impl BlockDownloader { return Err(BlockDownloaderImportError::Useless); } else if self.state != State::Blocks { - trace!(target: "sync", "Ignored unexpected block receipts"); + trace_sync!(self, target: "sync", "Ignored unexpected block receipts"); } else { let mut receipts = Vec::with_capacity(item_count); for i in 0..item_count { let receipt = r.at(i).map_err(|e| { - trace!(target: "sync", "Error decoding block receipts RLP: {:?}", e); + trace_sync!(self, target: "sync", "Error decoding block receipts RLP: {:?}", e); BlockDownloaderImportError::Invalid })?; receipts.push(receipt.as_raw().to_vec()); } if self.blocks.insert_receipts(receipts) != item_count { - trace!(target: "sync", "Deactivating peer for giving invalid block receipts"); + trace_sync!(self, target: "sync", "Deactivating peer for giving invalid block receipts"); return Err(BlockDownloaderImportError::Invalid); } } @@ -363,7 +371,7 @@ impl BlockDownloader { fn start_sync_round(&mut self, io: &mut SyncIo) { self.state = State::ChainHead; - trace!(target: "sync", "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block); + trace_sync!(self, target: "sync", "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block); // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even // from the non-canonical part of the tree. So we also retract if nothing has been imported last round. let start = self.last_round_start; @@ -375,12 +383,12 @@ impl BlockDownloader { if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == start_hash) { self.last_imported_block = start - 1; self.last_imported_hash = p.clone(); - trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); + trace_sync!(self, target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); } else { let best = io.chain().chain_info().best_block_number; let oldest_reorg = io.chain().pruning_info().earliest_state; if self.limit_reorg && best > start && start < oldest_reorg { - debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash); + debug_sync!(self, target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash); self.reset(); } else { let n = start - cmp::min(self.retract_step, start); @@ -389,10 +397,10 @@ impl BlockDownloader { Some(h) => { self.last_imported_block = n; self.last_imported_hash = h; - trace!(target: "sync", "Searching common header in the blockchain {} ({})", start, self.last_imported_hash); + trace_sync!(self, target: "sync", "Searching common header in the blockchain {} ({})", start, self.last_imported_hash); } None => { - debug!(target: "sync", "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash); + debug_sync!(self, target: "sync", "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash); self.reset(); } } @@ -420,7 +428,7 @@ impl BlockDownloader { State::ChainHead => { if num_active_peers < MAX_PARALLEL_SUBCHAIN_DOWNLOAD { // Request subchain headers - trace!(target: "sync", "Starting sync with better chain"); + trace_sync!(self, target: "sync", "Starting sync with better chain"); // Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that // MAX_HEADERS_TO_REQUEST would include headers for neighbouring subchains return Some(BlockRequest::Headers { @@ -478,7 +486,7 @@ impl BlockDownloader { if self.target_hash.as_ref().map_or(false, |t| t == &h) { self.state = State::Complete; - trace!(target: "sync", "Sync target reached"); + trace_sync!(self, target: "sync", "Sync target reached"); return Ok(()); } @@ -490,15 +498,15 @@ impl BlockDownloader { match result { Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain), _)) => { - trace!(target: "sync", "Block already in chain {:?}", h); + trace_sync!(self, target: "sync", "Block already in chain {:?}", h); self.block_imported(&h, number, &parent); }, Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyQueued), _)) => { - trace!(target: "sync", "Block already queued {:?}", h); + trace_sync!(self, target: "sync", "Block already queued {:?}", h); self.block_imported(&h, number, &parent); }, Ok(_) => { - trace!(target: "sync", "Block queued {:?}", h); + trace_sync!(self, target: "sync", "Block queued {:?}", h); imported.insert(h.clone()); self.block_imported(&h, number, &parent); }, @@ -506,26 +514,26 @@ impl BlockDownloader { break; }, Err(BlockImportError(BlockImportErrorKind::Block(BlockError::UnknownParent(_)), _)) => { - trace!(target: "sync", "Unknown new block parent, restarting sync"); + trace_sync!(self, target: "sync", "Unknown new block parent, restarting sync"); break; }, Err(BlockImportError(BlockImportErrorKind::Block(BlockError::TemporarilyInvalid(_)), _)) => { - debug!(target: "sync", "Block temporarily invalid, restarting sync"); + debug_sync!(self, target: "sync", "Block temporarily invalid: {:?}, restarting sync", h); break; }, Err(BlockImportError(BlockImportErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => { - debug!(target: "sync", "Block import queue full ({}), restarting sync", limit); + debug_sync!(self, target: "sync", "Block import queue full ({}), restarting sync", limit); bad = true; break; }, Err(e) => { - debug!(target: "sync", "Bad block {:?} : {:?}", h, e); + debug_sync!(self, target: "sync", "Bad block {:?} : {:?}", h, e); bad = true; break; } } } - trace!(target: "sync", "Imported {} of {}", imported.len(), count); + trace_sync!(self, target: "sync", "Imported {} of {}", imported.len(), count); self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len()); if bad { @@ -534,7 +542,7 @@ impl BlockDownloader { if self.blocks.is_empty() { // complete sync round - trace!(target: "sync", "Sync round complete"); + trace_sync!(self, target: "sync", "Sync round complete"); self.reset(); } Ok(()) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 858bbd05252..9444ea2956b 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -429,7 +429,7 @@ impl ChainSync { peers: HashMap::new(), handshaking_peers: HashMap::new(), active_peers: HashSet::new(), - new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number), + new_blocks: BlockDownloader::new(BlockSet::NewBlocks, &chain_info.best_block_hash, chain_info.best_block_number), old_blocks: None, last_sent_block_number: 0, network_id: config.network_id, @@ -637,13 +637,13 @@ impl ChainSync { pub fn update_targets(&mut self, chain: &BlockChainClient) { // Do not assume that the block queue/chain still has our last_imported_block let chain = chain.chain_info(); - self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number); + self.new_blocks = BlockDownloader::new(BlockSet::NewBlocks, &chain.best_block_hash, chain.best_block_number); self.old_blocks = None; if self.download_old_blocks { if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) { trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number); - let mut downloader = BlockDownloader::with_unlimited_reorg(true, &ancient_block_hash, ancient_block_number); + let mut downloader = BlockDownloader::new(BlockSet::OldBlocks, &ancient_block_hash, ancient_block_number); if let Some(hash) = chain.first_block_hash { trace!(target: "sync", "Downloader target set to {:?}", hash); downloader.set_target(&hash); From 6612f9626be4466552b75f65f0039264bd6b84e1 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Fri, 14 Sep 2018 12:23:36 +0100 Subject: [PATCH 19/42] Reset OldBlocks download from last enqueued Previously when the ancient block queue was full it would restart the download from the last imported block, so the ones still in the queue would be redownloaded. Keeping the existing downloader instance and just resetting it will start again from the last enqueued block.:wq --- ethcore/sync/src/chain/mod.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 9444ea2956b..77622fe456f 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -860,15 +860,21 @@ impl ChainSync { } }, BlockSet::OldBlocks => { - if self.old_blocks.as_mut().map_or(false, |downloader| { downloader.collect_blocks(io, false) == Err(DownloaderImportError::Invalid) }) { - // reset stale requests from last sync round - for (_, ref mut p) in &mut self.peers { - if p.block_set == Some(BlockSet::OldBlocks) { - p.reset_asking(); + let mut is_complete = false; + if let Some(downloader) = self.old_blocks.as_mut() { + if downloader.collect_blocks(io, false) == Err(DownloaderImportError::Invalid) { + // reset in flight requests in order to prevent them being handled in the next round + for (_, ref mut p) in &mut self.peers { + if p.block_set == Some(BlockSet::OldBlocks) { + p.reset_asking(); + } } + downloader.reset(); } - self.restart(io); - } else if self.old_blocks.as_ref().map_or(false, |downloader| { downloader.is_complete() }) { + is_complete = downloader.is_complete(); + } + + if is_complete { trace!(target: "sync", "Background block download is complete"); self.old_blocks = None; } From e72ad1418a3e7d70f7b5b37c52097aa659c72d75 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Fri, 14 Sep 2018 14:07:36 +0100 Subject: [PATCH 20/42] Ignore expired Body and Receipt requests --- ethcore/sync/src/chain/handler.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 15f234fbbfe..3db60c070db 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -292,7 +292,9 @@ impl SyncHandler { let block_set = sync.peers.get(&peer_id) .and_then(|p| p.block_set) .unwrap_or(BlockSet::NewBlocks); - if !sync.reset_peer_asking(peer_id, PeerAsking::BlockBodies) { + let allowed = sync.peers.get(&peer_id).map(|p| p.is_allowed()).unwrap_or(false); + + if !sync.reset_peer_asking(peer_id, PeerAsking::BlockBodies) || !allowed { trace!(target: "sync", "{}: Ignored unexpected bodies", peer_id); return Ok(()); } @@ -415,7 +417,8 @@ impl SyncHandler { fn on_peer_block_receipts(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { sync.clear_peer_download(peer_id); let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks); - if !sync.reset_peer_asking(peer_id, PeerAsking::BlockReceipts) { + let allowed = sync.peers.get(&peer_id).map(|p| p.is_allowed()).unwrap_or(false); + if !sync.reset_peer_asking(peer_id, PeerAsking::BlockReceipts) || !allowed { trace!(target: "sync", "{}: Ignored unexpected receipts", peer_id); return Ok(()); } From 3d34ed57e6b46d9c0603d1c4fcdd3efec626eb25 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Fri, 14 Sep 2018 14:42:37 +0100 Subject: [PATCH 21/42] Log when ancient block download being restarted --- ethcore/sync/src/chain/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 77622fe456f..2a7aaa326b8 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -863,10 +863,12 @@ impl ChainSync { let mut is_complete = false; if let Some(downloader) = self.old_blocks.as_mut() { if downloader.collect_blocks(io, false) == Err(DownloaderImportError::Invalid) { + trace!(target: "sync", "Restarting OldBlocks download"); // reset in flight requests in order to prevent them being handled in the next round - for (_, ref mut p) in &mut self.peers { + for (pid, ref mut p) in &mut self.peers { if p.block_set == Some(BlockSet::OldBlocks) { p.reset_asking(); + debug!(target: "sync", "Reset peer asking OldBlocks {:?}", pid); } } downloader.reset(); From 3fe9c92e17d3bfd3ce0b60875f4f173ea908ae91 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Mon, 17 Sep 2018 12:02:59 +0100 Subject: [PATCH 22/42] Only request old blocks from peers with >= difficulty https://github.com/paritytech/parity-ethereum/pull/9226 might be too permissive and causing the behaviour of the retraction soon after the fork block. With this change the peer difficulty has to be greater than or euqal to our syncing difficulty, so should still fix https://github.com/paritytech/parity-ethereum/issues/9225 --- ethcore/sync/src/chain/mod.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 2a7aaa326b8..432022a0fef 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -762,12 +762,10 @@ impl ChainSync { } } - // Only ask for old blocks if the peer has a higher difficulty than the last imported old block - let last_imported_old_block_difficulty = self.old_blocks.as_mut().and_then(|d| { - io.chain().block_total_difficulty(BlockId::Number(d.last_imported_block_number())) - }); + // Only ask for old blocks if the peer has an equal or higher difficulty + let equal_or_higher_difficulty = peer_difficulty.map_or(false, |pd| pd >= syncing_difficulty); - if force || last_imported_old_block_difficulty.map_or(true, |ld| peer_difficulty.map_or(true, |pd| pd > ld)) { + if force || equal_or_higher_difficulty { if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(io, num_active_peers)) { SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks); return; @@ -775,9 +773,9 @@ impl ChainSync { } else { trace!( target: "sync", - "peer {:?} is not suitable for requesting old blocks, last_imported_old_block_difficulty={:?}, peer_difficulty={:?}", + "peer {:?} is not suitable for requesting old blocks, syncing_difficulty={:?}, peer_difficulty={:?}", peer_id, - last_imported_old_block_difficulty, + syncing_difficulty, peer_difficulty ); self.deactivate_peer(io, peer_id); From 757641d9b817ae8b63fec684759b0815af9c4d0e Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 18 Sep 2018 17:58:39 +0100 Subject: [PATCH 23/42] Some logging and clear stalled blocks head --- ethcore/sync/src/block_sync.rs | 4 ++++ ethcore/sync/src/blocks.rs | 12 ++++++++++++ 2 files changed, 16 insertions(+) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index a51b192dbde..ad4b9dc7001 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -292,6 +292,10 @@ impl BlockDownloader { if !headers.is_empty() { // TODO: validate heads better. E.g. check that there is enough distance between blocks. trace_sync!(self, target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); + match self.block_set { + BlockSet::NewBlocks => (), + BlockSet::OldBlocks => trace_sync!(self, target: "sync", "ChainHead: reset_to {:?}", hashes), + } self.blocks.reset_to(hashes); self.state = State::Blocks; return Ok(DownloadAction::Reset); diff --git a/ethcore/sync/src/blocks.rs b/ethcore/sync/src/blocks.rs index 3815084f8f1..0361098a181 100644 --- a/ethcore/sync/src/blocks.rs +++ b/ethcore/sync/src/blocks.rs @@ -387,6 +387,15 @@ impl BlockCollection { } trace!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head); + + // reset if head stuck, not downloading any block bodies or receipts + // for now a crude heuristic that may not cover all cases + // assumes that at least some bodies will be downloaded before all headers + // 256 = subchain heads length and 128 = subchain length. This limit could be an argument. + if drained.len() == 0 && self.blocks.len() > 128 * 256 { + info!(target: "sync", "Resetting blocks. Current head {:?}, blocks length {:?}, heads {:?}", self.head, self.blocks.len(), self.heads); + } + drained } @@ -569,6 +578,9 @@ impl BlockCollection { } } } + if self.need_receipts { + trace!(target: "sync", "update_heads: {:?}", new_heads); + } self.heads = new_heads; } } From 54614c8ac5f252337c9a2651cd680f49cc557ad8 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Wed, 19 Sep 2018 11:08:02 +0100 Subject: [PATCH 24/42] Revert "Some logging and clear stalled blocks head" This reverts commit 757641d9b817ae8b63fec684759b0815af9c4d0e. --- ethcore/sync/src/block_sync.rs | 4 ---- ethcore/sync/src/blocks.rs | 12 ------------ 2 files changed, 16 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index ad4b9dc7001..a51b192dbde 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -292,10 +292,6 @@ impl BlockDownloader { if !headers.is_empty() { // TODO: validate heads better. E.g. check that there is enough distance between blocks. trace_sync!(self, target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); - match self.block_set { - BlockSet::NewBlocks => (), - BlockSet::OldBlocks => trace_sync!(self, target: "sync", "ChainHead: reset_to {:?}", hashes), - } self.blocks.reset_to(hashes); self.state = State::Blocks; return Ok(DownloadAction::Reset); diff --git a/ethcore/sync/src/blocks.rs b/ethcore/sync/src/blocks.rs index 0361098a181..3815084f8f1 100644 --- a/ethcore/sync/src/blocks.rs +++ b/ethcore/sync/src/blocks.rs @@ -387,15 +387,6 @@ impl BlockCollection { } trace!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head); - - // reset if head stuck, not downloading any block bodies or receipts - // for now a crude heuristic that may not cover all cases - // assumes that at least some bodies will be downloaded before all headers - // 256 = subchain heads length and 128 = subchain length. This limit could be an argument. - if drained.len() == 0 && self.blocks.len() > 128 * 256 { - info!(target: "sync", "Resetting blocks. Current head {:?}, blocks length {:?}, heads {:?}", self.head, self.blocks.len(), self.heads); - } - drained } @@ -578,9 +569,6 @@ impl BlockCollection { } } } - if self.need_receipts { - trace!(target: "sync", "update_heads: {:?}", new_heads); - } self.heads = new_heads; } } From 39166d0834a03310bb110d08b493f3a3b653c219 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Wed, 19 Sep 2018 18:09:45 +0100 Subject: [PATCH 25/42] Reset stalled header if useless more than once --- ethcore/sync/src/block_sync.rs | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index a51b192dbde..135c23096b3 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -117,9 +117,9 @@ pub struct BlockDownloader { highest_block: Option, /// Downloaded blocks, holds `H`, `B` and `S` blocks: BlockCollection, - /// Last impoted block number + /// Last imported block number last_imported_block: BlockNumber, - /// Last impoted block hash + /// Last imported block hash last_imported_hash: H256, /// Number of blocks imported this round imported_this_round: Option, @@ -136,6 +136,8 @@ pub struct BlockDownloader { retract_step: u64, /// Whether reorg should be limited. limit_reorg: bool, + /// TODO + useless_requested_hashes: HashSet, } impl BlockDownloader { @@ -161,6 +163,7 @@ impl BlockDownloader { target_hash: None, retract_step: 1, limit_reorg: limit_reorg, + useless_requested_hashes: HashSet::new(), } } @@ -307,11 +310,26 @@ impl BlockDownloader { }, State::Blocks => { let count = headers.len(); + // At least one of the heades must advance the subchain. Otherwise they are all useless. if count == 0 || !any_known { trace_sync!(self, target: "sync", "No useful headers"); + if let Some(eh) = expected_hash { + if self.useless_requested_hashes.contains(&eh) { + trace_sync!(self, target: "sync", + "Received consecutive sets of useless headers from requested header {:?}. Resetting sync and disabling peer", eh); + self.reset(); + self.useless_requested_hashes.clear(); + return Err(BlockDownloaderImportError::Invalid); + } else { + self.useless_requested_hashes.insert(eh); + } + } return Err(BlockDownloaderImportError::Useless); } + if let Some(eh) = expected_hash { + self.useless_requested_hashes.remove(&eh); + } self.blocks.insert_headers(headers); trace_sync!(self, target: "sync", "Inserted {} headers", count); }, From eea2f8d1dae8348625de26be467fb1c3c8ee7452 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Thu, 20 Sep 2018 11:21:59 +0100 Subject: [PATCH 26/42] Store useless headers in HashSet --- ethcore/sync/src/block_sync.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 135c23096b3..1493f64dc2c 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -313,16 +313,14 @@ impl BlockDownloader { // At least one of the heades must advance the subchain. Otherwise they are all useless. if count == 0 || !any_known { - trace_sync!(self, target: "sync", "No useful headers"); + trace_sync!(self, target: "sync", "No useful headers, expected hash {:?}", expected_hash); if let Some(eh) = expected_hash { - if self.useless_requested_hashes.contains(&eh) { + if !self.useless_requested_hashes.insert(eh) { trace_sync!(self, target: "sync", - "Received consecutive sets of useless headers from requested header {:?}. Resetting sync and disabling peer", eh); + "Received consecutive sets of useless headers from requested header {:?}. Resetting sync", eh); self.reset(); self.useless_requested_hashes.clear(); return Err(BlockDownloaderImportError::Invalid); - } else { - self.useless_requested_hashes.insert(eh); } } return Err(BlockDownloaderImportError::Useless); From 535dc63618519a8618e4e029a53e133e11ddf428 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Thu, 20 Sep 2018 12:00:46 +0100 Subject: [PATCH 27/42] Add sync target to logging macro --- ethcore/sync/src/block_sync.rs | 70 +++++++++++++++++----------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index a51b192dbde..ee0a1b8a3b5 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -39,11 +39,11 @@ const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5; // logging macros prepend BlockSet context for log filtering macro_rules! trace_sync { - ($self:ident, target: $target:expr, $fmt:expr, $($arg:tt)+) => { - trace!(target: $target, concat!("{:?}: ", $fmt), $self.block_set, $($arg)+); + ($self:ident, $fmt:expr, $($arg:tt)+) => { + trace!(target: "sync", concat!("{:?}: ", $fmt), $self.block_set, $($arg)+); }; - ($self:ident, target: $target:expr, $fmt:expr) => { - trace!(target: $target, concat!("{:?}: ", $fmt), $self.block_set); + ($self:ident, $fmt:expr) => { + trace!(target: "sync", concat!("{:?}: ", $fmt), $self.block_set); }; } @@ -231,7 +231,7 @@ impl BlockDownloader { pub fn import_headers(&mut self, io: &mut SyncIo, r: &Rlp, expected_hash: Option) -> Result { let item_count = r.item_count().unwrap_or(0); if self.state == State::Idle { - trace_sync!(self, target: "sync", "Ignored unexpected block headers"); + trace_sync!(self, "Ignored unexpected block headers"); return Ok(DownloadAction::None) } if item_count == 0 && (self.state == State::Blocks) { @@ -254,7 +254,7 @@ impl BlockDownloader { } any_known = any_known || self.blocks.contains_head(&hash); if self.blocks.contains(&hash) { - trace_sync!(self, target: "sync", "Skipping existing block header {} ({:?})", number, hash); + trace_sync!(self, "Skipping existing block header {} ({:?})", number, hash); continue; } @@ -265,8 +265,8 @@ impl BlockDownloader { match io.chain().block_status(BlockId::Hash(hash.clone())) { BlockStatus::InChain | BlockStatus::Queued => { match self.state { - State::Blocks => trace_sync!(self, target: "sync", "Header already in chain {} ({})", number, hash), - _ => trace_sync!(self, target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state), + State::Blocks => trace_sync!(self, "Header already in chain {} ({})", number, hash), + _ => trace_sync!(self, "Header already in chain {} ({}), state = {:?}", number, hash, self.state), } headers.push(info); hashes.push(hash); @@ -283,7 +283,7 @@ impl BlockDownloader { // Disable the peer for this syncing round if it gives invalid chain if !valid_response { - trace_sync!(self, target: "sync", "Invalid headers response"); + trace_sync!(self, "Invalid headers response"); return Err(BlockDownloaderImportError::Invalid); } @@ -291,7 +291,7 @@ impl BlockDownloader { State::ChainHead => { if !headers.is_empty() { // TODO: validate heads better. E.g. check that there is enough distance between blocks. - trace_sync!(self, target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); + trace_sync!(self, "Received {} subchain heads, proceeding to download", headers.len()); self.blocks.reset_to(hashes); self.state = State::Blocks; return Ok(DownloadAction::Reset); @@ -300,7 +300,7 @@ impl BlockDownloader { let oldest_reorg = io.chain().pruning_info().earliest_state; let last = self.last_imported_block; if self.limit_reorg && best > last && (last == 0 || last < oldest_reorg) { - trace_sync!(self, target: "sync", "No common block, disabling peer"); + trace_sync!(self, "No common block, disabling peer"); return Err(BlockDownloaderImportError::Invalid); } } @@ -309,13 +309,13 @@ impl BlockDownloader { let count = headers.len(); // At least one of the heades must advance the subchain. Otherwise they are all useless. if count == 0 || !any_known { - trace_sync!(self, target: "sync", "No useful headers"); + trace_sync!(self, "No useful headers"); return Err(BlockDownloaderImportError::Useless); } self.blocks.insert_headers(headers); - trace_sync!(self, target: "sync", "Inserted {} headers", count); + trace_sync!(self, "Inserted {} headers", count); }, - _ => trace_sync!(self, target: "sync", "Unexpected headers({})", headers.len()), + _ => trace_sync!(self, "Unexpected headers({})", headers.len()), } Ok(DownloadAction::None) @@ -327,7 +327,7 @@ impl BlockDownloader { if item_count == 0 { return Err(BlockDownloaderImportError::Useless); } else if self.state != State::Blocks { - trace_sync!(self, target: "sync", "Ignored unexpected block bodies"); + trace_sync!(self, "Ignored unexpected block bodies"); } else { let mut bodies = Vec::with_capacity(item_count); for i in 0..item_count { @@ -336,7 +336,7 @@ impl BlockDownloader { } if self.blocks.insert_bodies(bodies) != item_count { - trace_sync!(self, target: "sync", "Deactivating peer for giving invalid block bodies"); + trace_sync!(self, "Deactivating peer for giving invalid block bodies"); return Err(BlockDownloaderImportError::Invalid); } } @@ -350,19 +350,19 @@ impl BlockDownloader { return Err(BlockDownloaderImportError::Useless); } else if self.state != State::Blocks { - trace_sync!(self, target: "sync", "Ignored unexpected block receipts"); + trace_sync!(self, "Ignored unexpected block receipts"); } else { let mut receipts = Vec::with_capacity(item_count); for i in 0..item_count { let receipt = r.at(i).map_err(|e| { - trace_sync!(self, target: "sync", "Error decoding block receipts RLP: {:?}", e); + trace_sync!(self, "Error decoding block receipts RLP: {:?}", e); BlockDownloaderImportError::Invalid })?; receipts.push(receipt.as_raw().to_vec()); } if self.blocks.insert_receipts(receipts) != item_count { - trace_sync!(self, target: "sync", "Deactivating peer for giving invalid block receipts"); + trace_sync!(self, "Deactivating peer for giving invalid block receipts"); return Err(BlockDownloaderImportError::Invalid); } } @@ -371,7 +371,7 @@ impl BlockDownloader { fn start_sync_round(&mut self, io: &mut SyncIo) { self.state = State::ChainHead; - trace_sync!(self, target: "sync", "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block); + trace_sync!(self, "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block); // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even // from the non-canonical part of the tree. So we also retract if nothing has been imported last round. let start = self.last_round_start; @@ -383,12 +383,12 @@ impl BlockDownloader { if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == start_hash) { self.last_imported_block = start - 1; self.last_imported_hash = p.clone(); - trace_sync!(self, target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); + trace_sync!(self, "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); } else { let best = io.chain().chain_info().best_block_number; let oldest_reorg = io.chain().pruning_info().earliest_state; if self.limit_reorg && best > start && start < oldest_reorg { - debug_sync!(self, target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash); + debug_sync!(self, "Could not revert to previous ancient block, last: {} ({})", start, start_hash); self.reset(); } else { let n = start - cmp::min(self.retract_step, start); @@ -397,10 +397,10 @@ impl BlockDownloader { Some(h) => { self.last_imported_block = n; self.last_imported_hash = h; - trace_sync!(self, target: "sync", "Searching common header in the blockchain {} ({})", start, self.last_imported_hash); + trace_sync!(self, "Searching common header in the blockchain {} ({})", start, self.last_imported_hash); } None => { - debug_sync!(self, target: "sync", "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash); + debug_sync!(self, "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash); self.reset(); } } @@ -428,7 +428,7 @@ impl BlockDownloader { State::ChainHead => { if num_active_peers < MAX_PARALLEL_SUBCHAIN_DOWNLOAD { // Request subchain headers - trace_sync!(self, target: "sync", "Starting sync with better chain"); + trace_sync!(self, "Starting sync with better chain"); // Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that // MAX_HEADERS_TO_REQUEST would include headers for neighbouring subchains return Some(BlockRequest::Headers { @@ -486,7 +486,7 @@ impl BlockDownloader { if self.target_hash.as_ref().map_or(false, |t| t == &h) { self.state = State::Complete; - trace_sync!(self, target: "sync", "Sync target reached"); + trace_sync!(self, "Sync target reached"); return Ok(()); } @@ -498,15 +498,15 @@ impl BlockDownloader { match result { Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain), _)) => { - trace_sync!(self, target: "sync", "Block already in chain {:?}", h); + trace_sync!(self, "Block already in chain {:?}", h); self.block_imported(&h, number, &parent); }, Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyQueued), _)) => { - trace_sync!(self, target: "sync", "Block already queued {:?}", h); + trace_sync!(self, "Block already queued {:?}", h); self.block_imported(&h, number, &parent); }, Ok(_) => { - trace_sync!(self, target: "sync", "Block queued {:?}", h); + trace_sync!(self, "Block queued {:?}", h); imported.insert(h.clone()); self.block_imported(&h, number, &parent); }, @@ -514,26 +514,26 @@ impl BlockDownloader { break; }, Err(BlockImportError(BlockImportErrorKind::Block(BlockError::UnknownParent(_)), _)) => { - trace_sync!(self, target: "sync", "Unknown new block parent, restarting sync"); + trace_sync!(self, "Unknown new block parent, restarting sync"); break; }, Err(BlockImportError(BlockImportErrorKind::Block(BlockError::TemporarilyInvalid(_)), _)) => { - debug_sync!(self, target: "sync", "Block temporarily invalid: {:?}, restarting sync", h); + debug_sync!(self, "Block temporarily invalid: {:?}, restarting sync", h); break; }, Err(BlockImportError(BlockImportErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => { - debug_sync!(self, target: "sync", "Block import queue full ({}), restarting sync", limit); + debug_sync!(self, "Block import queue full ({}), restarting sync", limit); bad = true; break; }, Err(e) => { - debug_sync!(self, target: "sync", "Bad block {:?} : {:?}", h, e); + debug_sync!(self, "Bad block {:?} : {:?}", h, e); bad = true; break; } } } - trace_sync!(self, target: "sync", "Imported {} of {}", imported.len(), count); + trace_sync!(self, "Imported {} of {}", imported.len(), count); self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len()); if bad { @@ -542,7 +542,7 @@ impl BlockDownloader { if self.blocks.is_empty() { // complete sync round - trace_sync!(self, target: "sync", "Sync round complete"); + trace_sync!(self, "Sync round complete"); self.reset(); } Ok(()) From 9e18c90c5bb95da3eaad0e8a80f1fd6fb34d6993 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Thu, 20 Sep 2018 13:31:54 +0100 Subject: [PATCH 28/42] Don't disable useless peer and fix log macro --- ethcore/sync/src/block_sync.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 717fdf01e56..f62619e45b9 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -48,11 +48,11 @@ macro_rules! trace_sync { } macro_rules! debug_sync { - ($self:ident, target: $target:expr, $fmt:expr, $($arg:tt)+) => { - debug!(target: $target, concat!("{:?}: ", $fmt), $self.block_set, $($arg)+); + ($self:ident, $fmt:expr, $($arg:tt)+) => { + debug!(target: "sync", concat!("{:?}: ", $fmt), $self.block_set, $($arg)+); }; - ($self:ident, target: $target:expr, $fmt:expr) => { - debug!(target: $target, concat!("{:?}: ", $fmt), $self.block_set); + ($self:ident, $fmt:expr) => { + debug!(target: "sync", concat!("{:?}: ", $fmt), $self.block_set); }; } @@ -311,6 +311,7 @@ impl BlockDownloader { State::Blocks => { let count = headers.len(); + let reset_consecutive_useless = // At least one of the heades must advance the subchain. Otherwise they are all useless. if count == 0 || !any_known { trace_sync!(self, "No useful headers, expected hash {:?}", expected_hash); @@ -319,7 +320,6 @@ impl BlockDownloader { trace_sync!(self, "Received consecutive sets of useless headers from requested header {:?}. Resetting sync", eh); self.reset(); self.useless_requested_hashes.clear(); - return Err(BlockDownloaderImportError::Invalid); } } return Err(BlockDownloaderImportError::Useless); From 409f3e8b38acfb661dbaa8b8d1edd3e8bba81b26 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Thu, 20 Sep 2018 15:42:02 +0100 Subject: [PATCH 29/42] Clear useless headers on reset and comments --- ethcore/sync/src/block_sync.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index f62619e45b9..92a00a71627 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -136,8 +136,8 @@ pub struct BlockDownloader { retract_step: u64, /// Whether reorg should be limited. limit_reorg: bool, - /// TODO - useless_requested_hashes: HashSet, + /// Hashes of header requests which return no useful headers + useless_expected_hashes: HashSet, } impl BlockDownloader { @@ -163,13 +163,14 @@ impl BlockDownloader { target_hash: None, retract_step: 1, limit_reorg: limit_reorg, - useless_requested_hashes: HashSet::new(), + useless_expected_hashes: HashSet::new(), } } /// Reset sync. Clear all local downloaded data. pub fn reset(&mut self) { self.blocks.clear(); + self.useless_expected_hashes.clear(); self.state = State::Idle; } @@ -311,21 +312,19 @@ impl BlockDownloader { State::Blocks => { let count = headers.len(); - let reset_consecutive_useless = // At least one of the heades must advance the subchain. Otherwise they are all useless. if count == 0 || !any_known { trace_sync!(self, "No useful headers, expected hash {:?}", expected_hash); if let Some(eh) = expected_hash { - if !self.useless_requested_hashes.insert(eh) { + if !self.useless_expected_hashes.insert(eh) { trace_sync!(self, "Received consecutive sets of useless headers from requested header {:?}. Resetting sync", eh); self.reset(); - self.useless_requested_hashes.clear(); } } return Err(BlockDownloaderImportError::Useless); } if let Some(eh) = expected_hash { - self.useless_requested_hashes.remove(&eh); + self.useless_expected_hashes.remove(&eh); } self.blocks.insert_headers(headers); trace_sync!(self, "Inserted {} headers", count); From df9f9529276ce256ce48d4244aa13eb8b9f2c1ff Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Fri, 21 Sep 2018 12:21:40 +0100 Subject: [PATCH 30/42] Use custom error for collecting blocks Previously we resued BlockImportError, however only the Invalid case and this made little sense with the QueueFull error. --- ethcore/sync/src/block_sync.rs | 22 +++++++++++++++------- ethcore/sync/src/chain/mod.rs | 9 +++++---- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 92a00a71627..cd3adabb4b6 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -100,6 +100,14 @@ pub enum BlockDownloaderImportError { Useless, } +#[derive(Eq, PartialEq, Debug)] +pub enum CollectBlocksError { + /// One of the downloaded blocks was bad. + BadBlock, + /// The import queue is full so not all blocks were imported. + QueueFull, +} + impl From for BlockDownloaderImportError { fn from(_: rlp::DecoderError) -> BlockDownloaderImportError { BlockDownloaderImportError::Invalid @@ -312,7 +320,7 @@ impl BlockDownloader { State::Blocks => { let count = headers.len(); - // At least one of the heades must advance the subchain. Otherwise they are all useless. + // At least one of the headers must advance the subchain. Otherwise they are all useless. if count == 0 || !any_known { trace_sync!(self, "No useful headers, expected hash {:?}", expected_hash); if let Some(eh) = expected_hash { @@ -485,8 +493,8 @@ impl BlockDownloader { } /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. - pub fn collect_blocks(&mut self, io: &mut SyncIo, allow_out_of_order: bool) -> Result<(), BlockDownloaderImportError> { - let mut bad = false; + pub fn collect_blocks(&mut self, io: &mut SyncIo, allow_out_of_order: bool) -> Result<(), CollectBlocksError> { + let mut err: Option = None; let mut imported = HashSet::new(); let blocks = self.blocks.drain(); let count = blocks.len(); @@ -537,12 +545,12 @@ impl BlockDownloader { }, Err(BlockImportError(BlockImportErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => { debug_sync!(self, "Block import queue full ({}), restarting sync", limit); - bad = true; + err = Some(CollectBlocksError::QueueFull); break; }, Err(e) => { debug_sync!(self, "Bad block {:?} : {:?}", h, e); - bad = true; + err = Some(CollectBlocksError::BadBlock); break; } } @@ -550,8 +558,8 @@ impl BlockDownloader { trace_sync!(self, "Imported {} of {}", imported.len(), count); self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len()); - if bad { - return Err(BlockDownloaderImportError::Invalid); + if let Some(e) = err { + return Err(e); } if self.blocks.is_empty() { diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 432022a0fef..b5db8f1ad09 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -109,7 +109,7 @@ use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, Bl use ethcore::snapshot::{RestorationStatus}; use sync_io::SyncIo; use super::{WarpSync, SyncConfig}; -use block_sync::{BlockDownloader, BlockDownloaderImportError as DownloaderImportError}; +use block_sync::BlockDownloader; use rand::Rng; use snapshot::{Snapshot}; use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; @@ -853,15 +853,16 @@ impl ChainSync { fn collect_blocks(&mut self, io: &mut SyncIo, block_set: BlockSet) { match block_set { BlockSet::NewBlocks => { - if self.new_blocks.collect_blocks(io, self.state == SyncState::NewBlocks) == Err(DownloaderImportError::Invalid) { + if let Err(err) = self.new_blocks.collect_blocks(io, self.state == SyncState::NewBlocks) { + trace!(target: "sync", "Error collecting blocks {:?}. Restarting NewBlocks download", err); self.restart(io); } }, BlockSet::OldBlocks => { let mut is_complete = false; if let Some(downloader) = self.old_blocks.as_mut() { - if downloader.collect_blocks(io, false) == Err(DownloaderImportError::Invalid) { - trace!(target: "sync", "Restarting OldBlocks download"); + if let Err(err) = downloader.collect_blocks(io, false) { + trace!(target: "sync", "Error collecting blocks {:?}. Restarting OldBlocks download", err); // reset in flight requests in order to prevent them being handled in the next round for (pid, ref mut p) in &mut self.peers { if p.block_set == Some(BlockSet::OldBlocks) { From 10fc545e23f355dd4e87dcd1c106611b322798d5 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Fri, 21 Sep 2018 13:03:33 +0100 Subject: [PATCH 31/42] Remove blank line --- ethcore/sync/src/block_sync.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index cd3adabb4b6..94cc0f21a92 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -319,7 +319,6 @@ impl BlockDownloader { }, State::Blocks => { let count = headers.len(); - // At least one of the headers must advance the subchain. Otherwise they are all useless. if count == 0 || !any_known { trace_sync!(self, "No useful headers, expected hash {:?}", expected_hash); From f0bb06d529c79cc18cc1ee39ac457a0c2d3a04b5 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Mon, 24 Sep 2018 17:35:08 +0100 Subject: [PATCH 32/42] Test for reset sync after consecutive useless headers --- ethcore/sync/src/block_sync.rs | 89 +++++++++++++++++++++++++++++++++- 1 file changed, 88 insertions(+), 1 deletion(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 7aaaafafaf0..8c4df52f177 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -579,4 +579,91 @@ impl BlockDownloader { } } -//TODO: module tests +#[cfg(test)] +mod tests { + use ethcore::header::*; + use ethcore::client::{TestBlockChainClient}; + use parking_lot::RwLock; + use rlp::*; + use std::collections::{VecDeque}; + use tests::helpers::{TestIo}; + use tests::snapshot::TestSnapshotService; + + use super::*; + // use chain::tests::{ + // dummy_sync_with_peer, + // get_dummy_block, + // get_dummy_blocks, + // get_dummy_hashes, + // }; + + fn get_dummy_headers(count: u32) -> Vec
{ + let mut parent_hash = H256::new(); + let mut tx_root = H256::new(); + let mut headers = Vec::new(); + for i in 0..count { + let mut header = Header::new(); + header.set_difficulty((i * 100).into()); + header.set_timestamp((i * 10) as u64); + header.set_number(i as u64); + header.set_parent_hash(parent_hash); + header.set_transactions_root(tx_root); + parent_hash = header.hash(); + tx_root = H256::random(); + headers.push(header); + } + headers + } + + fn import_headers(headers: &[Header], downloader: &mut BlockDownloader, io: &mut SyncIo) -> Result { + let mut stream = RlpStream::new(); + stream.append_list(headers); + let bytes = stream.out(); + let rlp = Rlp::new(&bytes); + let expected_hash = headers.first().map(|h| h.hash()); + downloader.import_headers(io, &rlp, expected_hash) + } + + fn import_headers_ok(headers: &[Header], downloader: &mut BlockDownloader, io: &mut SyncIo) { + let res = import_headers(headers, downloader, io); + assert!(res.is_ok()); + } + + #[test] + fn reset_after_consecutive_sets_of_useless_headers() { + ::env_logger::try_init().ok(); + let headers1 = get_dummy_headers(20); + let short_subchain = get_dummy_headers(5); + + let mut client = TestBlockChainClient::new(); + let queue = RwLock::new(VecDeque::new()); + let ss = TestSnapshotService::new(); + let mut io = TestIo::new(&mut client, &ss, &queue, None); + + let heads : Vec<_> = headers1.iter() + .enumerate().filter_map(|(i, h)| if i % 10 == 0 { Some(h.clone()) } else { None }).collect(); + let start_hash = heads[0].hash(); + + let mut downloader = BlockDownloader::new(BlockSet::OldBlocks, &start_hash, 0); + + downloader.request_blocks(&mut io, 1); + + import_headers_ok(&heads, &mut downloader, &mut io); + import_headers_ok(&short_subchain, &mut downloader, &mut io); + + assert_eq!(downloader.state, State::Blocks); + assert!(!downloader.blocks.is_empty()); + + // simulate receiving useless headers + let head = vec![short_subchain.last().unwrap().clone()]; + let res1 = import_headers(&head, &mut downloader, &mut io); + let res2 = import_headers(&head, &mut downloader, &mut io); + + assert!(res1.is_err()); + assert!(res2.is_err()); + assert_eq!(downloader.state, State::Idle); + assert!(downloader.blocks.is_empty()); + } + + // todo: test that it doesnt reset if there is only a single header +} From ec4adc014c9f279adec0e07817076d0d933c33ca Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 25 Sep 2018 18:12:18 +0100 Subject: [PATCH 33/42] Don't reset after consecutive headers when chain head --- ethcore/sync/src/block_sync.rs | 40 ++++++++++++++++++++++++++++++++-- ethcore/sync/src/blocks.rs | 5 +++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 8c4df52f177..6c3eaa0a656 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -323,7 +323,7 @@ impl BlockDownloader { if count == 0 || !any_known { trace_sync!(self, "No useful headers, expected hash {:?}", expected_hash); if let Some(eh) = expected_hash { - if !self.useless_expected_hashes.insert(eh) { + if self.blocks.heads_len() > 1 && !self.useless_expected_hashes.insert(eh) { trace_sync!(self, "Received consecutive sets of useless headers from requested header {:?}. Resetting sync", eh); self.reset(); } @@ -665,5 +665,41 @@ mod tests { assert!(downloader.blocks.is_empty()); } - // todo: test that it doesnt reset if there is only a single header + #[test] + fn dont_reset_after_consecutive_sets_of_useless_headers_for_chain_head() { + ::env_logger::try_init().ok(); + let headers = get_dummy_headers(10); + let short_subchain = get_dummy_headers(5); + + let mut client = TestBlockChainClient::new(); + let queue = RwLock::new(VecDeque::new()); + let ss = TestSnapshotService::new(); + let mut io = TestIo::new(&mut client, &ss, &queue, None); + + let heads : Vec<_> = headers.iter() + .enumerate().filter_map(|(i, h)| if i % 10 == 0 { Some(h.clone()) } else { None }).collect(); + let start_hash = heads[0].hash(); + + let mut downloader = BlockDownloader::new(BlockSet::OldBlocks, &start_hash, 0); + + downloader.request_blocks(&mut io, 1); + + import_headers_ok(&heads, &mut downloader, &mut io); + import_headers_ok(&short_subchain, &mut downloader, &mut io); + + assert_eq!(downloader.state, State::Blocks); + assert!(!downloader.blocks.is_empty()); + + // simulate receiving useless headers + let head = vec![short_subchain.last().unwrap().clone()]; + let res1 = import_headers(&head, &mut downloader, &mut io); + let res2 = import_headers(&head, &mut downloader, &mut io); + + assert!(res1.is_err()); + assert!(res2.is_err()); + // download shouldn't be reset since this is the chain head for a single subchain. + // this state usually occurs for NewBlocks when it has reached the chain head. + assert_eq!(downloader.state, State::Blocks); + assert!(!downloader.blocks.is_empty()); + } } diff --git a/ethcore/sync/src/blocks.rs b/ethcore/sync/src/blocks.rs index 3815084f8f1..6b0a5f3f617 100644 --- a/ethcore/sync/src/blocks.rs +++ b/ethcore/sync/src/blocks.rs @@ -406,6 +406,11 @@ impl BlockCollection { self.heads.contains(hash) } + /// Check the number of heads + pub fn heads_len(&self) -> usize { + self.heads.len() + } + /// Return used heap size. pub fn heap_size(&self) -> usize { self.heads.heap_size_of_children() From 9b5f21f7cd8e1739f30612f58e4e9577bbc4088b Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 25 Sep 2018 22:23:17 +0100 Subject: [PATCH 34/42] Delete commented out imports --- ethcore/sync/src/block_sync.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 6c3eaa0a656..3e4b7e75247 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -590,12 +590,6 @@ mod tests { use tests::snapshot::TestSnapshotService; use super::*; - // use chain::tests::{ - // dummy_sync_with_peer, - // get_dummy_block, - // get_dummy_blocks, - // get_dummy_hashes, - // }; fn get_dummy_headers(count: u32) -> Vec
{ let mut parent_hash = H256::new(); From 48e548359504bed3c8d863263e8b608e5a190826 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Wed, 26 Sep 2018 11:58:27 +0100 Subject: [PATCH 35/42] Return DownloadAction from collect_blocks instead of error --- ethcore/sync/src/block_sync.rs | 27 +++++++++------------------ ethcore/sync/src/chain/mod.rs | 10 +++++----- 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 3e4b7e75247..2a68f00fdca 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -85,6 +85,7 @@ pub enum BlockRequest { } /// Indicates sync action +#[derive(Eq, PartialEq, Debug)] pub enum DownloadAction { /// Do nothing None, @@ -100,14 +101,6 @@ pub enum BlockDownloaderImportError { Useless, } -#[derive(Eq, PartialEq, Debug)] -pub enum CollectBlocksError { - /// One of the downloaded blocks was bad. - BadBlock, - /// The import queue is full so not all blocks were imported. - QueueFull, -} - impl From for BlockDownloaderImportError { fn from(_: rlp::DecoderError) -> BlockDownloaderImportError { BlockDownloaderImportError::Invalid @@ -492,8 +485,9 @@ impl BlockDownloader { } /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. - pub fn collect_blocks(&mut self, io: &mut SyncIo, allow_out_of_order: bool) -> Result<(), CollectBlocksError> { - let mut err: Option = None; + /// Returns DownloadAction::Reset if it is imported all the the blocks it can and all downloading peers should be reset + pub fn collect_blocks(&mut self, io: &mut SyncIo, allow_out_of_order: bool) -> DownloadAction { + let mut download_action = DownloadAction::None; let mut imported = HashSet::new(); let blocks = self.blocks.drain(); let count = blocks.len(); @@ -508,7 +502,7 @@ impl BlockDownloader { if self.target_hash.as_ref().map_or(false, |t| t == &h) { self.state = State::Complete; trace_sync!(self, "Sync target reached"); - return Ok(()); + return download_action; } let result = if let Some(receipts) = receipts { @@ -544,12 +538,12 @@ impl BlockDownloader { }, Err(EthcoreError(EthcoreErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => { debug_sync!(self, "Block import queue full ({}), restarting sync", limit); - err = Some(CollectBlocksError::QueueFull); + download_action = DownloadAction::Reset; break; }, Err(e) => { debug_sync!(self, "Bad block {:?} : {:?}", h, e); - err = Some(CollectBlocksError::BadBlock); + download_action = DownloadAction::Reset; break; } } @@ -557,16 +551,13 @@ impl BlockDownloader { trace_sync!(self, "Imported {} of {}", imported.len(), count); self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len()); - if let Some(e) = err { - return Err(e); - } - if self.blocks.is_empty() { // complete sync round trace_sync!(self, "Sync round complete"); self.reset(); + download_action = DownloadAction::Reset; } - Ok(()) + download_action } fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) { diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index b5db8f1ad09..aab89db38fd 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -109,7 +109,7 @@ use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, Bl use ethcore::snapshot::{RestorationStatus}; use sync_io::SyncIo; use super::{WarpSync, SyncConfig}; -use block_sync::BlockDownloader; +use block_sync::{BlockDownloader, DownloadAction}; use rand::Rng; use snapshot::{Snapshot}; use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; @@ -853,16 +853,16 @@ impl ChainSync { fn collect_blocks(&mut self, io: &mut SyncIo, block_set: BlockSet) { match block_set { BlockSet::NewBlocks => { - if let Err(err) = self.new_blocks.collect_blocks(io, self.state == SyncState::NewBlocks) { - trace!(target: "sync", "Error collecting blocks {:?}. Restarting NewBlocks download", err); + if self.new_blocks.collect_blocks(io, self.state == SyncState::NewBlocks) == DownloadAction::Reset { + trace!(target: "sync", "Restarting NewBlocks download"); self.restart(io); } }, BlockSet::OldBlocks => { let mut is_complete = false; if let Some(downloader) = self.old_blocks.as_mut() { - if let Err(err) = downloader.collect_blocks(io, false) { - trace!(target: "sync", "Error collecting blocks {:?}. Restarting OldBlocks download", err); + if downloader.collect_blocks(io, false) == DownloadAction::Reset { + trace!(target: "sync", "Restarting OldBlocks download"); // reset in flight requests in order to prevent them being handled in the next round for (pid, ref mut p) in &mut self.peers { if p.block_set == Some(BlockSet::OldBlocks) { From a3409e3fe980cbe1ffe551fa88b11e1d13eff658 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Wed, 26 Sep 2018 15:19:04 +0100 Subject: [PATCH 36/42] Don't reset after round complete, was causing test hangs --- ethcore/sync/src/block_sync.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 2a68f00fdca..7983ac9a1bc 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -555,7 +555,6 @@ impl BlockDownloader { // complete sync round trace_sync!(self, "Sync round complete"); self.reset(); - download_action = DownloadAction::Reset; } download_action } From 4a030428c364701327575cf89515f2415b52efa3 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Mon, 1 Oct 2018 16:47:59 +0100 Subject: [PATCH 37/42] Add comment explaining reset after useless --- ethcore/sync/src/block_sync.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 7983ac9a1bc..381d093d2c3 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -316,6 +316,8 @@ impl BlockDownloader { if count == 0 || !any_known { trace_sync!(self, "No useful headers, expected hash {:?}", expected_hash); if let Some(eh) = expected_hash { + // only reset download if we have multiple subchain heads, to avoid unnecessary resets + // when we are at the head of the chain when we may legitimately receive no useful headers if self.blocks.heads_len() > 1 && !self.useless_expected_hashes.insert(eh) { trace_sync!(self, "Received consecutive sets of useless headers from requested header {:?}. Resetting sync", eh); self.reset(); From 0af4cf6cd96a2d225b3bc9caeb00a35cdf770702 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 2 Oct 2018 10:18:26 +0100 Subject: [PATCH 38/42] Replace HashSet with counter for useless headers --- ethcore/res/ethereum/tests | 2 +- ethcore/sync/src/block_sync.rs | 46 ++++++++++++++++------------------ 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/ethcore/res/ethereum/tests b/ethcore/res/ethereum/tests index 3f5febc9019..b8a21c19369 160000 --- a/ethcore/res/ethereum/tests +++ b/ethcore/res/ethereum/tests @@ -1 +1 @@ -Subproject commit 3f5febc901913ef698f1b09dda8705babd729e4a +Subproject commit b8a21c193696976ca3b33b6d82107601063a5d26 diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 381d093d2c3..3fc9ceffa50 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -36,6 +36,7 @@ const MAX_RECEPITS_TO_REQUEST: usize = 128; const SUBCHAIN_SIZE: u64 = 256; const MAX_ROUND_PARENTS: usize = 16; const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5; +const MAX_USELESS_HEADERS_PER_ROUND: usize = 3; // logging macros prepend BlockSet context for log filtering macro_rules! trace_sync { @@ -137,8 +138,8 @@ pub struct BlockDownloader { retract_step: u64, /// Whether reorg should be limited. limit_reorg: bool, - /// Hashes of header requests which return no useful headers - useless_expected_hashes: HashSet, + /// consecutive useless headers this round + useless_headers_count: usize, } impl BlockDownloader { @@ -164,14 +165,14 @@ impl BlockDownloader { target_hash: None, retract_step: 1, limit_reorg: limit_reorg, - useless_expected_hashes: HashSet::new(), + useless_headers_count: 0, } } /// Reset sync. Clear all local downloaded data. pub fn reset(&mut self) { self.blocks.clear(); - self.useless_expected_hashes.clear(); + self.useless_headers_count = 0; self.state = State::Idle; } @@ -315,19 +316,15 @@ impl BlockDownloader { // At least one of the headers must advance the subchain. Otherwise they are all useless. if count == 0 || !any_known { trace_sync!(self, "No useful headers, expected hash {:?}", expected_hash); - if let Some(eh) = expected_hash { - // only reset download if we have multiple subchain heads, to avoid unnecessary resets - // when we are at the head of the chain when we may legitimately receive no useful headers - if self.blocks.heads_len() > 1 && !self.useless_expected_hashes.insert(eh) { - trace_sync!(self, "Received consecutive sets of useless headers from requested header {:?}. Resetting sync", eh); - self.reset(); - } + self.useless_headers_count += 1; + // only reset download if we have multiple subchain heads, to avoid unnecessary resets + // when we are at the head of the chain when we may legitimately receive no useful headers + if self.blocks.heads_len() > 1 && self.useless_headers_count >= MAX_USELESS_HEADERS_PER_ROUND { + trace_sync!(self, "Received {:?} useless responses this round. Resetting sync", MAX_USELESS_HEADERS_PER_ROUND); + self.reset(); } return Err(BlockDownloaderImportError::Useless); } - if let Some(eh) = expected_hash { - self.useless_expected_hashes.remove(&eh); - } self.blocks.insert_headers(headers); trace_sync!(self, "Inserted {} headers", count); }, @@ -616,7 +613,7 @@ mod tests { } #[test] - fn reset_after_consecutive_sets_of_useless_headers() { + fn reset_after_multiple_sets_of_useless_headers() { ::env_logger::try_init().ok(); let headers1 = get_dummy_headers(20); let short_subchain = get_dummy_headers(5); @@ -642,17 +639,17 @@ mod tests { // simulate receiving useless headers let head = vec![short_subchain.last().unwrap().clone()]; - let res1 = import_headers(&head, &mut downloader, &mut io); - let res2 = import_headers(&head, &mut downloader, &mut io); + for _ in 0..MAX_USELESS_HEADERS_PER_ROUND { + let res = import_headers(&head, &mut downloader, &mut io); + assert!(res.is_err()); + } - assert!(res1.is_err()); - assert!(res2.is_err()); assert_eq!(downloader.state, State::Idle); assert!(downloader.blocks.is_empty()); } #[test] - fn dont_reset_after_consecutive_sets_of_useless_headers_for_chain_head() { + fn dont_reset_after_multiple_sets_of_useless_headers_for_chain_head() { ::env_logger::try_init().ok(); let headers = get_dummy_headers(10); let short_subchain = get_dummy_headers(5); @@ -678,11 +675,12 @@ mod tests { // simulate receiving useless headers let head = vec![short_subchain.last().unwrap().clone()]; - let res1 = import_headers(&head, &mut downloader, &mut io); - let res2 = import_headers(&head, &mut downloader, &mut io); - assert!(res1.is_err()); - assert!(res2.is_err()); + for _ in 0..MAX_USELESS_HEADERS_PER_ROUND { + let res = import_headers(&head, &mut downloader, &mut io); + assert!(res.is_err()); + } + // download shouldn't be reset since this is the chain head for a single subchain. // this state usually occurs for NewBlocks when it has reached the chain head. assert_eq!(downloader.state, State::Blocks); From b8418e4786549a0cfbfc111152e2a45d53f155d2 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 2 Oct 2018 17:55:07 +0100 Subject: [PATCH 39/42] Refactor sync reset on bad block/queue full --- ethcore/sync/src/block_sync.rs | 2 +- ethcore/sync/src/chain/handler.rs | 8 ++------ ethcore/sync/src/chain/mod.rs | 30 ++++++++++++++++++------------ 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 3fc9ceffa50..3a69b7fc138 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -553,7 +553,7 @@ impl BlockDownloader { if self.blocks.is_empty() { // complete sync round trace_sync!(self, "Sync round complete"); - self.reset(); + download_action = DownloadAction::Reset; } download_action } diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 98d4b4bc062..92068fc2dda 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -401,12 +401,8 @@ impl SyncHandler { downloader.import_headers(io, r, expected_hash)? }; - if let DownloadAction::Reset = result { - // mark all outstanding requests as expired - trace!("Resetting downloads for {:?}", block_set); - for (_, ref mut p) in sync.peers.iter_mut().filter(|&(_, ref p)| p.block_set == Some(block_set)) { - p.reset_asking(); - } + if result == DownloadAction::Reset { + sync.reset_downloads(block_set); } sync.collect_blocks(io, block_set); diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index aab89db38fd..7b050257ef5 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -854,25 +854,23 @@ impl ChainSync { match block_set { BlockSet::NewBlocks => { if self.new_blocks.collect_blocks(io, self.state == SyncState::NewBlocks) == DownloadAction::Reset { - trace!(target: "sync", "Restarting NewBlocks download"); - self.restart(io); + self.reset_downloads(block_set); + self.new_blocks.reset(); } }, BlockSet::OldBlocks => { let mut is_complete = false; + let mut download_action = DownloadAction::None; if let Some(downloader) = self.old_blocks.as_mut() { - if downloader.collect_blocks(io, false) == DownloadAction::Reset { - trace!(target: "sync", "Restarting OldBlocks download"); - // reset in flight requests in order to prevent them being handled in the next round - for (pid, ref mut p) in &mut self.peers { - if p.block_set == Some(BlockSet::OldBlocks) { - p.reset_asking(); - debug!(target: "sync", "Reset peer asking OldBlocks {:?}", pid); - } - } + download_action = downloader.collect_blocks(io, false); + is_complete = downloader.is_complete(); + } + + if download_action == DownloadAction::Reset { + self.reset_downloads(block_set); + if let Some(downloader) = self.old_blocks.as_mut() { downloader.reset(); } - is_complete = downloader.is_complete(); } if is_complete { @@ -880,6 +878,14 @@ impl ChainSync { self.old_blocks = None; } } + }; + } + + /// Mark all outstanding requests as expired + fn reset_downloads(&mut self, block_set: BlockSet) { + trace!("Resetting downloads for {:?}", block_set); + for (_, ref mut p) in self.peers.iter_mut().filter(|&(_, ref p)| p.block_set == Some(block_set)) { + p.reset_asking(); } } From 0ec672bd79fe56f45d8e49fec7f07906d4fd7b4c Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Tue, 2 Oct 2018 18:22:05 +0100 Subject: [PATCH 40/42] Add missing target for log message --- ethcore/sync/src/chain/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 7b050257ef5..44d27827084 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -883,7 +883,7 @@ impl ChainSync { /// Mark all outstanding requests as expired fn reset_downloads(&mut self, block_set: BlockSet) { - trace!("Resetting downloads for {:?}", block_set); + trace!(target: "sync", "Resetting downloads for {:?}", block_set); for (_, ref mut p) in self.peers.iter_mut().filter(|&(_, ref p)| p.block_set == Some(block_set)) { p.reset_asking(); } From e72d150d72aba932132e28689972493c2a73db54 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Wed, 3 Oct 2018 12:43:53 +0100 Subject: [PATCH 41/42] Fix compiler errors and test after merge --- ethcore/sync/src/block_sync.rs | 91 +++++++++++++++------------------- 1 file changed, 41 insertions(+), 50 deletions(-) diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index c3dd5a804e6..739523f722a 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -316,7 +316,7 @@ impl BlockDownloader { match self.state { State::ChainHead => { if !headers.is_empty() { - trace_sync!("Received {} subchain heads, proceeding to download", headers.len()); + trace_sync!(self, "Received {} subchain heads, proceeding to download", headers.len()); self.blocks.reset_to(hashes); self.state = State::Blocks; return Ok(DownloadAction::Reset); @@ -334,8 +334,8 @@ impl BlockDownloader { let count = headers.len(); // At least one of the headers must advance the subchain. Otherwise they are all useless. if count == 0 { - trace_sync!(self, "No useful headers, expected hash {:?}", expected_hash); self.useless_headers_count += 1; + trace_sync!(self, "No useful headers ({:?} this round), expected hash {:?}", self.useless_headers_count, expected_hash); // only reset download if we have multiple subchain heads, to avoid unnecessary resets // when we are at the head of the chain when we may legitimately receive no useful headers if self.blocks.heads_len() > 1 && self.useless_headers_count >= MAX_USELESS_HEADERS_PER_ROUND { @@ -369,11 +369,11 @@ impl BlockDownloader { let hashes = self.blocks.insert_bodies(bodies); if hashes.len() != item_count { - trace_sync!("Deactivating peer for giving invalid block bodies"); + trace_sync!(self, "Deactivating peer for giving invalid block bodies"); return Err(BlockDownloaderImportError::Invalid); } if !all_expected(hashes.as_slice(), expected_hashes, |&a, &b| a == b) { - trace_sync!("Deactivating peer for giving unexpected block bodies"); + trace_sync!(self, "Deactivating peer for giving unexpected block bodies"); return Err(BlockDownloaderImportError::Invalid); } } @@ -400,11 +400,11 @@ impl BlockDownloader { } let hashes = self.blocks.insert_receipts(receipts); if hashes.len() != item_count { - trace_sync!("Deactivating peer for giving invalid block receipts"); + trace_sync!(self, "Deactivating peer for giving invalid block receipts"); return Err(BlockDownloaderImportError::Invalid); } if !all_expected(hashes.as_slice(), expected_hashes, |a, b| a.contains(b)) { - trace_sync!("Deactivating peer for giving unexpected block receipts"); + trace_sync!(self, "Deactivating peer for giving unexpected block receipts"); return Err(BlockDownloaderImportError::Invalid); } } @@ -638,35 +638,21 @@ mod tests { header } - fn get_dummy_headers(count: u32) -> Vec
{ - let mut parent_hash = H256::new(); - let mut tx_root = H256::new(); - let mut headers = Vec::new(); - for i in 0..count { - let mut header = dummy_header(i as u64, parent_hash); - header.set_transactions_root(tx_root); - parent_hash = header.hash(); - tx_root = H256::random(); - headers.push(header); - } - headers - } - fn dummy_signed_tx() -> SignedTransaction { let keypair = Random.generate().unwrap(); Transaction::default().sign(keypair.secret(), None) } - fn import_headers(headers: &[Header], downloader: &mut BlockDownloader, io: &mut SyncIo) -> Result { + fn import_headers(headers: &[BlockHeader], downloader: &mut BlockDownloader, io: &mut SyncIo) -> Result { let mut stream = RlpStream::new(); stream.append_list(headers); let bytes = stream.out(); let rlp = Rlp::new(&bytes); - let expected_hash = headers.first().map(|h| h.hash()); + let expected_hash = headers.first().unwrap().hash(); downloader.import_headers(io, &rlp, expected_hash) } - fn import_headers_ok(headers: &[Header], downloader: &mut BlockDownloader, io: &mut SyncIo) { + fn import_headers_ok(headers: &[BlockHeader], downloader: &mut BlockDownloader, io: &mut SyncIo) { let res = import_headers(headers, downloader, io); assert!(res.is_ok()); } @@ -678,7 +664,7 @@ mod tests { let spec = Spec::new_test(); let genesis_hash = spec.genesis_header().hash(); - let mut downloader = BlockDownloader::new(false, &genesis_hash, 0); + let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &genesis_hash, 0); downloader.state = State::ChainHead; let mut chain = TestBlockChainClient::new(); @@ -760,7 +746,7 @@ mod tests { let parent_hash = headers[1].hash(); headers.push(dummy_header(129, parent_hash)); - let mut downloader = BlockDownloader::new(false, &H256::random(), 0); + let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &H256::random(), 0); downloader.state = State::Blocks; downloader.blocks.reset_to(vec![headers[0].hash()]); @@ -830,7 +816,7 @@ mod tests { headers.push(header); } - let mut downloader = BlockDownloader::new(false, &headers[0].hash(), 0); + let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &headers[0].hash(), 0); downloader.state = State::Blocks; downloader.blocks.reset_to(vec![headers[0].hash()]); @@ -894,7 +880,7 @@ mod tests { headers.push(header); } - let mut downloader = BlockDownloader::new(true, &headers[0].hash(), 0); + let mut downloader = BlockDownloader::new(BlockSet::OldBlocks, &headers[0].hash(), 0); downloader.state = State::Blocks; downloader.blocks.reset_to(vec![headers[0].hash()]); @@ -923,21 +909,25 @@ mod tests { #[test] fn reset_after_multiple_sets_of_useless_headers() { ::env_logger::try_init().ok(); - let headers1 = get_dummy_headers(20); - let short_subchain = get_dummy_headers(5); - let mut client = TestBlockChainClient::new(); - let queue = RwLock::new(VecDeque::new()); - let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let spec = Spec::new_test(); + let genesis_hash = spec.genesis_header().hash(); - let heads : Vec<_> = headers1.iter() - .enumerate().filter_map(|(i, h)| if i % 10 == 0 { Some(h.clone()) } else { None }).collect(); - let start_hash = heads[0].hash(); + let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &genesis_hash, 0); + downloader.state = State::ChainHead; - let mut downloader = BlockDownloader::new(BlockSet::OldBlocks, &start_hash, 0); + let mut chain = TestBlockChainClient::new(); + let snapshot_service = TestSnapshotService::new(); + let queue = RwLock::new(VecDeque::new()); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); - downloader.request_blocks(&mut io, 1); + let heads = [ + spec.genesis_header(), + dummy_header(127, H256::random()), + dummy_header(254, H256::random()), + ]; + + let short_subchain = [dummy_header(1, genesis_hash)]; import_headers_ok(&heads, &mut downloader, &mut io); import_headers_ok(&short_subchain, &mut downloader, &mut io); @@ -959,21 +949,23 @@ mod tests { #[test] fn dont_reset_after_multiple_sets_of_useless_headers_for_chain_head() { ::env_logger::try_init().ok(); - let headers = get_dummy_headers(10); - let short_subchain = get_dummy_headers(5); - let mut client = TestBlockChainClient::new(); - let queue = RwLock::new(VecDeque::new()); - let ss = TestSnapshotService::new(); - let mut io = TestIo::new(&mut client, &ss, &queue, None); + let spec = Spec::new_test(); + let genesis_hash = spec.genesis_header().hash(); - let heads : Vec<_> = headers.iter() - .enumerate().filter_map(|(i, h)| if i % 10 == 0 { Some(h.clone()) } else { None }).collect(); - let start_hash = heads[0].hash(); + let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &genesis_hash, 0); + downloader.state = State::ChainHead; - let mut downloader = BlockDownloader::new(BlockSet::OldBlocks, &start_hash, 0); + let mut chain = TestBlockChainClient::new(); + let snapshot_service = TestSnapshotService::new(); + let queue = RwLock::new(VecDeque::new()); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); - downloader.request_blocks(&mut io, 1); + let heads = [ + spec.genesis_header() + ]; + + let short_subchain = [dummy_header(1, genesis_hash)]; import_headers_ok(&heads, &mut downloader, &mut io); import_headers_ok(&short_subchain, &mut downloader, &mut io); @@ -983,7 +975,6 @@ mod tests { // simulate receiving useless headers let head = vec![short_subchain.last().unwrap().clone()]; - for _ in 0..MAX_USELESS_HEADERS_PER_ROUND { let res = import_headers(&head, &mut downloader, &mut io); assert!(res.is_err()); From d77ac446d35231ecdd43b8bb5fbef97928874c31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Tue, 9 Oct 2018 10:07:29 +0100 Subject: [PATCH 42/42] ethcore: revert ethereum tests submodule update --- ethcore/res/ethereum/tests | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethcore/res/ethereum/tests b/ethcore/res/ethereum/tests index b8a21c19369..3f5febc9019 160000 --- a/ethcore/res/ethereum/tests +++ b/ethcore/res/ethereum/tests @@ -1 +1 @@ -Subproject commit b8a21c193696976ca3b33b6d82107601063a5d26 +Subproject commit 3f5febc901913ef698f1b09dda8705babd729e4a