diff --git a/evm/src/executor/fork/backend.rs b/evm/src/executor/fork/backend.rs index f77e829350219..887c6f5e69086 100644 --- a/evm/src/executor/fork/backend.rs +++ b/evm/src/executor/fork/backend.rs @@ -1,12 +1,14 @@ //! Smart caching and deduplication of requests when using a forking provider use revm::{db::DatabaseRef, AccountInfo, KECCAK_EMPTY}; +use crate::storage::StorageMap; use ethers::{ core::abi::ethereum_types::BigEndianHash, providers::Middleware, types::{Address, BlockId, Bytes, H160, H256, U256}, utils::keccak256, }; +use foundry_utils::RuntimeOrHandle; use futures::{ channel::mpsc::{channel, Receiver, Sender}, stream::{Fuse, Stream, StreamExt}, @@ -22,9 +24,7 @@ use std::{ Arc, }, }; - -use crate::storage::StorageMap; -use foundry_utils::RuntimeOrHandle; +use tracing::trace; type StorageInfo = BTreeMap; @@ -54,6 +54,7 @@ impl Drop for FlushStorageCacheOnDrop { drop(lock); self.storage_map.set_storage(data); self.storage_map.save(); + trace!(target: "diskmap", "flushed storage map {:?}", self.storage_map.path()); } } } @@ -97,12 +98,14 @@ struct BackendHandler { block_requests: HashMap>>, /// Incoming commands. incoming: Fuse>, + /// shutdown signal + shutdown: Fuse>>, /// The block to fetch data from. // This is an `Option` so that we can have less code churn in the functions below block_id: Option, /// If storage caching is enabled, this will flush the data to the configured file once the /// handler is dropped - _flush_storage: FlushStorageCacheOnDrop, + flush_storage: Option, } impl BackendHandler @@ -113,14 +116,15 @@ where provider: M, cache: SharedMemCache, rx: Receiver, + shutdown: Receiver>, block_id: Option, storage_map: StorageMap, ) -> Self { Self { - _flush_storage: FlushStorageCacheOnDrop { + flush_storage: Some(FlushStorageCacheOnDrop { storage_map, storage: Arc::clone(&cache.storage), - }, + }), provider, cache, pending_requests: Default::default(), @@ -128,6 +132,7 @@ where storage_requests: Default::default(), block_requests: Default::default(), incoming: rx.fuse(), + shutdown: shutdown.fuse(), block_id, } } @@ -283,7 +288,7 @@ where if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) { // get the response let (balance, nonce, code) = resp.unwrap_or_else(|_| { - tracing::trace!("Failed to get account for {}", addr); + trace!("Failed to get account for {}", addr); Default::default() }); @@ -310,7 +315,7 @@ where ProviderRequest::Storage(fut) => { if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) { let value = resp.unwrap_or_else(|_| { - tracing::trace!("Failed to get storage for {} at {}", addr, idx); + trace!("Failed to get storage for {} at {}", addr, idx); Default::default() }); @@ -329,7 +334,7 @@ where ProviderRequest::BlockHash(fut) => { if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) { let value = block_hash.unwrap_or_else(|_| { - tracing::trace!("Failed to get block hash for {}", number); + trace!("Failed to get block hash for {}", number); Default::default() }); @@ -352,10 +357,19 @@ where // the handler is finished if the request channel was closed and all requests are processed if pin.incoming.is_done() && pin.pending_requests.is_empty() { - Poll::Ready(()) - } else { - Poll::Pending + if let Poll::Ready(Some(ack)) = Pin::new(&mut pin.shutdown).poll_next(cx) { + // effectively flushing the cached storage if any + let _ = pin.flush_storage.take(); + // signaling back + let _ = ack.send(()); + } + + if pin.shutdown.is_done() { + trace!(target: "backendhandler", "finished"); + return Poll::Ready(()) + } } + Poll::Pending } } @@ -383,7 +397,26 @@ where /// (`A` and `B`) get notified. #[derive(Debug, Clone)] pub struct SharedBackend { + /// channel used for sending commands related to database operations backend: Sender, + /// channel to ensure the [BackendHandler] shutdowns gracefully + /// + /// This is essentially used to sync the last [SharedBackend] with the [BackendHandler] + shutdown: Sender>, +} + +impl Drop for SharedBackend { + fn drop(&mut self) { + // disconnect the command channel + self.backend.disconnect(); + if self.backend.is_closed() { + // was the last sender, let the handler know and wait until it gracefully shut down + let (ack, rx) = oneshot_channel(); + if self.shutdown.try_send(ack).is_ok() { + let _ = rx.recv(); + } + } + } } impl SharedBackend { @@ -401,8 +434,10 @@ impl SharedBackend { where M: Middleware + Unpin + 'static + Clone, { - let (tx, rx) = channel(1); - let handler = BackendHandler::new(provider, cache, rx, pin_block, storage_map); + let (backend, backend_rx) = channel(1); + let (shutdown, shutdown_rx) = channel(1); + let handler = + BackendHandler::new(provider, cache, backend_rx, shutdown_rx, pin_block, storage_map); // spawn the provider handler to background let rt = RuntimeOrHandle::new(); std::thread::spawn(move || match rt { @@ -410,7 +445,7 @@ impl SharedBackend { RuntimeOrHandle::Handle(handle) => handle.block_on(handler), }); - Self { backend: tx } + Self { backend, shutdown } } fn do_get_basic(&self, address: Address) -> eyre::Result { @@ -438,7 +473,7 @@ impl SharedBackend { impl DatabaseRef for SharedBackend { fn basic(&self, address: H160) -> AccountInfo { self.do_get_basic(address).unwrap_or_else(|_| { - tracing::trace!("Failed to send/recv `basic` for {}", address); + trace!("Failed to send/recv `basic` for {}", address); Default::default() }) } @@ -449,7 +484,7 @@ impl DatabaseRef for SharedBackend { fn storage(&self, address: H160, index: U256) -> U256 { self.do_get_storage(address, index).unwrap_or_else(|_| { - tracing::trace!("Failed to send/recv `storage` for {} at {}", address, index); + trace!("Failed to send/recv `storage` for {} at {}", address, index); Default::default() }) } @@ -460,7 +495,7 @@ impl DatabaseRef for SharedBackend { } let number = number.as_u64(); self.do_get_block_hash(number).unwrap_or_else(|_| { - tracing::trace!("Failed to send/recv `block_hash` for {}", number); + trace!("Failed to send/recv `block_hash` for {}", number); Default::default() }) } diff --git a/evm/src/storage/diskmap.rs b/evm/src/storage/diskmap.rs index ac009ee3dd5bc..dd899173563f7 100644 --- a/evm/src/storage/diskmap.rs +++ b/evm/src/storage/diskmap.rs @@ -1,4 +1,10 @@ -use std::{collections::BTreeMap, fmt, fs, hash, ops, path::PathBuf}; +use std::{ + collections::BTreeMap, + fmt, fs, hash, + io::{BufReader, BufWriter}, + ops, + path::PathBuf, +}; use tracing::{trace, warn}; @@ -44,11 +50,11 @@ impl DiskMap { /// Reads the contents of the diskmap file and returns the read cache pub fn read(path: impl Into, read: F) -> Self where - F: Fn(fs::File) -> Result, E>, + F: Fn(BufReader) -> Result, E>, E: fmt::Display, { let mut map = Self::new(path); - trace!("reading diskmap path={:?}", map.file_path); + trace!(target: "diskmap" ,"reading diskmap path={:?}", map.file_path); map.reload(read); map } @@ -87,7 +93,7 @@ impl DiskMap { /// overwriting all changes fn reload(&mut self, read: F) where - F: Fn(fs::File) -> Result, E>, + F: Fn(BufReader) -> Result, E>, E: fmt::Display, { if self.transient { @@ -95,8 +101,11 @@ impl DiskMap { } trace!("reloading diskmap {:?}", self.file_path); let _ = fs::File::open(self.file_path.clone()) - .map_err(|e| trace!("Failed to open disk map: {}", e)) - .and_then(|f| read(f).map_err(|e| warn!("Failed to read disk map: {}", e))) + .map_err(|e| trace!(target: "diskmap" , "Failed to open disk map: {}", e)) + .and_then(|f| { + read(BufReader::new(f)) + .map_err(|e| warn!(target: "diskmap" ,"Failed to read disk map: {}", e)) + }) .map(|m| { self.cache = m; }); @@ -107,20 +116,22 @@ impl DiskMap { /// The closure is expected to do the actual writing pub fn save(&self, write: F) where - F: Fn(&BTreeMap, &mut fs::File) -> Result<(), E>, + F: Fn(&BTreeMap, &mut BufWriter) -> Result<(), E>, E: fmt::Display, { if self.transient { return } - trace!("saving diskmap {:?}", self.file_path); + trace!(target: "diskmap", "saving diskmap {:?}", self.file_path); if let Some(parent) = self.file_path.parent() { let _ = fs::create_dir_all(parent); } let _ = fs::File::create(&self.file_path) - .map_err(|e| warn!("Failed to open disk map for writing: {}", e)) - .and_then(|mut f| { - write(&self.cache, &mut f).map_err(|e| warn!("Failed to write to disk map: {}", e)) + .map_err(|e| warn!(target: "diskmap", "Failed to open disk map for writing: {}", e)) + .and_then(|f| { + let mut f = BufWriter::new(f); + write(&self.cache, &mut f) + .map_err(|e| warn!(target: "diskmap" ,"Failed to write to disk map: {}", e)) }); } }