diff --git a/Cargo.lock b/Cargo.lock index 0e13c3fd1f..6bad381af2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -786,6 +786,7 @@ dependencies = [ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -869,6 +870,7 @@ dependencies = [ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1784,6 +1786,14 @@ dependencies = [ "ucd-util 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "remove_dir_all" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ring" version = "0.13.5" @@ -2025,6 +2035,19 @@ dependencies = [ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tempfile" +version = "3.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.54 (registry+https://github.com/rust-lang/crates.io-index)", + "remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "term" version = "0.5.2" @@ -2749,6 +2772,7 @@ dependencies = [ "checksum redox_users 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3fe5204c3a17e97dde73f285d49be585df59ed84b50a872baf416e73b62c3828" "checksum regex 1.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "559008764a17de49a3146b234641644ed37d118d1ef641a0bb573d146edc6ce0" "checksum regex-syntax 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "dcfd8681eebe297b81d98498869d4aae052137651ad7b96822f09ceb690d0a96" +"checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5" "checksum ring 0.13.5 (registry+https://github.com/rust-lang/crates.io-index)" = "2c4db68a2e35f3497146b7e4563df7d4773a2433230c5e4b448328e31740458a" "checksum ripemd160 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "482aa56cc68aaeccdaaff1cc5a72c247da8bbad3beb174ca5741f274c22883fb" "checksum rustc-demangle 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "ccc78bfd5acd7bf3e89cffcf899e5cb1a52d6fafa8dec2739ad70c9577a57288" @@ -2781,6 +2805,7 @@ dependencies = [ "checksum syn 0.14.9 (registry+https://github.com/rust-lang/crates.io-index)" = "261ae9ecaa397c42b960649561949d69311f08eeaea86a65696e6e46517cf741" "checksum syn 0.15.31 (registry+https://github.com/rust-lang/crates.io-index)" = "d2b4cfac95805274c6afdb12d8f770fa2d27c045953e7b630a81801953699a9a" "checksum synstructure 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "73687139bf99285483c96ac0add482c3776528beac1d97d444f6e91f203a2015" +"checksum tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b86c784c88d98c801132806dadd3819ed29d8600836c4088e855cdf3e178ed8a" "checksum term 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "edd106a334b7657c10b7c540a0106114feadeb4dc314513e97df481d5d966f42" "checksum term_size 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9e5b9a66db815dcfd2da92db471106457082577c3c278d4138ab3e3b4e189327" "checksum termcolor 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4096add70612622289f2fdcdbd5086dc81c1e2675e6ae58d6c4f62a16c6d7f2f" diff --git a/api/src/handlers.rs b/api/src/handlers.rs index feb65e3438..a26bed9a94 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -33,6 +33,7 @@ use self::pool_api::PoolInfoHandler; use self::pool_api::PoolPushHandler; use self::server_api::IndexHandler; use self::server_api::StatusHandler; +use self::server_api::KernelDownloadHandler; use self::transactions_api::TxHashSetHandler; use crate::auth::{BasicAuthMiddleware, GRIN_BASIC_REALM}; use crate::chain; @@ -135,6 +136,9 @@ pub fn build_router( chain: Arc::downgrade(&chain), peers: Arc::downgrade(&peers), }; + let kernel_download_handler = KernelDownloadHandler { + peers: Arc::downgrade(&peers), + }; let txhashset_handler = TxHashSetHandler { chain: Arc::downgrade(&chain), }; @@ -165,6 +169,7 @@ pub fn build_router( router.add_route("/v1/chain/validate", Arc::new(chain_validation_handler))?; router.add_route("/v1/txhashset/*", Arc::new(txhashset_handler))?; router.add_route("/v1/status", Arc::new(status_handler))?; + router.add_route("/v1/kerneldownload", Arc::new(kernel_download_handler))?; router.add_route("/v1/pool", Arc::new(pool_info_handler))?; router.add_route("/v1/pool/push", Arc::new(pool_push_handler))?; router.add_route("/v1/peers/all", Arc::new(peers_all_handler))?; diff --git a/api/src/handlers/server_api.rs b/api/src/handlers/server_api.rs index d5092b0547..d7dbd6220e 100644 --- a/api/src/handlers/server_api.rs +++ b/api/src/handlers/server_api.rs @@ -19,7 +19,7 @@ use crate::rest::*; use crate::router::{Handler, ResponseFuture}; use crate::types::*; use crate::web::*; -use hyper::{Body, Request}; +use hyper::{Body, Request, StatusCode}; use std::sync::Weak; // RESTful index of available api endpoints @@ -36,6 +36,29 @@ impl Handler for IndexHandler { } } +pub struct KernelDownloadHandler { + pub peers: Weak, +} + +impl Handler for KernelDownloadHandler { + fn post(&self, _req: Request) -> ResponseFuture { + if let Some(peer) = w_fut!(&self.peers).most_work_peer() { + match peer.send_kernel_data_request() { + Ok(_) => response(StatusCode::OK, "{}"), + Err(e) => response( + StatusCode::INTERNAL_SERVER_ERROR, + format!("requesting kernel data from peer failed: {:?}", e), + ), + } + } else { + response( + StatusCode::INTERNAL_SERVER_ERROR, + format!("requesting kernel data from peer failed (no peers)"), + ) + } + } +} + /// Status handler. Post a summary of the server status /// GET /v1/status pub struct StatusHandler { diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 56b6a52bd2..c0d38bad72 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -23,6 +23,7 @@ use crate::core::core::{ }; use crate::core::global; use crate::core::pow; +use crate::core::ser::{Readable, StreamingReader}; use crate::error::{Error, ErrorKind}; use crate::pipe; use crate::store; @@ -36,6 +37,7 @@ use crate::util::{Mutex, RwLock, StopState}; use grin_store::Error::NotFoundErr; use std::collections::HashMap; use std::fs::{self, File}; +use std::io::Read; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -659,6 +661,28 @@ impl Chain { self.txhashset.read().roots() } + /// Provides a reading view into the current kernel state. + pub fn kernel_data_read(&self) -> Result { + let txhashset = self.txhashset.read(); + txhashset::rewindable_kernel_view(&txhashset, |view| view.kernel_data_read()) + } + + /// Writes kernels provided to us (via a kernel data download). + /// Currently does not write these to disk and simply deserializes + /// the provided data. + /// TODO - Write this data to disk and validate the rebuilt kernel MMR. + pub fn kernel_data_write(&self, reader: &mut Read) -> Result<(), Error> { + let mut count = 0; + let mut stream = StreamingReader::new(reader, Duration::from_secs(1)); + while let Ok(_kernel) = TxKernelEntry::read(&mut stream) { + count += 1; + } + + debug!("kernel_data_write: read {} kernels", count); + + Ok(()) + } + /// Provides a reading view into the current txhashset state as well as /// the required indexes for a consumer to rewind to a consistent state /// at the provided block hash. diff --git a/chain/src/txhashset/rewindable_kernel_view.rs b/chain/src/txhashset/rewindable_kernel_view.rs index a1be9fc669..a6432e91d8 100644 --- a/chain/src/txhashset/rewindable_kernel_view.rs +++ b/chain/src/txhashset/rewindable_kernel_view.rs @@ -14,6 +14,8 @@ //! Lightweight readonly view into kernel MMR for convenience. +use std::fs::File; + use crate::core::core::pmmr::RewindablePMMR; use crate::core::core::{BlockHeader, TxKernel}; use crate::error::{Error, ErrorKind}; @@ -78,4 +80,14 @@ impl<'a> RewindableKernelView<'a> { } Ok(()) } + + /// Read the "raw" kernel backend data file (via temp file for consistent view on data). + pub fn kernel_data_read(&self) -> Result { + let file = self + .pmmr + .backend() + .data_as_temp_file() + .map_err(|_| ErrorKind::FileReadErr("Data file woes".into()))?; + Ok(file) + } } diff --git a/core/src/core/pmmr/backend.rs b/core/src/core/pmmr/backend.rs index 2b5e174fb2..e5a43d5574 100644 --- a/core/src/core/pmmr/backend.rs +++ b/core/src/core/pmmr/backend.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fs::File; + use croaring::Bitmap; use crate::core::hash::Hash; @@ -59,6 +61,11 @@ pub trait Backend { /// triggered removal). fn remove(&mut self, position: u64) -> Result<(), String>; + /// Creates a temp file containing the contents of the underlying data file + /// from the backend storage. This allows a caller to see a consistent view + /// of the data without needing to lock the backend storage. + fn data_as_temp_file(&self) -> Result; + /// Release underlying datafiles and locks fn release_files(&mut self); diff --git a/core/src/core/pmmr/rewindable_pmmr.rs b/core/src/core/pmmr/rewindable_pmmr.rs index e696d67afe..3c5911f287 100644 --- a/core/src/core/pmmr/rewindable_pmmr.rs +++ b/core/src/core/pmmr/rewindable_pmmr.rs @@ -49,6 +49,11 @@ where } } + /// Reference to the underlying storage backend. + pub fn backend(&'a self) -> &Backend { + self.backend + } + /// Build a new readonly PMMR pre-initialized to /// last_pos with the provided backend. pub fn at(backend: &'a B, last_pos: u64) -> RewindablePMMR<'_, T, B> { diff --git a/core/tests/vec_backend.rs b/core/tests/vec_backend.rs index 37e9071b62..e2089e80be 100644 --- a/core/tests/vec_backend.rs +++ b/core/tests/vec_backend.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fs::File; + use self::core::core::hash::{DefaultHashable, Hash}; use self::core::core::pmmr::{self, Backend}; use self::core::core::BlockHeader; @@ -103,6 +105,10 @@ impl Backend for VecBackend { Some(data.as_elmt()) } + fn data_as_temp_file(&self) -> Result { + unimplemented!() + } + fn leaf_pos_iter(&self) -> Box + '_> { unimplemented!() } diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 1779617f9c..acb1c202e2 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -18,6 +18,7 @@ num = "0.1" rand = "0.6" serde = "1" serde_derive = "1" +tempfile = "3.0.5" log = "0.4" chrono = { version = "0.4.4", features = ["serde"] } diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index e082ed03ad..d8b12646d3 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -74,6 +74,8 @@ enum_from_primitive! { BanReason = 18, GetTransaction = 19, TransactionKernel = 20, + KernelDataRequest = 21, + KernelDataResponse = 22, } } @@ -111,6 +113,8 @@ fn max_msg_size(msg_type: Type) -> u64 { Type::BanReason => 64, Type::GetTransaction => 32, Type::TransactionKernel => 32, + Type::KernelDataRequest => 0, + Type::KernelDataResponse => 8, } } @@ -714,3 +718,30 @@ impl Readable for TxHashSetArchive { }) } } + +pub struct KernelDataRequest {} + +impl Writeable for KernelDataRequest { + fn write(&self, _writer: &mut W) -> Result<(), ser::Error> { + Ok(()) + } +} + +pub struct KernelDataResponse { + /// Size in bytes of the attached kernel data file. + pub bytes: u64, +} + +impl Writeable for KernelDataResponse { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + writer.write_u64(self.bytes)?; + Ok(()) + } +} + +impl Readable for KernelDataResponse { + fn read(reader: &mut dyn Reader) -> Result { + let bytes = reader.read_u64()?; + Ok(KernelDataResponse { bytes }) + } +} diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 1b1df25d17..08f5b697bc 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -15,6 +15,7 @@ use crate::util::{Mutex, RwLock}; use std::fmt; use std::fs::File; +use std::io::Read; use std::net::{Shutdown, TcpStream}; use std::path::PathBuf; use std::sync::Arc; @@ -26,7 +27,9 @@ use crate::core::pow::Difficulty; use crate::core::ser::Writeable; use crate::core::{core, global}; use crate::handshake::Handshake; -use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest, Type}; +use crate::msg::{ + self, BanReason, GetPeerAddrs, KernelDataRequest, Locator, Ping, TxHashSetRequest, Type, +}; use crate::protocol::Protocol; use crate::types::{ Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, @@ -379,6 +382,13 @@ impl Peer { ) } + pub fn send_kernel_data_request(&self) -> Result<(), Error> { + debug!("Asking {} for kernel data.", self.info.addr); + self.connection + .lock() + .send(&KernelDataRequest {}, msg::Type::KernelDataRequest) + } + /// Stops the peer, closing its connection pub fn stop(&self) { self.connection.lock().close(); @@ -521,6 +531,14 @@ impl ChainAdapter for TrackingAdapter { self.adapter.get_block(h) } + fn kernel_data_read(&self) -> Result { + self.adapter.kernel_data_read() + } + + fn kernel_data_write(&self, reader: &mut Read) -> Result { + self.adapter.kernel_data_write(reader) + } + fn txhashset_read(&self, h: Hash) -> Option { self.adapter.txhashset_read(h) } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 3fe18f7085..527a283590 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -15,6 +15,7 @@ use crate::util::RwLock; use std::collections::HashMap; use std::fs::File; +use std::io::Read; use std::path::PathBuf; use std::sync::Arc; @@ -594,6 +595,14 @@ impl ChainAdapter for Peers { self.adapter.get_block(h) } + fn kernel_data_read(&self) -> Result { + self.adapter.kernel_data_read() + } + + fn kernel_data_write(&self, reader: &mut Read) -> Result { + self.adapter.kernel_data_write(reader) + } + fn txhashset_read(&self, h: Hash) -> Option { self.adapter.txhashset_read(h) } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 9b3100ce1f..02f8fe9839 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -15,17 +15,19 @@ use rand::{thread_rng, Rng}; use std::cmp; use std::fs::{self, File, OpenOptions}; -use std::io::{BufWriter, Write}; +use std::io::{BufWriter, Seek, SeekFrom, Write}; use std::sync::Arc; +use chrono::prelude::Utc; +use tempfile::tempfile; + use crate::conn::{Message, MessageHandler, Response}; use crate::core::core::{self, hash::Hash, CompactBlock}; use crate::util::{RateCounter, RwLock}; -use chrono::prelude::Utc; use crate::msg::{ - BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, TxHashSetArchive, - TxHashSetRequest, Type, + BanReason, GetPeerAddrs, Headers, KernelDataResponse, Locator, PeerAddrs, Ping, Pong, + TxHashSetArchive, TxHashSetRequest, Type, }; use crate::types::{Error, NetAdapter, PeerInfo}; @@ -244,6 +246,54 @@ impl MessageHandler for Protocol { Ok(None) } + Type::KernelDataRequest => { + debug!("handle_payload: kernel_data_request"); + let kernel_data = self.adapter.kernel_data_read()?; + let bytes = kernel_data.metadata()?.len(); + let kernel_data_response = KernelDataResponse { bytes }; + let mut response = + Response::new(Type::KernelDataResponse, &kernel_data_response, writer)?; + response.add_attachment(kernel_data); + Ok(Some(response)) + } + + Type::KernelDataResponse => { + let response: KernelDataResponse = msg.body()?; + debug!( + "handle_payload: kernel_data_response: bytes: {}", + response.bytes + ); + + let mut writer = BufWriter::new(tempfile()?); + + let total_size = response.bytes as usize; + let mut remaining_size = total_size; + + while remaining_size > 0 { + let size = msg.copy_attachment(remaining_size, &mut writer)?; + remaining_size = remaining_size.saturating_sub(size); + + // Increase received bytes quietly (without affecting the counters). + // Otherwise we risk banning a peer as "abusive". + received_bytes.write().inc_quiet(size as u64); + } + + // Remember to seek back to start of the file as the caller is likely + // to read this file directly without reopening it. + writer.seek(SeekFrom::Start(0))?; + + let mut file = writer.into_inner().map_err(|_| Error::Internal)?; + + debug!( + "handle_payload: kernel_data_response: file size: {}", + file.metadata().unwrap().len() + ); + + self.adapter.kernel_data_write(&mut file)?; + + Ok(None) + } + Type::TxHashSetRequest => { let sm_req: TxHashSetRequest = msg.body()?; debug!( diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 7b7ff05f2d..5e439ecbca 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -13,11 +13,12 @@ // limitations under the License. use std::fs::File; +use std::io::{self, Read}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; use std::path::PathBuf; use std::sync::Arc; +use std::thread; use std::time::Duration; -use std::{io, thread}; use crate::chain; use crate::core::core; @@ -280,6 +281,12 @@ impl ChainAdapter for DummyAdapter { fn get_block(&self, _: Hash) -> Option { None } + fn kernel_data_read(&self) -> Result { + unimplemented!() + } + fn kernel_data_write(&self, _reader: &mut Read) -> Result { + unimplemented!() + } fn txhashset_read(&self, _h: Hash) -> Option { unimplemented!() } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 23ab44dc15..d766dd8cb6 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -15,7 +15,7 @@ use crate::util::RwLock; use std::convert::From; use std::fs::File; -use std::io; +use std::io::{self, Read}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::path::PathBuf; @@ -527,6 +527,10 @@ pub trait ChainAdapter: Sync + Send { /// Gets a full block by its hash. fn get_block(&self, h: Hash) -> Option; + fn kernel_data_read(&self) -> Result; + + fn kernel_data_write(&self, reader: &mut Read) -> Result; + /// Provides a reading view into the current txhashset state as well as /// the required indexes for a consumer to rewind to a consistant state /// at the provided block hash. diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 77496cacf0..f2af4178ff 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -17,6 +17,7 @@ use crate::util::RwLock; use std::fs::File; +use std::io::Read; use std::path::PathBuf; use std::sync::{Arc, Weak}; use std::thread; @@ -344,6 +345,16 @@ impl p2p::ChainAdapter for NetToChainAdapter { } } + fn kernel_data_read(&self) -> Result { + self.chain().kernel_data_read() + } + + fn kernel_data_write(&self, reader: &mut Read) -> Result { + let res = self.chain().kernel_data_write(reader)?; + error!("***** kernel_data_write: {:?}", res); + Ok(true) + } + /// Provides a reading view into the current txhashset state as well as /// the required indexes for a consumer to rewind to a consistent state /// at the provided block hash. diff --git a/store/Cargo.toml b/store/Cargo.toml index fdff6954bd..652b85eb7e 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -18,6 +18,7 @@ failure = "0.1" failure_derive = "0.1" lmdb-zero = "0.4.4" memmap = "0.7" +tempfile = "3.0.5" serde = "1" serde_derive = "1" log = "0.4" diff --git a/store/src/pmmr.rs b/store/src/pmmr.rs index ded9679b19..ee01f21579 100644 --- a/store/src/pmmr.rs +++ b/store/src/pmmr.rs @@ -13,7 +13,8 @@ //! Implementation of the persistent Backend for the prunable MMR tree. -use std::{fs, io, time}; +use std::fs::{self, File}; +use std::{io, time}; use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::pmmr::{self, family, Backend}; @@ -139,6 +140,12 @@ impl Backend for PMMRBackend { } } + fn data_as_temp_file(&self) -> Result { + self.data_file + .as_temp_file() + .map_err(|_| format!("Failed to build temp data file")) + } + /// Rewind the PMMR backend to the given position. fn rewind(&mut self, position: u64, rewind_rm_pos: &Bitmap) -> Result<(), String> { // First rewind the leaf_set with the necessary added and removed positions. diff --git a/store/src/types.rs b/store/src/types.rs index 5f681d8a6b..6629896a3b 100644 --- a/store/src/types.rs +++ b/store/src/types.rs @@ -13,13 +13,14 @@ //! Common storage-related types use memmap; +use tempfile::tempfile; use crate::core::ser::{ self, BinWriter, FixedLength, Readable, Reader, StreamingReader, Writeable, Writer, }; use std::fmt::Debug; use std::fs::{self, File, OpenOptions}; -use std::io::{self, BufReader, BufWriter, Write}; +use std::io::{self, BufReader, BufWriter, Seek, SeekFrom, Write}; use std::marker; use std::path::{Path, PathBuf}; use std::time; @@ -144,6 +145,13 @@ where self.file.path() } + /// Create a new tempfile containing the contents of this data file. + /// This allows callers to see a consistent view of the data without + /// locking the data file. + pub fn as_temp_file(&self) -> io::Result { + self.file.as_temp_file() + } + /// Drop underlying file handles pub fn release(&mut self) { self.file.release(); @@ -437,6 +445,22 @@ where } } + /// Create a new tempfile containing the contents of this append only file. + /// This allows callers to see a consistent view of the data without + /// locking the append only file. + pub fn as_temp_file(&self) -> io::Result { + let mut reader = BufReader::new(File::open(&self.path)?); + let mut writer = BufWriter::new(tempfile()?); + io::copy(&mut reader, &mut writer)?; + + // Remember to seek back to start of the file as the caller is likely + // to read this file directly without reopening it. + writer.seek(SeekFrom::Start(0))?; + + let file = writer.into_inner()?; + Ok(file) + } + /// Saves a copy of the current file content, skipping data at the provided /// prune positions. prune_pos must be ordered. pub fn save_prune(&mut self, prune_pos: &[u64]) -> io::Result<()> {