diff --git a/Cargo.lock b/Cargo.lock index a69c3e1de8e1a..dd75eab4a1064 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1096,6 +1096,7 @@ dependencies = [ "evm", "evmodin", "eyre", + "futures", "hex", "once_cell", "proptest", diff --git a/evm-adapters/Cargo.toml b/evm-adapters/Cargo.toml index a4ac173ebe006..7115bc7eedd5f 100644 --- a/evm-adapters/Cargo.toml +++ b/evm-adapters/Cargo.toml @@ -25,6 +25,7 @@ tokio = { version = "1.12.0", features = ["rt-multi-thread"] } hex = "0.4.3" thiserror = "1.0.29" proptest = "1.0.0" +futures = "0.3.17" [dev-dependencies] evmodin = { git = "https://github.com/vorot93/evmodin", features = ["util"] } diff --git a/evm-adapters/src/blocking_provider.rs b/evm-adapters/src/blocking_provider.rs index 084cd5af6198e..6d1c34eb7be9f 100644 --- a/evm-adapters/src/blocking_provider.rs +++ b/evm-adapters/src/blocking_provider.rs @@ -1,6 +1,20 @@ use ethers::{ providers::Middleware, - types::{Address, Block, BlockId, Bytes, TxHash, H256, U256, U64}, + types::{ + transaction::eip2718::TypedTransaction, Address, Block, BlockId, BlockNumber, Bytes, + EIP1186ProofResponse, NameOrAddress, Transaction, TransactionReceipt, TxHash, H256, U256, + U64, + }, +}; +use futures::{ + channel::mpsc::{channel, Receiver, Sender}, + stream::{Fuse, Stream, StreamExt}, + task::{Context, Poll}, + Future, FutureExt, +}; +use std::{ + pin::Pin, + sync::mpsc::{channel as oneshot_channel, Sender as OneshotSender}, }; use tokio::runtime::Runtime; @@ -64,6 +78,10 @@ where self.block_on(self.provider.get_balance(address, block)) } + pub fn get_accounts(&self) -> Result, M::Error> { + self.block_on(self.provider.get_accounts()) + } + pub fn get_transaction_count( &self, address: Address, @@ -85,3 +103,441 @@ where self.block_on(self.provider.get_storage_at(address, slot, block)) } } + +/// The Request type the ProviderHandler listens for +#[derive(Debug)] +enum ProviderRequest { + GetBlockNumber(OneshotSender>), + ResolveName { + ens_name: String, + sender: OneshotSender>, + }, + LookupAddress { + address: Address, + sender: OneshotSender>, + }, + GetBlock { + block: BlockId, + sender: OneshotSender>, Err>>, + }, + GetBlockWithTxs { + block: BlockId, + sender: OneshotSender>, Err>>, + }, + GetUncleCount { + block_hash_or_number: BlockId, + sender: OneshotSender>, + }, + GetUncle { + block: BlockId, + idx: U64, + sender: OneshotSender>, Err>>, + }, + GetTransactionCount { + from: NameOrAddress, + block: Option, + sender: OneshotSender>, + }, + EstimateGas { + tx: TypedTransaction, + sender: OneshotSender>, + }, + Call { + tx: TypedTransaction, + block: Option, + sender: OneshotSender>, + }, + GetChainId(OneshotSender>), + GetBalance { + from: NameOrAddress, + block: Option, + sender: OneshotSender>, + }, + GetTransaction { + transaction_hash: TxHash, + sender: OneshotSender, Err>>, + }, + GetTransactionReceipt { + transaction_hash: TxHash, + sender: OneshotSender, Err>>, + }, + GetBlockReceipts { + block: BlockNumber, + sender: OneshotSender, Err>>, + }, + GetGasPrice(OneshotSender>), + GetAccounts(OneshotSender, Err>>), + GetStorageAt { + from: NameOrAddress, + location: H256, + block: Option, + sender: OneshotSender>, + }, + GetProof { + from: NameOrAddress, + locations: Vec, + block: Option, + sender: OneshotSender>, + }, +} + +type ProviderRequestFut = Pin + Send>>; + +/// Handles an internal provider and listens for commands to delegate to the provider and respond +/// with the provider's response. +#[must_use = "ProviderHandler does nothing unless polled."] +struct ProviderHandler { + provider: M, + /// Commands that are being processed and awaiting a response from the + /// provider. + pending_requests: Vec, + /// Incoming commands + incoming: Fuse>>, +} + +impl ProviderHandler +where + M: Middleware + Clone + 'static, +{ + fn new(provider: M, rx: Receiver>) -> Self { + Self { provider, pending_requests: Default::default(), incoming: rx.fuse() } + } + + /// handle the request in queue in the future + fn on_request(&mut self, cmd: ProviderRequest) { + let provider = self.provider.clone(); + let fut = Box::pin(async move { + match cmd { + ProviderRequest::GetBlockNumber(sender) => { + let resp = provider.get_block_number().await; + let _ = sender.send(resp); + } + ProviderRequest::ResolveName { ens_name, sender } => { + let resp = provider.resolve_name(&ens_name).await; + let _ = sender.send(resp); + } + ProviderRequest::LookupAddress { address, sender } => { + let resp = provider.lookup_address(address).await; + let _ = sender.send(resp); + } + ProviderRequest::GetBlock { block, sender } => { + let resp = provider.get_block(block).await; + let _ = sender.send(resp); + } + ProviderRequest::GetBlockWithTxs { block, sender } => { + let resp = provider.get_block_with_txs(block).await; + let _ = sender.send(resp); + } + ProviderRequest::GetUncleCount { block_hash_or_number, sender } => { + let resp = provider.get_uncle_count(block_hash_or_number).await; + let _ = sender.send(resp); + } + ProviderRequest::GetUncle { block, idx, sender } => { + let resp = provider.get_uncle(block, idx).await; + let _ = sender.send(resp); + } + ProviderRequest::GetTransactionCount { from, block, sender } => { + let resp = provider.get_transaction_count(from, block).await; + let _ = sender.send(resp); + } + ProviderRequest::EstimateGas { tx, sender } => { + let resp = provider.estimate_gas(&tx).await; + let _ = sender.send(resp); + } + ProviderRequest::Call { tx, block, sender } => { + let resp = provider.call(&tx, block).await; + let _ = sender.send(resp); + } + ProviderRequest::GetChainId(sender) => { + let resp = provider.get_chainid().await; + let _ = sender.send(resp); + } + ProviderRequest::GetBalance { from, block, sender } => { + let resp = provider.get_balance(from, block).await; + let _ = sender.send(resp); + } + ProviderRequest::GetTransaction { transaction_hash, sender } => { + let resp = provider.get_transaction(transaction_hash).await; + let _ = sender.send(resp); + } + ProviderRequest::GetTransactionReceipt { transaction_hash, sender } => { + let resp = provider.get_transaction_receipt(transaction_hash).await; + let _ = sender.send(resp); + } + ProviderRequest::GetBlockReceipts { block, sender } => { + let resp = provider.get_block_receipts(block).await; + let _ = sender.send(resp); + } + ProviderRequest::GetGasPrice(sender) => { + let resp = provider.get_gas_price().await; + let _ = sender.send(resp); + } + ProviderRequest::GetAccounts(sender) => { + let resp = provider.get_accounts().await; + let _ = sender.send(resp); + } + ProviderRequest::GetStorageAt { from, location, block, sender } => { + let resp = provider.get_storage_at(from, location, block).await; + let _ = sender.send(resp); + } + ProviderRequest::GetProof { from, locations, block, sender } => { + let resp = provider.get_proof(from, locations, block).await; + let _ = sender.send(resp); + } + } + }); + self.pending_requests.push(fut); + } +} + +impl Future for ProviderHandler +where + M: Middleware + Clone + Unpin + 'static, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let pin = self.get_mut(); + + // receive new commands to delegate to the underlying provider + while let Poll::Ready(Some(req)) = Pin::new(&mut pin.incoming).poll_next(cx) { + pin.on_request(req) + } + + // poll all futures + for n in (0..pin.pending_requests.len()).rev() { + let mut request = pin.pending_requests.swap_remove(n); + if request.poll_unpin(cx).is_pending() { + pin.pending_requests.push(request); + } + } + + // the handler is finished if the command channel was closed and all commands are processed + if pin.incoming.is_done() && pin.pending_requests.is_empty() { + Poll::Ready(()) + } else { + Poll::Pending + } + } +} + +/// A blocking alternative to the async `Middleware`. +#[derive(Debug)] +pub struct SyncProvider { + provider: Sender>, +} + +impl Clone for SyncProvider { + fn clone(&self) -> Self { + Self { provider: self.provider.clone() } + } +} + +impl SyncProvider +where + M: Middleware + Unpin + 'static + Clone, +{ + /// NOTE: this should be called with `Arc` + pub fn new(provider: M) -> eyre::Result { + let (tx, rx) = channel(1); + let handler = ProviderHandler::new(provider, rx); + // spawn the provider handler to background for which we need a new Runtime + let rt = Runtime::new()?; + std::thread::spawn(move || rt.block_on(handler)); + + Ok(Self { provider: tx }) + } + + pub fn get_block_number(&self) -> eyre::Result { + let (tx, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetBlockNumber(tx); + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn resolve_name(&self, ens_name: &str) -> eyre::Result
{ + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::ResolveName { ens_name: ens_name.to_string(), sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn lookup_address(&self, address: Address) -> eyre::Result { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::LookupAddress { address, sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_block>(&self, block: T) -> eyre::Result>> { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetBlock { block: block.into(), sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_block_with_txs>( + &self, + block: T, + ) -> eyre::Result>> { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetBlockWithTxs { block: block.into(), sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_uncle_count>(&self, block: T) -> eyre::Result { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetUncleCount { block_hash_or_number: block.into(), sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_uncle>( + &self, + block: T, + idx: U64, + ) -> eyre::Result>> { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetUncle { block: block.into(), idx, sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_transaction_count>( + &self, + from: T, + block: Option, + ) -> eyre::Result { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetTransactionCount { from: from.into(), block, sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn estimate_gas(&self, tx: TypedTransaction) -> eyre::Result { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::EstimateGas { tx, sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn call(&self, tx: TypedTransaction, block: Option) -> eyre::Result { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::Call { tx, block, sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_chainid(&self) -> eyre::Result { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetChainId(sender); + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_balance>( + &self, + from: T, + block: Option, + ) -> eyre::Result { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetBalance { from: from.into(), block, sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_transaction>( + &self, + transaction_hash: T, + ) -> eyre::Result> { + let (sender, rx) = oneshot_channel(); + let cmd = + ProviderRequest::GetTransaction { transaction_hash: transaction_hash.into(), sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_transaction_receipt>( + &self, + transaction_hash: T, + ) -> eyre::Result> { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetTransactionReceipt { + transaction_hash: transaction_hash.into(), + sender, + }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_block_receipts>( + &self, + block: T, + ) -> eyre::Result> { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetBlockReceipts { block: block.into(), sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_gas_price(&self) -> eyre::Result { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetGasPrice(sender); + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_accounts(&self) -> eyre::Result> { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetAccounts(sender); + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_storage_at>( + &self, + from: T, + location: H256, + block: Option, + ) -> eyre::Result { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetStorageAt { from: from.into(), location, block, sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } + + pub fn get_proof>( + &self, + from: T, + locations: Vec, + block: Option, + ) -> eyre::Result { + let (sender, rx) = oneshot_channel(); + let cmd = ProviderRequest::GetProof { from: from.into(), locations, block, sender }; + self.provider.clone().try_send(cmd).map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(rx.recv()??) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ethers::{ + providers::{Http, Provider}, + utils::Ganache, + }; + use std::{convert::TryFrom, sync::Arc}; + + #[test] + fn sync_provider_test_poc() { + let ganache = Ganache::new().spawn(); + + // connect to the network + let provider = Provider::::try_from(ganache.endpoint()).unwrap(); + + let provider = SyncProvider::new(Arc::new(provider)).unwrap(); + + let _ = provider.get_accounts().unwrap(); + } +} diff --git a/evm-adapters/src/lib.rs b/evm-adapters/src/lib.rs index 5e97504dcf71f..61d9f525c8db2 100644 --- a/evm-adapters/src/lib.rs +++ b/evm-adapters/src/lib.rs @@ -7,7 +7,7 @@ pub mod sputnik; pub mod evmodin; mod blocking_provider; -pub use blocking_provider::BlockingProvider; +pub use blocking_provider::{BlockingProvider, SyncProvider}; pub mod fuzz;