From b2b22f03d6457bca3973dc7202d852c21c5b0ac6 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Mon, 24 Jun 2024 11:58:44 -0400 Subject: [PATCH 1/5] wip refactor --- .../src/bridge_withdrawer/ethereum/watcher.rs | 182 +++++++++++------- 1 file changed, 115 insertions(+), 67 deletions(-) diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs index 812bec4e32..0f4b8627a5 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs @@ -136,7 +136,7 @@ impl Watcher { let batcher = Batcher { event_rx, - provider, + provider: provider.clone(), submitter_handle, shutdown_token: shutdown_token.clone(), fee_asset_id, @@ -147,28 +147,25 @@ impl Watcher { tokio::task::spawn(batcher.run()); - let sequencer_withdrawal_event_handler = - tokio::task::spawn(watch_for_sequencer_withdrawal_events( - contract.clone(), - event_tx.clone(), - next_rollup_block_height, - )); - let ics20_withdrawal_event_handler = tokio::task::spawn(watch_for_ics20_withdrawal_events( - contract, - event_tx.clone(), - next_rollup_block_height, - )); + // let sequencer_withdrawal_event_handler = + // tokio::task::spawn(watch_for_sequencer_withdrawal_events( + // contract.clone(), + // event_tx.clone(), + // next_rollup_block_height, + // )); + // let ics20_withdrawal_event_handler = tokio::task::spawn(watch_for_ics20_withdrawal_events( + // contract, + // event_tx.clone(), + // next_rollup_block_height, + // )); + let block_handler = tokio::task::spawn(watch_for_blocks(provider, contract.address(), shutdown_token.clone())); state.set_watcher_ready(); tokio::select! { - res = sequencer_withdrawal_event_handler => { - info!("sequencer withdrawal event handler exited"); - res.context("sequencer withdrawal event handler exited")? - } - res = ics20_withdrawal_event_handler => { - info!("ics20 withdrawal event handler exited"); - res.context("ics20 withdrawal event handler exited")? + res = block_handler => { + info!("block handler exited"); + res.context("block handler exited")? } () = shutdown_token.cancelled() => { info!("watcher shutting down"); @@ -263,66 +260,117 @@ impl Watcher { } } -async fn watch_for_sequencer_withdrawal_events( - contract: IAstriaWithdrawer>, - event_tx: mpsc::Sender<(WithdrawalEvent, LogMeta)>, - from_block: u64, +async fn watch_for_blocks( + provider: Arc>, + contract_address: ethers::types::Address, + mut shutdown_token: CancellationToken, ) -> Result<()> { - let events = contract - .sequencer_withdrawal_filter() - .from_block(from_block) - .address(contract.address().into()); + use ethers::{ + contract::EthEvent as _, + types::Filter, + }; - let mut stream = events - .stream() - .await - .wrap_err("failed to subscribe to sequencer withdrawal events")? - .with_meta(); + use crate::bridge_withdrawer::ethereum::astria_withdrawer_interface::Ics20WithdrawalFilter; + use crate::bridge_withdrawer::ethereum::astria_withdrawer_interface::SequencerWithdrawalFilter; - while let Some(item) = stream.next().await { - if let Ok((event, meta)) = item { - event_tx - .send((WithdrawalEvent::Sequencer(event), meta)) - .await - .wrap_err("failed to send sequencer withdrawal event; receiver dropped?")?; - } else if item.is_err() { - item.wrap_err("failed to read from event stream; event stream closed?")?; - } - } + let mut block_rx = provider + .subscribe_blocks() + .await + .wrap_err("failed to subscribe to blocks")?; - Ok(()) -} + let sequencer_withdrawal_event_sig = SequencerWithdrawalFilter::signature(); + let ics20_withdrawal_event_sig = Ics20WithdrawalFilter::signature(); -async fn watch_for_ics20_withdrawal_events( - contract: IAstriaWithdrawer>, - event_tx: mpsc::Sender<(WithdrawalEvent, LogMeta)>, - from_block: u64, -) -> Result<()> { - let events = contract - .ics_20_withdrawal_filter() - .from_block(from_block) - .address(contract.address().into()); + loop { + select! { + () = shutdown_token.cancelled() => { + info!("block watcher shutting down"); + return Ok(()); + } + block = block_rx.next() => { + if let Some(Block { number, hash, .. }) = block { + let Some(block_hash) = hash else { + // don't think this should happen + warn!("block hash missing; skipping"); + continue; + }; + + let sequencer_withdrawal_filter = Filter::new().at_block_hash(block_hash).address(contract_address).topic0(sequencer_withdrawal_event_sig); + let ics20_withdrawal_filter = Filter::new().at_block_hash(block_hash).address(contract_address).topic0(ics20_withdrawal_event_sig); + let logs = provider.get_logs(&ics20_withdrawal_filter).await?; + let events = logs.into_iter().map(|log| { + let event = Ics20WithdrawalFilter::decode_log(log)?; + Ok((WithdrawalEvent::Ics20(event), log)) + }).collect::>>()?; + } - let mut stream = events - .stream() - .await - .wrap_err("failed to subscribe to ics20 withdrawal events")? - .with_meta(); - while let Some(item) = stream.next().await { - if let Ok((event, meta)) = item { - event_tx - .send((WithdrawalEvent::Ics20(event), meta)) - .await - .wrap_err("failed to send ics20 withdrawal event; receiver dropped?")?; - } else if item.is_err() { - item.wrap_err("failed to read from event stream; event stream closed?")?; + } } } - Ok(()) } +// async fn watch_for_sequencer_withdrawal_events( +// contract: IAstriaWithdrawer>, +// event_tx: mpsc::Sender<(WithdrawalEvent, LogMeta)>, +// from_block: u64, +// ) -> Result<()> { +// let events = contract +// .sequencer_withdrawal_filter() +// .from_block(from_block) +// .address(contract.address().into()); + +// let mut stream = events +// .stream() +// .await +// .wrap_err("failed to subscribe to sequencer withdrawal events")? +// .with_meta(); + +// while let Some(item) = stream.next().await { +// if let Ok((event, meta)) = item { +// event_tx +// .send((WithdrawalEvent::Sequencer(event), meta)) +// .await +// .wrap_err("failed to send sequencer withdrawal event; receiver dropped?")?; +// } else if item.is_err() { +// item.wrap_err("failed to read from event stream; event stream closed?")?; +// } +// } + +// Ok(()) +// } + +// async fn watch_for_ics20_withdrawal_events( +// contract: IAstriaWithdrawer>, +// event_tx: mpsc::Sender<(WithdrawalEvent, LogMeta)>, +// from_block: u64, +// ) -> Result<()> { +// let events = contract +// .ics_20_withdrawal_filter() +// .from_block(from_block) +// .address(contract.address().into()); + +// let mut stream = events +// .stream() +// .await +// .wrap_err("failed to subscribe to ics20 withdrawal events")? +// .with_meta(); + +// while let Some(item) = stream.next().await { +// if let Ok((event, meta)) = item { +// event_tx +// .send((WithdrawalEvent::Ics20(event), meta)) +// .await +// .wrap_err("failed to send ics20 withdrawal event; receiver dropped?")?; +// } else if item.is_err() { +// item.wrap_err("failed to read from event stream; event stream closed?")?; +// } +// } + +// Ok(()) +// } + struct Batcher { event_rx: mpsc::Receiver<(WithdrawalEvent, LogMeta)>, provider: Arc>, From c875bb2e2d4794034495ccb8ec3cc2788d55c4b1 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Mon, 24 Jun 2024 15:57:49 -0400 Subject: [PATCH 2/5] refactor to watch for new blocks --- .../src/bridge_withdrawer/ethereum/watcher.rs | 502 ++++++++---------- 1 file changed, 223 insertions(+), 279 deletions(-) diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs index 0f4b8627a5..e7bbeadb41 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs @@ -3,13 +3,16 @@ use std::{ time::Duration, }; -use astria_core::primitive::v1::{ - asset::{ - self, - denom, - Denom, +use astria_core::{ + primitive::v1::{ + asset::{ + self, + denom, + Denom, + }, + Address, }, - Address, + protocol::transaction::v1alpha1::Action, }; use astria_eyre::{ eyre::{ @@ -20,7 +23,7 @@ use astria_eyre::{ Result, }; use ethers::{ - contract::LogMeta, + contract::EthEvent as _, core::types::Block, providers::{ Middleware, @@ -29,15 +32,16 @@ use ethers::{ StreamExt as _, Ws, }, + types::{ + Filter, + Log, + H256, + }, utils::hex, }; -use tokio::{ - select, - sync::mpsc, -}; +use tokio::select; use tokio_util::sync::CancellationToken; use tracing::{ - error, info, warn, }; @@ -45,7 +49,11 @@ use tracing::{ use crate::bridge_withdrawer::{ batch::Batch, ethereum::{ - astria_withdrawer_interface::IAstriaWithdrawer, + astria_withdrawer_interface::{ + IAstriaWithdrawer, + Ics20WithdrawalFilter, + SequencerWithdrawalFilter, + }, convert::{ event_to_action, EventWithMetadata, @@ -132,33 +140,23 @@ impl Watcher { shutdown_token, } = self; - let (event_tx, event_rx) = mpsc::channel(100); - - let batcher = Batcher { - event_rx, - provider: provider.clone(), - submitter_handle, - shutdown_token: shutdown_token.clone(), + let converter = EventToActionConvertConfig { fee_asset_id, - rollup_asset_denom, + rollup_asset_denom: rollup_asset_denom.clone(), bridge_address, asset_withdrawal_divisor, }; - tokio::task::spawn(batcher.run()); - - // let sequencer_withdrawal_event_handler = - // tokio::task::spawn(watch_for_sequencer_withdrawal_events( - // contract.clone(), - // event_tx.clone(), - // next_rollup_block_height, - // )); - // let ics20_withdrawal_event_handler = tokio::task::spawn(watch_for_ics20_withdrawal_events( - // contract, - // event_tx.clone(), - // next_rollup_block_height, - // )); - let block_handler = tokio::task::spawn(watch_for_blocks(provider, contract.address(), shutdown_token.clone())); + // sync any blocks missing between `next_rollup_block_height` and the current latest + // (inclusive). + + let block_handler = tokio::task::spawn(watch_for_blocks( + provider, + contract.address(), + converter, + submitter_handle, + shutdown_token.clone(), + )); state.set_watcher_ready(); @@ -263,24 +261,15 @@ impl Watcher { async fn watch_for_blocks( provider: Arc>, contract_address: ethers::types::Address, - mut shutdown_token: CancellationToken, + converter: EventToActionConvertConfig, + submitter_handle: submitter::Handle, + shutdown_token: CancellationToken, ) -> Result<()> { - use ethers::{ - contract::EthEvent as _, - types::Filter, - }; - - use crate::bridge_withdrawer::ethereum::astria_withdrawer_interface::Ics20WithdrawalFilter; - use crate::bridge_withdrawer::ethereum::astria_withdrawer_interface::SequencerWithdrawalFilter; - let mut block_rx = provider .subscribe_blocks() .await .wrap_err("failed to subscribe to blocks")?; - let sequencer_withdrawal_event_sig = SequencerWithdrawalFilter::signature(); - let ics20_withdrawal_event_sig = Ics20WithdrawalFilter::signature(); - loop { select! { () = shutdown_token.cancelled() => { @@ -295,171 +284,127 @@ async fn watch_for_blocks( continue; }; - let sequencer_withdrawal_filter = Filter::new().at_block_hash(block_hash).address(contract_address).topic0(sequencer_withdrawal_event_sig); - let ics20_withdrawal_filter = Filter::new().at_block_hash(block_hash).address(contract_address).topic0(ics20_withdrawal_event_sig); - let logs = provider.get_logs(&ics20_withdrawal_filter).await?; - let events = logs.into_iter().map(|log| { - let event = Ics20WithdrawalFilter::decode_log(log)?; - Ok((WithdrawalEvent::Ics20(event), log)) - }).collect::>>()?; + let Some(number) = number else { + // don't think this should happen + warn!("block number missing; skipping"); + continue; + }; + + let mut batch = Batch { + actions: Vec::new(), + rollup_height: number.as_u64(), + }; + + let sequencer_withdrawal_events = get_sequencer_withdrawal_events(provider.clone(), contract_address, block_hash).await.wrap_err("failed to get sequencer withdrawal events")?; + let ics20_withdrawal_events = get_ics20_withdrawal_events(provider.clone(), contract_address, block_hash).await.wrap_err("failed to get ics20 withdrawal events")?; + let events = vec![sequencer_withdrawal_events, ics20_withdrawal_events].into_iter().flatten(); + for (event, log) in events { + let Some(transaction_hash) = log.transaction_hash else { + warn!("transaction hash missing; skipping"); + continue; + }; + + let event_with_metadata = EventWithMetadata { + event, + block_number: number, + transaction_hash, + }; + let action = converter.convert(event_with_metadata).wrap_err("failed to convert event to action")?; + batch.actions.push(action); + } + + if !batch.actions.is_empty() { + submitter_handle.send_batch(batch) + .await + .wrap_err("failed to send batched events; receiver dropped?")?; + } + } } } } - Ok(()) } -// async fn watch_for_sequencer_withdrawal_events( -// contract: IAstriaWithdrawer>, -// event_tx: mpsc::Sender<(WithdrawalEvent, LogMeta)>, -// from_block: u64, -// ) -> Result<()> { -// let events = contract -// .sequencer_withdrawal_filter() -// .from_block(from_block) -// .address(contract.address().into()); - -// let mut stream = events -// .stream() -// .await -// .wrap_err("failed to subscribe to sequencer withdrawal events")? -// .with_meta(); - -// while let Some(item) = stream.next().await { -// if let Ok((event, meta)) = item { -// event_tx -// .send((WithdrawalEvent::Sequencer(event), meta)) -// .await -// .wrap_err("failed to send sequencer withdrawal event; receiver dropped?")?; -// } else if item.is_err() { -// item.wrap_err("failed to read from event stream; event stream closed?")?; -// } -// } - -// Ok(()) -// } - -// async fn watch_for_ics20_withdrawal_events( -// contract: IAstriaWithdrawer>, -// event_tx: mpsc::Sender<(WithdrawalEvent, LogMeta)>, -// from_block: u64, -// ) -> Result<()> { -// let events = contract -// .ics_20_withdrawal_filter() -// .from_block(from_block) -// .address(contract.address().into()); - -// let mut stream = events -// .stream() -// .await -// .wrap_err("failed to subscribe to ics20 withdrawal events")? -// .with_meta(); - -// while let Some(item) = stream.next().await { -// if let Ok((event, meta)) = item { -// event_tx -// .send((WithdrawalEvent::Ics20(event), meta)) -// .await -// .wrap_err("failed to send ics20 withdrawal event; receiver dropped?")?; -// } else if item.is_err() { -// item.wrap_err("failed to read from event stream; event stream closed?")?; -// } -// } - -// Ok(()) -// } - -struct Batcher { - event_rx: mpsc::Receiver<(WithdrawalEvent, LogMeta)>, +async fn get_sequencer_withdrawal_events( provider: Arc>, - submitter_handle: submitter::Handle, - shutdown_token: CancellationToken, - fee_asset_id: asset::Id, - rollup_asset_denom: Denom, - bridge_address: Address, - asset_withdrawal_divisor: u128, + contract_address: ethers::types::Address, + block_hash: H256, +) -> Result> { + let sequencer_withdrawal_event_sig = SequencerWithdrawalFilter::signature(); + let sequencer_withdrawal_filter = Filter::new() + .at_block_hash(block_hash) + .address(contract_address) + .topic0(sequencer_withdrawal_event_sig); + + let logs = provider + .get_logs(&sequencer_withdrawal_filter) + .await + .wrap_err("failed to get sequencer withdrawal events")?; + + let events = logs + .into_iter() + .map(|log| { + let raw_log = ethers::abi::RawLog { + topics: log.topics.clone(), + data: log.data.to_vec(), + }; + let event = SequencerWithdrawalFilter::decode_log(&raw_log)?; + Ok((WithdrawalEvent::Sequencer(event), log)) + }) + .collect::>>()?; + + Ok(events) } -impl Batcher { - pub(crate) async fn run(mut self) -> Result<()> { - let mut block_rx = self - .provider - .subscribe_blocks() - .await - .wrap_err("failed to subscribe to blocks")?; +async fn get_ics20_withdrawal_events( + provider: Arc>, + contract_address: ethers::types::Address, + block_hash: H256, +) -> Result> { + let ics20_withdrawal_event_sig = Ics20WithdrawalFilter::signature(); + let ics20_withdrawal_filter = Filter::new() + .at_block_hash(block_hash) + .address(contract_address) + .topic0(ics20_withdrawal_event_sig); - let mut curr_batch = Batch { - actions: Vec::new(), - rollup_height: 0, - }; + let logs = provider + .get_logs(&ics20_withdrawal_filter) + .await + .wrap_err("failed to get ics20 withdrawal events")?; + + let events = logs + .into_iter() + .map(|log| { + let raw_log = ethers::abi::RawLog { + topics: log.topics.clone(), + data: log.data.to_vec(), + }; + let event = Ics20WithdrawalFilter::decode_log(&raw_log)?; + Ok((WithdrawalEvent::Ics20(event), log)) + }) + .collect::>>()?; - loop { - select! { - () = self.shutdown_token.cancelled() => { - info!("batcher shutting down"); - break; - } - block = block_rx.next() => { - if let Some(Block { number, .. }) = block { - let Some(block_number) = number else { - // don't think this should happen - warn!("block number missing; skipping"); - continue; - }; + Ok(events) +} - if block_number.as_u64() > curr_batch.rollup_height { - if !curr_batch.actions.is_empty() { - self.submitter_handle.send_batch(curr_batch) - .await - .wrap_err("failed to send batched events; receiver dropped?")?; - } - - curr_batch = Batch { - actions: Vec::new(), - rollup_height: block_number.as_u64(), - }; - } - } else { - error!("block stream closed; shutting down batcher"); - break; - } - } - item = self.event_rx.recv() => { - if let Some((event, meta)) = item { - let event_with_metadata = EventWithMetadata { - event, - block_number: meta.block_number, - transaction_hash: meta.transaction_hash, - }; - let action = event_to_action(event_with_metadata, self.fee_asset_id, self.rollup_asset_denom.clone(), self.asset_withdrawal_divisor, self.bridge_address).wrap_err("failed to convert event to action")?; - - if meta.block_number.as_u64() == curr_batch.rollup_height { - // block number was the same; add event to current batch - curr_batch.actions.push(action); - } else { - // block number increased; send current batch and start a new one - if !curr_batch.actions.is_empty() { - self.submitter_handle.send_batch(curr_batch) - .await - .wrap_err("failed to send batched events; receiver dropped?")?; - } - - curr_batch = Batch { - actions: vec![action], - rollup_height: meta.block_number.as_u64(), - }; - } - } else { - error!("event receiver dropped; shutting down batcher"); - break; - } - } - } - } +struct EventToActionConvertConfig { + fee_asset_id: asset::Id, + rollup_asset_denom: Denom, + bridge_address: Address, + asset_withdrawal_divisor: u128, +} - Ok(()) +impl EventToActionConvertConfig { + fn convert(&self, event: EventWithMetadata) -> Result { + event_to_action( + event, + self.fee_asset_id, + self.rollup_asset_denom.clone(), + self.asset_withdrawal_divisor, + self.bridge_address, + ) } } @@ -497,7 +442,10 @@ mod tests { }, utils::hex, }; - use tokio::sync::oneshot; + use tokio::sync::{ + mpsc, + oneshot, + }; use super::*; use crate::bridge_withdrawer::ethereum::{ @@ -598,23 +546,9 @@ mod tests { .prefix(ASTRIA_ADDRESS_PREFIX) .try_build() .unwrap(); - let receipt = send_sequencer_withdraw_transaction(&contract, value, recipient).await; - let expected_event = EventWithMetadata { - event: WithdrawalEvent::Sequencer(SequencerWithdrawalFilter { - sender: wallet.address(), - destination_chain_address: recipient.to_string(), - amount: value, - }), - block_number: receipt.block_number.unwrap(), - transaction_hash: receipt.transaction_hash, - }; + let bridge_address = crate::astria_address([1u8; 20]); let denom = default_native_asset(); - let expected_action = - event_to_action(expected_event, denom.id(), denom.clone(), 1, bridge_address).unwrap(); - let Action::BridgeUnlock(expected_action) = expected_action else { - panic!("expected action to be BridgeUnlock, got {expected_action:?}"); - }; let (batch_tx, mut batch_rx) = mpsc::channel(100); let (startup_tx, startup_rx) = oneshot::channel(); @@ -632,7 +566,7 @@ mod tests { submitter_handle, shutdown_token: CancellationToken::new(), state: Arc::new(State::new()), - rollup_asset_denom: denom, + rollup_asset_denom: denom.clone(), bridge_address, } .build() @@ -640,8 +574,21 @@ mod tests { tokio::task::spawn(watcher.run()); - // make another tx to trigger anvil to make another block - send_sequencer_withdraw_transaction(&contract, value, recipient).await; + let receipt = send_sequencer_withdraw_transaction(&contract, value, recipient).await; + let expected_event = EventWithMetadata { + event: WithdrawalEvent::Sequencer(SequencerWithdrawalFilter { + sender: wallet.address(), + destination_chain_address: recipient.to_string(), + amount: value, + }), + block_number: receipt.block_number.unwrap(), + transaction_hash: receipt.transaction_hash, + }; + let expected_action = + event_to_action(expected_event, denom.id(), denom, 1, bridge_address).unwrap(); + let Action::BridgeUnlock(expected_action) = expected_action else { + panic!("expected action to be BridgeUnlock, got {expected_action:?}"); + }; let batch = batch_rx.recv().await.unwrap(); assert_eq!(batch.actions.len(), 1); @@ -688,25 +635,9 @@ mod tests { let value = 1_000_000_000.into(); let recipient = "somebech32address".to_string(); - let receipt = send_ics20_withdraw_transaction(&contract, value, recipient.clone()).await; - let expected_event = EventWithMetadata { - event: WithdrawalEvent::Ics20(Ics20WithdrawalFilter { - sender: wallet.address(), - destination_chain_address: recipient.clone(), - amount: value, - memo: "nootwashere".to_string(), - }), - block_number: receipt.block_number.unwrap(), - transaction_hash: receipt.transaction_hash, - }; + let bridge_address = crate::astria_address([1u8; 20]); let denom = "transfer/channel-0/utia".parse::().unwrap(); - let Action::Ics20Withdrawal(mut expected_action) = - event_to_action(expected_event, denom.id(), denom.clone(), 1, bridge_address).unwrap() - else { - panic!("expected action to be Ics20Withdrawal"); - }; - expected_action.timeout_time = 0; // zero this for testing let (batch_tx, mut batch_rx) = mpsc::channel(100); let (startup_tx, startup_rx) = oneshot::channel(); @@ -724,7 +655,7 @@ mod tests { submitter_handle, shutdown_token: CancellationToken::new(), state: Arc::new(State::new()), - rollup_asset_denom: denom, + rollup_asset_denom: denom.clone(), bridge_address, } .build() @@ -732,8 +663,23 @@ mod tests { tokio::task::spawn(watcher.run()); - // make another tx to trigger anvil to make another block - send_ics20_withdraw_transaction(&contract, value, recipient).await; + let receipt = send_ics20_withdraw_transaction(&contract, value, recipient.clone()).await; + let expected_event = EventWithMetadata { + event: WithdrawalEvent::Ics20(Ics20WithdrawalFilter { + sender: wallet.address(), + destination_chain_address: recipient.clone(), + amount: value, + memo: "nootwashere".to_string(), + }), + block_number: receipt.block_number.unwrap(), + transaction_hash: receipt.transaction_hash, + }; + let Action::Ics20Withdrawal(mut expected_action) = + event_to_action(expected_event, denom.id(), denom, 1, bridge_address).unwrap() + else { + panic!("expected action to be Ics20Withdrawal"); + }; + expected_action.timeout_time = 0; // zero this for testing let mut batch = batch_rx.recv().await.unwrap(); assert_eq!(batch.actions.len(), 1); @@ -812,23 +758,9 @@ mod tests { .prefix(ASTRIA_ADDRESS_PREFIX) .try_build() .unwrap(); - let receipt = send_sequencer_withdraw_transaction_erc20(&contract, value, recipient).await; - let expected_event = EventWithMetadata { - event: WithdrawalEvent::Sequencer(SequencerWithdrawalFilter { - sender: wallet.address(), - destination_chain_address: recipient.to_string(), - amount: value, - }), - block_number: receipt.block_number.unwrap(), - transaction_hash: receipt.transaction_hash, - }; + let denom = default_native_asset(); let bridge_address = crate::astria_address([1u8; 20]); - let expected_action = - event_to_action(expected_event, denom.id(), denom.clone(), 1, bridge_address).unwrap(); - let Action::BridgeUnlock(expected_action) = expected_action else { - panic!("expected action to be BridgeUnlock, got {expected_action:?}"); - }; let (batch_tx, mut batch_rx) = mpsc::channel(100); let (startup_tx, startup_rx) = oneshot::channel(); @@ -846,7 +778,7 @@ mod tests { submitter_handle, shutdown_token: CancellationToken::new(), state: Arc::new(State::new()), - rollup_asset_denom: denom, + rollup_asset_denom: denom.clone(), bridge_address, } .build() @@ -854,8 +786,21 @@ mod tests { tokio::task::spawn(watcher.run()); - // make another tx to trigger anvil to make another block - send_sequencer_withdraw_transaction_erc20(&contract, value, recipient).await; + let receipt = send_sequencer_withdraw_transaction_erc20(&contract, value, recipient).await; + let expected_event = EventWithMetadata { + event: WithdrawalEvent::Sequencer(SequencerWithdrawalFilter { + sender: wallet.address(), + destination_chain_address: recipient.to_string(), + amount: value, + }), + block_number: receipt.block_number.unwrap(), + transaction_hash: receipt.transaction_hash, + }; + let expected_action = + event_to_action(expected_event, denom.id(), denom, 1, bridge_address).unwrap(); + let Action::BridgeUnlock(expected_action) = expected_action else { + panic!("expected action to be BridgeUnlock, got {expected_action:?}"); + }; let batch = batch_rx.recv().await.unwrap(); assert_eq!(batch.actions.len(), 1); @@ -907,30 +852,9 @@ mod tests { let value = 1_000_000_000.into(); let recipient = "somebech32address".to_string(); - let receipt = send_ics20_withdraw_transaction_astria_bridgeable_erc20( - &contract, - value, - recipient.clone(), - ) - .await; - let expected_event = EventWithMetadata { - event: WithdrawalEvent::Ics20(Ics20WithdrawalFilter { - sender: wallet.address(), - destination_chain_address: recipient.clone(), - amount: value, - memo: "nootwashere".to_string(), - }), - block_number: receipt.block_number.unwrap(), - transaction_hash: receipt.transaction_hash, - }; + let denom = "transfer/channel-0/utia".parse::().unwrap(); let bridge_address = crate::astria_address([1u8; 20]); - let Action::Ics20Withdrawal(mut expected_action) = - event_to_action(expected_event, denom.id(), denom.clone(), 1, bridge_address).unwrap() - else { - panic!("expected action to be Ics20Withdrawal"); - }; - expected_action.timeout_time = 0; // zero this for testing let (batch_tx, mut batch_rx) = mpsc::channel(100); let (startup_tx, startup_rx) = oneshot::channel(); @@ -948,7 +872,7 @@ mod tests { submitter_handle, shutdown_token: CancellationToken::new(), state: Arc::new(State::new()), - rollup_asset_denom: denom, + rollup_asset_denom: denom.clone(), bridge_address, } .build() @@ -956,8 +880,28 @@ mod tests { tokio::task::spawn(watcher.run()); - // make another tx to trigger anvil to make another block - send_ics20_withdraw_transaction_astria_bridgeable_erc20(&contract, value, recipient).await; + let receipt = send_ics20_withdraw_transaction_astria_bridgeable_erc20( + &contract, + value, + recipient.clone(), + ) + .await; + let expected_event = EventWithMetadata { + event: WithdrawalEvent::Ics20(Ics20WithdrawalFilter { + sender: wallet.address(), + destination_chain_address: recipient.clone(), + amount: value, + memo: "nootwashere".to_string(), + }), + block_number: receipt.block_number.unwrap(), + transaction_hash: receipt.transaction_hash, + }; + let Action::Ics20Withdrawal(mut expected_action) = + event_to_action(expected_event, denom.id(), denom, 1, bridge_address).unwrap() + else { + panic!("expected action to be Ics20Withdrawal"); + }; + expected_action.timeout_time = 0; // zero this for testing let mut batch = batch_rx.recv().await.unwrap(); assert_eq!(batch.actions.len(), 1); From f3eabb48502cf602fd44a97b240a1949947c6dda Mon Sep 17 00:00:00 2001 From: elizabeth Date: Tue, 25 Jun 2024 17:13:10 -0400 Subject: [PATCH 3/5] add logic for syncing from next to batch to current --- .../src/bridge_withdrawer/batch.rs | 1 + .../src/bridge_withdrawer/ethereum/watcher.rs | 277 ++++++++++++++---- 2 files changed, 226 insertions(+), 52 deletions(-) diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/batch.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/batch.rs index c99d7937c1..7a56861819 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/batch.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/batch.rs @@ -1,5 +1,6 @@ use astria_core::protocol::transaction::v1alpha1::Action; +#[derive(Debug)] pub(crate) struct Batch { /// The withdrawal payloads pub(crate) actions: Vec, diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs index e7bbeadb41..81be90aa56 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs @@ -17,6 +17,7 @@ use astria_core::{ use astria_eyre::{ eyre::{ self, + bail, eyre, WrapErr as _, }, @@ -42,6 +43,7 @@ use ethers::{ use tokio::select; use tokio_util::sync::CancellationToken; use tracing::{ + debug, info, warn, }; @@ -147,12 +149,10 @@ impl Watcher { asset_withdrawal_divisor, }; - // sync any blocks missing between `next_rollup_block_height` and the current latest - // (inclusive). - let block_handler = tokio::task::spawn(watch_for_blocks( - provider, + provider.clone(), contract.address(), + next_rollup_block_height, converter, submitter_handle, shutdown_token.clone(), @@ -258,9 +258,47 @@ impl Watcher { } } +async fn sync_from_next_rollup_block_height( + provider: Arc>, + contract_address: ethers::types::Address, + converter: &EventToActionConvertConfig, + submitter_handle: &submitter::Handle, + next_rollup_block_height_to_check: u64, + current_rollup_block_height: u64, +) -> Result<()> { + if current_rollup_block_height < next_rollup_block_height_to_check { + return Ok(()); + } + + for i in next_rollup_block_height_to_check..=current_rollup_block_height { + let Some(block) = provider + .get_block(i) + .await + .wrap_err("failed to get block")? + else { + warn!("block with number {i} missing; skipping"); + continue; + }; + + get_and_send_events_at_block( + provider.clone(), + contract_address, + block, + converter, + submitter_handle, + ) + .await + .wrap_err("failed to get and send events at block")?; + } + + info!("synced from {next_rollup_block_height_to_check} to {current_rollup_block_height}"); + Ok(()) +} + async fn watch_for_blocks( provider: Arc>, contract_address: ethers::types::Address, + next_rollup_block_height: u64, converter: EventToActionConvertConfig, submitter_handle: submitter::Handle, shutdown_token: CancellationToken, @@ -270,6 +308,29 @@ async fn watch_for_blocks( .await .wrap_err("failed to subscribe to blocks")?; + // read latest block height from subscription; + // use this value for syncing from the next height to submit to current. + let Some(current_rollup_block) = block_rx.next().await else { + bail!("failed to get current rollup block from subscription") + }; + + let Some(current_rollup_block_height) = current_rollup_block.number else { + bail!("current rollup block missing block number") + }; + + // sync any blocks missing between `next_rollup_block_height` and the current latest + // (inclusive). + sync_from_next_rollup_block_height( + provider.clone(), + contract_address, + &converter, + &submitter_handle, + next_rollup_block_height, + current_rollup_block_height.as_u64(), + ) + .await + .wrap_err("failed to sync from next rollup block height")?; + loop { select! { () = shutdown_token.cancelled() => { @@ -277,56 +338,83 @@ async fn watch_for_blocks( return Ok(()); } block = block_rx.next() => { - if let Some(Block { number, hash, .. }) = block { - let Some(block_hash) = hash else { - // don't think this should happen - warn!("block hash missing; skipping"); - continue; - }; - - let Some(number) = number else { - // don't think this should happen - warn!("block number missing; skipping"); - continue; - }; - - let mut batch = Batch { - actions: Vec::new(), - rollup_height: number.as_u64(), - }; - - let sequencer_withdrawal_events = get_sequencer_withdrawal_events(provider.clone(), contract_address, block_hash).await.wrap_err("failed to get sequencer withdrawal events")?; - let ics20_withdrawal_events = get_ics20_withdrawal_events(provider.clone(), contract_address, block_hash).await.wrap_err("failed to get ics20 withdrawal events")?; - let events = vec![sequencer_withdrawal_events, ics20_withdrawal_events].into_iter().flatten(); - for (event, log) in events { - let Some(transaction_hash) = log.transaction_hash else { - warn!("transaction hash missing; skipping"); - continue; - }; - - let event_with_metadata = EventWithMetadata { - event, - block_number: number, - transaction_hash, - }; - let action = converter.convert(event_with_metadata).wrap_err("failed to convert event to action")?; - batch.actions.push(action); - } - - if !batch.actions.is_empty() { - submitter_handle.send_batch(batch) - .await - .wrap_err("failed to send batched events; receiver dropped?")?; - } - + if let Some(block) = block { + get_and_send_events_at_block( + provider.clone(), + contract_address, + block, + &converter, + &submitter_handle, + ) + .await + .wrap_err("failed to get and send events at block")?; } - - } } } } +async fn get_and_send_events_at_block( + provider: Arc>, + contract_address: ethers::types::Address, + block: Block, + converter: &EventToActionConvertConfig, + submitter_handle: &submitter::Handle, +) -> Result<()> { + let Some(block_hash) = block.hash else { + bail!("block hash missing; skipping") + }; + + let Some(number) = block.number else { + bail!("block number missing; skipping") + }; + + let sequencer_withdrawal_events = + get_sequencer_withdrawal_events(provider.clone(), contract_address, block_hash) + .await + .wrap_err("failed to get sequencer withdrawal events")?; + let ics20_withdrawal_events = + get_ics20_withdrawal_events(provider.clone(), contract_address, block_hash) + .await + .wrap_err("failed to get ics20 withdrawal events")?; + let events = vec![sequencer_withdrawal_events, ics20_withdrawal_events] + .into_iter() + .flatten(); + let mut batch = Batch { + actions: Vec::new(), + rollup_height: 0, + }; + for (event, log) in events { + let Some(transaction_hash) = log.transaction_hash else { + warn!("transaction hash missing; skipping"); + continue; + }; + + let event_with_metadata = EventWithMetadata { + event, + block_number: number, + transaction_hash, + }; + let action = converter + .convert(event_with_metadata) + .wrap_err("failed to convert event to action")?; + batch.actions.push(action); + } + + if batch.actions.is_empty() { + debug!("no actions to send at block {number}"); + } else { + let actions_len = batch.actions.len(); + submitter_handle + .send_batch(batch) + .await + .wrap_err("failed to send batched events; receiver dropped?")?; + debug!("sent batch with {} actions at block {number}", actions_len); + } + + Ok(()) +} + async fn get_sequencer_withdrawal_events( provider: Arc>, contract_address: ethers::types::Address, @@ -389,6 +477,7 @@ async fn get_ics20_withdrawal_events( Ok(events) } +#[derive(Clone)] struct EventToActionConvertConfig { fee_asset_id: asset::Id, rollup_asset_denom: Denom, @@ -444,6 +533,7 @@ mod tests { }; use tokio::sync::{ mpsc, + mpsc::error::TryRecvError::Empty, oneshot, }; @@ -556,7 +646,7 @@ mod tests { startup_tx .send(SequencerStartupInfo { fee_asset_id: denom.id(), - next_batch_rollup_height: 0, + next_batch_rollup_height: 1, }) .unwrap(); @@ -599,6 +689,86 @@ mod tests { ); }; assert_eq!(action, &expected_action); + assert_eq!(batch_rx.try_recv().unwrap_err(), Empty); + } + + #[tokio::test] + #[ignore = "requires foundry to be installed"] + async fn watcher_can_watch_sequencer_withdrawals_astria_withdrawer_sync_from_next_rollup_height() + { + let (contract_address, provider, wallet, anvil) = + ConfigureAstriaWithdrawerDeployer::default().deploy().await; + let signer = Arc::new(SignerMiddleware::new(provider, wallet.clone())); + let contract = AstriaWithdrawer::new(contract_address, signer.clone()); + + let value = 1_000_000_000.into(); + let recipient = Address::builder() + .array([1u8; 20]) + .prefix(ASTRIA_ADDRESS_PREFIX) + .try_build() + .unwrap(); + + let bridge_address = crate::astria_address([1u8; 20]); + let denom = default_native_asset(); + + // send tx before watcher starts + let receipt = send_sequencer_withdraw_transaction(&contract, value, recipient).await; + + let expected_event = EventWithMetadata { + event: WithdrawalEvent::Sequencer(SequencerWithdrawalFilter { + sender: wallet.address(), + destination_chain_address: recipient.to_string(), + amount: value, + }), + block_number: receipt.block_number.unwrap(), + transaction_hash: receipt.transaction_hash, + }; + let expected_action = + event_to_action(expected_event, denom.id(), denom.clone(), 1, bridge_address).unwrap(); + let Action::BridgeUnlock(expected_action) = expected_action else { + panic!("expected action to be BridgeUnlock, got {expected_action:?}"); + }; + + let (batch_tx, mut batch_rx) = mpsc::channel(100); + let (startup_tx, startup_rx) = oneshot::channel(); + let submitter_handle = submitter::Handle::new(startup_rx, batch_tx); + startup_tx + .send(SequencerStartupInfo { + fee_asset_id: denom.id(), + next_batch_rollup_height: 1, + }) + .unwrap(); + + let watcher = Builder { + ethereum_contract_address: hex::encode(contract_address), + ethereum_rpc_endpoint: anvil.ws_endpoint(), + submitter_handle, + shutdown_token: CancellationToken::new(), + state: Arc::new(State::new()), + rollup_asset_denom: denom.clone(), + bridge_address, + } + .build() + .unwrap(); + + tokio::task::spawn(watcher.run()); + + // send another tx to trigger a new block + send_sequencer_withdraw_transaction(&contract, value, recipient).await; + + let batch = batch_rx.recv().await.unwrap(); + assert_eq!(batch.actions.len(), 1); + let Action::BridgeUnlock(action) = &batch.actions[0] else { + panic!( + "expected action to be BridgeUnlock, got {:?}", + batch.actions[0] + ); + }; + assert_eq!(action, &expected_action); + + // should receive a second batch containing the second tx + let batch = batch_rx.recv().await.unwrap(); + assert_eq!(batch.actions.len(), 1); } async fn send_ics20_withdraw_transaction( @@ -645,7 +815,7 @@ mod tests { startup_tx .send(SequencerStartupInfo { fee_asset_id: denom.id(), - next_batch_rollup_height: 0, + next_batch_rollup_height: 1, }) .unwrap(); @@ -691,6 +861,7 @@ mod tests { }; action.timeout_time = 0; // zero this for testing assert_eq!(action, &expected_action); + assert_eq!(batch_rx.try_recv().unwrap_err(), Empty); } async fn mint_tokens( @@ -768,7 +939,7 @@ mod tests { startup_tx .send(SequencerStartupInfo { fee_asset_id: denom.id(), - next_batch_rollup_height: 0, + next_batch_rollup_height: 1, }) .unwrap(); @@ -811,6 +982,7 @@ mod tests { ); }; assert_eq!(action, &expected_action); + assert_eq!(batch_rx.try_recv().unwrap_err(), Empty); } async fn send_ics20_withdraw_transaction_astria_bridgeable_erc20( @@ -862,7 +1034,7 @@ mod tests { startup_tx .send(SequencerStartupInfo { fee_asset_id: asset::Id::from_str_unchecked("transfer/channel-0/utia"), - next_batch_rollup_height: 0, + next_batch_rollup_height: 1, }) .unwrap(); @@ -913,5 +1085,6 @@ mod tests { }; action.timeout_time = 0; // zero this for testing assert_eq!(action, &expected_action); + assert_eq!(batch_rx.try_recv().unwrap_err(), Empty); } } From 5f5df83a0537546d4422d5a99164a67a86c0153d Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 26 Jun 2024 14:20:19 -0400 Subject: [PATCH 4/5] fmt --- .../src/bridge_withdrawer/ethereum/watcher.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs index b0aec7772f..5a795ba272 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs @@ -672,8 +672,15 @@ mod tests { block_number: receipt.block_number.unwrap(), transaction_hash: receipt.transaction_hash, }; - let expected_action = - event_to_action(expected_event, denom.id(), denom, 1, bridge_address, crate::ASTRIA_ADDRESS_PREFIX).unwrap(); + let expected_action = event_to_action( + expected_event, + denom.id(), + denom, + 1, + bridge_address, + crate::ASTRIA_ADDRESS_PREFIX, + ) + .unwrap(); let Action::BridgeUnlock(expected_action) = expected_action else { panic!("expected action to be BridgeUnlock, got {expected_action:?}"); }; From b5f991bc7a031bd5611e43a27912edd20bddf9d3 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 26 Jun 2024 14:27:08 -0400 Subject: [PATCH 5/5] address comments --- .../src/bridge_withdrawer/ethereum/watcher.rs | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs index 5a795ba272..54f1d2ba62 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs @@ -149,27 +149,25 @@ impl Watcher { let converter = EventToActionConvertConfig { fee_asset_id, - rollup_asset_denom: rollup_asset_denom.clone(), + rollup_asset_denom, bridge_address, asset_withdrawal_divisor, sequencer_address_prefix, }; - let block_handler = tokio::task::spawn(watch_for_blocks( - provider.clone(), - contract.address(), - next_rollup_block_height, - converter, - submitter_handle, - shutdown_token.clone(), - )); - state.set_watcher_ready(); tokio::select! { - res = block_handler => { + res = watch_for_blocks( + provider, + contract.address(), + next_rollup_block_height, + converter, + submitter_handle, + shutdown_token.clone(), + ) => { info!("block handler exited"); - res.context("block handler exited")? + res.context("block handler exited") } () = shutdown_token.cancelled() => { info!("watcher shutting down"); @@ -282,8 +280,7 @@ async fn sync_from_next_rollup_block_height( .await .wrap_err("failed to get block")? else { - warn!("block with number {i} missing; skipping"); - continue; + bail!("block with number {i} missing"); }; get_and_send_events_at_block( @@ -354,6 +351,8 @@ async fn watch_for_blocks( ) .await .wrap_err("failed to get and send events at block")?; + } else { + bail!("block subscription ended") } } } @@ -371,7 +370,7 @@ async fn get_and_send_events_at_block( bail!("block hash missing; skipping") }; - let Some(number) = block.number else { + let Some(block_number) = block.number else { bail!("block number missing; skipping") }; @@ -388,7 +387,7 @@ async fn get_and_send_events_at_block( .flatten(); let mut batch = Batch { actions: Vec::new(), - rollup_height: 0, + rollup_height: block_number.as_u64(), }; for (event, log) in events { let Some(transaction_hash) = log.transaction_hash else { @@ -398,7 +397,7 @@ async fn get_and_send_events_at_block( let event_with_metadata = EventWithMetadata { event, - block_number: number, + block_number, transaction_hash, }; let action = converter @@ -408,14 +407,17 @@ async fn get_and_send_events_at_block( } if batch.actions.is_empty() { - debug!("no actions to send at block {number}"); + debug!("no actions to send at block {block_number}"); } else { let actions_len = batch.actions.len(); submitter_handle .send_batch(batch) .await .wrap_err("failed to send batched events; receiver dropped?")?; - debug!("sent batch with {} actions at block {number}", actions_len); + debug!( + "sent batch with {} actions at block {block_number}", + actions_len + ); } Ok(())