From de40eb38e3bd63a3ac15e55985e1384fb230996f Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 13 Jun 2019 16:39:23 +0200 Subject: [PATCH 01/10] Add a way to signal shutdown to snapshotting threads --- Cargo.toml | 4 +- ethcore/service/src/service.rs | 11 ++- ethcore/src/client/client.rs | 27 ++++++-- ethcore/src/snapshot/consensus/authority.rs | 6 +- ethcore/src/snapshot/error.rs | 3 + ethcore/src/snapshot/io.rs | 5 +- ethcore/src/snapshot/mod.rs | 77 +++++++++++++++------ ethcore/src/snapshot/service.rs | 26 +++++-- ethcore/src/snapshot/traits.rs | 3 + parity/lib.rs | 3 +- parity/run.rs | 33 ++++++--- 11 files changed, 145 insertions(+), 53 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1a5e32d1011..5dc3f6efd43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,7 +119,9 @@ name = "parity" [profile.release] debug = false -lto = true +# TODO: undo before merge +#lto = true +lto = false [workspace] # This should only list projects that are not diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index c16a071892b..c28d9418378 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -30,8 +30,10 @@ use blockchain::{BlockChainDB, BlockChainDBHandler}; use ethcore::client::{Client, ClientConfig, ChainNotify, ClientIoMessage}; use ethcore::miner::Miner; use ethcore::snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams}; -use ethcore::snapshot::{SnapshotService as _SnapshotService, RestorationStatus}; +use ethcore::snapshot::{SnapshotService as _SnapshotService, RestorationStatus, Error as SnapshotError}; use ethcore::spec::Spec; +use ethcore::error::Error as EthcoreError; + use ethcore_private_tx::{self, Importer, Signer}; use Error; @@ -197,6 +199,7 @@ impl ClientService { /// Shutdown the Client Service pub fn shutdown(&self) { + trace!(target: "shutdown", "Shutting down Client Service"); self.snapshot.shutdown(); } } @@ -257,7 +260,11 @@ impl IoHandler for ClientIoHandler { let res = thread::Builder::new().name("Periodic Snapshot".into()).spawn(move || { if let Err(e) = snapshot.take_snapshot(&*client, num) { - warn!("Failed to take snapshot at block #{}: {}", num, e); + match e { + EthcoreError::Snapshot(SnapshotError::AbortSnapshot) => info!("Snapshot aborted"), + _ => warn!("Failed to take snapshot at block #{}: {}", num, e), + } + } }); diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index d22cff0942d..a997661ac97 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -764,8 +764,8 @@ impl Client { liveness: AtomicBool::new(awake), mode: Mutex::new(config.mode.clone()), chain: RwLock::new(chain), - tracedb: tracedb, - engine: engine, + tracedb, + engine, pruning: config.pruning.clone(), db: RwLock::new(db.clone()), state_db: RwLock::new(state_db), @@ -778,8 +778,8 @@ impl Client { ancient_blocks_import_lock: Default::default(), queue_consensus_message: IoChannelQueue::new(usize::max_value()), last_hashes: RwLock::new(VecDeque::new()), - factories: factories, - history: history, + factories, + history, on_user_defaults_change: Mutex::new(None), registrar_address, exit_handler: Mutex::new(None), @@ -1138,7 +1138,12 @@ impl Client { /// Take a snapshot at the given block. /// If the ID given is "latest", this will default to 1000 blocks behind. - pub fn take_snapshot(&self, writer: W, at: BlockId, p: &snapshot::Progress) -> Result<(), EthcoreError> { + pub fn take_snapshot( + &self, + writer: W, + at: BlockId, + p: &snapshot::Progress, + ) -> Result<(), EthcoreError> { let db = self.state_db.read().journal_db().boxed_clone(); let best_block_number = self.chain_info().best_block_number; let block_number = self.block_number(at).ok_or_else(|| snapshot::Error::InvalidStartingBlock(at))?; @@ -1168,8 +1173,16 @@ impl Client { }; let processing_threads = self.config.snapshot.processing_threads; - snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hash_db(), writer, p, processing_threads)?; - + let chunker = self.engine.snapshot_components().ok_or(snapshot::Error::SnapshotsUnsupported)?; + snapshot::take_snapshot( + chunker, + &self.chain.read(), + start_hash, + db.as_hash_db(), + writer, + p, + processing_threads, + )?; Ok(()) } diff --git a/ethcore/src/snapshot/consensus/authority.rs b/ethcore/src/snapshot/consensus/authority.rs index 4423e074019..589299843f6 100644 --- a/ethcore/src/snapshot/consensus/authority.rs +++ b/ethcore/src/snapshot/consensus/authority.rs @@ -58,7 +58,7 @@ impl SnapshotComponents for PoaSnapshot { chain: &BlockChain, block_at: H256, sink: &mut ChunkSink, - _progress: &Progress, + progress: &Progress, preferred_size: usize, ) -> Result<(), Error> { let number = chain.block_number(&block_at) @@ -70,6 +70,10 @@ impl SnapshotComponents for PoaSnapshot { for (_, transition) in chain.epoch_transitions() .take_while(|&(_, ref t)| t.block_number <= number) { + if progress.abort.load(Ordering::SeqCst) { + return Err(Error::AbortSnapshot); + } + // this can happen when our starting block is non-canonical. if transition.block_number == number && transition.block_hash != block_at { break diff --git a/ethcore/src/snapshot/error.rs b/ethcore/src/snapshot/error.rs index 0eba4725569..3b1ea3dfac0 100644 --- a/ethcore/src/snapshot/error.rs +++ b/ethcore/src/snapshot/error.rs @@ -62,6 +62,8 @@ pub enum Error { ChunkTooLarge, /// Snapshots not supported by the consensus engine. SnapshotsUnsupported, + /// Aborted snapshot + AbortSnapshot, /// Bad epoch transition. BadEpochProof(u64), /// Wrong chunk format. @@ -103,6 +105,7 @@ impl fmt::Display for Error { Error::ChunkTooSmall => write!(f, "Chunk size is too small."), Error::ChunkTooLarge => write!(f, "Chunk size is too large."), Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."), + Error::AbortSnapshot => write!(f, "Snapshot was aborted."), Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i), Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg), Error::UnlinkedAncientBlockChain => write!(f, "Unlinked ancient blocks chain"), diff --git a/ethcore/src/snapshot/io.rs b/ethcore/src/snapshot/io.rs index c5f178cd329..536862e7bee 100644 --- a/ethcore/src/snapshot/io.rs +++ b/ethcore/src/snapshot/io.rs @@ -310,10 +310,7 @@ impl LooseReader { dir.pop(); - Ok(LooseReader { - dir: dir, - manifest: manifest, - }) + Ok(LooseReader { dir, manifest }) } } diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 155182ed516..5d2fcd2143b 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -119,6 +119,7 @@ pub struct Progress { blocks: AtomicUsize, size: AtomicUsize, // Todo [rob] use Atomicu64 when it stabilizes. done: AtomicBool, + abort: AtomicBool, // TODO: why no Arcs here? } impl Progress { @@ -127,6 +128,7 @@ impl Progress { self.accounts.store(0, Ordering::Release); self.blocks.store(0, Ordering::Release); self.size.store(0, Ordering::Release); + self.abort.store(false, Ordering::Release); // atomic fence here to ensure the others are written first? // logs might very rarely get polluted if not. @@ -148,27 +150,28 @@ impl Progress { } /// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer. pub fn take_snapshot( - engine: &EthEngine, + chunker: Box, chain: &BlockChain, - block_at: H256, + block_hash: H256, state_db: &HashDB, writer: W, p: &Progress, processing_threads: usize, ) -> Result<(), Error> { - let start_header = chain.block_header_data(&block_at) - .ok_or_else(|| Error::InvalidStartingBlock(BlockId::Hash(block_at)))?; + let start_header = chain.block_header_data(&block_hash) + .ok_or_else(|| Error::InvalidStartingBlock(BlockId::Hash(block_hash)))?; let state_root = start_header.state_root(); - let number = start_header.number(); + let block_number = start_header.number(); - info!("Taking snapshot starting at block {}", number); + info!("Taking snapshot starting at block {}", block_number); + let version = chunker.current_version(); let writer = Mutex::new(writer); - let chunker = engine.snapshot_components().ok_or(Error::SnapshotsUnsupported)?; - let snapshot_version = chunker.current_version(); let (state_hashes, block_hashes) = scope(|scope| -> Result<(Vec, Vec), Error> { let writer = &writer; - let block_guard = scope.spawn(move || chunk_secondary(chunker, chain, block_at, writer, p)); + let block_guard = scope.spawn(move || { + chunk_secondary(chunker, chain, block_hash, writer, p) + }); // The number of threads must be between 1 and SNAPSHOT_SUBPARTS assert!(processing_threads >= 1, "Cannot use less than 1 threads for creating snapshots"); @@ -183,7 +186,7 @@ pub fn take_snapshot( for part in (thread_idx..SNAPSHOT_SUBPARTS).step_by(num_threads) { debug!(target: "snapshot", "Chunking part {} in thread {}", part, thread_idx); - let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part))?; + let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part), thread_idx)?; chunk_hashes.append(&mut hashes); } @@ -196,6 +199,10 @@ pub fn take_snapshot( let mut state_hashes = Vec::new(); for guard in state_guards { + if p.abort.load(Ordering::SeqCst) { + trace!(target: "snapshot", "[snapshot::take_snapshot] aborting"); + return Err(Error::AbortSnapshot); + } let part_state_hashes = guard.join().expect("Sub-thread never panics; qed")?; state_hashes.extend(part_state_hashes); } @@ -207,12 +214,12 @@ pub fn take_snapshot( info!(target: "snapshot", "produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len()); let manifest_data = ManifestData { - version: snapshot_version, - state_hashes: state_hashes, - block_hashes: block_hashes, - state_root: state_root, - block_number: number, - block_hash: block_at, + version, + state_hashes, + block_hashes, + state_root, + block_number, + block_hash, }; writer.into_inner().finish(manifest_data)?; @@ -228,12 +235,22 @@ pub fn take_snapshot( /// Secondary chunks are engine-specific, but they intend to corroborate the state data /// in the state chunks. /// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis. -pub fn chunk_secondary<'a>(mut chunker: Box, chain: &'a BlockChain, start_hash: H256, writer: &Mutex, progress: &'a Progress) -> Result, Error> { +pub fn chunk_secondary<'a>( + mut chunker: Box, + chain: &'a BlockChain, + start_hash: H256, + writer: &Mutex, + progress: &'a Progress +) -> Result, Error> { let mut chunk_hashes = Vec::new(); let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)]; { let mut chunk_sink = |raw_data: &[u8]| { + if progress.abort.load(Ordering::SeqCst) { + trace!(target: "snapshot", "[chunk_secondary, sink] Aborting"); + return Ok(()); + } let compressed_size = snappy::compress_into(raw_data, &mut snappy_buffer); let compressed = &snappy_buffer[..compressed_size]; let hash = keccak(&compressed); @@ -268,6 +285,7 @@ struct StateChunker<'a> { snappy_buffer: Vec, writer: &'a Mutex, progress: &'a Progress, + thread_idx: usize, } impl<'a> StateChunker<'a> { @@ -284,6 +302,10 @@ impl<'a> StateChunker<'a> { // Write out the buffer to disk, pushing the created chunk's hash to // the list. fn write_chunk(&mut self) -> Result<(), Error> { + if self.progress.abort.load(Ordering::SeqCst) { + trace!(target: "snapshot", "[write_chunk] Thread {} aborting early", self.thread_idx); + return Err(Error::AbortSnapshot); + } let num_entries = self.rlps.len(); let mut stream = RlpStream::new_list(num_entries); for rlp in self.rlps.drain(..) { @@ -297,7 +319,7 @@ impl<'a> StateChunker<'a> { let hash = keccak(&compressed); self.writer.lock().write_state_chunk(hash, compressed)?; - trace!(target: "snapshot", "wrote state chunk. size: {}, uncompressed size: {}", compressed_size, raw_data.len()); + trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len()); self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst); self.progress.size.fetch_add(compressed_size, Ordering::SeqCst); @@ -321,7 +343,14 @@ impl<'a> StateChunker<'a> { /// /// Returns a list of hashes of chunks created, or any error it may /// have encountered. -pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex, progress: &'a Progress, part: Option) -> Result, Error> { +pub fn chunk_state<'a>( + db: &HashDB, + root: &H256, + writer: &Mutex, + progress: &'a Progress, + part: Option, + thread_idx: usize, +) -> Result, Error> { let account_trie = TrieDB::new(&db, &root)?; let mut chunker = StateChunker { @@ -329,8 +358,9 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: rlps: Vec::new(), cur_size: 0, snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)], - writer: writer, - progress: progress, + writer, + progress, + thread_idx, }; let mut used_code = HashSet::new(); @@ -355,6 +385,11 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: } for item in account_iter { + if progress.abort.load(Ordering::SeqCst) { + trace!(target: "snapshot", "[chunk_state] Thread {} aborting", thread_idx); + return Err(Error::AbortSnapshot); + } + let (account_key, account_data) = item?; let account_key_hash = H256::from_slice(&account_key); diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 8120f47b81f..a996311a632 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -415,7 +415,7 @@ impl Service { _ => break, } - // Writting changes to DB and logging every now and then + // Writing changes to DB and logging every now and then if block_number % 1_000 == 0 { next_db.key_value().write_buffered(batch); next_chain.commit(); @@ -479,15 +479,12 @@ impl Service { let guard = Guard::new(temp_dir.clone()); let res = client.take_snapshot(writer, BlockId::Number(num), &self.progress); - self.taking_snapshot.store(false, Ordering::SeqCst); if let Err(e) = res { if client.chain_info().best_block_number >= num + client.pruning_history() { - // "Cancelled" is mincing words a bit -- what really happened - // is that the state we were snapshotting got pruned out - // before we could finish. - info!("Periodic snapshot failed: block state pruned.\ - Run with a longer `--pruning-history` or with `--no-periodic-snapshot`"); + // The state we were snapshotting was pruned before we could finish. + info!("Periodic snapshot failed: block state pruned. Run with a longer `--pruning-history` or with `--no-periodic-snapshot`"); + // TODO: this looks odd to me. I don't think it's dangerous but I also don't understand why we're not returning an error here. return Ok(()) } else { return Err(e); @@ -846,14 +843,29 @@ impl SnapshotService for Service { } } + fn abort_snapshot(&self) { + if self.taking_snapshot.load(Ordering::SeqCst) { + trace!(target: "snapshot", "Aborting snapshot – Snapshot under way"); + self.progress.abort.store(true, Ordering::SeqCst); + } + } + fn shutdown(&self) { + trace!(target: "snapshot", "Shut down SnapshotService"); self.abort_restore(); + trace!(target: "snapshot", "Shut down SnapshotService - restore aborted"); + self.abort_snapshot(); + trace!(target: "snapshot", "Shut down SnapshotService - snapshot aborted"); } } impl Drop for Service { fn drop(&mut self) { + trace!(target: "shutdown", "Dropping Service"); self.abort_restore(); + trace!(target: "shutdown", "Dropping Service - restore aborted"); + self.abort_snapshot(); + trace!(target: "shutdown", "Dropping Service - snapshot aborted"); } } diff --git a/ethcore/src/snapshot/traits.rs b/ethcore/src/snapshot/traits.rs index bb4ab3b3964..aa61b595bf7 100644 --- a/ethcore/src/snapshot/traits.rs +++ b/ethcore/src/snapshot/traits.rs @@ -55,6 +55,9 @@ pub trait SnapshotService : Sync + Send { /// no-op if currently restoring. fn restore_block_chunk(&self, hash: H256, chunk: Bytes); + /// Abort in-progress snapshotting if there is one. + fn abort_snapshot(&self); + /// Shutdown the Snapshot Service by aborting any ongoing restore fn shutdown(&self); } diff --git a/parity/lib.rs b/parity/lib.rs index 7d1bf108b79..2deac114148 100644 --- a/parity/lib.rs +++ b/parity/lib.rs @@ -15,7 +15,8 @@ // along with Parity Ethereum. If not, see . //! Ethcore client application. - +// TODO: remove before merge +#![feature(weak_counts)] #![warn(missing_docs)] extern crate ansi_term; diff --git a/parity/run.rs b/parity/run.rs index c21399290b8..e33dcea1671 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -66,10 +66,14 @@ use signer; use db; // how often to take periodic snapshots. -const SNAPSHOT_PERIOD: u64 = 5000; +// TODO: restore before merge +//const SNAPSHOT_PERIOD: u64 = 5000; +const SNAPSHOT_PERIOD: u64 = 50; // how many blocks to wait before starting a periodic snapshot. -const SNAPSHOT_HISTORY: u64 = 100; +// TODO: restore before merge +//const SNAPSHOT_HISTORY: u64 = 100; +const SNAPSHOT_HISTORY: u64 = 5; // Number of minutes before a given gas price corpus should expire. // Light client only. @@ -900,17 +904,26 @@ impl RunningClient { // Create a weak reference to the client so that we can wait on shutdown // until it is dropped let weak_client = Arc::downgrade(&client); - // Shutdown and drop the ServiceClient + // Shutdown and drop the ClientService client_service.shutdown(); + trace!(target: "shutdown", "ClientService shut down"); drop(client_service); + trace!(target: "shutdown", "ClientService dropped"); // drop this stuff as soon as exit detected. drop(rpc); + trace!(target: "shutdown", "RPC dropped"); drop(keep_alive); + trace!(target: "shutdown", "KeepAlive dropped"); // to make sure timer does not spawn requests while shutdown is in progress informant.shutdown(); + trace!(target: "shutdown", "Informant shut down"); // just Arc is dropping here, to allow other reference release in its default time drop(informant); + trace!(target: "shutdown", "Informant dropped"); drop(client); + trace!(target: "shutdown", "Client dropped"); + // TODO: remove weak_count before merging. + trace!(target: "shutdown", "Waiting for refs to Client to shutdown, strong_count={:?}, weak_count={:?}", weak_client.strong_count(), weak_client.weak_count()); wait_for_drop(weak_client); } } @@ -944,24 +957,26 @@ fn print_running_environment(data_dir: &str, dirs: &Directories, db_dirs: &Datab } fn wait_for_drop(w: Weak) { - let sleep_duration = Duration::from_secs(1); - let warn_timeout = Duration::from_secs(60); - let max_timeout = Duration::from_secs(300); + const SLEEP_DURATION: Duration = Duration::from_secs(1); + const WARN_TIMEOUT: Duration = Duration::from_secs(60); + const MAX_TIMEOUT: Duration = Duration::from_secs(300); let instant = Instant::now(); let mut warned = false; - while instant.elapsed() < max_timeout { + while instant.elapsed() < MAX_TIMEOUT { if w.upgrade().is_none() { return; } - if !warned && instant.elapsed() > warn_timeout { + if !warned && instant.elapsed() > WARN_TIMEOUT { warned = true; warn!("Shutdown is taking longer than expected."); } - thread::sleep(sleep_duration); + thread::sleep(SLEEP_DURATION); + // TODO: remove weak_count before merging + trace!(target: "shutdown", "Waiting for client to drop, strong_count={:?}, weak_count={:?}", w.strong_count(), w.weak_count()); } warn!("Shutdown timeout reached, exiting uncleanly."); From 9abd4be36d681b137d21940bddd69976b6642a9a Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 13 Jun 2019 22:27:30 +0200 Subject: [PATCH 02/10] Pass Progress to fat_rlps() so we can abort from there too. --- ethcore/src/snapshot/account.rs | 21 +++++++++++++++++++-- ethcore/src/snapshot/mod.rs | 2 +- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/ethcore/src/snapshot/account.rs b/ethcore/src/snapshot/account.rs index d18ecacbc3b..4b092b83fe8 100644 --- a/ethcore/src/snapshot/account.rs +++ b/ethcore/src/snapshot/account.rs @@ -24,9 +24,10 @@ use ethtrie::{TrieDB, TrieDBMut}; use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP}; use hash_db::HashDB; use rlp::{RlpStream, Rlp}; -use snapshot::Error; +use snapshot::{Error, Progress}; use std::collections::HashSet; use trie::{Trie, TrieMut}; +use std::sync::atomic::Ordering; // An empty account -- these were replaced with RLP null data for a space optimization in v1. const ACC_EMPTY: BasicAccount = BasicAccount { @@ -65,7 +66,15 @@ impl CodeState { // walk the account's storage trie, returning a vector of RLP items containing the // account address hash, account properties and the storage. Each item contains at most `max_storage_items` // storage records split according to snapshot format definition. -pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB, used_code: &mut HashSet, first_chunk_size: usize, max_chunk_size: usize) -> Result, Error> { +pub fn to_fat_rlps( + account_hash: &H256, + acc: &BasicAccount, + acct_db: &AccountDB, + used_code: &mut HashSet, + first_chunk_size: usize, + max_chunk_size: usize, + p: &Progress, +) -> Result, Error> { let db = &(acct_db as &HashDB<_,_>); let db = TrieDB::new(db, &acc.storage_root)?; let mut chunks = Vec::new(); @@ -74,6 +83,10 @@ pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB, let mut account_stream = RlpStream::new_list(2); let mut leftover: Option> = None; loop { + if p.abort.load(Ordering::SeqCst) { + trace!(target: "snapshot", "[account, to_fat_rlps] abort (top of the first loop"); + return Err(Error::AbortSnapshot); + } account_stream.append(account_hash); account_stream.begin_list(5); @@ -112,6 +125,10 @@ pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB, } loop { + if p.abort.load(Ordering::SeqCst) { + trace!(target: "snapshot", "[account, to_fat_rlps] abort"); + return Err(Error::AbortSnapshot); + } match db_iter.next() { Some(Ok((k, v))) => { let pair = { diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 5d2fcd2143b..454a1b0f735 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -400,7 +400,7 @@ pub fn chunk_state<'a>( let account = ::rlp::decode(&*account_data)?; let account_db = AccountDB::from_hash(db, account_key_hash); - let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE)?; + let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE, progress)?; for (i, fat_rlp) in fat_rlps.into_iter().enumerate() { if i > 0 { chunker.write_chunk()?; From 6f36ecb504a032c2cafa26b6858de8a31de3564f Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 13 Jun 2019 22:56:19 +0200 Subject: [PATCH 03/10] Checking for abort in a single spot --- ethcore/service/src/service.rs | 2 +- ethcore/src/snapshot/account.rs | 8 ++------ ethcore/src/snapshot/consensus/authority.rs | 4 ---- ethcore/src/snapshot/error.rs | 4 ++-- ethcore/src/snapshot/mod.rs | 17 ----------------- 5 files changed, 5 insertions(+), 30 deletions(-) diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index c28d9418378..11bcdfa708f 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -261,7 +261,7 @@ impl IoHandler for ClientIoHandler { let res = thread::Builder::new().name("Periodic Snapshot".into()).spawn(move || { if let Err(e) = snapshot.take_snapshot(&*client, num) { match e { - EthcoreError::Snapshot(SnapshotError::AbortSnapshot) => info!("Snapshot aborted"), + EthcoreError::Snapshot(SnapshotError::SnapshotAborted) => info!("Snapshot aborted"), _ => warn!("Failed to take snapshot at block #{}: {}", num, e), } diff --git a/ethcore/src/snapshot/account.rs b/ethcore/src/snapshot/account.rs index 4b092b83fe8..1f03e850ab9 100644 --- a/ethcore/src/snapshot/account.rs +++ b/ethcore/src/snapshot/account.rs @@ -83,10 +83,6 @@ pub fn to_fat_rlps( let mut account_stream = RlpStream::new_list(2); let mut leftover: Option> = None; loop { - if p.abort.load(Ordering::SeqCst) { - trace!(target: "snapshot", "[account, to_fat_rlps] abort (top of the first loop"); - return Err(Error::AbortSnapshot); - } account_stream.append(account_hash); account_stream.begin_list(5); @@ -126,8 +122,8 @@ pub fn to_fat_rlps( loop { if p.abort.load(Ordering::SeqCst) { - trace!(target: "snapshot", "[account, to_fat_rlps] abort"); - return Err(Error::AbortSnapshot); + trace!(target: "snapshot", "to_fat_rlps: aborting snapshot"); + return Err(Error::SnapshotAborted); } match db_iter.next() { Some(Ok((k, v))) => { diff --git a/ethcore/src/snapshot/consensus/authority.rs b/ethcore/src/snapshot/consensus/authority.rs index 589299843f6..71e76be3d13 100644 --- a/ethcore/src/snapshot/consensus/authority.rs +++ b/ethcore/src/snapshot/consensus/authority.rs @@ -70,10 +70,6 @@ impl SnapshotComponents for PoaSnapshot { for (_, transition) in chain.epoch_transitions() .take_while(|&(_, ref t)| t.block_number <= number) { - if progress.abort.load(Ordering::SeqCst) { - return Err(Error::AbortSnapshot); - } - // this can happen when our starting block is non-canonical. if transition.block_number == number && transition.block_hash != block_at { break diff --git a/ethcore/src/snapshot/error.rs b/ethcore/src/snapshot/error.rs index 3b1ea3dfac0..f9eafb15255 100644 --- a/ethcore/src/snapshot/error.rs +++ b/ethcore/src/snapshot/error.rs @@ -63,7 +63,7 @@ pub enum Error { /// Snapshots not supported by the consensus engine. SnapshotsUnsupported, /// Aborted snapshot - AbortSnapshot, + SnapshotAborted, /// Bad epoch transition. BadEpochProof(u64), /// Wrong chunk format. @@ -105,7 +105,7 @@ impl fmt::Display for Error { Error::ChunkTooSmall => write!(f, "Chunk size is too small."), Error::ChunkTooLarge => write!(f, "Chunk size is too large."), Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."), - Error::AbortSnapshot => write!(f, "Snapshot was aborted."), + Error::SnapshotAborted => write!(f, "Snapshot was aborted."), Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i), Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg), Error::UnlinkedAncientBlockChain => write!(f, "Unlinked ancient blocks chain"), diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 454a1b0f735..a29b9dee2d7 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -199,10 +199,6 @@ pub fn take_snapshot( let mut state_hashes = Vec::new(); for guard in state_guards { - if p.abort.load(Ordering::SeqCst) { - trace!(target: "snapshot", "[snapshot::take_snapshot] aborting"); - return Err(Error::AbortSnapshot); - } let part_state_hashes = guard.join().expect("Sub-thread never panics; qed")?; state_hashes.extend(part_state_hashes); } @@ -247,10 +243,6 @@ pub fn chunk_secondary<'a>( { let mut chunk_sink = |raw_data: &[u8]| { - if progress.abort.load(Ordering::SeqCst) { - trace!(target: "snapshot", "[chunk_secondary, sink] Aborting"); - return Ok(()); - } let compressed_size = snappy::compress_into(raw_data, &mut snappy_buffer); let compressed = &snappy_buffer[..compressed_size]; let hash = keccak(&compressed); @@ -302,10 +294,6 @@ impl<'a> StateChunker<'a> { // Write out the buffer to disk, pushing the created chunk's hash to // the list. fn write_chunk(&mut self) -> Result<(), Error> { - if self.progress.abort.load(Ordering::SeqCst) { - trace!(target: "snapshot", "[write_chunk] Thread {} aborting early", self.thread_idx); - return Err(Error::AbortSnapshot); - } let num_entries = self.rlps.len(); let mut stream = RlpStream::new_list(num_entries); for rlp in self.rlps.drain(..) { @@ -385,11 +373,6 @@ pub fn chunk_state<'a>( } for item in account_iter { - if progress.abort.load(Ordering::SeqCst) { - trace!(target: "snapshot", "[chunk_state] Thread {} aborting", thread_idx); - return Err(Error::AbortSnapshot); - } - let (account_key, account_data) = item?; let account_key_hash = H256::from_slice(&account_key); From 51fddbba9d71889332c57fbebb14a9d576eeef47 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 14 Jun 2019 09:29:20 +0200 Subject: [PATCH 04/10] Remove nightly-only weak/strong counts --- parity/lib.rs | 2 -- parity/run.rs | 13 +++++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/parity/lib.rs b/parity/lib.rs index 2deac114148..9dbc8c6eafb 100644 --- a/parity/lib.rs +++ b/parity/lib.rs @@ -15,8 +15,6 @@ // along with Parity Ethereum. If not, see . //! Ethcore client application. -// TODO: remove before merge -#![feature(weak_counts)] #![warn(missing_docs)] extern crate ansi_term; diff --git a/parity/run.rs b/parity/run.rs index e33dcea1671..c26ed73e237 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -922,8 +922,9 @@ impl RunningClient { trace!(target: "shutdown", "Informant dropped"); drop(client); trace!(target: "shutdown", "Client dropped"); - // TODO: remove weak_count before merging. - trace!(target: "shutdown", "Waiting for refs to Client to shutdown, strong_count={:?}, weak_count={:?}", weak_client.strong_count(), weak_client.weak_count()); + // This may help when debugging ref cycles. Requires nightly-only `#![feature(weak_counts)]` + // trace!(target: "shutdown", "Waiting for refs to Client to shutdown, strong_count={:?}, weak_count={:?}", weak_client.strong_count(), weak_client.weak_count()); + trace!(target: "shutdown", "Waiting for refs to Client to shutdown"); wait_for_drop(weak_client); } } @@ -975,8 +976,12 @@ fn wait_for_drop(w: Weak) { } thread::sleep(SLEEP_DURATION); - // TODO: remove weak_count before merging - trace!(target: "shutdown", "Waiting for client to drop, strong_count={:?}, weak_count={:?}", w.strong_count(), w.weak_count()); + + // When debugging shutdown issues on a nightly build it can help to enable this with the + // `#![feature(weak_counts)]` added to lib.rs (TODO: enable when + // https://github.com/rust-lang/rust/issues/57977 is stable) + // trace!(target: "shutdown", "Waiting for client to drop, strong_count={:?}, weak_count={:?}", w.strong_count(), w.weak_count()); + trace!(target: "shutdown", "Waiting for client to drop"); } warn!("Shutdown timeout reached, exiting uncleanly."); From e8e9d78a6715eca61716ce3c5a391213a560c8ee Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 14 Jun 2019 09:35:09 +0200 Subject: [PATCH 05/10] fix warning --- ethcore/src/snapshot/consensus/authority.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethcore/src/snapshot/consensus/authority.rs b/ethcore/src/snapshot/consensus/authority.rs index 71e76be3d13..4423e074019 100644 --- a/ethcore/src/snapshot/consensus/authority.rs +++ b/ethcore/src/snapshot/consensus/authority.rs @@ -58,7 +58,7 @@ impl SnapshotComponents for PoaSnapshot { chain: &BlockChain, block_at: H256, sink: &mut ChunkSink, - progress: &Progress, + _progress: &Progress, preferred_size: usize, ) -> Result<(), Error> { let number = chain.block_number(&block_at) From 200fdd570110c95c57b3ef2d55afe96f0a2a6e94 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 14 Jun 2019 10:37:11 +0200 Subject: [PATCH 06/10] Fix tests --- ethcore/src/snapshot/account.rs | 23 ++++++++++++++--------- ethcore/src/snapshot/tests/service.rs | 9 +++++---- ethcore/src/snapshot/tests/state.rs | 12 ++++++------ 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/ethcore/src/snapshot/account.rs b/ethcore/src/snapshot/account.rs index 1f03e850ab9..c64eedf246e 100644 --- a/ethcore/src/snapshot/account.rs +++ b/ethcore/src/snapshot/account.rs @@ -224,6 +224,7 @@ mod tests { use types::basic_account::BasicAccount; use test_helpers::get_temp_state_db; use snapshot::tests::helpers::fill_storage; + use snapshot::Progress; use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP, keccak}; use ethereum_types::{H256, Address}; @@ -249,8 +250,8 @@ mod tests { let thin_rlp = ::rlp::encode(&account); assert_eq!(::rlp::decode::(&thin_rlp).unwrap(), account); - - let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap(); + let p = Progress::default(); + let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap(); let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap(); assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account); } @@ -275,7 +276,9 @@ mod tests { let thin_rlp = ::rlp::encode(&account); assert_eq!(::rlp::decode::(&thin_rlp).unwrap(), account); - let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap(); + let p = Progress::default(); + + let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap(); let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap(); assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account); } @@ -300,7 +303,8 @@ mod tests { let thin_rlp = ::rlp::encode(&account); assert_eq!(::rlp::decode::(&thin_rlp).unwrap(), account); - let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), 500, 1000).unwrap(); + let p = Progress::default(); + let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), 500, 1000, &p).unwrap(); let mut root = KECCAK_NULL_RLP; let mut restored_account = None; for rlp in fat_rlps { @@ -332,20 +336,21 @@ mod tests { nonce: 50.into(), balance: 123456789.into(), storage_root: KECCAK_NULL_RLP, - code_hash: code_hash, + code_hash, }; let account2 = BasicAccount { nonce: 400.into(), balance: 98765432123456789usize.into(), storage_root: KECCAK_NULL_RLP, - code_hash: code_hash, + code_hash, }; let mut used_code = HashSet::new(); - - let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::new(db.as_hash_db(), &addr1), &mut used_code, usize::max_value(), usize::max_value()).unwrap(); - let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::new(db.as_hash_db(), &addr2), &mut used_code, usize::max_value(), usize::max_value()).unwrap(); + let p1 = Progress::default(); + let p2 = Progress::default(); + let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::new(db.as_hash_db(), &addr1), &mut used_code, usize::max_value(), usize::max_value(), &p1).unwrap(); + let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::new(db.as_hash_db(), &addr2), &mut used_code, usize::max_value(), usize::max_value(), &p2).unwrap(); assert_eq!(used_code.len(), 1); let fat_rlp1 = Rlp::new(&fat_rlp1[0]).at(1).unwrap(); diff --git a/ethcore/src/snapshot/tests/service.rs b/ethcore/src/snapshot/tests/service.rs index 37a10048abf..515e5992ffa 100644 --- a/ethcore/src/snapshot/tests/service.rs +++ b/ethcore/src/snapshot/tests/service.rs @@ -188,14 +188,15 @@ fn keep_ancient_blocks() { &state_root, &writer, &Progress::default(), - None + None, + 0 ).unwrap(); let manifest = ::snapshot::ManifestData { version: 2, - state_hashes: state_hashes, - state_root: state_root, - block_hashes: block_hashes, + state_hashes, + state_root, + block_hashes, block_number: NUM_BLOCKS, block_hash: best_hash, }; diff --git a/ethcore/src/snapshot/tests/state.rs b/ethcore/src/snapshot/tests/state.rs index bbf3bfa6271..769534e0298 100644 --- a/ethcore/src/snapshot/tests/state.rs +++ b/ethcore/src/snapshot/tests/state.rs @@ -58,7 +58,7 @@ fn snap_and_restore() { let mut state_hashes = Vec::new(); for part in 0..SNAPSHOT_SUBPARTS { - let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part)).unwrap(); + let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part), 0).unwrap(); state_hashes.append(&mut hashes); } @@ -129,8 +129,8 @@ fn get_code_from_prev_chunk() { let mut make_chunk = |acc, hash| { let mut db = journaldb::new_memory_db(); AccountDBMut::from_hash(&mut db, hash).insert(&code[..]); - - let fat_rlp = account::to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value()).unwrap(); + let p = Progress::default(); + let fat_rlp = account::to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value(), &p).unwrap(); let mut stream = RlpStream::new_list(1); stream.append_raw(&fat_rlp[0], 1); stream.out() @@ -174,13 +174,13 @@ fn checks_flag() { let state_root = producer.state_root(); let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap()); - let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None).unwrap(); + let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None, 0).unwrap(); writer.into_inner().finish(::snapshot::ManifestData { version: 2, - state_hashes: state_hashes, + state_hashes, block_hashes: Vec::new(), - state_root: state_root, + state_root, block_number: 0, block_hash: H256::zero(), }).unwrap(); From 6b400e7d30290aa54524a45fcbd2b52fce60ecc7 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 14 Jun 2019 11:09:05 +0200 Subject: [PATCH 07/10] Add dummy impl to abort snapshots --- ethcore/sync/src/tests/snapshot.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ethcore/sync/src/tests/snapshot.rs b/ethcore/sync/src/tests/snapshot.rs index 60459defb5a..f7a947329aa 100644 --- a/ethcore/sync/src/tests/snapshot.rs +++ b/ethcore/sync/src/tests/snapshot.rs @@ -122,6 +122,8 @@ impl SnapshotService for TestSnapshotService { self.block_restoration_chunks.lock().clear(); } + fn abort_snapshot(&self) {} + fn restore_state_chunk(&self, hash: H256, chunk: Bytes) { if self.restoration_manifest.lock().as_ref().map_or(false, |m| m.state_hashes.iter().any(|h| h == &hash)) { self.state_restoration_chunks.lock().insert(hash, chunk); From b83ab98eabc96e06e5b4c776fd7e7b145ed4620c Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 14 Jun 2019 19:56:18 +0200 Subject: [PATCH 08/10] Add another dummy impl for TestSnapshotService --- rpc/src/v1/tests/helpers/snapshot_service.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rpc/src/v1/tests/helpers/snapshot_service.rs b/rpc/src/v1/tests/helpers/snapshot_service.rs index 5450886bb42..881c434e178 100644 --- a/rpc/src/v1/tests/helpers/snapshot_service.rs +++ b/rpc/src/v1/tests/helpers/snapshot_service.rs @@ -48,6 +48,7 @@ impl SnapshotService for TestSnapshotService { fn status(&self) -> RestorationStatus { self.status.lock().clone() } fn begin_restore(&self, _manifest: ManifestData) { } fn abort_restore(&self) { } + fn abort_snapshot(&self) {} fn restore_state_chunk(&self, _hash: H256, _chunk: Bytes) { } fn restore_block_chunk(&self, _hash: H256, _chunk: Bytes) { } fn shutdown(&self) { } From bfe795e00b3daf5a82ade744ca6be52f10272523 Mon Sep 17 00:00:00 2001 From: David Palm Date: Mon, 17 Jun 2019 13:26:46 +0200 Subject: [PATCH 09/10] Remove debugging code --- Cargo.toml | 4 +--- parity/run.rs | 8 ++------ 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5dc3f6efd43..1a5e32d1011 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,9 +119,7 @@ name = "parity" [profile.release] debug = false -# TODO: undo before merge -#lto = true -lto = false +lto = true [workspace] # This should only list projects that are not diff --git a/parity/run.rs b/parity/run.rs index c26ed73e237..0652e8f9d99 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -66,14 +66,10 @@ use signer; use db; // how often to take periodic snapshots. -// TODO: restore before merge -//const SNAPSHOT_PERIOD: u64 = 5000; -const SNAPSHOT_PERIOD: u64 = 50; +const SNAPSHOT_PERIOD: u64 = 5000; // how many blocks to wait before starting a periodic snapshot. -// TODO: restore before merge -//const SNAPSHOT_HISTORY: u64 = 100; -const SNAPSHOT_HISTORY: u64 = 5; +const SNAPSHOT_HISTORY: u64 = 100; // Number of minutes before a given gas price corpus should expire. // Light client only. From dca02a8c9036db6361e978a4b32c4c8dd2573869 Mon Sep 17 00:00:00 2001 From: David Palm Date: Tue, 18 Jun 2019 13:20:55 +0200 Subject: [PATCH 10/10] Return error instead of the odd Ok(()) Switch to AtomicU64 --- ethcore/src/snapshot/mod.rs | 14 +++++++------- ethcore/src/snapshot/service.rs | 3 +-- parity/snapshot.rs | 2 +- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 2f826c102c6..51f6f6de8bb 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -22,7 +22,7 @@ use std::collections::{HashMap, HashSet}; use std::cmp; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY}; use account_db::{AccountDB, AccountDBMut}; @@ -117,9 +117,9 @@ impl Default for SnapshotConfiguration { pub struct Progress { accounts: AtomicUsize, blocks: AtomicUsize, - size: AtomicUsize, // Todo [rob] use Atomicu64 when it stabilizes. + size: AtomicU64, done: AtomicBool, - abort: AtomicBool, // TODO: why no Arcs here? + abort: AtomicBool, } impl Progress { @@ -142,7 +142,7 @@ impl Progress { pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) } /// Get the written size of the snapshot in bytes. - pub fn size(&self) -> usize { self.size.load(Ordering::Acquire) } + pub fn size(&self) -> u64 { self.size.load(Ordering::Acquire) } /// Whether the snapshot is complete. pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) } @@ -150,7 +150,7 @@ impl Progress { } /// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer. pub fn take_snapshot( - chunker: Box, + chunker: Box, chain: &BlockChain, block_hash: H256, state_db: &dyn HashDB, @@ -252,7 +252,7 @@ pub fn chunk_secondary<'a>( trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}", hash, size, raw_data.len()); - progress.size.fetch_add(size, Ordering::SeqCst); + progress.size.fetch_add(size as u64, Ordering::SeqCst); chunk_hashes.push(hash); Ok(()) }; @@ -310,7 +310,7 @@ impl<'a> StateChunker<'a> { trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len()); self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst); - self.progress.size.fetch_add(compressed_size, Ordering::SeqCst); + self.progress.size.fetch_add(compressed_size as u64, Ordering::SeqCst); self.hashes.push(hash); self.cur_size = 0; diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 15819e73f68..a9ff866ebed 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -484,8 +484,7 @@ impl Service { if client.chain_info().best_block_number >= num + client.pruning_history() { // The state we were snapshotting was pruned before we could finish. info!("Periodic snapshot failed: block state pruned. Run with a longer `--pruning-history` or with `--no-periodic-snapshot`"); - // TODO: this looks odd to me. I don't think it's dangerous but I also don't understand why we're not returning an error here. - return Ok(()) + return Err(e); } else { return Err(e); } diff --git a/parity/snapshot.rs b/parity/snapshot.rs index 70957762f5a..269965c3355 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -261,7 +261,7 @@ impl SnapshotCommand { let cur_size = p.size(); if cur_size != last_size { last_size = cur_size; - let bytes = ::informant::format_bytes(p.size()); + let bytes = ::informant::format_bytes(cur_size as usize); info!("Snapshot: {} accounts {} blocks {}", p.accounts(), p.blocks(), bytes); }