diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/convert.rs b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/convert.rs index 90857f853d..5e08e55d57 100644 --- a/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/convert.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/convert.rs @@ -4,7 +4,6 @@ use astria_core::{ primitive::v1::{ asset, asset::Denom, - Address, }, protocol::transaction::v1alpha1::{ action::{ @@ -95,7 +94,7 @@ fn event_to_bridge_unlock( transaction_hash, }; let action = BridgeUnlockAction { - to: event.sender.to_fixed_bytes().into(), + to: event.destination_chain_address.to_fixed_bytes().into(), amount: event .amount .as_u128() @@ -127,11 +126,7 @@ fn event_to_ics20_withdrawal( // TODO: make this configurable const ICS20_WITHDRAWAL_TIMEOUT: Duration = Duration::from_secs(300); - let sender: [u8; 20] = event - .sender - .as_bytes() - .try_into() - .expect("U160 must be 20 bytes"); + let sender = event.sender.to_fixed_bytes().into(); let denom = rollup_asset_denom.clone(); let (_, channel) = denom @@ -152,7 +147,7 @@ fn event_to_ics20_withdrawal( // returned to the rollup. // this is only ok for now because addresses on the sequencer and the rollup are both 20 // bytes, but this won't work otherwise. - return_address: Address::from(sender), + return_address: sender, amount: event .amount .as_u128() @@ -207,7 +202,7 @@ mod tests { }; let expected_action = BridgeUnlockAction { - to: [0u8; 20].into(), + to: [1u8; 20].into(), amount: 99, memo: serde_json::to_vec(&BridgeUnlockMemo { block_number: 1.into(), @@ -239,7 +234,7 @@ mod tests { }; let expected_action = BridgeUnlockAction { - to: [0u8; 20].into(), + to: [1u8; 20].into(), amount: 99, memo: serde_json::to_vec(&BridgeUnlockMemo { block_number: 1.into(), diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/watcher.rs b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/watcher.rs index 64343d0057..6c97ec3186 100644 --- a/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/watcher.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/watcher.rs @@ -13,7 +13,9 @@ use astria_eyre::{ }; use ethers::{ contract::LogMeta, + core::types::Block, providers::{ + Middleware, Provider, StreamExt as _, Ws, @@ -26,6 +28,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{ + error, info, warn, }; @@ -106,7 +109,7 @@ impl Watcher { .await .wrap_err("failed to connect to ethereum RPC endpoint")?, ); - let contract = AstriaWithdrawer::new(contract_address, provider); + let contract = AstriaWithdrawer::new(contract_address, provider.clone()); let base_chain_asset_precision = contract .base_chain_asset_precision() @@ -121,6 +124,7 @@ impl Watcher { let batcher = Batcher::new( event_rx, + provider, batch_tx, &shutdown_token, fee_asset_id, @@ -214,6 +218,7 @@ async fn watch_for_ics20_withdrawal_events( struct Batcher { event_rx: mpsc::Receiver<(WithdrawalEvent, LogMeta)>, + provider: Arc>, batch_tx: mpsc::Sender, shutdown_token: CancellationToken, fee_asset_id: asset::Id, @@ -224,6 +229,7 @@ struct Batcher { impl Batcher { pub(crate) fn new( event_rx: mpsc::Receiver<(WithdrawalEvent, LogMeta)>, + provider: Arc>, batch_tx: mpsc::Sender, shutdown_token: &CancellationToken, fee_asset_id: asset::Id, @@ -232,6 +238,7 @@ impl Batcher { ) -> Self { Self { event_rx, + provider, batch_tx, shutdown_token: shutdown_token.clone(), fee_asset_id, @@ -239,10 +246,14 @@ impl Batcher { asset_withdrawal_divisor, } } -} -impl Batcher { pub(crate) async fn run(mut self) -> Result<()> { + let mut block_rx = self + .provider + .subscribe_blocks() + .await + .wrap_err("failed to subscribe to blocks")?; + let mut curr_batch = Batch { actions: Vec::new(), rollup_height: 0, @@ -254,6 +265,32 @@ impl Batcher { info!("batcher shutting down"); break; } + block = block_rx.next() => { + if let Some(Block { number, .. }) = block { + let Some(block_number) = number else { + // don't think this should happen + warn!("block number missing; skipping"); + continue; + }; + + if block_number.as_u64() > curr_batch.rollup_height { + if !curr_batch.actions.is_empty() { + self.batch_tx + .send(curr_batch) + .await + .wrap_err("failed to send batched events; receiver dropped?")?; + } + + curr_batch = Batch { + actions: Vec::new(), + rollup_height: block_number.as_u64(), + }; + } + } else { + error!("block stream closed; shutting down batcher"); + break; + } + } item = self.event_rx.recv() => { if let Some((event, meta)) = item { let event_with_metadata = EventWithMetadata { @@ -281,7 +318,7 @@ impl Batcher { }; } } else { - warn!("event receiver dropped; shutting down batcher"); + error!("event receiver dropped; shutting down batcher"); break; } }