From cb5d19418d8d6a07c5437eaaf977a9437bbc6fa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 30 May 2018 16:00:35 +0200 Subject: [PATCH 1/6] Revert "Fix not downloading old blocks (#8642)" This reverts commit d1934363e7c2c953f17cd451bea8476f46f52b82. --- ethcore/src/client/client.rs | 62 +++++++++++++----------------------- ethcore/sync/src/blocks.rs | 2 +- 2 files changed, 24 insertions(+), 40 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 5cfe8fce82c..993ecdda216 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -17,7 +17,7 @@ use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque}; use std::fmt; use std::str::FromStr; -use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::sync::{Arc, Weak}; use std::time::{Instant, Duration}; @@ -210,8 +210,6 @@ pub struct Client { queue_transactions: IoChannelQueue, /// Ancient blocks import queue queue_ancient_blocks: IoChannelQueue, - /// Hashes of pending ancient block wainting to be included - pending_ancient_blocks: RwLock>, /// Consensus messages import queue queue_consensus_message: IoChannelQueue, @@ -435,7 +433,6 @@ impl Importer { let hash = header.hash(); let _import_lock = self.import_lock.lock(); - trace!(target: "client", "Trying to import old block #{}", header.number()); { trace_time!("import_old_block"); // verify the block, passing the chain for updating the epoch verifier. @@ -764,7 +761,6 @@ impl Client { notify: RwLock::new(Vec::new()), queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE), queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), - pending_ancient_blocks: RwLock::new(HashSet::new()), queue_consensus_message: IoChannelQueue::new(usize::max_value()), last_hashes: RwLock::new(VecDeque::new()), factories: factories, @@ -2012,7 +2008,7 @@ impl BlockChainClient for Client { impl IoClient for Client { fn queue_transactions(&self, transactions: Vec, peer_id: usize) { let len = transactions.len(); - self.queue_transactions.queue(&mut self.io_channel.lock(), move |client| { + self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| { trace_time!("import_queued_transactions"); let txs: Vec = transactions @@ -2036,32 +2032,23 @@ impl IoClient for Client { { // check block order - if self.chain.read().is_known(&hash) { + if self.chain.read().is_known(&header.hash()) { bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain)); } - - let parent_hash = *header.parent_hash(); - let parent_pending = self.pending_ancient_blocks.read().contains(&parent_hash); - let status = self.block_status(BlockId::Hash(parent_hash)); - if !parent_pending && (status == BlockStatus::Unknown || status == BlockStatus::Pending) { - bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(parent_hash))); + let status = self.block_status(BlockId::Hash(*header.parent_hash())); + if status == BlockStatus::Unknown || status == BlockStatus::Pending { + bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*header.parent_hash()))); } } - self.pending_ancient_blocks.write().insert(hash); - - trace!(target: "client", "Queuing old block #{}", header.number()); - match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), move |client| { - let result = client.importer.import_old_block( + match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| { + client.importer.import_old_block( &header, &block_bytes, &receipts_bytes, &**client.db.read(), &*client.chain.read() - ); - - client.pending_ancient_blocks.write().remove(&hash); - result.map(|_| ()).unwrap_or_else(|e| { + ).map(|_| ()).unwrap_or_else(|e| { error!(target: "client", "Error importing ancient block: {}", e); }); }) { @@ -2071,7 +2058,7 @@ impl IoClient for Client { } fn queue_consensus_message(&self, message: Bytes) { - match self.queue_consensus_message.queue(&mut self.io_channel.lock(), move |client| { + match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| { if let Err(e) = client.engine().handle_message(&message) { debug!(target: "poa", "Invalid message received: {}", e); } @@ -2484,38 +2471,35 @@ impl fmt::Display for QueueError { /// Queue some items to be processed by IO client. struct IoChannelQueue { - queue: Arc>>>, + currently_queued: Arc, limit: usize, } impl IoChannelQueue { pub fn new(limit: usize) -> Self { IoChannelQueue { - queue: Default::default(), + currently_queued: Default::default(), limit, } } - pub fn queue(&self, channel: &mut IoChannel, fun: F) -> Result<(), QueueError> - where F: Fn(&Client) + Send + Sync + 'static + pub fn queue(&self, channel: &mut IoChannel, count: usize, fun: F) -> Result<(), QueueError> where + F: Fn(&Client) + Send + Sync + 'static, { - { - let mut queue = self.queue.lock(); - let queue_size = queue.len(); - ensure!(queue_size < self.limit, QueueError::Full(self.limit)); + let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed); + ensure!(queue_size < self.limit, QueueError::Full(self.limit)); - queue.push_back(Box::new(fun)); - } - - let queue = self.queue.clone(); + let currently_queued = self.currently_queued.clone(); let result = channel.send(ClientIoMessage::execute(move |client| { - while let Some(fun) = queue.lock().pop_front() { - fun(client); - } + currently_queued.fetch_sub(count, AtomicOrdering::SeqCst); + fun(client); })); match result { - Ok(_) => Ok(()), + Ok(_) => { + self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst); + Ok(()) + }, Err(e) => Err(QueueError::Channel(e)), } } diff --git a/ethcore/sync/src/blocks.rs b/ethcore/sync/src/blocks.rs index 283f4ed610d..321c783b4db 100644 --- a/ethcore/sync/src/blocks.rs +++ b/ethcore/sync/src/blocks.rs @@ -266,7 +266,7 @@ impl BlockCollection { } } - /// Get a valid chain of blocks ordered in ascending order and ready for importing into blockchain. + /// Get a valid chain of blocks ordered in descending order and ready for importing into blockchain. pub fn drain(&mut self) -> Vec { if self.blocks.is_empty() || self.head.is_none() { return Vec::new(); From 8a92cd354eff689d1013beac8d99fcaa622173a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 30 May 2018 16:44:26 +0200 Subject: [PATCH 2/6] Make sure only one thread actually imports old blocks. --- ethcore/src/client/client.rs | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 993ecdda216..2a69a03a32e 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -210,6 +210,8 @@ pub struct Client { queue_transactions: IoChannelQueue, /// Ancient blocks import queue queue_ancient_blocks: IoChannelQueue, + /// Queued ancient blocks, make sure they are imported in order. + queued_ancient_blocks: Arc>>, /// Consensus messages import queue queue_consensus_message: IoChannelQueue, @@ -761,6 +763,7 @@ impl Client { notify: RwLock::new(Vec::new()), queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE), queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), + queued_ancient_blocks: Default::default(), queue_consensus_message: IoChannelQueue::new(usize::max_value()), last_hashes: RwLock::new(VecDeque::new()), factories: factories, @@ -2041,16 +2044,24 @@ impl IoClient for Client { } } + // we queue blocks here and trigger an IO message. + self.queued_ancient_blocks.lock().push_back((header, block_bytes, receipts_bytes)); + + let queue = self.queued_ancient_blocks.clone(); match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| { - client.importer.import_old_block( - &header, - &block_bytes, - &receipts_bytes, - &**client.db.read(), - &*client.chain.read() - ).map(|_| ()).unwrap_or_else(|e| { - error!(target: "client", "Error importing ancient block: {}", e); - }); + // Make sure to hold the lock here to prevent importing out of order. + let mut ancient = queue.lock(); + if let Some((header, block_bytes, receipts_bytes)) = ancient.pop_front() { + client.importer.import_old_block( + &header, + &block_bytes, + &receipts_bytes, + &**client.db.read(), + &*client.chain.read() + ).map(|_| ()).unwrap_or_else(|e| { + error!(target: "client", "Error importing ancient block: {}", e); + }); + } }) { Ok(_) => Ok(hash), Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))), From fded6052f777d9932aad8c5eca73c87bc226e8d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 31 May 2018 11:06:33 +0200 Subject: [PATCH 3/6] Add some trace timers. --- ethcore/src/client/client.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 2a69a03a32e..41033d39235 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -2010,6 +2010,7 @@ impl BlockChainClient for Client { impl IoClient for Client { fn queue_transactions(&self, transactions: Vec, peer_id: usize) { + trace_time!("queue_transactions"); let len = transactions.len(); self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| { trace_time!("import_queued_transactions"); @@ -2030,6 +2031,7 @@ impl IoClient for Client { } fn queue_ancient_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result { + trace_time!("queue_ancient_block"); let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?; let hash = header.hash(); @@ -2049,6 +2051,7 @@ impl IoClient for Client { let queue = self.queued_ancient_blocks.clone(); match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| { + trace_time!("import_ancient_block"); // Make sure to hold the lock here to prevent importing out of order. let mut ancient = queue.lock(); if let Some((header, block_bytes, receipts_bytes)) = ancient.pop_front() { From 9f76a0f17c513c681434149d690fe231fd81edd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 31 May 2018 11:46:52 +0200 Subject: [PATCH 4/6] Bring back pending hashes set. --- ethcore/src/client/client.rs | 32 ++++++++++++++++++++++++-------- ethcore/sync/src/blocks.rs | 2 +- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 41033d39235..293c792070a 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -90,6 +90,8 @@ use_contract!(registry, "Registry", "res/contracts/registrar.json"); const MAX_TX_QUEUE_SIZE: usize = 4096; const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096; +// Max number of blocks imported at once. +const MAX_ANCIENT_BLOCKS_IMPORT: usize = 4; const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2; const MIN_HISTORY_SIZE: u64 = 8; @@ -211,7 +213,10 @@ pub struct Client { /// Ancient blocks import queue queue_ancient_blocks: IoChannelQueue, /// Queued ancient blocks, make sure they are imported in order. - queued_ancient_blocks: Arc>>, + queued_ancient_blocks: Arc, + VecDeque<(Header, Bytes, Bytes)> + )>>, /// Consensus messages import queue queue_consensus_message: IoChannelQueue, @@ -2037,24 +2042,35 @@ impl IoClient for Client { { // check block order - if self.chain.read().is_known(&header.hash()) { + if self.chain.read().is_known(&hash) { bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain)); } - let status = self.block_status(BlockId::Hash(*header.parent_hash())); - if status == BlockStatus::Unknown || status == BlockStatus::Pending { - bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*header.parent_hash()))); + let parent_hash = header.parent_hash(); + // NOTE To prevent race condition with import, make sure to check queued blocks first + // (and attempt to acquire lock) + let is_parent_pending = self.queued_ancient_blocks.read().0.contains(parent_hash); + if !is_parent_pending { + let status = self.block_status(BlockId::Hash(*parent_hash)); + if status == BlockStatus::Unknown || status == BlockStatus::Pending { + bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*parent_hash))); + } } } // we queue blocks here and trigger an IO message. - self.queued_ancient_blocks.lock().push_back((header, block_bytes, receipts_bytes)); + { + let mut queued = self.queued_ancient_blocks.write(); + queued.0.insert(hash); + queued.1.push_back((header, block_bytes, receipts_bytes)); + } let queue = self.queued_ancient_blocks.clone(); match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| { trace_time!("import_ancient_block"); // Make sure to hold the lock here to prevent importing out of order. - let mut ancient = queue.lock(); - if let Some((header, block_bytes, receipts_bytes)) = ancient.pop_front() { + let mut ancient = queue.write(); + while let Some((header, block_bytes, receipts_bytes)) = ancient.1.pop_front() { + ancient.0.remove(&header.hash()); client.importer.import_old_block( &header, &block_bytes, diff --git a/ethcore/sync/src/blocks.rs b/ethcore/sync/src/blocks.rs index 321c783b4db..283f4ed610d 100644 --- a/ethcore/sync/src/blocks.rs +++ b/ethcore/sync/src/blocks.rs @@ -266,7 +266,7 @@ impl BlockCollection { } } - /// Get a valid chain of blocks ordered in descending order and ready for importing into blockchain. + /// Get a valid chain of blocks ordered in ascending order and ready for importing into blockchain. pub fn drain(&mut self) -> Vec { if self.blocks.is_empty() || self.head.is_none() { return Vec::new(); From 6d3ed76d04c65aff83003a1f060803c295c0d73b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 31 May 2018 13:44:03 +0200 Subject: [PATCH 5/6] Separate locks so that queue can happen while we are importing. --- ethcore/src/client/client.rs | 39 +++++++++++++++++++++++------------- miner/src/pool/queue.rs | 2 +- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 293c792070a..9c3edb13911 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -91,7 +91,7 @@ use_contract!(registry, "Registry", "res/contracts/registrar.json"); const MAX_TX_QUEUE_SIZE: usize = 4096; const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096; // Max number of blocks imported at once. -const MAX_ANCIENT_BLOCKS_IMPORT: usize = 4; +const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4; const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2; const MIN_HISTORY_SIZE: u64 = 8; @@ -217,6 +217,7 @@ pub struct Client { HashSet, VecDeque<(Header, Bytes, Bytes)> )>>, + ancient_blocks_import_lock: Arc>, /// Consensus messages import queue queue_consensus_message: IoChannelQueue, @@ -769,6 +770,7 @@ impl Client { queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE), queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), queued_ancient_blocks: Default::default(), + ancient_blocks_import_lock: Default::default(), queue_consensus_message: IoChannelQueue::new(usize::max_value()), last_hashes: RwLock::new(VecDeque::new()), factories: factories, @@ -2064,22 +2066,31 @@ impl IoClient for Client { queued.1.push_back((header, block_bytes, receipts_bytes)); } - let queue = self.queued_ancient_blocks.clone(); + let queued = self.queued_ancient_blocks.clone(); + let lock = self.ancient_blocks_import_lock.clone(); match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| { trace_time!("import_ancient_block"); // Make sure to hold the lock here to prevent importing out of order. - let mut ancient = queue.write(); - while let Some((header, block_bytes, receipts_bytes)) = ancient.1.pop_front() { - ancient.0.remove(&header.hash()); - client.importer.import_old_block( - &header, - &block_bytes, - &receipts_bytes, - &**client.db.read(), - &*client.chain.read() - ).map(|_| ()).unwrap_or_else(|e| { - error!(target: "client", "Error importing ancient block: {}", e); - }); + // We use separate lock, cause we don't want to block queueing. + let _lock = lock.lock(); + for _i in 0..MAX_ANCIENT_BLOCKS_TO_IMPORT { + let first = queued.write().1.pop_front(); + if let Some((header, block_bytes, receipts_bytes)) = first { + let hash = header.hash(); + client.importer.import_old_block( + &header, + &block_bytes, + &receipts_bytes, + &**client.db.read(), + &*client.chain.read() + ).map(|_| ()).unwrap_or_else(|e| { + error!(target: "client", "Error importing ancient block: {}", e); + }); + // remove from pending + queued.write().0.remove(&hash); + } else { + break; + } } }) { Ok(_) => Ok(hash), diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index 8cf4534b763..9a18c0c43c0 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -174,7 +174,7 @@ impl TransactionQueue { transactions: Vec, ) -> Vec> { // Run verification - let _timer = ::trace_time::PerfTimer::new("queue::verifyAndImport"); + let _timer = ::trace_time::PerfTimer::new("pool::verify_and_import"); let options = self.options.read().clone(); let verifier = verifier::Verifier::new(client, options, self.insertion_id.clone()); From 5af7b9a3fa1dd0be3807a7a25a361adb76039fea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 5 Jun 2018 14:45:44 +0200 Subject: [PATCH 6/6] Address grumbles. --- ethcore/src/client/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 9c3edb13911..08b49dd06d7 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -2083,7 +2083,7 @@ impl IoClient for Client { &receipts_bytes, &**client.db.read(), &*client.chain.read() - ).map(|_| ()).unwrap_or_else(|e| { + ).ok().map_or((), |e| { error!(target: "client", "Error importing ancient block: {}", e); }); // remove from pending