diff --git a/Cargo.lock b/Cargo.lock index 3d8bb0ef15..1d08d6dbaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -779,6 +779,7 @@ dependencies = [ "regex 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.0.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -865,6 +866,7 @@ dependencies = [ "rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.0.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -951,6 +953,7 @@ dependencies = [ "rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.0.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/api/src/handlers.rs b/api/src/handlers.rs index feb65e3438..a26bed9a94 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -33,6 +33,7 @@ use self::pool_api::PoolInfoHandler; use self::pool_api::PoolPushHandler; use self::server_api::IndexHandler; use self::server_api::StatusHandler; +use self::server_api::KernelDownloadHandler; use self::transactions_api::TxHashSetHandler; use crate::auth::{BasicAuthMiddleware, GRIN_BASIC_REALM}; use crate::chain; @@ -135,6 +136,9 @@ pub fn build_router( chain: Arc::downgrade(&chain), peers: Arc::downgrade(&peers), }; + let kernel_download_handler = KernelDownloadHandler { + peers: Arc::downgrade(&peers), + }; let txhashset_handler = TxHashSetHandler { chain: Arc::downgrade(&chain), }; @@ -165,6 +169,7 @@ pub fn build_router( router.add_route("/v1/chain/validate", Arc::new(chain_validation_handler))?; router.add_route("/v1/txhashset/*", Arc::new(txhashset_handler))?; router.add_route("/v1/status", Arc::new(status_handler))?; + router.add_route("/v1/kerneldownload", Arc::new(kernel_download_handler))?; router.add_route("/v1/pool", Arc::new(pool_info_handler))?; router.add_route("/v1/pool/push", Arc::new(pool_push_handler))?; router.add_route("/v1/peers/all", Arc::new(peers_all_handler))?; diff --git a/api/src/handlers/server_api.rs b/api/src/handlers/server_api.rs index d5092b0547..3d4f21b03c 100644 --- a/api/src/handlers/server_api.rs +++ b/api/src/handlers/server_api.rs @@ -19,7 +19,7 @@ use crate::rest::*; use crate::router::{Handler, ResponseFuture}; use crate::types::*; use crate::web::*; -use hyper::{Body, Request}; +use hyper::{Body, Request, StatusCode}; use std::sync::Weak; // RESTful index of available api endpoints @@ -36,6 +36,43 @@ impl Handler for IndexHandler { } } +pub struct KernelDownloadHandler { + pub peers: Weak, +} + +impl Handler for KernelDownloadHandler { + fn post(&self, _req: Request) -> ResponseFuture { + if let Some(peer) = w_fut!(&self.peers).most_work_peer() { + match peer.send_kernel_data_request() { + Ok(_) => response(StatusCode::OK, "{}"), + Err(e) => response( + StatusCode::INTERNAL_SERVER_ERROR, + format!("requesting kernel data from peer failed: {:?}", e), + ), + } + } else { + response( + StatusCode::INTERNAL_SERVER_ERROR, + format!("requesting kernel data from peer failed (no peers)"), + ) + } + + // w_fut!(&self.peers).most_work_peer().and_then(|peer| { + // peer.send_kernel_data_request().and_then(|_| { + // response(StatusCode::OK, "{}") + // }) + // })?; + + // match w_fut!(&self.chain).xxx() { + // Ok(_) => response(StatusCode::OK, "{}"), + // Err(e) => response( + // StatusCode::INTERNAL_SERVER_ERROR, + // format!("requesting kernel download failed: {}", e), + // ), + // } + } +} + /// Status handler. Post a summary of the server status /// GET /v1/status pub struct StatusHandler { diff --git a/chain/Cargo.toml b/chain/Cargo.toml index f36bfe2ab5..59234b63ae 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -19,6 +19,7 @@ croaring = "0.3" log = "0.4" serde = "1" serde_derive = "1" +tempfile = "3.0.5" chrono = "0.4.4" lru-cache = "0.1" lazy_static = "1" diff --git a/chain/src/chain.rs b/chain/src/chain.rs index e750f5793d..40785e522b 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -17,31 +17,38 @@ use crate::core::core::hash::{Hash, Hashed, ZERO_HASH}; use crate::core::core::merkle_proof::MerkleProof; +use crate::core::core::pmmr::PMMR; use crate::core::core::verifier_cache::VerifierCache; use crate::core::core::{ - Block, BlockHeader, BlockSums, Committed, Output, OutputIdentifier, Transaction, TxKernelEntry, + Block, BlockHeader, BlockSums, Committed, Output, OutputIdentifier, Transaction, TxKernel, + TxKernelEntry, }; use crate::core::global; use crate::core::pow; +use crate::core::ser::{Readable, StreamingReader}; use crate::error::{Error, ErrorKind}; use crate::lmdb; use crate::pipe; use crate::store; use crate::txhashset; +use crate::txhashset::RebuildableKernelView; use crate::txhashset::TxHashSet; use crate::types::{ BlockStatus, ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus, }; use crate::util::secp::pedersen::{Commitment, RangeProof}; use crate::util::{Mutex, RwLock, StopState}; +use grin_store::pmmr::{PMMRBackend, PMMR_FILES}; use grin_store::Error::NotFoundErr; use std::collections::HashMap; use std::env; use std::fs::File; +use std::io::Read; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; +use tempfile; /// Orphan pool size is limited by MAX_ORPHAN_SIZE pub const MAX_ORPHAN_SIZE: usize = 200; @@ -672,6 +679,11 @@ impl Chain { self.txhashset.read().roots() } + pub fn kernel_data_read(&self) -> Result { + let txhashset = self.txhashset.read(); + txhashset::rewindable_kernel_view(&txhashset, |view| view.kernel_data_read()) + } + /// Provides a reading view into the current txhashset state as well as /// the required indexes for a consumer to rewind to a consistent state /// at the provided block hash. @@ -710,7 +722,40 @@ impl Chain { header: &BlockHeader, txhashset: &txhashset::TxHashSet, ) -> Result<(), Error> { - debug!("validate_kernel_history: rewinding and validating kernel history (readonly)"); + debug!("validate_kernel_history: about to validate kernels via temporary view"); + + let mut kernel_data = + txhashset::rewindable_kernel_view(&txhashset, |view| view.kernel_data_read())?; + + { + let tempdir = tempfile::tempdir()?; + let mut backend: PMMRBackend = PMMRBackend::new(tempdir.path(), false, None)?; + let mut kernel_view = RebuildableKernelView::new(&mut backend); + + // Rebuilding the kernel view will verify the following - + // * all kernel signatures + // * kernel MMR root matches for each historical header. + kernel_view.rebuild(&mut kernel_data, txhashset, header)?; + } + + Ok(()) + } + + // For completeness we also (re)validate the kernel history by rewinding the kernel MMR + // in the txhashset. + // The call to validate_kernel_history above rebuilds the kernel MMR but we do not yet + // use this rebuilt kernel MMR (we simply discard it after validation). + // This is only a temprorary solution and can be simplified when we split kernels sync + // and txhashset sync. + // This prevents a malicious peer sending a kernel MMR where the data file and hash file + // are inconsistent. Once the kernel sync is fully implemented we will only receive a data + // file and this will no longer be required. + fn validate_kernel_history_via_rewind( + &self, + header: &BlockHeader, + txhashset: &txhashset::TxHashSet, + ) -> Result<(), Error> { + debug!("validate_kernel_history_via_rewind: about to validate kernels via rewind"); let mut count = 0; let mut current = header.clone(); @@ -857,6 +902,26 @@ impl Chain { txhashset::clean_header_folder(&sandbox_dir); } + pub fn kernel_data_write(&self, reader: &mut Read) -> Result<(), Error> { + let txhashset = self.txhashset.read(); + let head_header = self.head_header()?; + + let tempdir = tempfile::tempdir()?; + let mut backend: PMMRBackend = PMMRBackend::new(tempdir.path(), false, None)?; + let mut kernel_view = RebuildableKernelView::new(&mut backend); + + // Rebuilding the kernel view will verify the following - + // * all kernel signatures + // * kernel MMR root matches for each historical header + { + let txhashset = self.txhashset.read(); + kernel_view.rebuild(reader, &txhashset, &head_header)?; + } + + // Backend storage will be deleted once the tempdir goes out of scope. + Ok(()) + } + /// Writes a reading view on a txhashset state that's been provided to us. /// If we're willing to accept that new state, the data stream will be /// read as a zip file, unzipped and the resulting state files should be @@ -897,11 +962,19 @@ impl Chain { // We must rebuild the header MMR ourselves based on the headers in our db. self.rebuild_header_mmr(&Tip::from_header(&header), &mut txhashset)?; - // Validate the full kernel history (kernel MMR root for every block header). - self.validate_kernel_history(&header, &txhashset)?; - - // all good, prepare a new batch and update all the required records - debug!("txhashset_write: rewinding a 2nd time (writeable)"); + // Validate the full kernel history - + // * all kernel signatures + // * kernel MMR root at each block height + { + // Validate kernel MMR roots and all kernel signatures by rebuilding the kernel + // MMR from the kernel data file. + self.validate_kernel_history(&header, &txhashset)?; + + // (Re)validate kernel MMR roots here against the kernel MMR in the txhashset. + // Ensures a malicious peer does not send us inconsistent data and hash files. + // This will not be necessary once kernel sync is a separate payload. + self.validate_kernel_history_via_rewind(&header, &txhashset)?; + } let mut batch = self.store.batch()?; diff --git a/chain/src/error.rs b/chain/src/error.rs index 4ba115269e..963d94b218 100644 --- a/chain/src/error.rs +++ b/chain/src/error.rs @@ -256,6 +256,14 @@ impl From for Error { } } +impl From for Error { + fn from(error: ser::Error) -> Error { + Error { + inner: Context::new(ErrorKind::SerErr(error)), + } + } +} + impl From for Error { fn from(e: secp::Error) -> Error { Error { diff --git a/chain/src/txhashset.rs b/chain/src/txhashset.rs index a4f66e8edd..d0cd1aca96 100644 --- a/chain/src/txhashset.rs +++ b/chain/src/txhashset.rs @@ -15,10 +15,12 @@ //! Utility structs to handle the 3 hashtrees (output, range proof, //! kernel) more conveniently and transactionally. +mod rebuildable_kernel_view; mod rewindable_kernel_view; mod txhashset; mod utxo_view; +pub use self::rebuildable_kernel_view::*; pub use self::rewindable_kernel_view::*; pub use self::txhashset::*; pub use self::utxo_view::*; diff --git a/chain/src/txhashset/rebuildable_kernel_view.rs b/chain/src/txhashset/rebuildable_kernel_view.rs new file mode 100644 index 0000000000..133727df01 --- /dev/null +++ b/chain/src/txhashset/rebuildable_kernel_view.rs @@ -0,0 +1,137 @@ +// Copyright 2018 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Lightweight rebuildable view of the kernel MMR. +//! Used when receiving a "kernel data" file from a peer to +//! (re)build the kernel MMR locally. + +use std::fs::File; +use std::io; +use std::io::{BufReader, Read}; +use std::time::Duration; + +use croaring::Bitmap; +use tempfile; +use tempfile::TempDir; + +use crate::core::core::pmmr::{self, PMMR}; +use crate::core::core::{BlockHeader, TxKernel, TxKernelEntry}; +use crate::core::ser::{Readable, StreamingReader}; +use crate::error::{Error, ErrorKind}; +use crate::store::Batch; +use crate::txhashset::txhashset::{PMMRHandle, TxHashSet}; +use grin_store::pmmr::PMMRBackend; + +/// Rebuildable kernel view backed by a tempdir. +pub struct RebuildableKernelView<'a> { + pmmr: PMMR<'a, TxKernel, PMMRBackend>, +} + +impl<'a> RebuildableKernelView<'a> { + pub fn new(backend: &'a mut PMMRBackend) -> RebuildableKernelView<'a> { + RebuildableKernelView { + pmmr: PMMR::at(backend, 0), + } + } + + pub fn truncate(&mut self) -> Result<(), Error> { + debug!("Truncating temp kernel view."); + self.pmmr + .rewind(0, &Bitmap::create()) + .map_err(&ErrorKind::TxHashSetErr)?; + Ok(()) + } + + pub fn rebuild( + &mut self, + data: &mut Read, + txhashset: &TxHashSet, + header: &BlockHeader, + ) -> Result<(), Error> { + // Rebuild is all-or-nothing. Truncate everything before we begin. + self.truncate()?; + + let mut stream = StreamingReader::new(data, Duration::from_secs(1)); + + let mut current_pos = 0; + let mut current_header = txhashset.get_header_by_height(0)?; + + loop { + while current_pos < current_header.kernel_mmr_size { + // Read and verify the next kernel from the stream of data. + let kernel = TxKernelEntry::read(&mut stream)?; + kernel.kernel.verify()?; + + // Apply it to the MMR and keep track of last_pos. + let (_, last_pos) = self.apply_kernel(&kernel.kernel)?; + current_pos = last_pos; + } + + // Verify the kernel MMR root is correct for current header. + let root = self.pmmr.root(); + if root != current_header.kernel_root { + return Err(ErrorKind::InvalidTxHashSet(format!( + "Kernel root at {} does not match", + current_header.height + )) + .into()); + } + + // Periodically sync the PMMR backend as we rebuild it. + if current_header.height % 1000 == 0 { + self.pmmr + .sync() + .map_err(|_| ErrorKind::TxHashSetErr("failed to sync pmmr".into()))?; + debug!( + "Rebuilt kernel MMR to height: {}, kernels: {} (MMR size: {}) ...", + current_header.height, + pmmr::n_leaves(self.pmmr.last_pos), + self.pmmr.last_pos, + ); + } + + // Done if we have reached the specified header. + if current_header == *header { + break; + } else if current_header.height >= header.height { + return Err(ErrorKind::InvalidTxHashSet(format!( + "Header mismatch when rebuilding kernel MMR.", + )) + .into()); + } else { + current_header = txhashset.get_header_by_height(current_header.height + 1)?; + } + } + + // One final sync to ensure everything is saved (to the tempdir). + self.pmmr + .sync() + .map_err(|_| ErrorKind::TxHashSetErr("failed to sync pmmr".into()))?; + debug!( + "Rebuilt kernel MMR to height: {}, kernels: {} (MMR size: {}) DONE", + current_header.height, + pmmr::n_leaves(self.pmmr.last_pos), + self.pmmr.last_pos, + ); + + Ok(()) + } + + /// Push kernel onto MMR (hash, data and size files). + /// Returns the pos of the element applies and "last_pos" including all new parents. + pub fn apply_kernel(&mut self, kernel: &TxKernel) -> Result<(u64, u64), Error> { + let pos = self.pmmr.push(kernel).map_err(&ErrorKind::TxHashSetErr)?; + Ok(pos) + } +} diff --git a/chain/src/txhashset/rewindable_kernel_view.rs b/chain/src/txhashset/rewindable_kernel_view.rs index a1be9fc669..d2ba7595c5 100644 --- a/chain/src/txhashset/rewindable_kernel_view.rs +++ b/chain/src/txhashset/rewindable_kernel_view.rs @@ -14,6 +14,9 @@ //! Lightweight readonly view into kernel MMR for convenience. +use std::fs::File; +use std::io; + use crate::core::core::pmmr::RewindablePMMR; use crate::core::core::{BlockHeader, TxKernel}; use crate::error::{Error, ErrorKind}; @@ -78,4 +81,14 @@ impl<'a> RewindableKernelView<'a> { } Ok(()) } + + /// Read the "raw" kernel backend data file (via temp file for consistent view on data). + pub fn kernel_data_read(&self) -> Result { + let file = self + .pmmr + .backend() + .temp_data_file() + .map_err(|_| ErrorKind::FileReadErr("Data file woes".into()))?; + Ok(file) + } } diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index fc66479080..1a13e88f73 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -51,7 +51,7 @@ const KERNEL_SUBDIR: &'static str = "kernel"; const TXHASHSET_ZIP: &'static str = "txhashset_snapshot"; -struct PMMRHandle { +pub struct PMMRHandle { backend: PMMRBackend, last_pos: u64, } @@ -924,13 +924,13 @@ impl<'a> Extension<'a> { } } // push the new output to the MMR. - let output_pos = self + let (output_pos, _) = self .output_pmmr .push(out) .map_err(&ErrorKind::TxHashSetErr)?; // push the rangeproof to the MMR. - let rproof_pos = self + let (rproof_pos, _) = self .rproof_pmmr .push(&out.proof) .map_err(&ErrorKind::TxHashSetErr)?; @@ -953,7 +953,7 @@ impl<'a> Extension<'a> { } /// Push kernel onto MMR (hash and data files). - fn apply_kernel(&mut self, kernel: &TxKernel) -> Result<(), Error> { + pub fn apply_kernel(&mut self, kernel: &TxKernel) -> Result<(), Error> { self.kernel_pmmr .push(kernel) .map_err(&ErrorKind::TxHashSetErr)?; @@ -1220,7 +1220,8 @@ impl<'a> Extension<'a> { } /// Validate the txhashset state against the provided block header. - /// A "fast validation" will skip rangeproof verification and kernel signature verification. + /// A "fast validation" will skip rangeproof verification. + /// Note: We have already verified the full set of kernels at this point. pub fn validate( &self, fast_validation: bool, @@ -1239,13 +1240,9 @@ impl<'a> Extension<'a> { // sum of unspent outputs minus total supply. let (output_sum, kernel_sum) = self.validate_kernel_sums()?; - // These are expensive verification step (skipped for "fast validation"). if !fast_validation { // Verify the rangeproof associated with each unspent output. self.verify_rangeproofs(status)?; - - // Verify all the kernel signatures. - self.verify_kernel_signatures(status)?; } Ok((output_sum, kernel_sum)) @@ -1313,43 +1310,6 @@ impl<'a> Extension<'a> { ) } - fn verify_kernel_signatures(&self, status: &dyn TxHashsetWriteStatus) -> Result<(), Error> { - let now = Instant::now(); - - let mut kern_count = 0; - let total_kernels = pmmr::n_leaves(self.kernel_pmmr.unpruned_size()); - for n in 1..self.kernel_pmmr.unpruned_size() + 1 { - if pmmr::is_leaf(n) { - let kernel = self - .kernel_pmmr - .get_data(n) - .ok_or::(ErrorKind::TxKernelNotFound.into())?; - - kernel.verify()?; - kern_count += 1; - - if kern_count % 20 == 0 { - status.on_validation(kern_count, total_kernels, 0, 0); - } - if kern_count % 1_000 == 0 { - debug!( - "txhashset: verify_kernel_signatures: verified {} signatures", - kern_count, - ); - } - } - } - - debug!( - "txhashset: verified {} kernel signatures, pmmr size {}, took {}s", - kern_count, - self.kernel_pmmr.unpruned_size(), - now.elapsed().as_secs(), - ); - - Ok(()) - } - fn verify_rangeproofs(&self, status: &dyn TxHashsetWriteStatus) -> Result<(), Error> { let now = Instant::now(); diff --git a/core/src/core/pmmr/backend.rs b/core/src/core/pmmr/backend.rs index c92dd7f317..511050cb6e 100644 --- a/core/src/core/pmmr/backend.rs +++ b/core/src/core/pmmr/backend.rs @@ -17,6 +17,9 @@ use croaring::Bitmap; use crate::core::hash::Hash; use crate::core::BlockHeader; use crate::ser::PMMRable; +use std::fs::File; +use std::io; +use std::io::Read; use std::path::Path; /// Storage backend for the MMR, just needs to be indexed by order of insertion. @@ -54,20 +57,19 @@ pub trait Backend { /// Iterator over current (unpruned, unremoved) leaf positions. fn leaf_pos_iter(&self) -> Box + '_>; + fn temp_data_file(&self) -> Result; + /// Remove Hash by insertion position. An index is also provided so the /// underlying backend can implement some rollback of positions up to a /// given index (practically the index is the height of a block that /// triggered removal). fn remove(&mut self, position: u64) -> Result<(), String>; - /// Returns the data file path.. this is a bit of a hack now that doesn't - /// sit well with the design, but TxKernels have to be summed and the - /// fastest way to to be able to allow direct access to the file - fn get_data_file_path(&self) -> &Path; - /// Release underlying datafiles and locks fn release_files(&mut self); + fn sync(&mut self) -> io::Result<()>; + /// Also a bit of a hack... /// Saves a snapshot of the rewound utxo file with the block hash as /// filename suffix. We need this when sending a txhashset zip file to a diff --git a/core/src/core/pmmr/pmmr.rs b/core/src/core/pmmr/pmmr.rs index 7367f0eef2..f6d45539ac 100644 --- a/core/src/core/pmmr/pmmr.rs +++ b/core/src/core/pmmr/pmmr.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io; use std::marker; use std::u64; @@ -178,7 +179,8 @@ where /// Push a new element into the MMR. Computes new related peaks at /// the same time if applicable. - pub fn push(&mut self, elmt: &T) -> Result { + /// Returns the pos of the new element and the "last_pos" (includes all parent pos). + pub fn push(&mut self, elmt: &T) -> Result<(u64, u64), String> { let elmt_pos = self.last_pos + 1; let mut current_hash = elmt.hash_with_index(elmt_pos - 1); @@ -206,7 +208,7 @@ where // append all the new nodes and update the MMR index self.backend.append(elmt, hashes)?; self.last_pos = pos; - Ok(elmt_pos) + Ok((elmt_pos, pos)) } /// Saves a snapshot of the MMR tagged with the block hash. @@ -217,6 +219,11 @@ where Ok(()) } + pub fn sync(&mut self) -> io::Result<()> { + self.backend.sync()?; + Ok(()) + } + /// Rewind the PMMR to a previous position, as if all push operations after /// that had been canceled. Expects a position in the PMMR to rewind and /// bitmaps representing the positions added and removed that we want to @@ -328,11 +335,6 @@ where self.last_pos } - /// Return the path of the data file (needed to sum kernels efficiently) - pub fn data_file_path(&self) -> &Path { - self.backend.get_data_file_path() - } - /// Debugging utility to print information about the MMRs. Short version /// only prints the last 8 nodes. pub fn dump(&self, short: bool) { diff --git a/core/src/core/pmmr/rewindable_pmmr.rs b/core/src/core/pmmr/rewindable_pmmr.rs index e696d67afe..ef783463e0 100644 --- a/core/src/core/pmmr/rewindable_pmmr.rs +++ b/core/src/core/pmmr/rewindable_pmmr.rs @@ -49,6 +49,10 @@ where } } + pub fn backend(&'a self) -> &Backend { + self.backend + } + /// Build a new readonly PMMR pre-initialized to /// last_pos with the provided backend. pub fn at(backend: &'a B, last_pos: u64) -> RewindablePMMR<'_, T, B> { diff --git a/core/tests/vec_backend.rs b/core/tests/vec_backend.rs index 73f5e3a7e4..cb894ce676 100644 --- a/core/tests/vec_backend.rs +++ b/core/tests/vec_backend.rs @@ -124,10 +124,6 @@ impl Backend for VecBackend { Ok(()) } - fn get_data_file_path(&self) -> &Path { - Path::new("") - } - fn release_files(&mut self) {} fn dump_stats(&self) {} diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index a0f3793e79..2216a36b82 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -19,6 +19,7 @@ num = "0.1" rand = "0.5" serde = "1" serde_derive = "1" +tempfile = "3.0.5" log = "0.4" chrono = { version = "0.4.4", features = ["serde"] } diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 06a8a206ab..f3798c2a84 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -21,7 +21,7 @@ //! stream and make sure we get the right number of bytes out. use std::fs::File; -use std::io::{self, Read, Write}; +use std::io::{self, BufReader, BufWriter, Read, Write}; use std::net::{Shutdown, TcpStream}; use std::sync::{mpsc, Arc}; use std::{cmp, thread, time}; @@ -132,21 +132,18 @@ impl<'a> Response<'a> { let mut sent_bytes = sent_bytes.write(); sent_bytes.inc(msg.len() as u64); } - if let Some(mut file) = self.attachment { - let mut buf = [0u8; 8000]; - loop { - match file.read(&mut buf[..]) { - Ok(0) => break, - Ok(n) => { - write_all(&mut self.stream, &buf[..n], time::Duration::from_secs(10))?; - // Increase sent bytes "quietly" without incrementing the counter. - // (In a loop here for the single attachment). - let mut sent_bytes = sent_bytes.write(); - sent_bytes.inc_quiet(n as u64); - } - Err(e) => return Err(From::from(e)), - } - } + if let Some(file) = self.attachment { + error!( + "***** writing attachment (reading from file): file size: {}", + file.metadata().unwrap().len() + ); + let mut reader = BufReader::new(file); + let mut writer = BufWriter::new(self.stream); + + let bytes_written = io::copy(&mut reader, &mut writer)?; + sent_bytes.write().inc_quiet(bytes_written as u64); + + error!("***** wrote some bytes: {}", bytes_written); } Ok(()) } diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 16fb81c28d..a977ef740c 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -66,6 +66,8 @@ enum_from_primitive! { BanReason = 18, GetTransaction = 19, TransactionKernel = 20, + KernelDataRequest = 21, + KernelDataResponse = 22, } } @@ -98,6 +100,8 @@ fn max_msg_size(msg_type: Type) -> u64 { Type::BanReason => 64, Type::GetTransaction => 32, Type::TransactionKernel => 32, + Type::KernelDataRequest => 0, + Type::KernelDataResponse => 8, } } @@ -612,3 +616,42 @@ impl Readable for TxHashSetArchive { }) } } + +pub struct KernelDataRequest {} + +impl Writeable for KernelDataRequest { + fn write(&self, _writer: &mut W) -> Result<(), ser::Error> { + Ok(()) + } +} + +pub struct KernelDataResponse { + /// Support for multiple versions of data serialization. + /// 0 - the original "fixed size" kernel serialization format (lock_height and fee required) + /// 1 - "variable size" kernel serialization + /// * lock_height only included in HeightLocked kernels + /// * fee omitted for coinbase kernels + /// + /// Note: Both versions will produce identical hash values (and corresponding MMR root). + /// Only the "data" serialization differs. + /// + pub version: u8, + /// Size in bytes of the kernel data file. + pub bytes: u64, +} + +impl Writeable for KernelDataResponse { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + writer.write_u8(self.version)?; + writer.write_u64(self.bytes)?; + Ok(()) + } +} + +impl Readable for KernelDataResponse { + fn read(reader: &mut dyn Reader) -> Result { + let version = reader.read_u8()?; + let bytes = reader.read_u64()?; + Ok(KernelDataResponse { version, bytes }) + } +} diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index ce6daec543..ee65e3d322 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -14,6 +14,7 @@ use crate::util::{Mutex, RwLock}; use std::fs::File; +use std::io::Read; use std::net::{Shutdown, TcpStream}; use std::sync::Arc; @@ -23,7 +24,9 @@ use crate::core::core::hash::{Hash, Hashed}; use crate::core::pow::Difficulty; use crate::core::{core, global}; use crate::handshake::Handshake; -use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest}; +use crate::msg::{ + self, BanReason, GetPeerAddrs, KernelDataRequest, Locator, Ping, TxHashSetRequest, +}; use crate::protocol::Protocol; use crate::types::{ Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, @@ -399,6 +402,11 @@ impl Peer { ) } + pub fn send_kernel_data_request(&self) -> Result<(), Error> { + debug!("Asking {} for kernel data.", self.info.addr); + connection!(self).send(&KernelDataRequest {}, msg::Type::KernelDataRequest) + } + /// Stops the peer, closing its connection pub fn stop(&self) { if let Some(conn) = self.connection.as_ref() { @@ -587,6 +595,14 @@ impl ChainAdapter for TrackingAdapter { self.adapter.get_block(h) } + fn kernel_data_read(&self) -> Option { + self.adapter.kernel_data_read() + } + + fn kernel_data_write(&self, reader: &mut Read) -> Result { + self.adapter.kernel_data_write(reader) + } + fn txhashset_read(&self, h: Hash) -> Option { self.adapter.txhashset_read(h) } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 16ac4b5eda..be2e9e7938 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -15,6 +15,7 @@ use crate::util::RwLock; use std::collections::HashMap; use std::fs::File; +use std::io::Read; use std::sync::Arc; use rand::{thread_rng, Rng}; @@ -644,6 +645,14 @@ impl ChainAdapter for Peers { self.adapter.get_block(h) } + fn kernel_data_read(&self) -> Option { + self.adapter.kernel_data_read() + } + + fn kernel_data_write(&self, reader: &mut Read) -> Result { + self.adapter.kernel_data_write(reader) + } + fn txhashset_read(&self, h: Hash) -> Option { self.adapter.txhashset_read(h) } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index b98df4240d..9d4099002a 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -15,17 +15,18 @@ use std::cmp; use std::env; use std::fs::File; -use std::io::{BufWriter, Write}; +use std::io::{BufWriter, Seek, SeekFrom, Write}; use std::sync::Arc; use crate::conn::{Message, MessageHandler, Response}; use crate::core::core::{self, hash::Hash, CompactBlock}; use crate::util::{RateCounter, RwLock}; use chrono::prelude::Utc; +use tempfile::tempfile; use crate::msg::{ - BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, TxHashSetArchive, - TxHashSetRequest, Type, + BanReason, GetPeerAddrs, Headers, KernelDataResponse, Locator, PeerAddrs, Ping, Pong, + TxHashSetArchive, TxHashSetRequest, Type, }; use crate::types::{Error, NetAdapter, PeerAddr}; @@ -344,7 +345,66 @@ impl MessageHandler for Protocol { Ok(None) } + Type::KernelDataRequest => { + debug!("handle_payload: kernel_data_request"); + let kernel_data = self.adapter.kernel_data_read(); + + if let Some(kernel_data) = kernel_data { + // We can migrate to version = 1 "variable size" kernels once this is widely supported. + + let version = 0; // original "fixed size" kernel serialization + let bytes = kernel_data.metadata()?.len(); + let kernel_data_response = KernelDataResponse { version, bytes }; + let mut response = + Response::new(Type::KernelDataResponse, &kernel_data_response, writer)?; + response.add_attachment(kernel_data); + Ok(Some(response)) + } else { + Ok(None) + } + } + Type::KernelDataResponse => { + let response: KernelDataResponse = msg.body()?; + debug!( + "handle_payload: kernel_data_response: version: {}, bytes: {}", + response.version, response.bytes + ); + + let mut writer = BufWriter::new(tempfile()?); + + let total_size = response.bytes as usize; + let mut remaining_size = total_size; + + while remaining_size > 0 { + debug!("in loop, remaining: {}", remaining_size); + + let size = msg.copy_attachment(remaining_size, &mut writer)?; + debug!("in loop, size here: {}", size); + + remaining_size = remaining_size.saturating_sub(size); + debug!("in loop, remaining now: {}", remaining_size); + + // Increase received bytes quietly (without affecting the counters). + // Otherwise we risk banning a peer as "abusive". + received_bytes.write().inc_quiet(size as u64); + } + + // Remember to seek back to start of the file as the caller is likely + // to read this file directly without reopening it. + writer.seek(SeekFrom::Start(0))?; + + let mut file = writer.into_inner().map_err(|_| Error::Internal)?; + + debug!( + "handle_payload: kernel_data_response: file size: {}", + file.metadata().unwrap().len() + ); + + self.adapter.kernel_data_write(&mut file)?; + + Ok(None) + } _ => { debug!("unknown message type {:?}", msg.header.msg_type); Ok(None) diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index e7038493a0..54642c6cb1 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -28,6 +28,7 @@ use crate::types::{ use crate::util::{Mutex, StopState}; use chrono::prelude::{DateTime, Utc}; use std::fs::File; +use std::io::Read; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; use std::sync::Arc; use std::time::Duration; @@ -276,6 +277,12 @@ impl ChainAdapter for DummyAdapter { fn get_block(&self, _: Hash) -> Option { None } + fn kernel_data_read(&self) -> Option { + unimplemented!() + } + fn kernel_data_write(&self, _reader: &mut Read) -> Result { + unimplemented!() + } fn txhashset_read(&self, _h: Hash) -> Option { unimplemented!() } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 5bf2b8e4e8..e99037c844 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -16,6 +16,7 @@ use crate::util::RwLock; use std::convert::From; use std::fs::File; use std::io; +use std::io::Read; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::sync::mpsc; @@ -521,6 +522,10 @@ pub trait ChainAdapter: Sync + Send { /// Gets a full block by its hash. fn get_block(&self, h: Hash) -> Option; + fn kernel_data_read(&self) -> Option; + + fn kernel_data_write(&self, reader: &mut Read) -> Result; + /// Provides a reading view into the current txhashset state as well as /// the required indexes for a consumer to rewind to a consistant state /// at the provided block hash. diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 9aeda53346..7179de946b 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -17,6 +17,7 @@ use crate::util::RwLock; use std::fs::File; +use std::io::Read; use std::sync::{Arc, Weak}; use std::thread; use std::time::Instant; @@ -315,6 +316,22 @@ impl p2p::ChainAdapter for NetToChainAdapter { } } + fn kernel_data_read(&self) -> Option { + match self.chain().kernel_data_read() { + Ok(file) => Some(file), + Err(e) => { + warn!("Couldn't produce kernel data file: {:?}", e); + None + } + } + } + + fn kernel_data_write(&self, reader: &mut Read) -> Result { + let res = self.chain().kernel_data_write(reader)?; + error!("***** kernel_data_write: {:?}", res); + Ok(true) + } + /// Provides a reading view into the current txhashset state as well as /// the required indexes for a consumer to rewind to a consistent state /// at the provided block hash. diff --git a/store/Cargo.toml b/store/Cargo.toml index a3750243e4..fce64ec909 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -18,6 +18,7 @@ failure = "0.1" failure_derive = "0.1" lmdb-zero = "0.4.4" memmap = "0.7" +tempfile = "3.0.5" serde = "1" serde_derive = "1" log = "0.4" diff --git a/store/src/pmmr.rs b/store/src/pmmr.rs index 8d4c3caffd..c967566223 100644 --- a/store/src/pmmr.rs +++ b/store/src/pmmr.rs @@ -13,6 +13,7 @@ //! Implementation of the persistent Backend for the prunable MMR tree. +use std::fs::File; use std::{fs, io, time}; use crate::core::core::hash::{Hash, Hashed}; @@ -132,6 +133,12 @@ impl Backend for PMMRBackend { } } + fn temp_data_file(&self) -> Result { + self.data_file + .as_temp_file() + .map_err(|_| format!("Failed to build temp data file")) + } + /// Rewind the PMMR backend to the given position. fn rewind(&mut self, position: u64, rewind_rm_pos: &Bitmap) -> Result<(), String> { // First rewind the leaf_set with the necessary added and removed positions. @@ -158,11 +165,6 @@ impl Backend for PMMRBackend { Ok(()) } - /// Return data file path - fn get_data_file_path(&self) -> &Path { - self.data_file.path() - } - /// Release underlying data files fn release_files(&mut self) { self.data_file.release(); @@ -176,6 +178,21 @@ impl Backend for PMMRBackend { Ok(()) } + // /// Syncs all files to disk. A call to sync is required to ensure all the + // /// data has been successfully written to disk. + fn sync(&mut self) -> io::Result<()> { + Ok(()) + .and(self.hash_file.flush()) + .and(self.data_file.flush()) + .and(self.sync_leaf_set()) + .map_err(|e| { + io::Error::new( + io::ErrorKind::Interrupted, + format!("Could not write to state storage, disk full? {:?}", e), + ) + }) + } + fn dump_stats(&self) { debug!( "pmmr backend: unpruned: {}, hashes: {}, data: {}, leaf_set: {}, prune_list: {}", @@ -258,21 +275,6 @@ impl PMMRBackend { self.hash_file.size() } - /// Syncs all files to disk. A call to sync is required to ensure all the - /// data has been successfully written to disk. - pub fn sync(&mut self) -> io::Result<()> { - Ok(()) - .and(self.hash_file.flush()) - .and(self.data_file.flush()) - .and(self.sync_leaf_set()) - .map_err(|e| { - io::Error::new( - io::ErrorKind::Interrupted, - format!("Could not write to state storage, disk full? {:?}", e), - ) - }) - } - // Sync the leaf_set if this is a prunable backend. fn sync_leaf_set(&mut self) -> io::Result<()> { if !self.prunable { diff --git a/store/src/types.rs b/store/src/types.rs index bfe53bf5de..acd8769d9c 100644 --- a/store/src/types.rs +++ b/store/src/types.rs @@ -13,10 +13,12 @@ //! Common storage-related types use memmap; +use tempfile::tempfile; use crate::core::ser::{self, FixedLength, Readable, Writeable}; +use std::env; use std::fs::{self, File, OpenOptions}; -use std::io::{self, BufWriter, ErrorKind, Read, Write}; +use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek, SeekFrom, Write}; use std::marker; use std::path::{Path, PathBuf}; @@ -39,6 +41,10 @@ where }) } + pub fn as_temp_file(&self) -> io::Result { + self.file.as_temp_file() + } + /// Append an element to the file. /// Will not be written to disk until flush() is subsequently called. /// Alternatively discard() may be called to discard any pending changes. @@ -170,6 +176,19 @@ impl AppendOnlyFile { Ok(()) } + pub fn as_temp_file(&self) -> io::Result { + let mut reader = BufReader::new(File::open(&self.path)?); + let mut writer = BufWriter::new(tempfile()?); + io::copy(&mut reader, &mut writer)?; + + // Remember to seek back to start of the file as the caller is likely + // to read this file directly without reopening it. + writer.seek(SeekFrom::Start(0))?; + + let file = writer.into_inner()?; + Ok(file) + } + /// Append data to the file. Until the append-only file is synced, data is /// only written to memory. pub fn append(&mut self, bytes: &mut [u8]) {