diff --git a/Cargo.lock b/Cargo.lock index a933343a729..9cc97acb4ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -609,6 +609,7 @@ dependencies = [ "keccak-hasher 0.1.0", "kvdb 0.1.0 (git+https://github.com/paritytech/parity-common)", "kvdb-memorydb 0.1.0 (git+https://github.com/paritytech/parity-common)", + "kvdb-rocksdb 0.1.0 (git+https://github.com/paritytech/parity-common)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "memory-cache 0.1.0", "memorydb 0.2.1 (git+https://github.com/paritytech/parity-common)", @@ -621,7 +622,8 @@ dependencies = [ "rlp_derive 0.1.0", "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.37 (registry+https://github.com/rust-lang/crates.io-index)", - "smallvec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "snappy 0.1.0 (git+https://github.com/paritytech/rust-snappy)", "stats 0.1.0", "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "triehash-ethereum 0.2.0", @@ -668,7 +670,7 @@ dependencies = [ "rlp 0.2.1 (git+https://github.com/paritytech/parity-common)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "trace-time 0.1.0", - "transaction-pool 1.12.1", + "transaction-pool 1.12.2", "url 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -852,6 +854,7 @@ dependencies = [ "hashdb 0.2.0 (git+https://github.com/paritytech/parity-common)", "heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "ipnetwork 0.12.7 (registry+https://github.com/rust-lang/crates.io-index)", + "itertools 0.7.8 (registry+https://github.com/rust-lang/crates.io-index)", "keccak-hash 0.1.2 (git+https://github.com/paritytech/parity-common)", "keccak-hasher 0.1.0", "kvdb 0.1.0 (git+https://github.com/paritytech/parity-common)", @@ -864,7 +867,7 @@ dependencies = [ "rlp 0.2.1 (git+https://github.com/paritytech/parity-common)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", - "smallvec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "trace-time 0.1.0", "triehash-ethereum 0.2.0", ] @@ -972,7 +975,7 @@ dependencies = [ "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.37 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", - "smallvec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", "tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1331,6 +1334,14 @@ dependencies = [ "either 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "itertools" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "either 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "itoa" version = "0.3.4" @@ -1845,7 +1856,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "num-integer 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2168,7 +2179,7 @@ dependencies = [ "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "transaction-pool 1.12.1", + "transaction-pool 1.12.2", "transient-hashmap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "vm 0.1.0", ] @@ -2276,7 +2287,7 @@ dependencies = [ "serde_derive 1.0.37 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", - "smallvec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2309,7 +2320,7 @@ dependencies = [ "libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", "petgraph 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)", - "smallvec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "thread-id 3.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2465,7 +2476,7 @@ dependencies = [ "hamming 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "primal-bit 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "primal-estimate 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "smallvec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2582,7 +2593,7 @@ dependencies = [ "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2927,7 +2938,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "smallvec" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3327,12 +3338,12 @@ dependencies = [ [[package]] name = "transaction-pool" -version = "1.12.1" +version = "1.12.2" dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", - "smallvec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "trace-time 0.1.0", ] @@ -3757,6 +3768,7 @@ dependencies = [ "checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" "checksum ipnetwork 0.12.7 (registry+https://github.com/rust-lang/crates.io-index)" = "2134e210e2a024b5684f90e1556d5f71a1ce7f8b12e9ac9924c67fb36f63b336" "checksum itertools 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4833d6978da405305126af4ac88569b5d71ff758581ce5a987dbfa3755f694fc" +"checksum itertools 0.7.8 (registry+https://github.com/rust-lang/crates.io-index)" = "f58856976b776fedd95533137617a02fb25719f40e7d9b01c7043cd65474f450" "checksum itoa 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8324a32baf01e2ae060e9de58ed0bc2320c9a2833491ee36cd3b4c414de4db8c" "checksum jsonrpc-core 8.0.1 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.11)" = "" "checksum jsonrpc-http-server 8.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.11)" = "" @@ -3890,7 +3902,7 @@ dependencies = [ "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fdeff4cd9ecff59ec7e3744cbca73dfe5ac35c2aedb2cfba8a1c715a18912e9d" "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" -"checksum smallvec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8fcd03faf178110ab0334d74ca9631d77f94c8c11cc77fcb59538abf0025695d" +"checksum smallvec 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f90c5e5fe535e48807ab94fc611d323935f39d4660c52b26b96446a7b33aef10" "checksum snappy 0.1.0 (git+https://github.com/paritytech/rust-snappy)" = "" "checksum snappy-sys 0.1.0 (git+https://github.com/paritytech/rust-snappy)" = "" "checksum socket2 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "06dc9f86ee48652b7c80f3d254e3b9accb67a928c562c64d10d7b016d3d98dab" diff --git a/ethcore/light/Cargo.toml b/ethcore/light/Cargo.toml index 9224f07e95c..4cf8fae76e7 100644 --- a/ethcore/light/Cargo.toml +++ b/ethcore/light/Cargo.toml @@ -38,10 +38,12 @@ triehash-ethereum = { version = "0.2", path = "../../util/triehash-ethereum" } kvdb = { git = "https://github.com/paritytech/parity-common" } memory-cache = { path = "../../util/memory_cache" } error-chain = { version = "0.12", default-features = false } +snappy = { git = "https://github.com/paritytech/rust-snappy" } [dev-dependencies] ethcore = { path = "..", features = ["test-helpers"] } kvdb-memorydb = { git = "https://github.com/paritytech/parity-common" } +kvdb-rocksdb = { git = "https://github.com/paritytech/parity-common" } tempdir = "0.3" [features] diff --git a/ethcore/light/src/client/header_chain.rs b/ethcore/light/src/client/header_chain.rs index 4611adbfe0b..3f5a8909f10 100644 --- a/ethcore/light/src/client/header_chain.rs +++ b/ethcore/light/src/client/header_chain.rs @@ -190,6 +190,7 @@ pub struct PendingChanges { } /// Whether or not the hardcoded sync feature is allowed. +#[derive(Clone, Debug, Copy)] pub enum HardcodedSync { Allow, Deny, @@ -352,10 +353,10 @@ impl HeaderChain { header: Header, transition_proof: Option>, ) -> Result { - self.insert_inner(transaction, header, None, transition_proof) + self.insert_inner(transaction, header, None, transition_proof, true) } - /// Insert a pre-verified header, with a known total difficulty. Similary to `insert`. + /// Insert a pre-verified header, with a known total difficulty. Similarly to `insert`. /// /// This blindly trusts that the data given to it is sensible. pub fn insert_with_td( @@ -365,15 +366,16 @@ impl HeaderChain { total_difficulty: U256, transition_proof: Option>, ) -> Result { - self.insert_inner(transaction, header, Some(total_difficulty), transition_proof) + self.insert_inner(transaction, header, Some(total_difficulty), transition_proof, true) } - fn insert_inner( + pub(super) fn insert_inner( &self, transaction: &mut DBTransaction, header: Header, total_difficulty: Option, transition_proof: Option>, + allow_producing_cht: bool, ) -> Result { let hash = header.hash(); let number = header.number(); @@ -451,15 +453,24 @@ impl HeaderChain { // reorganize ancestors so canonical entries are first in their // respective candidates vectors. if is_new_best { + const PROOF: &str = "blocks are only inserted if parent is present; \ + or this is the block we just added; qed"; let mut canon_hash = hash; for (&height, entry) in candidates.iter_mut().rev().skip_while(|&(height, _)| *height > number) { - if height != number && entry.canonical_hash == canon_hash { break; } + // If we're disallowing producing chts, then blocks are inserted in arbitrary order + // and our assumption in the `PROOF` doesn't hold. + if !allow_producing_cht || + (height != number && entry.canonical_hash == canon_hash) { + break; + } - trace!(target: "chain", "Setting new canonical block {} for block height {}", - canon_hash, height); + trace!( + target: "chain", "Setting new canonical block {} for block height {}", + canon_hash, height + ); let canon_pos = entry.candidates.iter().position(|x| x.hash == canon_hash) - .expect("blocks are only inserted if parent is present; or this is the block we just added; qed"); + .expect(PROOF); // move the new canonical entry to the front and set the // era's canonical hash. @@ -485,65 +496,10 @@ impl HeaderChain { }); // produce next CHT root if it's time. - let earliest_era = *candidates.keys().next().expect("at least one era just created; qed"); - if earliest_era + HISTORY + cht::SIZE <= number { - let cht_num = cht::block_to_cht_number(earliest_era) - .expect("fails only for number == 0; genesis never imported; qed"); - - let mut last_canonical_transition = None; - let cht_root = { - let mut i = earliest_era; - let mut live_epoch_proofs = self.live_epoch_proofs.write(); - - // iterable function which removes the candidates as it goes - // along. this will only be called until the CHT is complete. - let iter = || { - let era_entry = candidates.remove(&i) - .expect("all eras are sequential with no gaps; qed"); - transaction.delete(self.col, era_key(i).as_bytes()); - - i += 1; - - // prune old blocks and epoch proofs. - for ancient in &era_entry.candidates { - let maybe_transition = live_epoch_proofs.remove(&ancient.hash); - if let Some(epoch_transition) = maybe_transition { - transaction.delete(self.col, &*transition_key(ancient.hash)); - - if ancient.hash == era_entry.canonical_hash { - last_canonical_transition = match self.db.get(self.col, &ancient.hash) { - Err(e) => { - warn!(target: "chain", "Error reading from DB: {}\n - ", e); - None - } - Ok(None) => panic!("stored candidates always have corresponding headers; qed"), - Ok(Some(header)) => Some(( - epoch_transition, - ::rlp::decode(&header).expect("decoding value from db failed") - )), - }; - } - } - - transaction.delete(self.col, &ancient.hash); - } - - let canon = &era_entry.candidates[0]; - (canon.hash, canon.total_difficulty) - }; - cht::compute_root(cht_num, ::itertools::repeat_call(iter)) - .expect("fails only when too few items; this is checked; qed") - }; - - // write the CHT root to the database. - debug!(target: "chain", "Produced CHT {} root: {:?}", cht_num, cht_root); - transaction.put(self.col, cht_key(cht_num).as_bytes(), &::rlp::encode(&cht_root)); - - // update the last canonical transition proof - if let Some((epoch_transition, header)) = last_canonical_transition { - let x = encode_canonical_transition(&header, &epoch_transition.proof); - transaction.put_vec(self.col, LAST_CANONICAL_TRANSITION, x); + if allow_producing_cht { + let earliest_era = *candidates.keys().next().expect("at least one era just created; qed"); + if earliest_era + HISTORY + cht::SIZE <= number { + self.produce_next_cht_root(earliest_era, &mut candidates, transaction); } } } @@ -557,6 +513,69 @@ impl HeaderChain { Ok(pending) } + fn produce_next_cht_root( + &self, + from_era: u64, + candidates: &mut BTreeMap, + transaction: &mut DBTransaction, + ) { + let cht_num = cht::block_to_cht_number(from_era) + .expect("fails only for number == 0; genesis never imported; qed"); + + let mut last_canonical_transition = None; + let cht_root = { + let mut i = from_era; + let mut live_epoch_proofs = self.live_epoch_proofs.write(); + + // iterable function which removes the candidates as it goes + // along. this will only be called until the CHT is complete. + let iter = || { + let era_entry = candidates.remove(&i) + .expect("all eras are sequential with no gaps; qed"); + transaction.delete(self.col, era_key(i).as_bytes()); + + i += 1; + + // prune old blocks and epoch proofs. + for ancient in &era_entry.candidates { + let maybe_transition = live_epoch_proofs.remove(&ancient.hash); + if let Some(epoch_transition) = maybe_transition { + transaction.delete(self.col, &*transition_key(ancient.hash)); + + if ancient.hash == era_entry.canonical_hash { + last_canonical_transition = match self.db.get(self.col, &ancient.hash) { + Err(e) => { + warn!(target: "chain", "Error reading from DB: {}", e); + None + } + Ok(None) => panic!("stored candidates always have corresponding headers; qed"), + Ok(Some(header)) => Some((epoch_transition, ::rlp::decode(&header))), + }; + } + } + + transaction.delete(self.col, &ancient.hash); + } + + let canon = &era_entry.candidates[0]; + (canon.hash, canon.total_difficulty) + }; + cht::compute_root(cht_num, ::itertools::repeat_call(iter)) + .expect("fails only when too few items; this is checked; qed") + }; + + // write the CHT root to the database. + debug!(target: "chain", "Produced CHT {} root: {:?}", cht_num, cht_root); + transaction.put(self.col, cht_key(cht_num).as_bytes(), &::rlp::encode(&cht_root)); + + // update the last canonical transition proof + if let Some((epoch_transition, header)) = last_canonical_transition { + let header = header.expect("headers from db are always decodable; qed"); + let x = encode_canonical_transition(&header, &epoch_transition.proof); + transaction.put_vec(self.col, LAST_CANONICAL_TRANSITION, x); + } + } + /// Generates the specifications for hardcoded sync. This is typically only called manually /// from time to time by a Parity developer in order to update the chain specifications. /// @@ -750,6 +769,11 @@ impl HeaderChain { self.genesis_header.hash() } + /// Get the genesis header. + pub(super) fn genesis_header(&self) -> encoded::Header { + self.genesis_header.clone() + } + /// Get the best block's data. pub fn best_block(&self) -> BlockDescriptor { self.best_block.read().clone() diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index a1625b0e8f3..191073beddd 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -28,16 +28,17 @@ use ethcore::header::{BlockNumber, Header}; use ethcore::verification::queue::{self, HeaderQueue}; use ethcore::blockchain_info::BlockChainInfo; use ethcore::spec::{Spec, SpecHardcodedSync}; +use ethcore::snapshot::DatabaseRestore; use ethcore::encoded; use io::IoChannel; -use parking_lot::{Mutex, RwLock}; +use parking_lot::{Mutex, RwLock, RwLockReadGuard}; use ethereum_types::{H256, U256}; use futures::{IntoFuture, Future}; use kvdb::KeyValueDB; use self::fetch::ChainDataFetcher; -use self::header_chain::{AncestryIter, HeaderChain, HardcodedSync}; +use self::header_chain::{HeaderChain, HardcodedSync}; use cache::Cache; @@ -47,6 +48,7 @@ mod header_chain; mod service; pub mod fetch; +pub mod snapshot; /// Configuration for the light client. #[derive(Debug, Clone)] @@ -100,7 +102,7 @@ pub trait LightChainClient: Send + Sync { fn score(&self, id: BlockId) -> Option; /// Get an iterator over a block and its ancestry. - fn ancestry_iter<'a>(&'a self, start: BlockId) -> Box + 'a>; + fn ancestry_iter(&self) -> AncestryIter; /// Get the signing chain ID. fn signing_chain_id(&self) -> Option; @@ -159,7 +161,9 @@ impl AsLightClient for T { pub struct Client { queue: HeaderQueue, engine: Arc, - chain: HeaderChain, + // header chain is thread-safe, but we need + // to replace it in a thread-safe way + chain: RwLock, report: RwLock, import_lock: Mutex<()>, db: Arc, @@ -184,7 +188,7 @@ impl Client { engine: spec.engine.clone(), chain: { let hs_cfg = if config.no_hardcoded_sync { HardcodedSync::Deny } else { HardcodedSync::Allow }; - HeaderChain::new(db.clone(), chain_col, &spec, cache, hs_cfg)? + RwLock::new(HeaderChain::new(db.clone(), chain_col, &spec, cache, hs_cfg)?) }, report: RwLock::new(ClientReport::default()), import_lock: Mutex::new(()), @@ -200,7 +204,7 @@ impl Client { /// /// Returns `None` if we are at the genesis block. pub fn read_hardcoded_sync(&self) -> Result, Error> { - self.chain.read_hardcoded_sync() + self.chain.read().read_hardcoded_sync() } /// Adds a new `LightChainNotify` listener. @@ -216,18 +220,21 @@ impl Client { /// Inquire about the status of a given header. pub fn status(&self, hash: &H256) -> BlockStatus { match self.queue.status(hash) { - queue::Status::Unknown => self.chain.status(hash), + queue::Status::Unknown => self.chain.read().status(hash), other => other.into(), } } /// Get the chain info. pub fn chain_info(&self) -> BlockChainInfo { - let best_hdr = self.chain.best_header(); - let best_td = self.chain.best_block().total_difficulty; + let (best_hdr, best_td, first_block, genesis_hash) = { - let first_block = self.chain.first_block(); - let genesis_hash = self.chain.genesis_hash(); + let chain = self.chain.read(); + ( + chain.best_header(), chain.best_block().total_difficulty, + chain.first_block(), chain.genesis_hash(), + ) + }; BlockChainInfo { total_difficulty: best_td, @@ -250,27 +257,22 @@ impl Client { /// Attempt to get a block hash by block id. pub fn block_hash(&self, id: BlockId) -> Option { - self.chain.block_hash(id) + self.chain.read().block_hash(id) } /// Get a block header by Id. pub fn block_header(&self, id: BlockId) -> Option { - self.chain.block_header(id) + self.chain.read().block_header(id) } /// Get the best block header. pub fn best_block_header(&self) -> encoded::Header { - self.chain.best_header() + self.chain.read().best_header() } /// Get a block's chain score. pub fn score(&self, id: BlockId) -> Option { - self.chain.score(id) - } - - /// Get an iterator over a block and its ancestry. - pub fn ancestry_iter(&self, start: BlockId) -> AncestryIter { - self.chain.ancestry_iter(start) + self.chain.read().score(id) } /// Get the signing chain id. @@ -285,7 +287,7 @@ impl Client { /// Get the `i`th CHT root. pub fn cht_root(&self, i: usize) -> Option { - self.chain.cht_root(i) + self.chain.read().cht_root(i) } /// Import a set of pre-verified headers from the queue. @@ -316,14 +318,17 @@ impl Client { The node may not be able to synchronize further.", e); } - let epoch_proof = self.engine.is_epoch_end( - &verified_header, - &|h| self.chain.block_header(BlockId::Hash(h)).and_then(|hdr| hdr.decode().ok()), - &|h| self.chain.pending_transition(h), - ); + let epoch_proof = { + let chain = self.chain.read(); + self.engine.is_epoch_end( + &verified_header, + &|h| chain.block_header(BlockId::Hash(h)).and_then(|hdr| hdr.decode().ok()), + &|h| chain.pending_transition(h), + ) + }; let mut tx = self.db.transaction(); - let pending = match self.chain.insert(&mut tx, verified_header, epoch_proof) { + let pending = match self.chain.read().insert(&mut tx, verified_header, epoch_proof) { Ok(pending) => { good.push(hash); self.report.write().blocks_imported += 1; @@ -337,7 +342,7 @@ impl Client { }; self.db.write_buffered(tx); - self.chain.apply_pending(pending); + self.chain.read().apply_pending(pending); } if let Err(e) = self.db.flush() { @@ -359,7 +364,7 @@ impl Client { pub fn chain_mem_used(&self) -> usize { use heapsize::HeapSizeOf; - self.chain.heap_size_of_children() + self.chain.read().heap_size_of_children() } /// Get a handle to the verification engine. @@ -416,7 +421,7 @@ impl Client { // should skip. fn check_header(&self, bad: &mut Vec, verified_header: &Header) -> bool { let hash = verified_header.hash(); - let parent_header = match self.chain.block_header(BlockId::Hash(*verified_header.parent_hash())) { + let parent_header = match self.chain.read().block_header(BlockId::Hash(*verified_header.parent_hash())) { Some(header) => header, None => { trace!(target: "client", "No parent for block ({}, {})", @@ -514,7 +519,7 @@ impl Client { }; let mut batch = self.db.transaction(); - self.chain.insert_pending_transition(&mut batch, header.hash(), epoch::PendingTransition { + self.chain.read().insert_pending_transition(&mut batch, header.hash(), epoch::PendingTransition { proof: proof, }); self.db.write_buffered(batch); @@ -522,6 +527,20 @@ impl Client { } } +// An ugly borrow-checker workaround to allow iterating over a block and its ancestry. +// We need this since `self.chain.read()` does not live long enough for +// `self.chain.read().ancestry_iter(_)` to work. And we probably don't want to expose +// `RwLockReadGuard` in the public API. +/// An iterator over a block and its ancestry. +pub struct AncestryIter<'a>(RwLockReadGuard<'a, HeaderChain>); + +impl<'a> AncestryIter<'a> { + /// Get an iterator over a block and its ancestry. + pub fn iter(&'a self, start: BlockId) -> impl Iterator + 'a { + self.0.ancestry_iter(start) + } +} + impl LightChainClient for Client { fn add_listener(&self, listener: Weak) { Client::add_listener(self, listener) @@ -549,8 +568,8 @@ impl LightChainClient for Client { Client::score(self, id) } - fn ancestry_iter<'a>(&'a self, start: BlockId) -> Box + 'a> { - Box::new(Client::ancestry_iter(self, start)) + fn ancestry_iter(&self) -> AncestryIter { + AncestryIter(self.chain.read()) } fn signing_chain_id(&self) -> Option { @@ -606,7 +625,7 @@ impl ::ethcore::client::EngineClient for Client { fn broadcast_consensus_message(&self, _message: Vec) { } fn epoch_transition_for(&self, parent_hash: H256) -> Option { - self.chain.epoch_transition_for(parent_hash).map(|(hdr, proof)| EpochTransition { + self.chain.read().epoch_transition_for(parent_hash).map(|(hdr, proof)| EpochTransition { block_hash: hdr.hash(), block_number: hdr.number(), proof: proof, @@ -625,3 +644,18 @@ impl ::ethcore::client::EngineClient for Client { Client::block_header(self, id) } } + +impl DatabaseRestore for Client { + /// Restart the client with a new backend + fn restore_db(&self, new_db: &str, params: &snapshot::LightClientRestorationParams) -> Result<(), Error> { + trace!(target: "snapshot", "Replacing light client database with {:?}", new_db); + + let _import_lock = self.import_lock.lock(); + self.db.restore(new_db)?; + + let mut chain = self.chain.write(); + let cache = params.cache.clone(); + *chain = HeaderChain::new(self.db.clone(), params.col, ¶ms.spec, cache, params.allow_hs)?; + Ok(()) + } +} diff --git a/ethcore/light/src/client/service.rs b/ethcore/light/src/client/service.rs index 8c6ef44faef..89f1b4814f0 100644 --- a/ethcore/light/src/client/service.rs +++ b/ethcore/light/src/client/service.rs @@ -19,12 +19,22 @@ use std::fmt; use std::sync::Arc; +use std::path::Path; use ethcore::client::ClientIoMessage; -use ethcore::{db, BlockChainDB}; +use ethcore::{db, BlockChainDB, BlockChainDBHandler}; use ethcore::error::Error as CoreError; use ethcore::spec::Spec; use io::{IoContext, IoError, IoHandler, IoService}; +use ethcore::snapshot::{ + SnapshotService as _SnapshotService, + service::{ + ServiceParams as SnapServiceParams, + Service as SnapService, + }, +}; +use client::header_chain::HardcodedSync; +use client::snapshot::LightClientRestorationParams; use cache::Cache; use parking_lot::Mutex; @@ -56,31 +66,58 @@ impl fmt::Display for Error { } } +type SnapshotService = SnapService; + /// Light client service. pub struct Service { client: Arc>, io_service: IoService, + snapshot: Arc, } impl Service { /// Start the service: initialize I/O workers and client itself. - pub fn start(config: ClientConfig, spec: &Spec, fetcher: T, db: Arc, cache: Arc>) -> Result { + pub fn start( + config: ClientConfig, + spec: Spec, + fetcher: T, + db: Arc, + cache: Arc>, + restoration_db_handler: Box, + snapshot_path: &Path, + ) -> Result { let io_service = IoService::::start().map_err(Error::Io)?; + let hs = if config.no_hardcoded_sync { HardcodedSync::Deny } else { HardcodedSync::Allow }; let client = Arc::new(Client::new(config, db.key_value().clone(), db::COL_LIGHT_CHAIN, - spec, + &spec, fetcher, io_service.channel(), - cache, + cache.clone(), )?); - - io_service.register_handler(Arc::new(ImportBlocks(client.clone()))).map_err(Error::Io)?; spec.engine.register_client(Arc::downgrade(&client) as _); + let snapshot_params = SnapServiceParams { + engine: spec.engine.clone(), + chain_params: LightClientRestorationParams { + col: db::COL_LIGHT_CHAIN, + spec: spec, + cache: cache, + allow_hs: hs, + }, + restoration_db_handler, + channel: io_service.channel(), + snapshot_root: snapshot_path.into(), + db_restore: client.clone(), + }; + let snapshot = Arc::new(SnapshotService::new(snapshot_params)?); + let client_io = ClientIoHandler::new(client.clone(), snapshot.clone()); + io_service.register_handler(Arc::new(client_io)).map_err(Error::Io)?; Ok(Service { client: client, io_service: io_service, + snapshot: snapshot, }) } @@ -98,36 +135,85 @@ impl Service { pub fn client(&self) -> &Arc> { &self.client } + + /// Get snapshot interface. + pub fn snapshot_service(&self) -> Arc { + self.snapshot.clone() + } + + /// Shutdown the Service. + pub fn shutdown(&self) { + self.snapshot.shutdown(); + } +} + +/// IO interface for the Client handler +struct ClientIoHandler { + client: Arc>, + snapshot: Arc, } -struct ImportBlocks(Arc>); +impl ClientIoHandler { + pub fn new(client: Arc>, snapshot: Arc) -> Self { + ClientIoHandler { + client: client, + snapshot: snapshot, + } + } +} -impl IoHandler for ImportBlocks { +impl IoHandler for ClientIoHandler { fn message(&self, _io: &IoContext, message: &ClientIoMessage) { - if let ClientIoMessage::BlockVerified = *message { - self.0.import_verified(); + match *message { + ClientIoMessage::BlockVerified => { + self.client.import_verified(); + }, + ClientIoMessage::BeginRestoration(ref manifest) => { + if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) { + warn!("Failed to initialize snapshot restoration: {}", e); + } + }, + ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => self.snapshot.feed_block_chunk(*hash, chunk), + _ => {} // ignore other messages } } } #[cfg(test)] mod tests { - use super::Service; - use ethcore::spec::Spec; - - use std::sync::Arc; use cache::Cache; use client::fetch; - use std::time::Duration; - use parking_lot::Mutex; + use ethcore::client::ClientConfig; + use ethcore::db::NUM_COLUMNS; + use ethcore::spec::Spec; use ethcore::test_helpers; + use kvdb_rocksdb::{DatabaseConfig, CompactionProfile}; + use parking_lot::Mutex; + use std::sync::Arc; + use std::time::Duration; + use super::Service; + use tempdir::TempDir; #[test] fn it_works() { + let tempdir = TempDir::new("").unwrap(); + let client_path = tempdir.path().join("client"); + let snapshot_path = tempdir.path().join("snapshot"); + + let client_config = ClientConfig::default(); + let mut client_db_config = DatabaseConfig::with_columns(NUM_COLUMNS); + + client_db_config.memory_budget = client_config.db_cache_size; + client_db_config.compaction = CompactionProfile::auto(&client_path); + + let restoration_db_handler = test_helpers::restoration_db_handler(client_db_config); let db = test_helpers::new_db(); let spec = Spec::new_test(); let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::from_secs(6 * 3600)))); - Service::start(Default::default(), &spec, fetch::unavailable(), db, cache).unwrap(); + Service::start( + Default::default(), spec, fetch::unavailable(), db, cache, + restoration_db_handler, &snapshot_path, + ).unwrap(); } } diff --git a/ethcore/light/src/client/snapshot/chain.rs b/ethcore/light/src/client/snapshot/chain.rs new file mode 100644 index 00000000000..82675a4a675 --- /dev/null +++ b/ethcore/light/src/client/snapshot/chain.rs @@ -0,0 +1,116 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! This module contains a struct `LightChain`, which is +//! a wrapper around `HeaderChain` that keeps track of +//! pending changes made to it. +//! +//! The trait `RestorationTargetChain` is implemented for `HeaderChain`, +//! allowing the light client to restore from a snapshot (warp sync). + + +use super::super::HeaderChain; +use client::header_chain::PendingChanges; +use ethcore::engines::EpochTransition; +use ethcore::ids::BlockId; +use ethcore::header::{Header, BlockNumber}; +use ethcore::encoded; +use ethcore::snapshot::RestorationTargetChain; +use ethereum_types::{H256, U256}; +use parking_lot::RwLock; +use kvdb::DBTransaction; +use ethcore::receipt::Receipt; + +/// Wrapper for `HeaderChain` along with pending changes. +pub struct LightChain { + chain: HeaderChain, + pending: RwLock> +} + +impl LightChain { + /// Create a wrapper for `HeaderChain` implementing `RestorationTargetChain` + pub fn new(chain: HeaderChain) -> Self { + LightChain { + chain: chain, + pending: RwLock::new(None) + } + } + + /// Get a reference to the underlying chain + pub fn chain(&self) -> &HeaderChain { + &self.chain + } +} + +impl RestorationTargetChain for LightChain { + fn genesis_hash(&self) -> H256 { + self.chain.genesis_hash() + } + + fn genesis_header(&self) -> Header { + self.chain.genesis_header().decode().expect("genesis header is always decodable; qed") + } + + fn block_hash(&self, index: BlockNumber) -> Option { + self.chain.block_hash(BlockId::Number(index)) + } + + fn block_header_data(&self, hash: &H256) -> Option
{ + self.chain.block_header(BlockId::Hash(hash.clone())).and_then(|h| h.decode().ok()) + } + + fn add_child(&self, _batch: &mut DBTransaction, _block_hash: H256, _child_hash: H256) { + // We don't store parent <-> child relationship in the light client. + } + + fn insert_epoch_transition( + &self, + batch: &mut DBTransaction, + header: Header, + transition: EpochTransition, + ) { + let result = if header.number() == 0 { + let td = self.chain.genesis_header().difficulty(); + self.chain.insert_with_td(batch, header, td, Some(transition.proof)) + } else { + self.chain.insert(batch, header, Some(transition.proof)) + }; + let pending = result.expect("we either supply the total difficulty, or the parent is present; qed"); + *self.pending.write() = Some(pending); + } + + fn insert_unordered_block( + &self, + batch: &mut DBTransaction, + block: encoded::Block, + _receipts: Vec, + parent_td: Option, + _is_best: bool, + _is_ancient: bool, + ) -> bool { + let td = parent_td.map(|pd| pd + block.header().difficulty()); + let result = self.chain.insert_inner(batch, block.decode_header(), td, None, false); + let pending = result.expect("we either supply the total difficulty, or the parent is present; qed"); + *self.pending.write() = Some(pending); + parent_td.is_some() + } + + fn commit(&self) { + if let Some(pending) = self.pending.write().take() { + self.chain.apply_pending(pending) + } + } +} diff --git a/ethcore/light/src/client/snapshot/mod.rs b/ethcore/light/src/client/snapshot/mod.rs new file mode 100644 index 00000000000..c49afc97519 --- /dev/null +++ b/ethcore/light/src/client/snapshot/mod.rs @@ -0,0 +1,22 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Snapshot network service implementation for light client. + +pub mod chain; +mod service; + +pub use self::service::*; diff --git a/ethcore/light/src/client/snapshot/service.rs b/ethcore/light/src/client/snapshot/service.rs new file mode 100644 index 00000000000..10db4e87cb7 --- /dev/null +++ b/ethcore/light/src/client/snapshot/service.rs @@ -0,0 +1,83 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Snapshot network service implementation for light client. + +use std::path::PathBuf; +use std::sync::Arc; + + +use ethcore::snapshot::{ + ManifestData, + Error as SnapshotError, + io::{LooseWriter}, + service::{ChainRestorationParams, Restoration}, +}; + +use cache::Cache; +use client::HeaderChain; +use client::header_chain::HardcodedSync; +use ethcore::BlockChainDBHandler; +use ethcore::engines::EthEngine; +use ethcore::spec::Spec; +use ethcore::error::Error; +use client::snapshot::chain::LightChain; + +use parking_lot::Mutex; + +/// Light client specific snapshot restoration params. +pub struct LightClientRestorationParams { + pub(crate) spec: Spec, + pub(crate) allow_hs: HardcodedSync, + pub(crate) col: Option, + pub(crate) cache: Arc>, +} + +impl ChainRestorationParams for LightClientRestorationParams { + fn restoration( + &self, + manifest: ManifestData, + rest_db: PathBuf, + restoration_db_handler: &BlockChainDBHandler, + writer: Option, + engine: &EthEngine, + ) -> Result { + let db = restoration_db_handler.open(&rest_db)?; + let chain = HeaderChain::new( + db.key_value().clone(), + self.col, + &self.spec, + self.cache.clone(), + self.allow_hs + )?; + let boxed = Box::new(LightChain::new(chain)); + let restoration = engine.snapshot_components() + .ok_or_else(|| SnapshotError::SnapshotsUnsupported)?; + let rebuilder = restoration.rebuilder(boxed, db.clone(), &manifest)?; + + Ok(Restoration::new_light( + manifest, + rest_db, + writer, + db, + rebuilder, + )) + } + + fn is_light(&self) -> bool { + true + } +} diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index e151267a9c9..cfc10d62f54 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -84,8 +84,11 @@ extern crate kvdb; extern crate memory_cache; #[macro_use] extern crate error_chain; +extern crate snappy; #[cfg(test)] extern crate kvdb_memorydb; #[cfg(test)] extern crate tempdir; +#[cfg(test)] +extern crate kvdb_rocksdb; diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 5e73681a55a..4a15dc01c32 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -37,7 +37,6 @@ use std::time::{Duration, Instant}; use self::request_credits::{Credits, FlowParams}; use self::context::{Ctx, TickCtx}; -use self::error::Punishment; use self::load_timer::{LoadDistribution, NullStore}; use self::request_set::RequestSet; use self::id_guard::IdGuard; @@ -54,7 +53,7 @@ mod tests; pub mod request_credits; pub use self::context::{BasicContext, EventContext, IoContext}; -pub use self::error::Error; +pub use self::error::{Error, Punishment}; pub use self::load_timer::{SampleStore, FileStore}; pub use self::status::{Status, Capabilities, Announcement}; diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index 81997be079e..00367033f25 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -28,7 +28,11 @@ use sync::PrivateTxHandler; use ethcore::{BlockChainDB, BlockChainDBHandler}; use ethcore::client::{Client, ClientConfig, ChainNotify, ClientIoMessage}; use ethcore::miner::Miner; -use ethcore::snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams}; +use ethcore::snapshot::service::{ + Service as SnapshotService, + ServiceParams as SnapServiceParams, + FullNodeRestorationParams +}; use ethcore::snapshot::{SnapshotService as _SnapshotService, RestorationStatus}; use ethcore::spec::Spec; use ethcore::account_provider::AccountProvider; @@ -67,7 +71,7 @@ impl PrivateTxHandler for PrivateTxService { pub struct ClientService { io_service: Arc>, client: Arc, - snapshot: Arc, + snapshot: Arc>, private_tx: Arc, database: Arc, _stop_guard: StopGuard, @@ -98,9 +102,11 @@ impl ClientService { let snapshot_params = SnapServiceParams { engine: spec.engine.clone(), - genesis_block: spec.genesis_block(), restoration_db_handler: restoration_db_handler, - pruning: pruning, + chain_params: FullNodeRestorationParams { + pruning, + genesis_block: spec.genesis_block(), + }, channel: io_service.channel(), snapshot_root: snapshot_path.into(), db_restore: client.clone(), @@ -148,7 +154,7 @@ impl ClientService { } /// Get snapshot interface. - pub fn snapshot_service(&self) -> Arc { + pub fn snapshot_service(&self) -> Arc> { self.snapshot.clone() } @@ -179,7 +185,7 @@ impl ClientService { /// IO interface for the Client handler struct ClientIoHandler { client: Arc, - snapshot: Arc, + snapshot: Arc>, } const CLIENT_TICK_TIMER: TimerToken = 0; diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index 01becd1820e..2fc346d9a2e 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -747,16 +747,16 @@ impl BlockChain { }) } - /// Inserts a verified, known block from the canonical chain. - /// - /// Can be performed out-of-order, but care must be taken that the final chain is in a correct state. - /// This is used by snapshot restoration and when downloading missing blocks for the chain gap. - /// `is_best` forces the best block to be updated to this block. - /// `is_ancient` forces the best block of the first block sequence to be updated to this block. - /// `parent_td` is a parent total diffuculty - /// Supply a dummy parent total difficulty when the parent block may not be in the chain. - /// Returns true if the block is disconnected. - pub fn insert_unordered_block(&self, batch: &mut DBTransaction, block: encoded::Block, receipts: Vec, parent_td: Option, is_best: bool, is_ancient: bool) -> bool { + // `RestorationTargetChain::insert_unordered_block` implementation + pub(crate) fn insert_unordered_block( + &self, + batch: &mut DBTransaction, + block: encoded::Block, + receipts: Vec, + parent_td: Option, + is_best: bool, + is_ancient: bool, + ) -> bool { let block_number = block.header_view().number(); let block_parent_hash = block.header_view().parent_hash(); let block_difficulty = block.header_view().difficulty(); @@ -933,21 +933,14 @@ impl BlockChain { self.db.key_value().read(db::COL_EXTRA, &hash) } - /// Add a child to a given block. Assumes that the block hash is in - /// the chain and the child's parent is this block. - /// - /// Used in snapshots to glue the chunks together at the end. - pub fn add_child(&self, batch: &mut DBTransaction, block_hash: H256, child_hash: H256) { + // `RestorationTargetChain::add_child` implementation + pub(crate) fn add_child(&self, batch: &mut DBTransaction, block_hash: H256, child_hash: H256) { let mut parent_details = self.block_details(&block_hash) .unwrap_or_else(|| panic!("Invalid block hash: {:?}", block_hash)); parent_details.children.push(child_hash); - let mut update = HashMap::new(); - update.insert(block_hash, parent_details); - - let mut write_details = self.block_details.write(); - batch.extend_with_cache(db::COL_EXTRA, &mut *write_details, update, CacheUpdatePolicy::Overwrite); + self.update_block_details(batch, block_hash, parent_details); self.cache_man.lock().note_used(CacheId::BlockDetails(block_hash)); } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index efc8b3f2edd..3d2cbee12cb 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -1275,9 +1275,9 @@ impl Client { } } -impl snapshot::DatabaseRestore for Client { +impl snapshot::DatabaseRestore for Client { /// Restart the client with a new backend - fn restore_db(&self, new_db: &str) -> Result<(), EthcoreError> { + fn restore_db(&self, new_db: &str, _: &snapshot::FullNodeRestorationParams) -> Result<(), EthcoreError> { trace!(target: "snapshot", "Replacing client database with {:?}", new_db); let _import_lock = self.importer.import_lock.lock(); diff --git a/ethcore/src/client/config.rs b/ethcore/src/client/config.rs index 40d2c4990f1..08f8d67c73f 100644 --- a/ethcore/src/client/config.rs +++ b/ethcore/src/client/config.rs @@ -26,7 +26,7 @@ pub use trace::Config as TraceConfig; pub use evm::VMType; /// Client state db compaction profile -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Clone, Copy)] pub enum DatabaseCompactionProfile { /// Try to determine compaction profile automatically Auto, diff --git a/ethcore/src/snapshot/consensus/authority.rs b/ethcore/src/snapshot/consensus/authority.rs index 72d828643dd..41e287a5dbb 100644 --- a/ethcore/src/snapshot/consensus/authority.rs +++ b/ethcore/src/snapshot/consensus/authority.rs @@ -19,7 +19,10 @@ //! //! The chunks here contain state proofs of transitions, along with validator proofs. -use super::{SnapshotComponents, Rebuilder, ChunkSink}; +use super::{ + SnapshotComponents, RestorationTargetChain, + RebuilderFactory, Rebuilder, ChunkSink +}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -75,7 +78,7 @@ impl SnapshotComponents for PoaSnapshot { break } - let header = chain.block_header_data(&transition.block_hash) + let header = BlockProvider::block_header_data(chain, &transition.block_hash) .ok_or(Error::BlockNotFound(transition.block_hash))?; let entry = { @@ -123,10 +126,12 @@ impl SnapshotComponents for PoaSnapshot { Ok(()) } +} +impl RebuilderFactory for PoaSnapshot { fn rebuilder( &self, - chain: BlockChain, + chain: Box, db: Arc, manifest: &ManifestData, ) -> Result, ::error::Error> { @@ -163,7 +168,7 @@ fn write_chunk(last: bool, chunk_data: &mut Vec, sink: &mut ChunkSink) -> struct ChunkRebuilder { manifest: ManifestData, warp_target: Option
, - chain: BlockChain, + chain: Box, db: Arc, had_genesis: bool, @@ -203,7 +208,7 @@ impl ChunkRebuilder { Some(ref last) => if last.check_finality_proof(finality_proof).map_or(true, |hashes| !hashes.contains(&hash)) { - return Err(Error::BadEpochProof(header.number()).into()); + bail!(Error::BadEpochProof(header.number())); }, None if header.number() != 0 => { // genesis never requires additional validation. @@ -256,13 +261,13 @@ impl Rebuilder for ChunkRebuilder { }; if num_transitions == 0 && !is_last_chunk { - return Err(Error::WrongChunkFormat("Found non-last chunk without any data.".into()).into()); + bail!(Error::WrongChunkFormat("Found non-last chunk without any data.".into())); } let mut last_verifier = None; let mut last_number = None; for transition_rlp in rlp.iter().skip(1).take(num_transitions).with_position() { - if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) } + if !abort_flag.load(Ordering::SeqCst) { bail!(Error::RestorationAborted) } let (is_first, is_last) = match transition_rlp { Position::First(_) => (true, false), @@ -279,7 +284,7 @@ impl Rebuilder for ChunkRebuilder { )?; if last_number.map_or(false, |num| verified.header.number() <= num) { - return Err(Error::WrongChunkFormat("Later epoch transition in earlier or same block.".into()).into()); + bail!(Error::WrongChunkFormat("Later epoch transition in earlier or same block.".into())); } last_number = Some(verified.header.number()); @@ -290,7 +295,7 @@ impl Rebuilder for ChunkRebuilder { // but it doesn't need verification later. if verified.header.number() == 0 { if verified.header.hash() != self.chain.genesis_hash() { - return Err(Error::WrongBlockHash(0, verified.header.hash(), self.chain.genesis_hash()).into()); + bail!(Error::WrongBlockHash(0, verified.header.hash(), self.chain.genesis_hash())); } self.had_genesis = true; @@ -310,8 +315,10 @@ impl Rebuilder for ChunkRebuilder { // write epoch transition into database. let mut batch = self.db.transaction(); - self.chain.insert_epoch_transition(&mut batch, verified.header.number(), - verified.epoch_transition); + self.chain.insert_epoch_transition( + &mut batch, verified.header.clone(), + verified.epoch_transition + ); self.db.write_buffered(batch); trace!(target: "snapshot", "Verified epoch transition for epoch at block {}", verified.header.number()); @@ -333,7 +340,7 @@ impl Rebuilder for ChunkRebuilder { let hash = block.header.hash(); let best_hash = self.manifest.block_hash; if hash != best_hash { - return Err(Error::WrongBlockHash(block.header.number(), best_hash, hash).into()) + bail!(Error::WrongBlockHash(block.header.number(), best_hash, hash)) } } @@ -351,12 +358,12 @@ impl Rebuilder for ChunkRebuilder { fn finalize(&mut self, _engine: &EthEngine) -> Result<(), ::error::Error> { if !self.had_genesis { - return Err(Error::WrongChunkFormat("No genesis transition included.".into()).into()); + bail!(Error::WrongChunkFormat("No genesis transition included.".into())); } let target_header = match self.warp_target.take() { Some(x) => x, - None => return Err(Error::WrongChunkFormat("Warp target block not included.".into()).into()), + None => bail!(Error::WrongChunkFormat("Warp target block not included.".into())), }; // verify the first entries of chunks we couldn't before. @@ -368,7 +375,7 @@ impl Rebuilder for ChunkRebuilder { while let Some(&(ref last_header, ref last_verifier)) = lasts_reversed.next() { if last_header.number() < header.number() { if last_verifier.check_finality_proof(&finality_proof).map_or(true, |hashes| !hashes.contains(&hash)) { - return Err(Error::BadEpochProof(header.number()).into()); + bail!(Error::BadEpochProof(header.number())); } found = true; break; @@ -376,7 +383,7 @@ impl Rebuilder for ChunkRebuilder { } if !found { - return Err(Error::WrongChunkFormat("Inconsistent chunk ordering.".into()).into()); + bail!(Error::WrongChunkFormat("Inconsistent chunk ordering.".into())); } } diff --git a/ethcore/src/snapshot/consensus/mod.rs b/ethcore/src/snapshot/consensus/mod.rs index 7b6b03a3ee2..1a2cec9cb04 100644 --- a/ethcore/src/snapshot/consensus/mod.rs +++ b/ethcore/src/snapshot/consensus/mod.rs @@ -20,11 +20,15 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; -use blockchain::{BlockChain, BlockChainDB}; -use engines::EthEngine; +use types::{BlockNumber, receipt::Receipt}; +use blockchain::{BlockChain, BlockChainDB, BlockProvider}; +use encoded; +use engines::{EthEngine, EpochTransition}; +use header::Header; use snapshot::{Error, ManifestData}; -use ethereum_types::H256; +use ethereum_types::{H256, U256}; +use kvdb::DBTransaction; mod authority; mod work; @@ -36,7 +40,7 @@ pub use self::work::*; pub type ChunkSink<'a> = FnMut(&[u8]) -> ::std::io::Result<()> + 'a; /// Components necessary for snapshot creation and restoration. -pub trait SnapshotComponents: Send { +pub trait SnapshotComponents: RebuilderFactory + Send { /// Create secondary snapshot chunks; these corroborate the state data /// in the state chunks. /// @@ -51,17 +55,75 @@ pub trait SnapshotComponents: Send { chunk_sink: &mut ChunkSink, preferred_size: usize, ) -> Result<(), Error>; +} + +/// A common trait for `BlockChain` (full node) and `HeaderChain` (light client) +/// used in snapshot restoration. +pub trait RestorationTargetChain: Send { + /// Returns reference to genesis hash. + fn genesis_hash(&self) -> H256; + + /// Returns the header of the genesis block. + fn genesis_header(&self) -> Header; + + /// Get the hash of given block's number. + fn block_hash(&self, index: BlockNumber) -> Option; + + /// Get the header of a block. + fn block_header_data(&self, hash: &H256) -> Option
; + + /// Add a child to a given block. Assumes that the block hash is in + /// the chain and the child's parent is this block. + /// + /// Used in snapshots to glue the chunks together at the end. + fn add_child(&self, batch: &mut DBTransaction, block_hash: H256, child_hash: H256); + + /// Insert an epoch transition. Provide an epoch number being transitioned to + /// and epoch transition object. + /// + /// The block the transition occurred at should have already been inserted into the chain. + fn insert_epoch_transition( + &self, + batch: &mut DBTransaction, + header: Header, + transition: EpochTransition, + ); - /// Create a rebuilder, which will have chunks fed into it in aribtrary + /// Inserts a verified, known block from the canonical chain. + /// + /// Can be performed out-of-order, but care must be taken that the final chain is in a correct state. + /// This is used by snapshot restoration and when downloading missing blocks for the chain gap. + /// `is_best` forces the best block to be updated to this block. + /// `is_ancient` forces the best block of the first block sequence to be updated to this block. + /// `parent_td` is a parent total diffuculty + /// Supply a dummy parent total difficulty when the parent block may not be in the chain. + /// Returns true if the block is disconnected. + fn insert_unordered_block( + &self, + batch: &mut DBTransaction, + block: encoded::Block, + receipts: Vec, + parent_td: Option, + is_best: bool, + is_ancient: bool, + ) -> bool; + + /// Apply pending insertion updates. + fn commit(&self); +} + +/// A factory producing `Rebuilder`s needed for snapshot restoration. +pub trait RebuilderFactory: Send { + /// Create a `Rebuilder`, which will have chunks fed into it in arbitrary /// order and then be finalized. /// - /// The manifest, a database, and fresh `BlockChain` are supplied. + /// The manifest, a database, and fresh `RestorationTargetChain` are supplied. /// /// The engine passed to the `Rebuilder` methods will be the same instance /// that created the `SnapshotComponents`. fn rebuilder( &self, - chain: BlockChain, + chain: Box, db: Arc, manifest: &ManifestData, ) -> Result, ::error::Error>; @@ -93,3 +155,48 @@ pub trait Rebuilder: Send { /// and verify against the restored state. fn finalize(&mut self, engine: &EthEngine) -> Result<(), ::error::Error>; } + +impl RestorationTargetChain for BlockChain { + fn genesis_hash(&self) -> H256 { + BlockProvider::genesis_hash(self) + } + + fn genesis_header(&self) -> Header { + BlockProvider::genesis_header(self).decode().expect("genesis header is always decodable; qed") + } + + fn block_hash(&self, index: BlockNumber) -> Option { + BlockProvider::block_hash(self, index) + } + + fn block_header_data(&self, hash: &H256) -> Option
{ + BlockProvider::block_header_data(self, hash).and_then(|h| h.decode().ok()) + } + + fn add_child(&self, batch: &mut DBTransaction, block_hash: H256, child_hash: H256) { + BlockChain::add_child(self, batch, block_hash, child_hash); + } + + fn insert_epoch_transition( + &self, batch: &mut DBTransaction, + header: Header, transition: EpochTransition + ) { + BlockChain::insert_epoch_transition(self, batch, header.number(), transition); + } + + fn insert_unordered_block( + &self, + batch: &mut DBTransaction, + block: encoded::Block, + receipts: Vec, + parent_td: Option, + is_best: bool, + is_ancient: bool, + ) -> bool { + BlockChain::insert_unordered_block(self, batch, block, receipts, parent_td, is_best, is_ancient) + } + + fn commit(&self) { + BlockChain::commit(self); + } +} diff --git a/ethcore/src/snapshot/consensus/work.rs b/ethcore/src/snapshot/consensus/work.rs index ded004fe89f..713eefe18a7 100644 --- a/ethcore/src/snapshot/consensus/work.rs +++ b/ethcore/src/snapshot/consensus/work.rs @@ -20,7 +20,10 @@ //! The secondary chunks in this instance are 30,000 "abridged blocks" from the head //! of the chain, which serve as an indication of valid chain. -use super::{SnapshotComponents, Rebuilder, ChunkSink}; +use super::{ + SnapshotComponents, RestorationTargetChain, + RebuilderFactory, Rebuilder, ChunkSink +}; use std::collections::VecDeque; use std::sync::atomic::{AtomicBool, Ordering}; @@ -75,10 +78,12 @@ impl SnapshotComponents for PowSnapshot { preferred_size: preferred_size, }.chunk_all(self.blocks) } +} +impl RebuilderFactory for PowSnapshot { fn rebuilder( &self, - chain: BlockChain, + chain: Box, db: Arc, manifest: &ManifestData, ) -> Result, ::error::Error> { @@ -106,7 +111,7 @@ impl<'a> PowWorker<'a> { let mut loaded_size = 0; let mut last = self.current_hash; - let genesis_hash = self.chain.genesis_hash(); + let genesis_hash = RestorationTargetChain::genesis_hash(self.chain); for _ in 0..snapshot_blocks { if self.current_hash == genesis_hash { break } @@ -154,7 +159,7 @@ impl<'a> PowWorker<'a> { fn write_chunk(&mut self, last: H256) -> Result<(), Error> { trace!(target: "snapshot", "prepared block chunk with {} blocks", self.rlps.len()); - let (last_header, last_details) = self.chain.block_header_data(&last) + let (last_header, last_details) = BlockProvider::block_header_data(self.chain, &last) .and_then(|n| self.chain.block_details(&last).map(|d| (n, d))) .ok_or(Error::BlockNotFound(last))?; @@ -189,7 +194,7 @@ impl<'a> PowWorker<'a> { /// /// After all chunks have been submitted, we "glue" the chunks together. pub struct PowRebuilder { - chain: BlockChain, + chain: Box, db: Arc, rng: OsRng, disconnected: Vec<(u64, H256)>, @@ -202,7 +207,12 @@ pub struct PowRebuilder { impl PowRebuilder { /// Create a new PowRebuilder. - fn new(chain: BlockChain, db: Arc, manifest: &ManifestData, snapshot_blocks: u64) -> Result { + fn new( + chain: Box, + db: Arc, + manifest: &ManifestData, + snapshot_blocks: u64 + ) -> Result { Ok(PowRebuilder { chain: chain, db: db, @@ -232,7 +242,7 @@ impl Rebuilder for PowRebuilder { trace!(target: "snapshot", "restoring block chunk with {} blocks.", num_blocks); if self.fed_blocks + num_blocks > self.snapshot_blocks { - return Err(Error::TooManyBlocks(self.snapshot_blocks, self.fed_blocks + num_blocks).into()) + bail!(Error::TooManyBlocks(self.snapshot_blocks, self.fed_blocks + num_blocks)) } // todo: assert here that these values are consistent with chunks being in order. @@ -241,7 +251,7 @@ impl Rebuilder for PowRebuilder { let parent_total_difficulty = rlp.val_at::(2)?; for idx in 3..item_count { - if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) } + if !abort_flag.load(Ordering::SeqCst) { bail!(Error::RestorationAborted) } let pair = rlp.at(idx)?; let abridged_rlp = pair.at(0)?.as_raw().to_owned(); @@ -255,11 +265,11 @@ impl Rebuilder for PowRebuilder { if is_best { if block.header.hash() != self.best_hash { - return Err(Error::WrongBlockHash(cur_number, self.best_hash, block.header.hash()).into()) + bail!(Error::WrongBlockHash(cur_number, self.best_hash, block.header.hash())) } if block.header.state_root() != &self.best_root { - return Err(Error::WrongStateRoot(self.best_root, *block.header.state_root()).into()) + bail!(Error::WrongStateRoot(self.best_root, *block.header.state_root())) } } @@ -267,7 +277,7 @@ impl Rebuilder for PowRebuilder { &mut self.rng, &block.header, engine, - &self.chain, + &*self.chain, is_best )?; @@ -310,11 +320,14 @@ impl Rebuilder for PowRebuilder { } let genesis_hash = self.chain.genesis_hash(); - self.chain.insert_epoch_transition(&mut batch, 0, ::engines::EpochTransition { - block_number: 0, - block_hash: genesis_hash, - proof: vec![], - }); + self.chain.insert_epoch_transition( + &mut batch, self.chain.genesis_header(), + ::engines::EpochTransition { + block_number: 0, + block_hash: genesis_hash, + proof: vec![], + } + ); self.db.write_buffered(batch); Ok(()) diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 03f2eebfbf7..ee36f62c226 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -55,7 +55,7 @@ use rand::{Rng, OsRng}; pub use self::error::Error; pub use self::consensus::*; -pub use self::service::{Service, DatabaseRestore}; +pub use self::service::{Service, DatabaseRestore, FullNodeRestorationParams}; pub use self::traits::SnapshotService; pub use self::watcher::Watcher; pub use types::snapshot_manifest::ManifestData; @@ -81,7 +81,7 @@ const PREFERRED_CHUNK_SIZE: usize = 4 * 1024 * 1024; // Maximal chunk size (decompressed) // Snappy::decompressed_len estimation may sometimes yield results greater -// than PREFERRED_CHUNK_SIZE so allow some threshold here. +// than `PREFERRED_CHUNK_SIZE` so allow some threshold here. const MAX_CHUNK_SIZE: usize = PREFERRED_CHUNK_SIZE / 4 * 5; // Minimum supported state chunk version. @@ -132,7 +132,7 @@ pub fn take_snapshot( writer: W, p: &Progress ) -> Result<(), Error> { - let start_header = chain.block_header_data(&block_at) + let start_header = BlockProvider::block_header_data(chain, &block_at) .ok_or(Error::InvalidStartingBlock(BlockId::Hash(block_at)))?; let state_root = start_header.state_root(); let number = start_header.number(); @@ -176,7 +176,13 @@ pub fn take_snapshot( /// Secondary chunks are engine-specific, but they intend to corroborate the state data /// in the state chunks. /// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis. -pub fn chunk_secondary<'a>(mut chunker: Box, chain: &'a BlockChain, start_hash: H256, writer: &Mutex, progress: &'a Progress) -> Result, Error> { +pub fn chunk_secondary<'a>( + mut chunker: Box, + chain: &'a BlockChain, + start_hash: H256, + writer: &Mutex, + progress: &'a Progress +) -> Result, Error> { let mut chunk_hashes = Vec::new(); let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)]; @@ -483,13 +489,13 @@ const POW_VERIFY_RATE: f32 = 0.02; /// Verify an old block with the given header, engine, blockchain, body. If `always` is set, it will perform /// the fullest verification possible. If not, it will take a random sample to determine whether it will /// do heavy or light verification. -pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &EthEngine, chain: &BlockChain, always: bool) -> Result<(), ::error::Error> { +pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &EthEngine, chain: &RestorationTargetChain, always: bool) -> Result<(), ::error::Error> { engine.verify_block_basic(header)?; if always || rng.gen::() <= POW_VERIFY_RATE { engine.verify_block_unordered(header)?; match chain.block_header_data(header.parent_hash()) { - Some(parent) => engine.verify_block_family(header, &parent.decode()?), + Some(parent) => engine.verify_block_family(header, &parent), None => Ok(()), } } else { diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 2e76153cff4..4e22f0c4ae3 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -30,7 +30,7 @@ use blockchain::{BlockChain, BlockChainDB, BlockChainDBHandler}; use client::{Client, ChainInfo, ClientIoMessage}; use engines::EthEngine; use error::Error; -use hash::keccak; +use hash::{keccak, KECCAK_NULL_RLP}; use ids::BlockId; use io::IoChannel; @@ -45,11 +45,13 @@ use snappy; struct Guard(bool, PathBuf); impl Guard { + // Create a new path guard fn new(path: PathBuf) -> Self { Guard(true, path) } #[cfg(test)] fn benign() -> Self { Guard(false, PathBuf::default()) } + // Don't remove the directory on drop fn disarm(mut self) { self.0 = false } } @@ -62,17 +64,17 @@ impl Drop for Guard { } /// External database restoration handler -pub trait DatabaseRestore: Send + Sync { +pub trait DatabaseRestore: Send + Sync { /// Restart with a new backend. Takes ownership of passed database and moves it to a new location. - fn restore_db(&self, new_db: &str) -> Result<(), Error>; + fn restore_db(&self, new_db: &str, params: &T) -> Result<(), Error>; } /// State restoration manager. -struct Restoration { +pub struct Restoration { manifest: ManifestData, state_chunks_left: HashSet, block_chunks_left: HashSet, - state: StateRebuilder, + state: Option, secondary: Box, writer: Option, snappy_buffer: Bytes, @@ -92,6 +94,29 @@ struct RestorationParams<'a> { } impl Restoration { + /// Create a new `Restoration` instance for a light client. + pub fn new_light( + manifest: ManifestData, + rest_db: PathBuf, + writer: Option, + db: Arc, + rebuilder: Box, + ) -> Self { + let block_chunks_left = manifest.block_hashes.iter().cloned().collect(); + Self { + manifest, + state_chunks_left: HashSet::new(), + block_chunks_left, + state: None, + secondary: rebuilder, + writer, + snappy_buffer: Vec::new(), + final_state_root: KECCAK_NULL_RLP, + guard: Guard::new(rest_db), + db, + } + } + // make a new restoration using the given parameters. fn new(params: RestorationParams) -> Result { let manifest = params.manifest; @@ -102,10 +127,11 @@ impl Restoration { let raw_db = params.db; let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone()); + let boxed = Box::new(chain); let components = params.engine.snapshot_components() .ok_or_else(|| ::snapshot::Error::SnapshotsUnsupported)?; - let secondary = components.rebuilder(chain, raw_db.clone(), &manifest)?; + let secondary = components.rebuilder(boxed, raw_db.clone(), &manifest)?; let root = manifest.state_root.clone(); @@ -113,7 +139,7 @@ impl Restoration { manifest: manifest, state_chunks_left: state_chunks, block_chunks_left: block_chunks, - state: StateRebuilder::new(raw_db.key_value().clone(), params.pruning), + state: Some(StateRebuilder::new(raw_db.key_value().clone(), params.pruning)), secondary: secondary, writer: params.writer, snappy_buffer: Vec::new(), @@ -125,15 +151,22 @@ impl Restoration { // feeds a state chunk, aborts early if `flag` becomes false. fn feed_state(&mut self, hash: H256, chunk: &[u8], flag: &AtomicBool) -> Result<(), Error> { + if self.state.is_none() { + trace!(target: "snapshot", "Feeding a chunk state to a light client"); + return Ok(()); + } if self.state_chunks_left.contains(&hash) { let expected_len = snappy::decompressed_len(chunk)?; if expected_len > MAX_CHUNK_SIZE { trace!(target: "snapshot", "Discarding large chunk: {} vs {}", expected_len, MAX_CHUNK_SIZE); - return Err(::snapshot::Error::ChunkTooLarge.into()); + bail!(::snapshot::Error::ChunkTooLarge); } let len = snappy::decompress_into(chunk, &mut self.snappy_buffer)?; - self.state.feed(&self.snappy_buffer[..len], flag)?; + self.state + .as_mut() + .expect("checked above; qed") + .feed(&self.snappy_buffer[..len], flag)?; if let Some(ref mut writer) = self.writer.as_mut() { writer.write_state_chunk(hash, chunk)?; @@ -151,7 +184,7 @@ impl Restoration { let expected_len = snappy::decompressed_len(chunk)?; if expected_len > MAX_CHUNK_SIZE { trace!(target: "snapshot", "Discarding large chunk: {} vs {}", expected_len, MAX_CHUNK_SIZE); - return Err(::snapshot::Error::ChunkTooLarge.into()); + bail!(::snapshot::Error::ChunkTooLarge); } let len = snappy::decompress_into(chunk, &mut self.snappy_buffer)?; @@ -172,15 +205,17 @@ impl Restoration { if !self.is_done() { return Ok(()) } - // verify final state root. - let root = self.state.state_root(); - if root != self.final_state_root { - warn!("Final restored state has wrong state root: expected {:?}, got {:?}", self.final_state_root, root); - return Err(TrieError::InvalidStateRoot(root).into()); - } + if let Some(state) = self.state { + // verify final state root. + let root = state.state_root(); + if root != self.final_state_root { + warn!("Final restored state has wrong state root: expected {:?}, got {:?}", self.final_state_root, root); + bail!(TrieError::InvalidStateRoot(root)); + } - // check for missing code. - self.state.finalize(self.manifest.block_number, self.manifest.block_hash)?; + // check for missing code. + state.finalize(self.manifest.block_number, self.manifest.block_hash)?; + } // connect out-of-order chunks and verify chain integrity. self.secondary.finalize(engine)?; @@ -203,13 +238,11 @@ impl Restoration { pub type Channel = IoChannel; /// Snapshot service parameters. -pub struct ServiceParams { +pub struct ServiceParams { /// The consensus engine this is built on. pub engine: Arc, - /// The chain's genesis block. - pub genesis_block: Bytes, - /// State pruning algorithm. - pub pruning: Algorithm, + /// Additional chain specific restoration params. + pub chain_params: T, /// Handler for opening a restoration DB. pub restoration_db_handler: Box, /// Async IO channel for sending messages. @@ -218,42 +251,88 @@ pub struct ServiceParams { /// Usually "/snapshot" pub snapshot_root: PathBuf, /// A handle for database restoration. - pub db_restore: Arc, + pub db_restore: Arc>, +} + +/// Full node specific snapshot restoration parameters. +pub struct FullNodeRestorationParams { + /// The chain's genesis block. + pub genesis_block: Bytes, + /// State pruning algorithm. + pub pruning: Algorithm, +} + +/// Chain-specific snapshot restoration parameters. +pub trait ChainRestorationParams: Send + Sync { + /// Create a new `Restoration` instance. + fn restoration( + &self, + manifest: ManifestData, + rest_db: PathBuf, + restoration_db_handler: &BlockChainDBHandler, + writer: Option, + engine: &EthEngine, + ) -> Result; + + /// Whether the client is light. + fn is_light(&self) -> bool { + false + } +} + +impl ChainRestorationParams for FullNodeRestorationParams { + fn restoration( + &self, + manifest: ManifestData, + rest_db: PathBuf, + restoration_db_handler: &BlockChainDBHandler, + writer: Option, + engine: &EthEngine, + ) -> Result { + let params = RestorationParams { + manifest: manifest.clone(), + pruning: self.pruning, + db: restoration_db_handler.open(&rest_db)?, + writer: writer, + genesis: &self.genesis_block, + guard: Guard::new(rest_db), + engine: engine, + }; + Restoration::new(params) + } } /// `SnapshotService` implementation. /// This controls taking snapshots and restoring from them. -pub struct Service { +pub struct Service { restoration: Mutex>, restoration_db_handler: Box, snapshot_root: PathBuf, io_channel: Mutex, - pruning: Algorithm, status: Mutex, reader: RwLock>, engine: Arc, - genesis_block: Bytes, + chain_params: T, state_chunks: AtomicUsize, block_chunks: AtomicUsize, - db_restore: Arc, + db_restore: Arc>, progress: super::Progress, taking_snapshot: AtomicBool, restoring_snapshot: AtomicBool, } -impl Service { +impl Service { /// Create a new snapshot service from the given parameters. - pub fn new(params: ServiceParams) -> Result { + pub fn new(params: ServiceParams) -> Result { let mut service = Service { restoration: Mutex::new(None), restoration_db_handler: params.restoration_db_handler, snapshot_root: params.snapshot_root, io_channel: Mutex::new(params.channel), - pruning: params.pruning, + chain_params: params.chain_params, status: Mutex::new(RestorationStatus::Inactive), reader: RwLock::new(None), engine: params.engine, - genesis_block: params.genesis_block, state_chunks: AtomicUsize::new(0), block_chunks: AtomicUsize::new(0), db_restore: params.db_restore, @@ -265,21 +344,21 @@ impl Service { // create the root snapshot dir if it doesn't exist. if let Err(e) = fs::create_dir_all(&service.snapshot_root) { if e.kind() != ErrorKind::AlreadyExists { - return Err(e.into()) + bail!(e) } } // delete the temporary restoration DB dir if it does exist. if let Err(e) = fs::remove_dir_all(service.restoration_db()) { if e.kind() != ErrorKind::NotFound { - return Err(e.into()) + bail!(e) } } // delete the temporary snapshot dir if it does exist. if let Err(e) = fs::remove_dir_all(service.temp_snapshot_dir()) { if e.kind() != ErrorKind::NotFound { - return Err(e.into()) + bail!(e) } } @@ -335,7 +414,7 @@ impl Service { fn replace_client_db(&self) -> Result<(), Error> { let our_db = self.restoration_db(); - self.db_restore.restore_db(&*our_db.to_string_lossy())?; + self.db_restore.restore_db(&*our_db.to_string_lossy(), &self.chain_params)?; Ok(()) } @@ -386,7 +465,7 @@ impl Service { Run with a longer `--pruning-history` or with `--no-periodic-snapshot`"); return Ok(()) } else { - return Err(e); + bail!(e); } } @@ -443,7 +522,7 @@ impl Service { if let Err(e) = fs::remove_dir_all(&rest_dir) { match e.kind() { ErrorKind::NotFound => {}, - _ => return Err(e.into()), + _ => bail!(e), } } @@ -459,20 +538,21 @@ impl Service { false => None }; - let params = RestorationParams { - manifest: manifest.clone(), - pruning: self.pruning, - db: self.restoration_db_handler.open(&rest_db)?, - writer: writer, - genesis: &self.genesis_block, - guard: Guard::new(rest_db), - engine: &*self.engine, + let state_chunks = if self.chain_params.is_light() { + 0 + } else { + manifest.state_hashes.len() }; - - let state_chunks = manifest.state_hashes.len(); let block_chunks = manifest.block_hashes.len(); - *res = Some(Restoration::new(params)?); + *res = Some(ChainRestorationParams::restoration( + &self.chain_params, + manifest.clone(), + rest_db, + &*self.restoration_db_handler, + writer, + &*self.engine, + )?); self.restoring_snapshot.store(true, Ordering::SeqCst); @@ -666,7 +746,7 @@ impl Service { } } -impl SnapshotService for Service { +impl SnapshotService for Service { fn manifest(&self) -> Option { self.reader.read().as_ref().map(|r| r.manifest().clone()) } @@ -691,7 +771,10 @@ impl SnapshotService for Service { .chain( restoration.manifest.state_hashes .iter() - .filter(|h| !restoration.state_chunks_left.contains(h)) + .filter(|h| + !self.chain_params.is_light() && + !restoration.state_chunks_left.contains(h) + ) ) .map(|h| *h) .collect(); @@ -750,7 +833,7 @@ impl SnapshotService for Service { } } -impl Drop for Service { +impl Drop for Service { fn drop(&mut self) { self.abort_restore(); } @@ -764,14 +847,14 @@ mod tests { use spec::Spec; use journaldb::Algorithm; use error::Error; - use snapshot::{ManifestData, RestorationStatus, SnapshotService}; + use snapshot::{ManifestData, RestorationStatus, SnapshotService, FullNodeRestorationParams as ChainParams}; use super::*; use tempdir::TempDir; use test_helpers::restoration_db_handler; struct NoopDBRestore; - impl DatabaseRestore for NoopDBRestore { - fn restore_db(&self, _new_db: &str) -> Result<(), Error> { + impl DatabaseRestore for NoopDBRestore { + fn restore_db(&self, _new_db: &str, _: &ChainParams) -> Result<(), Error> { Ok(()) } } @@ -786,9 +869,11 @@ mod tests { let snapshot_params = ServiceParams { engine: spec.engine.clone(), - genesis_block: spec.genesis_block(), restoration_db_handler: restoration_db_handler(Default::default()), - pruning: Algorithm::Archive, + chain_params: ChainParams { + genesis_block: spec.genesis_block(), + pruning: Algorithm::Archive, + }, channel: service.channel(), snapshot_root: dir, db_restore: Arc::new(NoopDBRestore), diff --git a/ethcore/src/snapshot/tests/helpers.rs b/ethcore/src/snapshot/tests/helpers.rs index 19f50e94690..fe696cb6069 100644 --- a/ethcore/src/snapshot/tests/helpers.rs +++ b/ethcore/src/snapshot/tests/helpers.rs @@ -175,7 +175,7 @@ pub fn restore( let mut state = StateRebuilder::new(db.key_value().clone(), journaldb::Algorithm::Archive); let mut secondary = { let chain = BlockChain::new(Default::default(), genesis, db.clone()); - components.rebuilder(chain, db, manifest).unwrap() + components.rebuilder(Box::new(chain), db, manifest).unwrap() }; let mut snappy_buffer = Vec::new(); diff --git a/ethcore/src/snapshot/tests/proof_of_work.rs b/ethcore/src/snapshot/tests/proof_of_work.rs index 2572699682f..ad7a39ecfa5 100644 --- a/ethcore/src/snapshot/tests/proof_of_work.rs +++ b/ethcore/src/snapshot/tests/proof_of_work.rs @@ -22,7 +22,7 @@ use error::{Error, ErrorKind}; use blockchain::generator::{BlockGenerator, BlockBuilder}; use blockchain::{BlockChain, ExtrasInsert}; -use snapshot::{chunk_secondary, Error as SnapshotError, Progress, SnapshotComponents}; +use snapshot::{chunk_secondary, Error as SnapshotError, Progress, RebuilderFactory}; use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter}; use parking_lot::Mutex; @@ -83,7 +83,7 @@ fn chunk_and_restore(amount: u64) { // restore it. let new_db = test_helpers::new_db(); let new_chain = BlockChain::new(Default::default(), genesis.encoded().raw(), new_db.clone()); - let mut rebuilder = SNAPSHOT_MODE.rebuilder(new_chain, new_db.clone(), &manifest).unwrap(); + let mut rebuilder = SNAPSHOT_MODE.rebuilder(Box::new(new_chain), new_db.clone(), &manifest).unwrap(); let reader = PackedReader::new(&snapshot_path).unwrap().unwrap(); let flag = AtomicBool::new(true); @@ -140,7 +140,7 @@ fn checks_flag() { block_hash: H256::default(), }; - let mut rebuilder = SNAPSHOT_MODE.rebuilder(chain, db.clone(), &manifest).unwrap(); + let mut rebuilder = SNAPSHOT_MODE.rebuilder(Box::new(chain), db.clone(), &manifest).unwrap(); match rebuilder.feed(&chunk, engine.as_ref(), &AtomicBool::new(false)) { Err(Error(ErrorKind::Snapshot(SnapshotError::RestorationAborted), _)) => {} diff --git a/ethcore/src/snapshot/tests/service.rs b/ethcore/src/snapshot/tests/service.rs index a5af63b01e2..560999d0cb3 100644 --- a/ethcore/src/snapshot/tests/service.rs +++ b/ethcore/src/snapshot/tests/service.rs @@ -22,7 +22,7 @@ use tempdir::TempDir; use client::{Client, BlockInfo}; use ids::BlockId; use snapshot::service::{Service, ServiceParams}; -use snapshot::{self, ManifestData, SnapshotService}; +use snapshot::{self, ManifestData, SnapshotService, FullNodeRestorationParams as ChainParams}; use spec::Spec; use test_helpers::{generate_dummy_client_with_spec_and_data, restoration_db_handler}; @@ -31,8 +31,8 @@ use kvdb_rocksdb::DatabaseConfig; struct NoopDBRestore; -impl snapshot::DatabaseRestore for NoopDBRestore { - fn restore_db(&self, _new_db: &str) -> Result<(), ::error::Error> { +impl snapshot::DatabaseRestore for NoopDBRestore { + fn restore_db(&self, _new_db: &str, _: &ChainParams) -> Result<(), ::error::Error> { Ok(()) } } @@ -68,9 +68,11 @@ fn restored_is_equivalent() { let service_params = ServiceParams { engine: spec.engine.clone(), - genesis_block: spec.genesis_block(), restoration_db_handler: restoration, - pruning: ::journaldb::Algorithm::Archive, + chain_params: ChainParams { + genesis_block: spec.genesis_block(), + pruning: ::journaldb::Algorithm::Archive, + }, channel: IoChannel::disconnected(), snapshot_root: path, db_restore: client2.clone(), @@ -110,9 +112,11 @@ fn guards_delete_folders() { let tempdir = TempDir::new("").unwrap(); let service_params = ServiceParams { engine: spec.engine.clone(), - genesis_block: spec.genesis_block(), restoration_db_handler: restoration_db_handler(DatabaseConfig::with_columns(::db::NUM_COLUMNS)), - pruning: ::journaldb::Algorithm::Archive, + chain_params: ChainParams { + genesis_block: spec.genesis_block(), + pruning: ::journaldb::Algorithm::Archive, + }, channel: IoChannel::disconnected(), snapshot_root: tempdir.path().to_owned(), db_restore: Arc::new(NoopDBRestore), diff --git a/ethcore/sync/Cargo.toml b/ethcore/sync/Cargo.toml index 6cc87df31f2..f36e675d307 100644 --- a/ethcore/sync/Cargo.toml +++ b/ethcore/sync/Cargo.toml @@ -34,6 +34,7 @@ smallvec = { version = "0.4", features = ["heapsizeof"] } parking_lot = "0.6" trace-time = { path = "../../util/trace-time" } ipnetwork = "0.12.6" +itertools = "0.7.8" [dev-dependencies] ethcore-io = { path = "../../util/io", features = ["mio"] } diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index ef54a4802d9..01d26277384 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -40,6 +40,7 @@ use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62, use light::client::AsLightClient; use light::Provider; use light::net::{self as light_net, LightProtocol, Params as LightParams, Capabilities, Handler as LightHandler, EventContext}; +use light_sync::{SnapshotSyncLightHandler as WarpProto, LightSync as SyncHandler}; use network::IpFilter; use private_tx::PrivateTxHandler; use transaction::UnverifiedTransaction; @@ -748,6 +749,7 @@ pub struct LightSyncParams { /// Service for light synchronization. pub struct LightSync { proto: Arc, + warp_proto: Arc, sync: Arc<::light_sync::SyncInfo + Sync + Send>, attached_protos: Vec, network: NetworkService, @@ -757,13 +759,15 @@ pub struct LightSync { impl LightSync { /// Create a new light sync service. - pub fn new(params: LightSyncParams) -> Result + pub fn new( + params: LightSyncParams, + warp_sync: WarpSync, + snapshot_service: Arc, + ) -> Result where L: AsLightClient + Provider + Sync + Send + 'static { - use light_sync::LightSync as SyncHandler; - // initialize light protocol handler and attach sync module. - let (sync, light_proto) = { + let (sync, light_proto, warp_proto) = { let light_params = LightParams { network_id: params.network_id, config: Default::default(), @@ -776,21 +780,23 @@ impl LightSync { sample_store: None, }; + let sync_handler = Arc::new(SyncHandler::new(params.client.clone(), warp_sync)?); let mut light_proto = LightProtocol::new(params.client.clone(), light_params); - let sync_handler = Arc::new(SyncHandler::new(params.client.clone())?); + let mut warp_proto = WarpProto::new(params.network_id, sync_handler.clone(), snapshot_service); light_proto.add_handler(sync_handler.clone()); for handler in params.handlers { light_proto.add_handler(handler); } - (sync_handler, Arc::new(light_proto)) + (sync_handler, Arc::new(light_proto), Arc::new(warp_proto)) }; let service = NetworkService::new(params.network_config, None)?; Ok(LightSync { proto: light_proto, + warp_proto: warp_proto, sync: sync, attached_protos: params.attached_protos, network: service, @@ -850,6 +856,8 @@ impl ManageNetwork for LightSync { self.network.register_protocol(light_proto, self.subprotocol_name, ::light::net::PROTOCOL_VERSIONS) .unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e)); + self.network.register_protocol(self.warp_proto.clone(), WARP_SYNC_PROTOCOL_ID, &[PAR_PROTOCOL_VERSION_1]) + .unwrap_or_else(|e| warn!("Error registering warp sync protocol: {:?}", e)); for proto in &self.attached_protos { proto.register(&self.network) } } @@ -870,7 +878,12 @@ impl ManageNetwork for LightSync { impl LightSyncProvider for LightSync { fn peer_numbers(&self) -> PeerNumbers { - let (connected, active) = self.proto.peer_count(); + let (connected, active) = if self.sync.is_snapshot_syncing() { + let connected = self.sync.connected_snapshot_peers(); + (connected, connected) + } else { + self.proto.peer_count() + }; let peers_range = self.num_peers_range(); debug_assert!(peers_range.end > peers_range.start); PeerNumbers { diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 625ccb30d22..7580d9dc2ff 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -149,11 +149,11 @@ const MAX_NEW_HASHES: usize = 64; const MAX_NEW_BLOCK_AGE: BlockNumber = 20; // maximal packet size with transactions (cannot be greater than 16MB - protocol limitation). const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024; -// Min number of blocks to be behind for a snapshot sync -const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000; -const SNAPSHOT_MIN_PEERS: usize = 3; +/// Min number of blocks to be behind for a snapshot sync +pub(crate) const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000; +pub(crate) const SNAPSHOT_MIN_PEERS: usize = 3; -const STATUS_PACKET: u8 = 0x00; +pub(crate) const STATUS_PACKET: u8 = 0x00; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; const TRANSACTIONS_PACKET: u8 = 0x02; pub const GET_BLOCK_HEADERS_PACKET: u8 = 0x03; @@ -175,7 +175,7 @@ pub const CONSENSUS_DATA_PACKET: u8 = 0x15; const PRIVATE_TRANSACTION_PACKET: u8 = 0x16; const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17; -const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3; +pub const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3; const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5); const STATUS_TIMEOUT: Duration = Duration::from_secs(5); @@ -183,8 +183,8 @@ const HEADERS_TIMEOUT: Duration = Duration::from_secs(15); const BODIES_TIMEOUT: Duration = Duration::from_secs(20); const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10); const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3); -const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5); -const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120); +pub const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5); +pub const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120); #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Sync state @@ -886,21 +886,45 @@ impl ChainSync { let warp_protocol = warp_protocol_version != 0; let protocol = if warp_protocol { warp_protocol_version } else { ETH_PROTOCOL_VERSION_63.0 }; trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol); - let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 }); let chain = io.chain().chain_info(); - packet.append(&(protocol as u32)); - packet.append(&self.network_id); - packet.append(&chain.total_difficulty); - packet.append(&chain.best_block_hash); - packet.append(&chain.genesis_hash); - if warp_protocol { + let (manifest_number, manifest_hash) = if warp_protocol { let manifest = io.snapshot_service().manifest(); - let block_number = manifest.as_ref().map_or(0, |m| m.block_number); - let manifest_hash = manifest.map_or(H256::new(), |m| keccak(m.into_rlp())); + ( + Some(manifest.as_ref().map_or(0, |m| m.block_number)), + Some(manifest.map_or(H256::new(), |m| keccak(m.into_rlp()))), + ) + } else { + (None, None) + }; + let packet = ChainSync::status_packet( + protocol as u32, + self.network_id, + &chain, + manifest_hash, + manifest_number, + ); + io.respond(STATUS_PACKET, packet) + } + + pub(crate) fn status_packet( + protocol_version: u32, + network_id: u64, + chain_info: &BlockChainInfo, + manifest_hash: Option, + manifest_number: Option, + ) -> Vec { + let warp_protocol = manifest_hash.is_some() && manifest_number.is_some(); + let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 }); + packet.append(&protocol_version); + packet.append(&network_id); + packet.append(&chain_info.total_difficulty); + packet.append(&chain_info.best_block_hash); + packet.append(&chain_info.genesis_hash); + if let (Some(manifest_hash), Some(manifest_number)) = (manifest_hash, manifest_number) { packet.append(&manifest_hash); - packet.append(&block_number); + packet.append(&manifest_number); } - io.respond(STATUS_PACKET, packet.out()) + packet.out() } pub fn maintain_peers(&mut self, io: &mut SyncIo) { diff --git a/ethcore/sync/src/lib.rs b/ethcore/sync/src/lib.rs index eb38d09d9d5..9e89bc234c7 100644 --- a/ethcore/sync/src/lib.rs +++ b/ethcore/sync/src/lib.rs @@ -38,6 +38,7 @@ extern crate parking_lot; extern crate smallvec; extern crate rlp; extern crate ipnetwork; +extern crate itertools; extern crate keccak_hash as hash; extern crate keccak_hasher; extern crate triehash_ethereum; diff --git a/ethcore/sync/src/light_sync/mod.rs b/ethcore/sync/src/light_sync/mod.rs index 32e3a0dbfde..6a3fc8e750d 100644 --- a/ethcore/sync/src/light_sync/mod.rs +++ b/ethcore/sync/src/light_sync/mod.rs @@ -31,29 +31,49 @@ //! - When within a certain distance of the head of the chain, aggressively download all //! announced blocks. //! - On bad block/response, punish peer and reset. +//! +//! When `warp_sync` is enabled (by default), a light client will first try to +//! sync using snapshots. If a client fails to get a snapshot manifest within a certain period of time, +//! it will fall back to normal sync. use std::collections::{HashMap, HashSet}; use std::mem; use std::sync::Arc; use std::time::{Instant, Duration}; +use bytes::Bytes; use ethcore::encoded; +use super::WarpSync; +use chain; use light::client::{AsLightClient, LightChainClient}; use light::net::{ PeerStatus, Announcement, Handler, BasicContext, EventContext, Capabilities, ReqId, Status, Error as NetError, }; + +use hash::keccak; use light::request::{self, CompleteHeadersRequest as HeadersRequest}; -use network::PeerId; +use network::{PeerId}; use ethereum_types::{H256, U256}; +use rlp::Rlp; +use ethcore::snapshot::{ManifestData, RestorationStatus, SnapshotService}; +use snapshot::ChunkType; +use self::sync_round::{AbortReason, SyncRound, ResponseContext}; +use self::warp_sync::{ + SnapshotSyncHandler, SnapshotSyncContext, SnapshotSyncEvent, + SnapshotManager, WarpSyncError, SnapshotPeerAsking, WarpSyncState, + GroupedPeers, +}; + +pub use self::warp_sync::SnapshotSyncLightHandler; + use parking_lot::{Mutex, RwLock}; use rand::{Rng, OsRng}; -use self::sync_round::{AbortReason, SyncRound, ResponseContext}; - mod response; mod sync_round; +mod warp_sync; #[cfg(test)] mod tests; @@ -64,6 +84,8 @@ const REQ_TIMEOUT_BASE: Duration = Duration::from_secs(7); // If we request N headers, then the timeout will be: // REQ_TIMEOUT_BASE + N * REQ_TIMEOUT_PER_HEADER const REQ_TIMEOUT_PER_HEADER: Duration = Duration::from_millis(10); +// This number is pretty random. +pub(super) const MAX_BLOCK_CHUNKS_DOWNLOAD_AHEAD: usize = 10; /// Peer chain info. #[derive(Debug, Clone, PartialEq, Eq)] @@ -211,6 +233,8 @@ enum SyncState { AncestorSearch(AncestorSearch), // Doing sync rounds. Rounds(SyncRound), + // warp sync + Snapshot(WarpSyncState), } struct ResponseCtx<'a> { @@ -236,6 +260,7 @@ pub struct LightSync { client: Arc, rng: Mutex, state: Mutex, + snapshot_manager: RwLock, } #[derive(Debug, Clone)] @@ -275,6 +300,11 @@ impl Handler for LightSync { } fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { + match *self.state.lock() { + SyncState::Snapshot(_) => return, + _ => {}, + }; + let peer_id = ctx.peer(); let peer = match self.peers.write().remove(&peer_id).map(|p| p.into_inner()) { @@ -314,10 +344,13 @@ impl Handler for LightSync { let mut state = self.state.lock(); *state = match mem::replace(&mut *state, SyncState::Idle) { - SyncState::Idle => SyncState::Idle, + SyncState::Idle => + SyncState::Idle, SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search.requests_abandoned(unfulfilled)), - SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(unfulfilled)), + SyncState::Rounds(round) => + SyncState::Rounds(round.requests_abandoned(unfulfilled)), + other => other, }; } @@ -391,10 +424,13 @@ impl Handler for LightSync { }; *state = match mem::replace(&mut *state, SyncState::Idle) { - SyncState::Idle => SyncState::Idle, + SyncState::Idle => + SyncState::Idle, SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search.process_response(&ctx, &*self.client)), - SyncState::Rounds(round) => SyncState::Rounds(round.process_response(&ctx)), + SyncState::Rounds(round) => + SyncState::Rounds(round.process_response(&ctx)), + other => other, }; } @@ -406,6 +442,405 @@ impl Handler for LightSync { } } +// snapshot sync handler methods +impl SnapshotSyncHandler for LightSync { + fn on_connect(&self, event: &SnapshotSyncEvent) { + match *self.state.lock() { + SyncState::Snapshot(_) => {} + _ => { + return; + } + } + if self.send_warp_sync_status_packet(event) { + self.maintain_sync_with_snap(event.as_context()); + } + } + + fn on_disconnect(&self, event: &SnapshotSyncEvent) { + match *self.state.lock() { + SyncState::Snapshot(_) => {} + _ => { + return; + } + } + trace!(target: "warp", "Disconnected from peer {}", event.peer()); + let asking = self.snapshot_manager.write().on_peer_aborting(&event.peer()); + if let Some(SnapshotPeerAsking::SnapshotManifest) = asking { + // TODO: if this was the last peer with the same snapshot, + // maybe we should restart warp sync from scratch + self.restart(event.as_context().snapshot_service()); + self.maintain_sync_with_snap(event.as_context()); + } + } + + fn on_warp_peer_status(&self, event: &SnapshotSyncEvent, rlp: Rlp) -> Result<(), WarpSyncError> { + let genesis_hash = self.client.as_light_client().chain_info().genesis_hash; + self.snapshot_manager.write().on_peer_status(event, rlp, genesis_hash) + } + + fn on_snap_manifest(&self, event: &SnapshotSyncEvent, rlp: Rlp) -> Result<(), WarpSyncError> { + let peer_id = event.peer(); + if self.snapshot_manager.read().get(&peer_id).is_none() { + trace!(target: "warp", "Ignoring snapshot manifest from unknown peer {}", peer_id); + return Ok(()); + } + + let (manifest, manifest_hash) = { + let is_manifest_state = { + match *self.state.lock() { + SyncState::Snapshot(WarpSyncState::Manifest) => true, + _ => false + } + }; + + let asked_for_manifest = match self.snapshot_manager.write().reset_peer_asking(&peer_id) { + Some(SnapshotPeerAsking::SnapshotManifest) => true, + _ => false, + }; + if !asked_for_manifest || !is_manifest_state { + trace!(target: "warp", "{}: Ignored unexpected manifest", peer_id); + return Ok(()); + } + + let manifest_rlp = rlp.at(0)?; + match ManifestData::from_rlp(manifest_rlp.as_raw()) { + Err(e) => { + trace!(target: "warp", "{}: Ignored bad manifest: {:?}", peer_id, e); + return Err(WarpSyncError::BadManifest); + } + Ok(manifest) => (manifest, keccak(manifest_rlp.as_raw())), + } + }; + + let is_supported_version = event.as_context().snapshot_service().supported_versions() + .map_or(false, |(l, h)| manifest.version >= l && manifest.version <= h); + + if !is_supported_version { + trace!(target: "warp", "{}: Snapshot manifest version not supported: {}", peer_id, manifest.version); + return Err(WarpSyncError::UnsupportedManifestVersion(manifest.version)); + } + + trace!(target: "warp", "Received a manifest ({}) from {}", manifest.block_hash, peer_id); + + self.snapshot_manager.write().reset_manifest_to(&manifest, &manifest_hash); + event.as_context().snapshot_service().begin_restore(manifest); + *self.state.lock() = SyncState::Snapshot(WarpSyncState::Blocks); + + self.maintain_sync_with_snap(event.as_context()); + + Ok(()) + } + + fn on_snap_data(&self, event: &SnapshotSyncEvent, rlp: Rlp) -> Result<(), WarpSyncError> { + let peer_id = event.peer(); + if self.snapshot_manager.read().get(&peer_id).is_none() { + trace!(target: "warp", "Ignoring snapshot chunk from unknown peer {}", peer_id); + return Ok(()); + } + + self.snapshot_manager.write().clear_peer_download(&peer_id); + + let (chunk, hash) = { + let is_blocks_state = { + match *self.state.lock() { + SyncState::Snapshot(WarpSyncState::Blocks) => true, + _ => false + } + }; + + // always lock the snapshot manager before the state + let mut snap = self.snapshot_manager.write(); + + let asking = snap.reset_peer_asking(&peer_id); + let asked_for_snapshot_data = match asking { + Some(SnapshotPeerAsking::SnapshotData(_)) => true, + _ => false, + }; + + if !asked_for_snapshot_data || !is_blocks_state { + trace!(target: "warp", "Peer {}: Ignored unexpected snapshot chunk", peer_id); + return Ok(()); + } + + let status = event.as_context().snapshot_service().status(); + match status { + RestorationStatus::Inactive | RestorationStatus::Failed => { + trace!(target: "warp", "Snapshot restoration aborted"); + *self.state.lock() = SyncState::Snapshot(WarpSyncState::WaitingPeers); + + // only note bad if restoration failed. + if let (Some(hash), RestorationStatus::Failed) = (snap.snapshot_hash(), status) { + trace!(target: "warp", "Noting snapshot hash {} as bad", hash); + snap.note_bad(hash); + } + + snap.clear(); + return Ok(()); + }, + RestorationStatus::Initializing { .. } => { + trace!(target: "warp", "{}: Snapshot restoration is initializing", peer_id); + return Ok(()); + }, + RestorationStatus::Ongoing { .. } => { + trace!(target: "warp", "{}: Snapshot restoration is ongoing", peer_id); + }, + } + + let snapshot_data: Bytes = rlp.val_at(0)?; + match snap.validate_chunk(&snapshot_data) { + Ok(ChunkType::Block(hash)) => { + match asking { + Some(SnapshotPeerAsking::SnapshotData(h)) if h != hash => { + trace!(target: "warp", "{}: Asked for a different block chunk", peer_id); + }, + _ => {}, + } + (snapshot_data, hash) + } + _ => { + trace!(target: "warp", "{}: Got a state or a bad block chunk on light client", peer_id); + return Err(WarpSyncError::BadBlockChunk); + } + } + }; + + trace!(target: "warp", "{}: Processing block chunk", peer_id); + event.as_context().snapshot_service().restore_block_chunk(hash, chunk); + + if self.snapshot_manager.read().is_complete() { + // wait for snapshot restoration process to complete + *self.state.lock() = SyncState::Snapshot(WarpSyncState::WaitingService); + } + + Ok(()) + } + + fn on_tick(&self, ctx: &SnapshotSyncContext) { + self.maintain_sync_with_snap(ctx); + self.snapshot_manager.write().disconnect_slowpokes(ctx); + } +} + +// private warp sync helpers +impl LightSync { + + fn get_init_state(client: &LightChainClient, warp_sync: WarpSync) -> SyncState { + let best_block = client.chain_info().best_block_number; + let waiting_peers = SyncState::Snapshot(WarpSyncState::WaitingPeers); + match warp_sync { + WarpSync::Enabled => waiting_peers, + WarpSync::OnlyAndAfter(block) if block > best_block => waiting_peers, + _ => SyncState::Idle, + } + } + + fn maintain_sync_with_snap(&self, ctx: &SnapshotSyncContext) { + if !self.warp_sync_enabled(ctx.snapshot_service()) { + trace!(target: "warp", "Skipping warp sync. Disabled or not supported."); + return; + } + + let our_best_block = self.client.as_light_client().chain_info().best_block_number; + let best_seen = SyncInfo::highest_block(self); + let peers = self.snapshot_manager.read().best_peer_group(our_best_block, best_seen); + + self.maybe_start_snapshot_sync(ctx, &peers); + + match *self.state.lock() { + SyncState::Snapshot(_) => {} + _ => { + return; + } + } + + let old_state = match *self.state.lock() { + SyncState::Snapshot(s) => s, + _ => { return; } + }; + match old_state { + WarpSyncState::WaitingService => { + match ctx.snapshot_service().status() { + RestorationStatus::Initializing { .. } => { + trace!(target: "warp", "Snapshot restoration is initializing"); + return; + }, + RestorationStatus::Inactive => { + trace!(target: "warp", "Snapshot restoration is complete"); + self.restart(ctx.snapshot_service()); + return; + }, + RestorationStatus::Ongoing { block_chunks_done, .. } => { + // Initialize the snapshot if not already done + self.snapshot_manager.write().initialize(ctx.snapshot_service()); + let left_chunks = self.snapshot_manager.read() + .done_chunks() + .saturating_sub(block_chunks_done as usize); + if !self.snapshot_manager.read().is_complete() && + left_chunks <= MAX_BLOCK_CHUNKS_DOWNLOAD_AHEAD + { + trace!(target: "warp", "Resuming snapshot sync"); + *self.state.lock() = SyncState::Snapshot(WarpSyncState::Blocks); + } + }, + RestorationStatus::Failed => { + trace!(target: "warp", "Snapshot restoration aborted"); + self.snapshot_manager.write().clear(); + *self.state.lock() = SyncState::Snapshot(WarpSyncState::WaitingPeers); + }, + } + self.continue_warp_sync(ctx, &peers); + }, + WarpSyncState::Blocks => { + self.continue_warp_sync(ctx, &peers); + } + _ => {}, + }; + } + + fn restart(&self, service: &SnapshotService) { + trace!(target: "sync", "Restarting"); + if let SyncState::Snapshot(WarpSyncState::Blocks) = *self.state.lock() { + debug!(target: "warp", "Aborting snapshot restore"); + service.abort_restore(); + } + self.snapshot_manager.write().clear(); + let warp_sync = self.snapshot_manager.read().warp_sync(); + let init_state = Self::get_init_state(self.client.as_light_client(), warp_sync); + *self.state.lock() = init_state; + } + + fn continue_warp_sync(&self, ctx: &SnapshotSyncContext, peers: &Option) { + let old_state = match *self.state.lock() { + SyncState::Snapshot(s) => s, + _ => { return; } + }; + match old_state { + WarpSyncState::Blocks => { + match ctx.snapshot_service().status() { + RestorationStatus::Initializing { .. } => { + self.snapshot_manager.write().initialize(ctx.snapshot_service()); + trace!(target: "warp", "Snapshot service is initializing, pausing sync"); + *self.state.lock() = SyncState::Snapshot(WarpSyncState::WaitingService); + return; + } + RestorationStatus::Ongoing { block_chunks_done, .. } => { + // Initialize the snapshot if not already done + self.snapshot_manager.write().initialize(ctx.snapshot_service()); + let processed_chunks = self.snapshot_manager.read() + .done_chunks() + .saturating_sub(block_chunks_done as usize); + if processed_chunks > MAX_BLOCK_CHUNKS_DOWNLOAD_AHEAD { + trace!(target: "warp", "Snapshot queue full, pausing sync"); + *self.state.lock() = SyncState::Snapshot(WarpSyncState::WaitingService); + return; + } + } + RestorationStatus::Failed => { + trace!(target: "warp", "Snapshot restoration aborted"); + self.snapshot_manager.write().clear(); + *self.state.lock() = SyncState::Snapshot(WarpSyncState::WaitingPeers); + return; + } + s => { + trace!(target: "warp", "Downloading chunks, but snapshot service state is {:?}", s); + } + } + if let Some(peers) = peers { + self.snapshot_manager.write().request_snapshot_blocks(ctx, &peers.peers); + } else { + debug!(target: "warp", "No peers to download snapshot blocks from"); + } + }, + _ => {}, + } + } + + fn warp_sync_enabled(&self, service: &SnapshotService) -> bool { + let warp_sync = self.snapshot_manager.read().warp_sync(); + warp_sync.is_enabled() && service.supported_versions().is_some() + } + + fn maybe_start_snapshot_sync(&self, ctx: &SnapshotSyncContext, peers: &Option) { + match *self.state.lock() { + SyncState::Snapshot(WarpSyncState::WaitingPeers) => {} + _ => { + return; + } + } + + let has_manifest = self.snapshot_manager.read().has_manifest(); + let timeout = self.snapshot_manager.read().timeout(); + + if !has_manifest { + let requested = match *peers { + Some(ref p) => { + if p.peers.len() >= chain::SNAPSHOT_MIN_PEERS || timeout { + self.maybe_request_manifest(ctx, &p.peers) + } else { + false + } + }, + None => false, + }; + + if !requested { + trace!(target: "warp", "No appropriate snapshots found"); + } else { + return; + } + } + + let warp_sync = self.snapshot_manager.read().warp_sync(); + + let timeout = match *self.state.lock() { + SyncState::Snapshot(WarpSyncState::WaitingPeers) if timeout => true, + _ => false, + }; + + if timeout && !warp_sync.is_warp_only() { + trace!(target: "warp", "No snapshots found, starting header sync"); + *self.state.lock() = SyncState::Idle; + } + } + + fn maybe_request_manifest(&self, ctx: &SnapshotSyncContext, peers: &[PeerId]) -> bool { + let peer = self.snapshot_manager.write().request_manifest(ctx, peers); + if let Some(id) = peer { + *self.state.lock() = SyncState::Snapshot(WarpSyncState::Manifest); + trace!(target: "warp", "Requested a snapshot manifest from peer {}", id); + } + peer.is_some() + } + + fn send_warp_sync_status_packet(&self, event: &SnapshotSyncEvent) -> bool { + let protocol = event.as_context().protocol_version(event.peer()).unwrap_or(0); + let is_warp_protocol = protocol != 0; + if !is_warp_protocol { + trace!(target: "warp", "Peer {} doesn't support warp protocol", event.peer()); + return false; + } + + trace!(target: "warp", "Sending status to {}", event.peer()); + + let network_id = event.as_context().network_id(); + let chain_info = self.client.as_light_client().chain_info(); + + let manifest_hash = H256::new(); + let manifest_number: u64 = 0; + + let packet = chain::ChainSync::status_packet( + protocol as u32, + network_id, + &chain_info, + Some(manifest_hash), + Some(manifest_number), + ); + + event.as_context().send(event.peer(), chain::STATUS_PACKET, packet); + true + } +} + // private helpers impl LightSync { // Begins a search for the common ancestor and our best block. @@ -421,7 +856,7 @@ impl LightSync { let chain_info = self.client.as_light_client().chain_info(); trace!(target: "sync", "Beginning search for common ancestor from {:?}", - (chain_info.best_block_number, chain_info.best_block_hash)); + (chain_info.best_block_number, chain_info.best_block_hash)); *state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number)); } @@ -435,6 +870,16 @@ impl LightSync { let chain_info = client.chain_info(); let mut state = self.state.lock(); + + // skip normal sync if we're warp syncing + match *state { + SyncState::Snapshot(s) => { + trace!(target: "sync", "Skipping non-warp sync. State: {:?}", s); + return; + }, + _ => {}, + }; + debug!(target: "sync", "Maintaining sync ({:?})", &*state); // drain any pending blocks into the queue. @@ -544,10 +989,13 @@ impl LightSync { drop(pending_reqs); *state = match mem::replace(&mut *state, SyncState::Idle) { - SyncState::Idle => SyncState::Idle, + SyncState::Idle => + SyncState::Idle, SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search.requests_abandoned(&unfulfilled)), - SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(&unfulfilled)), + SyncState::Rounds(round) => + SyncState::Rounds(round.requests_abandoned(&unfulfilled)), + other => other, }; } } @@ -622,15 +1070,18 @@ impl LightSync { /// /// This won't do anything until registered as a handler /// so it can act on events. - pub fn new(client: Arc) -> Result { + pub fn new(client: Arc, warp_sync: WarpSync) -> Result { + let best_block = client.as_light_client().chain_info().best_block_number; + let state = Self::get_init_state(client.as_light_client(), warp_sync); Ok(LightSync { - start_block_number: client.as_light_client().chain_info().best_block_number, + start_block_number: best_block, best_seen: Mutex::new(None), peers: RwLock::new(HashMap::new()), pending_reqs: Mutex::new(HashMap::new()), client: client, rng: Mutex::new(OsRng::new()?), - state: Mutex::new(SyncState::Idle), + state: Mutex::new(state), + snapshot_manager: RwLock::new(SnapshotManager::new(warp_sync)), }) } } @@ -645,6 +1096,12 @@ pub trait SyncInfo { /// Whether major sync is underway. fn is_major_importing(&self) -> bool; + + /// Whether warp sync is underway. + fn is_snapshot_syncing(&self) -> bool; + + /// Count the number of connected peers with snapshots. + fn connected_snapshot_peers(&self) -> usize; } impl SyncInfo for LightSync { @@ -668,4 +1125,15 @@ impl SyncInfo for LightSync { _ => true, } } + + fn is_snapshot_syncing(&self) -> bool { + match *self.state.lock() { + SyncState::Snapshot(_) => true, + _ => false, + } + } + + fn connected_snapshot_peers(&self) -> usize { + self.snapshot_manager.read().peers_count() + } } diff --git a/ethcore/sync/src/light_sync/sync_round.rs b/ethcore/sync/src/light_sync/sync_round.rs index 79684efe53d..0e6c49d124b 100644 --- a/ethcore/sync/src/light_sync/sync_round.rs +++ b/ethcore/sync/src/light_sync/sync_round.rs @@ -59,7 +59,7 @@ pub enum AbortReason { // A request for headers with a known starting header hash. // and a known parent hash for the last block. -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Debug)] struct SubchainRequest { subchain_parent: (u64, H256), headers_request: HeadersRequest, @@ -84,6 +84,7 @@ impl Ord for SubchainRequest { } /// Manages downloading of interior blocks of a sparse header chain. +#[derive(Debug)] pub struct Fetcher { sparse: VecDeque
, // sparse header chain. requests: BinaryHeap, @@ -316,6 +317,7 @@ fn scaffold_params(diff: u64) -> (u64, u64) { /// If the sync target is within ROUND_SKIP of the start, we request /// only those blocks. If the sync target is within (ROUND_SKIP + 1) * (ROUND_PIVOTS - 1) of /// the start, we reduce the number of pivots so the target is outside it. +#[derive(Debug)] pub struct RoundStart { start_block: (u64, H256), target: (u64, H256), diff --git a/ethcore/sync/src/light_sync/tests/test_net.rs b/ethcore/sync/src/light_sync/tests/test_net.rs index 5995bd7c6c0..c68484bafae 100644 --- a/ethcore/sync/src/light_sync/tests/test_net.rs +++ b/ethcore/sync/src/light_sync/tests/test_net.rs @@ -24,6 +24,7 @@ use tests::helpers::{TestNet, Peer as PeerLike, TestPacket}; use ethcore::client::TestBlockChainClient; use ethcore::spec::Spec; +use super::super::WarpSync; use io::IoChannel; use kvdb_memorydb; use light::client::fetch::{self, Unavailable}; @@ -110,7 +111,7 @@ impl Peer { // create a new light-client peer to sync to full peers. pub fn new_light(chain: Arc) -> Self { - let sync = Arc::new(LightSync::new(chain.clone()).unwrap()); + let sync = Arc::new(LightSync::new(chain.clone(), WarpSync::Disabled).unwrap()); let params = LightParams { network_id: NETWORK_ID, config: Default::default(), diff --git a/ethcore/sync/src/light_sync/warp_sync.rs b/ethcore/sync/src/light_sync/warp_sync.rs new file mode 100644 index 00000000000..1bd4eec5546 --- /dev/null +++ b/ethcore/sync/src/light_sync/warp_sync.rs @@ -0,0 +1,656 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Warp sync implementation for light client. + +use ethcore::snapshot::SnapshotService; +use ethcore::header::BlockNumber; +use super::WarpSync; +use chain; +use snapshot::{Snapshot, ChunkType}; +use ethereum_types::{H256, U256}; +use io::TimerToken; +use itertools::Itertools; +use network::{self, PeerId, PacketId, NetworkProtocolHandler, NetworkContext}; +use light::net::Punishment; +use std::mem; +use std::sync::Arc; +use std::time::{Instant, Duration}; +use std::collections::HashMap; +use rlp::{self, Rlp, RlpStream}; +use api::WARP_SYNC_PROTOCOL_ID; +use ethcore::snapshot::ManifestData; + +const TICK_TIMEOUT: TimerToken = 0; +const TICK_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5); +const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(15); + +/// Snapshot sync network handler for light client. +pub trait SnapshotSyncHandler: Send + Sync { + /// Action on a new peer is connected. + fn on_connect(&self, event: &SnapshotSyncEvent); + /// Action on a previously connected peer disconnects. + fn on_disconnect(&self, event: &SnapshotSyncEvent); + /// Action on status packet received. + fn on_warp_peer_status(&self, event: &SnapshotSyncEvent, rlp: Rlp) -> Result<(), WarpSyncError>; + /// Action on snapshot manifest received. + fn on_snap_manifest(&self, event: &SnapshotSyncEvent, rlp: Rlp) -> Result<(), WarpSyncError>; + /// Action on snapshot block chunk received. + fn on_snap_data(&self, event: &SnapshotSyncEvent, rlp: Rlp) -> Result<(), WarpSyncError>; + /// On tick handler. + fn on_tick(&self, ctx: &SnapshotSyncContext); +} + +/// Snapshot sync context. +pub trait SnapshotSyncContext { + /// Send a warp sync request to a specific peer. + fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec); + /// Disconnect peer. Reconnect can be attempted later. + fn disconnect_peer(&self, peer: PeerId); + /// Disconnect a peer and prevent it from connecting again. + fn disable_peer(&self, peer: PeerId); + /// Return max version for the warp sync protocol. + fn protocol_version(&self, peer: PeerId) -> Option; + /// Return a reference to a snapshot service. + fn snapshot_service(&self) -> &SnapshotService; + /// Get network id. + fn network_id(&self) -> u64; +} + +/// Snapshot sync event context. +pub trait SnapshotSyncEvent { + /// Return event's peer id. + fn peer(&self) -> PeerId; + /// Treat the event context as a context. + fn as_context(&self) -> &SnapshotSyncContext; +} + +/// Light client warp sync network protocol handler. +pub struct SnapshotSyncLightHandler { + network_id: u64, + sync: Arc, + snapshot_service: Arc, +} + +impl SnapshotSyncLightHandler { + /// Creates a new instance of `SnapshotSyncLightHandler`. + pub fn new(network_id: u64, handler: Arc, service: Arc) -> Self { + Self { + network_id: network_id, + sync: handler, + snapshot_service: service, + } + } +} + +/// `SnapshotSyncContext` implementation. +struct SyncCtx<'a> { + network: &'a NetworkContext, + network_id: u64, + snapshot_service: &'a SnapshotService, +} +/// `SnapshotSyncEvent` implementation. +struct SyncEvent<'a> { + context: SyncCtx<'a>, + peer_id: PeerId, +} + + +impl<'a> SyncCtx<'a> { + /// Creates a new instance of `SyncCtx`. + pub fn new( + network: &'a NetworkContext, + network_id: u64, + snapshot_service: &'a SnapshotService, + ) -> Self { + Self { + network: network, + network_id: network_id, + snapshot_service: snapshot_service, + } + } +} + +impl<'a> SyncEvent<'a> { + /// Creates a new instance of `SyncEvent`. + pub fn new( + network: &'a NetworkContext, + network_id: u64, + snapshot_service: &'a SnapshotService, + peer_id: PeerId, + ) -> Self { + Self { + context: SyncCtx::new(network, network_id, snapshot_service), + peer_id: peer_id, + } + } +} + +impl<'a> SnapshotSyncContext for SyncCtx<'a> { + fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec) { + if let Err(e) = self.network.send_protocol(WARP_SYNC_PROTOCOL_ID, peer, packet_id, packet_body) { + debug!(target: "warp", "Error sending sync packet to peer {}: {}", peer, e); + } + } + + fn disconnect_peer(&self, peer: PeerId) { + trace!(target: "warp", "Initiating disconnect of peer {}", peer); + self.network.disconnect_peer(peer); + } + + fn disable_peer(&self, peer: PeerId) { + trace!(target: "warp", "Initiating disable of peer {}", peer); + self.network.disable_peer(peer); + } + + fn protocol_version(&self, peer: PeerId) -> Option { + self.network.protocol_version(WARP_SYNC_PROTOCOL_ID, peer) + } + + fn snapshot_service(&self) -> &SnapshotService { + self.snapshot_service + } + + fn network_id(&self) -> u64 { + self.network_id + } +} + +impl<'a> SnapshotSyncEvent for SyncEvent<'a> { + fn peer(&self) -> PeerId { + self.peer_id + } + + fn as_context(&self) -> &SnapshotSyncContext { + &self.context + } +} + +fn send_empty_packet(io: &NetworkContext, peer: &PeerId, packet_id: PacketId) -> Result<(), WarpSyncError> { + let packet = RlpStream::new_list(0).out(); + io.send_protocol(WARP_SYNC_PROTOCOL_ID, *peer, packet_id, packet).map_err(|e| e.into()) +} + +impl NetworkProtocolHandler for SnapshotSyncLightHandler { + fn initialize(&self, io: &NetworkContext) { + io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL) + .expect("Error registering sync timer for a light client."); + } + + fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { + let event = SyncEvent::new(io, self.network_id, &*self.snapshot_service, *peer); + + let rlp = Rlp::new(data); + + let result = match packet_id { + chain::STATUS_PACKET => self.sync.on_warp_peer_status(&event, rlp), + chain::SNAPSHOT_MANIFEST_PACKET => self.sync.on_snap_manifest(&event, rlp), + chain::SNAPSHOT_DATA_PACKET => self.sync.on_snap_data(&event, rlp), + chain::GET_SNAPSHOT_MANIFEST_PACKET => send_empty_packet(io, peer, chain::SNAPSHOT_MANIFEST_PACKET), + chain::GET_SNAPSHOT_DATA_PACKET => send_empty_packet(io, peer, chain::SNAPSHOT_DATA_PACKET), + chain::GET_BLOCK_HEADERS_PACKET => send_empty_packet(io, peer, chain::BLOCK_HEADERS_PACKET), + other => { + trace!(target: "warp", "Unrecognized packet {} from peer {}", other, peer); + Err(WarpSyncError::UnrecognizedPacket(other)) + }, + }; + + if let Err(e) = result { + punish(&event, e); + } + } + + fn connected(&self, io: &NetworkContext, peer: &PeerId) { + let protocol = io.protocol_version(WARP_SYNC_PROTOCOL_ID, *peer).unwrap_or(0); + if protocol == 0 { + trace!(target: "warp", "Connected to a peer that doesn't support warp protocol: {}", peer); + return; + } + let event = SyncEvent::new(io, self.network_id, &*self.snapshot_service, *peer); + self.sync.on_connect(&event); + } + + fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { + let event = SyncEvent::new(io, self.network_id, &*self.snapshot_service, *peer); + self.sync.on_disconnect(&event); + } + + fn timeout(&self, io: &NetworkContext, timer: TimerToken) { + assert_eq!(timer, TICK_TIMEOUT, "warp: unexpected timer token {}", timer); + let context = SyncCtx::new(io, self.network_id, &*self.snapshot_service); + self.sync.on_tick(&context); + } +} + +fn punish(event: &SnapshotSyncEvent, e: WarpSyncError) { + match e.punishment() { + Punishment::None => {} + Punishment::Disconnect => { + trace!(target: "warp", "Disconnecting peer {}: {:?}", event.peer(), e); + event.as_context().disconnect_peer(event.peer()) + } + Punishment::Disable => { + trace!(target: "warp", "Disabling peer {}: {:?}", event.peer(), e); + event.as_context().disable_peer(event.peer()) + } + } +} + +/// Snapshot download state machine. +#[derive(Debug)] +pub struct SnapshotManager { + snapshot: Snapshot, + peers: HashMap, + warp_sync: WarpSync, + sync_start_time: Option, +} + +/// Data type requested from a peer with a snapshot. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SnapshotPeerAsking { + /// Asking peer for snapshot block chunk with given hash. + SnapshotData(H256), + /// Asking peer for snapshot manifest. + SnapshotManifest, +} + +/// Peer with snapshot info. +#[derive(Debug, Clone)] +pub struct SnapshotPeer { + asking: Option, + /// Request timestamp. + ask_time: Instant, + /// Best snapshot hash. + snapshot_hash: H256, + /// Best snapshot block number. + snapshot_number: BlockNumber, +} + +impl SnapshotPeer { + /// Create a new instance of `SnapshotPeer` + pub fn new(snapshot_hash: H256, snapshot_number: BlockNumber) -> Self { + Self { + asking: None, + ask_time: Instant::now(), + snapshot_hash: snapshot_hash, + snapshot_number: snapshot_number, + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct GroupedPeers { + pub snapshot_hash: H256, + pub peers: Vec, +} + +impl SnapshotManager { + /// Create a new instance of `SnapshotManager`. + pub fn new(warp_sync: WarpSync) -> Self { + Self { + snapshot: Snapshot::new_light(), + peers: HashMap::new(), + warp_sync: warp_sync, + sync_start_time: None, + } + } + + pub fn on_peer_aborting(&mut self, peer_id: &PeerId) -> Option { + self.clear_peer_download(&peer_id); + self.peers.remove(&peer_id).and_then(|p| p.asking) + } + + pub fn clear_peer_download(&mut self, peer: &PeerId) { + let asking = self.peers.get(peer).and_then(|p| p.asking.clone()); + if let Some(SnapshotPeerAsking::SnapshotData(hash)) = asking { + self.snapshot.clear_chunk_download(&hash); + } + } + + /// Reset peer asking status after request is complete. + /// Returns the old asking status. + pub fn reset_peer_asking(&mut self, peer: &PeerId) -> Option { + self.reset_peer_asking_to(peer, None) + } + + fn reset_peer_asking_to(&mut self, peer: &PeerId, to: Option) -> Option { + self.peers + .get_mut(peer) + .and_then(|p| mem::replace(&mut p.asking, to)) + } + + /// Count the number of peers. + pub fn peers_count(&self) -> usize { + self.peers.len() + } + + /// Get peer by id. + pub fn get(&self, peer: &PeerId) -> Option<&SnapshotPeer> { + self.peers.get(peer) + } + + /// Returns the largest group of peers with the same snapshot hash. + /// If we already have a manifest, prefer peers with that manifest. + /// If there are several largest groups, choose the one with the highest snapshot number. + pub fn best_peer_group(&self, best_block: BlockNumber, best_seen: Option) -> Option { + self.grouped_peers_max_by_key(|p| { + let snapshot_number = p.peers + .first() + .and_then(|id| self.peers.get(id).map(|p| p.snapshot_number)) + .unwrap_or(0); + + let same_snapshot = Some(p.snapshot_hash) == self.snapshot.snapshot_hash(); + (same_snapshot, p.peers.len(), snapshot_number) + }, best_block, best_seen) + } + + /// Returns peers grouped by snapshot hash maximizing the `key`. + fn grouped_peers_max_by_key( + &self, + key: K, + best_block: BlockNumber, + best_seen: Option, + ) -> Option + where + B: Ord, + K: FnMut(&GroupedPeers) -> B + { + let expected_warp_block = match self.warp_sync { + WarpSync::OnlyAndAfter(block) => block, + _ => 0, + }; + + self.peers + .iter() + .filter(|&(_, p)| + p.snapshot_number > expected_warp_block && + best_block < p.snapshot_number && + (p.snapshot_number - best_block) > chain::SNAPSHOT_RESTORE_THRESHOLD && + best_seen.map_or(true, |highest| + highest >= p.snapshot_number && + (highest - p.snapshot_number) <= chain::SNAPSHOT_RESTORE_THRESHOLD) + ) + .map(|(id, p)| (p.snapshot_hash.clone(), id)) + .filter(|&(ref hash, _)| !self.snapshot.is_known_bad(hash)) + .sorted_by_key(|&(hash, _)| hash) + .into_iter() + .group_by(|&(hash, _)| hash) + .into_iter() + .map(|(hash, peers)| GroupedPeers { + snapshot_hash: hash, + peers: peers.map(|(_, id)| *id).collect(), + }) + .max_by_key(key) + } + + pub fn initialize(&mut self, service: &SnapshotService) { + self.snapshot.initialize(service); + } + + pub fn request_manifest(&mut self, ctx: &SnapshotSyncContext, peers: &[PeerId]) -> Option { + if self.snapshot.have_manifest() { + return None; + } + for peer_id in peers { + if let Some(ref mut peer) = self.peers.get_mut(peer_id) { + if peer.asking.is_some() { + continue; + } + peer.asking = Some(SnapshotPeerAsking::SnapshotManifest); + peer.ask_time = Instant::now(); + + trace!(target: "warp", "Requesting a snapshot manifest from {}", peer_id); + let rlp = RlpStream::new_list(0); + + ctx.send(*peer_id, chain::GET_SNAPSHOT_MANIFEST_PACKET, rlp.out()); + return Some(*peer_id); + } + } + None + } + + pub fn request_snapshot_blocks(&mut self, ctx: &SnapshotSyncContext, peers: &[PeerId]) { + use super::MAX_BLOCK_CHUNKS_DOWNLOAD_AHEAD; + + let snapshot_ref = &mut self.snapshot; + let mut needed_chunks = (0..MAX_BLOCK_CHUNKS_DOWNLOAD_AHEAD) + .into_iter() + .filter_map(|_| snapshot_ref.needed_chunk()); + + for peer_id in peers.iter() { + let mut maybe_peer = self.peers.get_mut(peer_id); + let peer = match maybe_peer { + Some(ref mut peer) if peer.asking.is_none() => peer, + _ => continue, + }; + let hash = match needed_chunks.next() { + Some(hash) => hash, + None => return, + }; + peer.asking = Some(SnapshotPeerAsking::SnapshotData(hash)); + peer.ask_time = Instant::now(); + + trace!(target: "warp", "Requesting a snapshot chunk {:?} from peer {}", &hash, peer_id); + let mut rlp = RlpStream::new_list(1); + rlp.append(&hash); + + ctx.send(*peer_id, chain::GET_SNAPSHOT_DATA_PACKET, rlp.out()); + } + } + + pub fn disconnect_slowpokes(&mut self, ctx: &SnapshotSyncContext) { + let now = Instant::now(); + let aborting: Vec = self.peers + .iter() + .filter_map(|(id, peer)| { + let elapsed = now - peer.ask_time; + let timeout = match peer.asking { + None => false, + Some(SnapshotPeerAsking::SnapshotData(_)) => elapsed > chain::SNAPSHOT_DATA_TIMEOUT, + Some(SnapshotPeerAsking::SnapshotManifest) => elapsed > chain::SNAPSHOT_MANIFEST_TIMEOUT, + }; + match timeout { + true => Some(*id), + false => None, + } + }) + .collect(); + for peer_id in &aborting { + trace!(target: "warp", "Timeout {}", peer_id); + ctx.disconnect_peer(*peer_id); + self.on_peer_aborting(peer_id); + } + } + + pub fn timeout(&self) -> bool { + self.sync_start_time.map_or(false, |t| t.elapsed() > WAIT_PEERS_TIMEOUT) + } + + /// Get the warp sync settings. + pub fn warp_sync(&self) -> WarpSync { + self.warp_sync + } + + /// Clear snapshot state. + pub fn clear(&mut self) { + self.snapshot.clear(); + } + + pub fn has_manifest(&self) -> bool { + return self.snapshot.have_manifest() + } + + pub fn is_complete(&self) -> bool { + self.snapshot.is_complete() + } + + pub fn done_chunks(&self) -> usize { + self.snapshot.done_chunks() + } + + /// Validate chunk and mark it as downloaded. + pub fn validate_chunk(&mut self, chunk: &[u8]) -> Result { + self.snapshot.validate_chunk(chunk) + } + + /// Get the snapshot hash. + pub fn snapshot_hash(&self) -> Option { + self.snapshot.snapshot_hash() + } + + /// Note snapshot hash as bad. + pub fn note_bad(&mut self, hash: H256) { + self.snapshot.note_bad(hash) + } + + pub fn reset_manifest_to(&mut self, manifest: &ManifestData, hash: &H256) { + self.snapshot.reset_to(manifest, hash); + } + + pub fn on_peer_status( + &mut self, + event: &SnapshotSyncEvent, + rlp: Rlp, + genesis_hash: H256, + ) -> Result<(), WarpSyncError> { + use chain::{PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3}; + + let supported_versions = [PAR_PROTOCOL_VERSION_1.0, PAR_PROTOCOL_VERSION_2.0, PAR_PROTOCOL_VERSION_3.0]; + let peer_id = event.peer(); + let warp_protocol = event.as_context().protocol_version(event.peer()).unwrap_or(0) != 0; + + if !warp_protocol { + trace!(target: "warp", "Peer doesn't support warp sync {}", peer_id); + return Err(WarpSyncError::NotServer); + } + + if self.peers.contains_key(&peer_id) { + debug!(target: "warp", "Unexpected status packet from a known peer {}", peer_id); + return Ok(()); + } + + let protocol_version: u8 = rlp.val_at(0)?; + let network_id: u64 = rlp.val_at(1)?; + + // make sure the status message is well formed + // even though we're not going to use these fields + let _difficulty: U256 = rlp.val_at(2)?; + let _latest_hash: H256 = rlp.val_at(3)?; + + let genesis: H256 = rlp.val_at(4)?; + let snapshot_hash: H256 = rlp.val_at(5)?; + let snapshot_number: BlockNumber = rlp.val_at(6)?; + + if !supported_versions.contains(&protocol_version) { + trace!(target: "warp", "Peer {} unsupported protocol version ({})", peer_id, protocol_version); + return Err(WarpSyncError::UnsupportedProtocolVersion(protocol_version)); + } + + let expected_network_id = event.as_context().network_id(); + if network_id != expected_network_id || genesis != genesis_hash { + trace!( + target: "warp", + "Wrong network: peer {} (genesis hash, network id): found ({}, {}), expected ({}, {})", + peer_id, + genesis, network_id, + genesis_hash, expected_network_id, + ); + return Err(WarpSyncError::WrongNetwork); + } + + // may be a light client + if snapshot_number == 0 { + trace!(target: "warp", "Received a status message with snapshot number 0 from {}", peer_id); + return Ok(()); + } + + let peer = SnapshotPeer::new(snapshot_hash, snapshot_number); + self.peers.insert(peer_id, peer); + + if self.sync_start_time.is_none() { + self.sync_start_time = Some(Instant::now()); + } + + debug!(target: "warp", "Connected {}", peer_id); + + Ok(()) + } +} + +/// Warp sync download state machine. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum WarpSyncState { + /// Collecting enough peers to start warp sync. + WaitingPeers, + /// Downloading manifest. + Manifest, + /// Downloading block chunks. + Blocks, + /// Waiting for snapshot restoration progress. + WaitingService, +} + +/// Kinds of errors which can be encountered in the course of warp sync. +#[derive(Debug)] +pub enum WarpSyncError { + /// An RLP decoding error. + Rlp(rlp::DecoderError), + /// A network error. + Network(network::Error), + /// Unrecognized packet code. + UnrecognizedPacket(u8), + /// Peer on wrong network (wrong NetworkId or genesis hash) + WrongNetwork, + /// Not a server, + NotServer, + /// Unsupported protocol version. + UnsupportedProtocolVersion(u8), + /// Unsupported manifest version. + UnsupportedManifestVersion(u64), + /// Invalid manifest packet. + BadManifest, + /// Invalid block chunk. + BadBlockChunk, +} + +impl WarpSyncError { + /// What level of punishment does this error warrant? + pub fn punishment(&self) -> Punishment { + match *self { + WarpSyncError::Rlp(_) => Punishment::Disable, + WarpSyncError::Network(_) => Punishment::None, + WarpSyncError::UnrecognizedPacket(_) => Punishment::None, + WarpSyncError::WrongNetwork => Punishment::Disable, + WarpSyncError::NotServer => Punishment::Disconnect, + WarpSyncError::UnsupportedProtocolVersion(_) => Punishment::Disable, + WarpSyncError::UnsupportedManifestVersion(_) => Punishment::Disable, + WarpSyncError::BadManifest => Punishment::Disable, + WarpSyncError::BadBlockChunk => Punishment::Disable, + } + } +} + +impl From for WarpSyncError { + fn from(err: rlp::DecoderError) -> Self { + WarpSyncError::Rlp(err) + } +} + +impl From for WarpSyncError { + fn from(err: network::Error) -> Self { + WarpSyncError::Network(err) + } +} diff --git a/ethcore/sync/src/snapshot.rs b/ethcore/sync/src/snapshot.rs index c7f0d284f39..0740501b395 100644 --- a/ethcore/sync/src/snapshot.rs +++ b/ethcore/sync/src/snapshot.rs @@ -27,6 +27,7 @@ pub enum ChunkType { Block(H256), } +#[derive(Default, Debug)] pub struct Snapshot { pending_state_chunks: Vec, pending_block_chunks: Vec, @@ -35,19 +36,21 @@ pub struct Snapshot { snapshot_hash: Option, bad_hashes: HashSet, initialized: bool, + /// Whether to ignore state chunks. + is_light: bool, } impl Snapshot { /// Create a new instance. pub fn new() -> Snapshot { + Default::default() + } + + /// Create a new light instance. + pub fn new_light() -> Snapshot { Snapshot { - pending_state_chunks: Vec::new(), - pending_block_chunks: Vec::new(), - downloading_chunks: HashSet::new(), - completed_chunks: HashSet::new(), - snapshot_hash: None, - bad_hashes: HashSet::new(), - initialized: false, + is_light: true, + ..Default::default() } } @@ -88,7 +91,9 @@ impl Snapshot { /// Reset collection for a manifest RLP pub fn reset_to(&mut self, manifest: &ManifestData, hash: &H256) { self.clear(); - self.pending_state_chunks = manifest.state_hashes.clone(); + if !self.is_light { + self.pending_state_chunks = manifest.state_hashes.clone(); + } self.pending_block_chunks = manifest.block_hashes.clone(); self.snapshot_hash = Some(hash.clone()); } diff --git a/parity/blockchain.rs b/parity/blockchain.rs index cc92419dabf..d75e7df7dce 100644 --- a/parity/blockchain.rs +++ b/parity/blockchain.rs @@ -180,6 +180,7 @@ fn execute_import_light(cmd: ImportBlockchain) -> Result<(), String> { // prepare client and snapshot paths. let client_path = db_dirs.client_path(algorithm); + let snapshot_path = db_dirs.snapshot_path(); // execute upgrades execute_upgrades(&cmd.dirs.base, &db_dirs, algorithm, &cmd.compaction)?; @@ -202,6 +203,24 @@ fn execute_import_light(cmd: ImportBlockchain) -> Result<(), String> { config.queue.max_mem_use = cmd.cache_config.queue() as usize * 1024 * 1024; config.queue.verifier_settings = cmd.verifier_settings; + // initialize snapshot restoration db handler + let tracing = false; + let fat_db = false; + let client_config = to_client_config( + &cmd.cache_config, + spec.name.to_lowercase(), + Mode::Active, + tracing, + fat_db, + cmd.compaction, + cmd.vm_type, + /*name: */"".into(), + algorithm, + cmd.pruning_history, + cmd.pruning_memory, + cmd.check_seal, + ); + let restoration_db_handler = db::restoration_db_handler(&client_path, &client_config); // initialize database. let db = db::open_db(&client_path.to_str().expect("DB path could not be converted to string."), &cmd.cache_config, @@ -209,11 +228,10 @@ fn execute_import_light(cmd: ImportBlockchain) -> Result<(), String> { // TODO: could epoch signals be avilable at the end of the file? let fetch = ::light::client::fetch::unavailable(); - let service = LightClientService::start(config, &spec, fetch, db, cache) - .map_err(|e| format!("Failed to start client: {}", e))?; - - // free up the spec in memory. - drop(spec); + let service = LightClientService::start( + config, spec, fetch, db, cache, + restoration_db_handler, &snapshot_path, + ).map_err(|e| format!("Failed to start client: {}", e))?; let client = service.client(); diff --git a/parity/export_hardcoded_sync.rs b/parity/export_hardcoded_sync.rs index b3121f08616..995d3f79906 100644 --- a/parity/export_hardcoded_sync.rs +++ b/parity/export_hardcoded_sync.rs @@ -17,13 +17,13 @@ use std::sync::Arc; use std::time::Duration; -use ethcore::client::DatabaseCompactionProfile; +use ethcore::client::{DatabaseCompactionProfile, Mode, VMType}; use ethcore::spec::{SpecParams, OptimizeFor}; use light::client::fetch::Unavailable as UnavailableDataFetcher; use light::Cache as LightDataCache; use params::{SpecType, Pruning}; -use helpers::execute_upgrades; +use helpers::{execute_upgrades, to_client_config}; use dir::Directories; use cache::CacheConfig; use user_defaults::UserDefaults; @@ -85,13 +85,38 @@ pub fn execute(cmd: ExportHsyncCmd) -> Result { config.queue.max_mem_use = cmd.cache_config.queue() as usize * 1024 * 1024; + // prepare client and snapshot paths. + let client_path = db_dirs.client_path(algorithm); + let snapshot_path = db_dirs.snapshot_path(); + + // initialize snapshot restoration db handler + let tracing = false; + let fat_db = false; + let client_config = to_client_config( + &cmd.cache_config, + spec.name.to_lowercase(), + Mode::Active, + tracing, + fat_db, + cmd.compaction, + VMType::Interpreter, + /*name: */"".into(), + /*pruning_algorithm: */algorithm, + /*pruning_history: */0, + /*pruning_memory: */0, + /*check_seal: */true, + ); + let restoration_db_handler = db::restoration_db_handler(&client_path, &client_config); + // initialize database. - let db = db::open_db(&db_dirs.client_path(algorithm).to_str().expect("DB path could not be converted to string."), + let db = db::open_db(&client_path.to_str().expect("DB path could not be converted to string."), &cmd.cache_config, &cmd.compaction).map_err(|e| format!("Failed to open database {:?}", e))?; - let service = light_client::Service::start(config, &spec, UnavailableDataFetcher, db, cache) - .map_err(|e| format!("Error starting light client: {}", e))?; + let service = light_client::Service::start( + config, spec, UnavailableDataFetcher, db, cache, + restoration_db_handler, &snapshot_path, + ).map_err(|e| format!("Error starting light client: {}", e))?; let hs = service.client().read_hardcoded_sync() .map_err(|e| format!("Error reading hardcoded sync: {}", e))?; diff --git a/parity/informant.rs b/parity/informant.rs index d3489a52f3b..80879559390 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -28,8 +28,7 @@ use ethcore::client::{ BlockQueueInfo, ChainNotify, ChainRoute, ClientReport, Client, ClientIoMessage }; use ethcore::header::BlockNumber; -use ethcore::snapshot::{RestorationStatus, SnapshotService as SS}; -use ethcore::snapshot::service::Service as SnapshotService; +use ethcore::snapshot::{RestorationStatus, SnapshotService}; use sync::{LightSyncProvider, LightSync, SyncProvider, ManageNetwork}; use io::{TimerToken, IoContext, IoHandler}; use light::Cache as LightDataCache; @@ -201,7 +200,7 @@ impl InformantData for LightNodeInformantData { last_imported_old_block_number: None, num_peers: peer_numbers.connected, max_peers: peer_numbers.max as u32, - snapshot_sync: false, + snapshot_sync: self.sync.is_snapshot_syncing(), }); Report { diff --git a/parity/run.rs b/parity/run.rs index f9ff9611112..9f11dff4b86 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -214,24 +214,53 @@ fn execute_light_impl(cmd: RunCmd, logger: Arc) -> Result) -> Result sync::WarpSync::OnlyAndAfter(block), + (true, _) => sync::WarpSync::Enabled, + _ => sync::WarpSync::Disabled, + }; + + let light_sync = LightSync::new(sync_params, warp_sync, service.snapshot_service()) + .map_err(|e| format!("Error starting network: {}", e))?; let light_sync = Arc::new(light_sync); *sync_handle.write() = Arc::downgrade(&light_sync); @@ -279,7 +316,7 @@ fn execute_light_impl(cmd: RunCmd, logger: Arc) -> Result) -> Result>, informant: Arc>, client: Arc, + client_service: Arc<::light::client::Service<::light_helpers::EpochFetch>>, keep_alive: Box, }, Full { @@ -856,10 +896,13 @@ impl RunningClient { /// Shuts down the client. pub fn shutdown(self) { match self.inner { - RunningClientInner::Light { rpc, informant, client, keep_alive } => { + RunningClientInner::Light { rpc, informant, client, client_service, keep_alive } => { // Create a weak reference to the client so that we can wait on shutdown // until it is dropped let weak_client = Arc::downgrade(&client); + // Shutdown and drop the ServiceClient + client_service.shutdown(); + drop(client_service); drop(rpc); drop(keep_alive); informant.shutdown(); diff --git a/parity/snapshot.rs b/parity/snapshot.rs index 0611734fd33..26b0315a239 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use hash::keccak; use ethcore::account_provider::AccountProvider; -use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService as SS}; +use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService as SS, FullNodeRestorationParams}; use ethcore::snapshot::io::{SnapshotReader, PackedReader, PackedWriter}; use ethcore::snapshot::service::Service as SnapshotService; use ethcore::client::{Mode, DatabaseCompactionProfile, VMType}; @@ -66,7 +66,11 @@ pub struct SnapshotCommand { // helper for reading chunks from arbitrary reader and feeding them into the // service. -fn restore_using(snapshot: Arc, reader: &R, recover: bool) -> Result<(), String> { +fn restore_using( + snapshot: Arc>, + reader: &R, + recover: bool +) -> Result<(), String> { let manifest = reader.manifest(); info!("Restoring to block #{} (0x{:?})", manifest.block_number, manifest.block_hash); diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index 9b789a56bb2..6de700b4bf7 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -204,7 +204,8 @@ pub fn fetch_gas_price_corpus( let eventual_corpus = sync.with_context(|ctx| { // get some recent headers with gas used, // and request each of the blocks from the network. - let block_requests = client.ancestry_iter(BlockId::Latest) + let ancestry_iter = client.ancestry_iter(); + let block_requests = ancestry_iter.iter(BlockId::Latest) .filter(|hdr| hdr.gas_used() != U256::default()) .take(GAS_PRICE_SAMPLE_SIZE) .map(|hdr| request::Body(hdr.into())) diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index 1da2fdf1ad3..c6a781a4db2 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -331,7 +331,8 @@ impl LightFetch { // find all headers which match the filter, and fetch the receipts for each one. // match them with their numbers for easy sorting later. let bit_combos = filter.bloom_possibilities(); - let receipts_futures: Vec<_> = self.client.ancestry_iter(filter.to_block) + let ancestry_iter = self.client.ancestry_iter(); + let receipts_futures: Vec<_> = ancestry_iter.iter(filter.to_block) .take_while(|ref hdr| BlockId::Number(hdr.number()) != filter.from_block) .take_while(|ref hdr| BlockId::Hash(hdr.hash()) != filter.from_block) .filter(|ref hdr| {