Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 53 additions & 18 deletions evm/src/executor/fork/backend.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
//! Smart caching and deduplication of requests when using a forking provider
use revm::{db::DatabaseRef, AccountInfo, KECCAK_EMPTY};

use crate::storage::StorageMap;
use ethers::{
core::abi::ethereum_types::BigEndianHash,
providers::Middleware,
types::{Address, BlockId, Bytes, H160, H256, U256},
utils::keccak256,
};
use foundry_utils::RuntimeOrHandle;
use futures::{
channel::mpsc::{channel, Receiver, Sender},
stream::{Fuse, Stream, StreamExt},
Expand All @@ -22,9 +24,7 @@ use std::{
Arc,
},
};

use crate::storage::StorageMap;
use foundry_utils::RuntimeOrHandle;
use tracing::trace;

type StorageInfo = BTreeMap<U256, U256>;

Expand Down Expand Up @@ -54,6 +54,7 @@ impl Drop for FlushStorageCacheOnDrop {
drop(lock);
self.storage_map.set_storage(data);
self.storage_map.save();
trace!(target: "diskmap", "flushed storage map {:?}", self.storage_map.path());
}
}
}
Expand Down Expand Up @@ -97,12 +98,14 @@ struct BackendHandler<M: Middleware> {
block_requests: HashMap<u64, Vec<OneshotSender<H256>>>,
/// Incoming commands.
incoming: Fuse<Receiver<BackendRequest>>,
/// shutdown signal
shutdown: Fuse<Receiver<OneshotSender<()>>>,
/// The block to fetch data from.
// This is an `Option` so that we can have less code churn in the functions below
block_id: Option<BlockId>,
/// If storage caching is enabled, this will flush the data to the configured file once the
/// handler is dropped
_flush_storage: FlushStorageCacheOnDrop,
flush_storage: Option<FlushStorageCacheOnDrop>,
}

impl<M> BackendHandler<M>
Expand All @@ -113,21 +116,23 @@ where
provider: M,
cache: SharedMemCache,
rx: Receiver<BackendRequest>,
shutdown: Receiver<OneshotSender<()>>,
block_id: Option<BlockId>,
storage_map: StorageMap,
) -> Self {
Self {
_flush_storage: FlushStorageCacheOnDrop {
flush_storage: Some(FlushStorageCacheOnDrop {
storage_map,
storage: Arc::clone(&cache.storage),
},
}),
provider,
cache,
pending_requests: Default::default(),
account_requests: Default::default(),
storage_requests: Default::default(),
block_requests: Default::default(),
incoming: rx.fuse(),
shutdown: shutdown.fuse(),
block_id,
}
}
Expand Down Expand Up @@ -283,7 +288,7 @@ where
if let Poll::Ready((resp, addr)) = fut.poll_unpin(cx) {
// get the response
let (balance, nonce, code) = resp.unwrap_or_else(|_| {
tracing::trace!("Failed to get account for {}", addr);
trace!("Failed to get account for {}", addr);
Default::default()
});

Expand All @@ -310,7 +315,7 @@ where
ProviderRequest::Storage(fut) => {
if let Poll::Ready((resp, addr, idx)) = fut.poll_unpin(cx) {
let value = resp.unwrap_or_else(|_| {
tracing::trace!("Failed to get storage for {} at {}", addr, idx);
trace!("Failed to get storage for {} at {}", addr, idx);
Default::default()
});

Expand All @@ -329,7 +334,7 @@ where
ProviderRequest::BlockHash(fut) => {
if let Poll::Ready((block_hash, number)) = fut.poll_unpin(cx) {
let value = block_hash.unwrap_or_else(|_| {
tracing::trace!("Failed to get block hash for {}", number);
trace!("Failed to get block hash for {}", number);
Default::default()
});

Expand All @@ -352,10 +357,19 @@ where

// the handler is finished if the request channel was closed and all requests are processed
if pin.incoming.is_done() && pin.pending_requests.is_empty() {
Poll::Ready(())
} else {
Poll::Pending
if let Poll::Ready(Some(ack)) = Pin::new(&mut pin.shutdown).poll_next(cx) {
// effectively flushing the cached storage if any
let _ = pin.flush_storage.take();
// signaling back
let _ = ack.send(());
}

if pin.shutdown.is_done() {
trace!(target: "backendhandler", "finished");
return Poll::Ready(())
}
}
Poll::Pending
}
}

Expand Down Expand Up @@ -383,7 +397,26 @@ where
/// (`A` and `B`) get notified.
#[derive(Debug, Clone)]
pub struct SharedBackend {
/// channel used for sending commands related to database operations
backend: Sender<BackendRequest>,
/// channel to ensure the [BackendHandler] shutdowns gracefully
///
/// This is essentially used to sync the last [SharedBackend] with the [BackendHandler]
shutdown: Sender<OneshotSender<()>>,
}

impl Drop for SharedBackend {
fn drop(&mut self) {
// disconnect the command channel
self.backend.disconnect();
if self.backend.is_closed() {
// was the last sender, let the handler know and wait until it gracefully shut down
let (ack, rx) = oneshot_channel();
if self.shutdown.try_send(ack).is_ok() {
let _ = rx.recv();
}
}
}
}

impl SharedBackend {
Expand All @@ -401,16 +434,18 @@ impl SharedBackend {
where
M: Middleware + Unpin + 'static + Clone,
{
let (tx, rx) = channel(1);
let handler = BackendHandler::new(provider, cache, rx, pin_block, storage_map);
let (backend, backend_rx) = channel(1);
let (shutdown, shutdown_rx) = channel(1);
let handler =
BackendHandler::new(provider, cache, backend_rx, shutdown_rx, pin_block, storage_map);
// spawn the provider handler to background
let rt = RuntimeOrHandle::new();
std::thread::spawn(move || match rt {
RuntimeOrHandle::Runtime(runtime) => runtime.block_on(handler),
RuntimeOrHandle::Handle(handle) => handle.block_on(handler),
});

Self { backend: tx }
Self { backend, shutdown }
}

fn do_get_basic(&self, address: Address) -> eyre::Result<AccountInfo> {
Expand Down Expand Up @@ -438,7 +473,7 @@ impl SharedBackend {
impl DatabaseRef for SharedBackend {
fn basic(&self, address: H160) -> AccountInfo {
self.do_get_basic(address).unwrap_or_else(|_| {
tracing::trace!("Failed to send/recv `basic` for {}", address);
trace!("Failed to send/recv `basic` for {}", address);
Default::default()
})
}
Expand All @@ -449,7 +484,7 @@ impl DatabaseRef for SharedBackend {

fn storage(&self, address: H160, index: U256) -> U256 {
self.do_get_storage(address, index).unwrap_or_else(|_| {
tracing::trace!("Failed to send/recv `storage` for {} at {}", address, index);
trace!("Failed to send/recv `storage` for {} at {}", address, index);
Default::default()
})
}
Expand All @@ -460,7 +495,7 @@ impl DatabaseRef for SharedBackend {
}
let number = number.as_u64();
self.do_get_block_hash(number).unwrap_or_else(|_| {
tracing::trace!("Failed to send/recv `block_hash` for {}", number);
trace!("Failed to send/recv `block_hash` for {}", number);
Default::default()
})
}
Expand Down
33 changes: 22 additions & 11 deletions evm/src/storage/diskmap.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::{collections::BTreeMap, fmt, fs, hash, ops, path::PathBuf};
use std::{
collections::BTreeMap,
fmt, fs, hash,
io::{BufReader, BufWriter},
ops,
path::PathBuf,
};

use tracing::{trace, warn};

Expand Down Expand Up @@ -44,11 +50,11 @@ impl<K: hash::Hash + Eq, V> DiskMap<K, V> {
/// Reads the contents of the diskmap file and returns the read cache
pub fn read<F, E>(path: impl Into<PathBuf>, read: F) -> Self
where
F: Fn(fs::File) -> Result<BTreeMap<K, V>, E>,
F: Fn(BufReader<fs::File>) -> Result<BTreeMap<K, V>, E>,
E: fmt::Display,
{
let mut map = Self::new(path);
trace!("reading diskmap path={:?}", map.file_path);
trace!(target: "diskmap" ,"reading diskmap path={:?}", map.file_path);
map.reload(read);
map
}
Expand Down Expand Up @@ -87,16 +93,19 @@ impl<K: hash::Hash + Eq, V> DiskMap<K, V> {
/// overwriting all changes
fn reload<F, E>(&mut self, read: F)
where
F: Fn(fs::File) -> Result<BTreeMap<K, V>, E>,
F: Fn(BufReader<fs::File>) -> Result<BTreeMap<K, V>, E>,
E: fmt::Display,
{
if self.transient {
return
}
trace!("reloading diskmap {:?}", self.file_path);
let _ = fs::File::open(self.file_path.clone())
.map_err(|e| trace!("Failed to open disk map: {}", e))
.and_then(|f| read(f).map_err(|e| warn!("Failed to read disk map: {}", e)))
.map_err(|e| trace!(target: "diskmap" , "Failed to open disk map: {}", e))
.and_then(|f| {
read(BufReader::new(f))
.map_err(|e| warn!(target: "diskmap" ,"Failed to read disk map: {}", e))
})
.map(|m| {
self.cache = m;
});
Expand All @@ -107,20 +116,22 @@ impl<K: hash::Hash + Eq, V> DiskMap<K, V> {
/// The closure is expected to do the actual writing
pub fn save<F, E>(&self, write: F)
where
F: Fn(&BTreeMap<K, V>, &mut fs::File) -> Result<(), E>,
F: Fn(&BTreeMap<K, V>, &mut BufWriter<fs::File>) -> Result<(), E>,
E: fmt::Display,
{
if self.transient {
return
}
trace!("saving diskmap {:?}", self.file_path);
trace!(target: "diskmap", "saving diskmap {:?}", self.file_path);
if let Some(parent) = self.file_path.parent() {
let _ = fs::create_dir_all(parent);
}
let _ = fs::File::create(&self.file_path)
.map_err(|e| warn!("Failed to open disk map for writing: {}", e))
.and_then(|mut f| {
write(&self.cache, &mut f).map_err(|e| warn!("Failed to write to disk map: {}", e))
.map_err(|e| warn!(target: "diskmap", "Failed to open disk map for writing: {}", e))
.and_then(|f| {
let mut f = BufWriter::new(f);
write(&self.cache, &mut f)
.map_err(|e| warn!(target: "diskmap" ,"Failed to write to disk map: {}", e))
});
}
}