diff --git a/crates/astria-bridge-withdrawer/local.env.example b/crates/astria-bridge-withdrawer/local.env.example index 3688ca6043..beb12ecb3a 100644 --- a/crates/astria-bridge-withdrawer/local.env.example +++ b/crates/astria-bridge-withdrawer/local.env.example @@ -34,6 +34,9 @@ ASTRIA_BRIDGE_WITHDRAWER_SEQUENCER_KEY_PATH=/path/to/priv_sequencer_key.json # The fee asset denomination to use for the bridge account's transactions. ASTRIA_BRIDGE_WITHDRAWER_FEE_ASSET_DENOMINATION="nria" +# The minimum expected balance of the fee asset in the bridge account. +ASTRIA_BRIDGE_WITHDRAWER_MIN_EXPECTED_FEE_ASSET_BALANCE=1000000 + # The asset denomination being withdrawn from the rollup. ASTRIA_BRIDGE_WITHDRAWER_ROLLUP_ASSET_DENOMINATION="nria", diff --git a/crates/astria-bridge-withdrawer/src/config.rs b/crates/astria-bridge-withdrawer/src/config.rs index 4156525d09..f6c710ce3a 100644 --- a/crates/astria-bridge-withdrawer/src/config.rs +++ b/crates/astria-bridge-withdrawer/src/config.rs @@ -17,6 +17,8 @@ pub struct Config { pub sequencer_key_path: String, // The fee asset denomination to use for the bridge account's transactions. pub fee_asset_denomination: String, + // The minimum expected balance of the fee asset in the bridge account. + pub min_expected_fee_asset_balance: u64, // The asset denomination being withdrawn from the rollup. pub rollup_asset_denomination: String, // The address of the AstriaWithdrawer contract on the evm rollup. diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/convert.rs b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/convert.rs index 47c20862e8..0430ea7e31 100644 --- a/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/convert.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/convert.rs @@ -113,6 +113,7 @@ fn event_to_bridge_unlock( memo: serde_json::to_vec(&memo).wrap_err("failed to serialize memo to json")?, fee_asset_id, }; + Ok(Action::BridgeUnlock(action)) } diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/watcher.rs b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/watcher.rs index e4ddb3ac5c..bf228135a0 100644 --- a/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/watcher.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/watcher.rs @@ -1,4 +1,7 @@ -use std::sync::Arc; +use std::{ + sync::Arc, + time::Duration, +}; use astria_core::primitive::v1::{ asset, @@ -6,6 +9,7 @@ use astria_core::primitive::v1::{ }; use astria_eyre::{ eyre::{ + self, eyre, WrapErr as _, }, @@ -17,6 +21,7 @@ use ethers::{ providers::{ Middleware, Provider, + ProviderError, StreamExt as _, Ws, }, @@ -44,6 +49,8 @@ use crate::withdrawer::{ }, }, state::State, + submitter, + SequencerStartupInfo, }; /// Watches for withdrawal events emitted by the `AstriaWithdrawer` contract. @@ -51,8 +58,7 @@ pub(crate) struct Watcher { // contract: AstriaWithdrawer>, contract_address: ethers::types::Address, ethereum_rpc_endpoint: String, - batch_tx: mpsc::Sender, - fee_asset_id: asset::Id, + submitter_handle: submitter::Handle, rollup_asset_denom: Denom, state: Arc, shutdown_token: CancellationToken, @@ -62,10 +68,9 @@ impl Watcher { pub(crate) fn new( ethereum_contract_address: &str, ethereum_rpc_endpoint: &str, - batch_tx: mpsc::Sender, + submitter_handle: submitter::Handle, shutdown_token: &CancellationToken, state: Arc, - fee_asset_id: asset::Id, rollup_asset_denom: Denom, ) -> Result { let contract_address = address_from_string(ethereum_contract_address) @@ -81,8 +86,7 @@ impl Watcher { Ok(Self { contract_address, ethereum_rpc_endpoint: ethereum_rpc_endpoint.to_string(), - batch_tx, - fee_asset_id, + submitter_handle, rollup_asset_denom, state, shutdown_token: shutdown_token.clone(), @@ -91,12 +95,13 @@ impl Watcher { } impl Watcher { - pub(crate) async fn run(self) -> Result<()> { - let Watcher { - contract_address, - ethereum_rpc_endpoint, - batch_tx, - fee_asset_id, + pub(crate) async fn run(mut self) -> Result<()> { + let (provider, contract, fee_asset_id, asset_withdrawal_divisor) = self.startup().await?; + + let Self { + contract_address: _contract_address, + ethereum_rpc_endpoint: _ethereum_rps_endpoint, + submitter_handle, rollup_asset_denom, state, shutdown_token, @@ -104,28 +109,10 @@ impl Watcher { let (event_tx, event_rx) = mpsc::channel(100); - let provider = Arc::new( - Provider::::connect(ethereum_rpc_endpoint) - .await - .wrap_err("failed to connect to ethereum RPC endpoint")?, - ); - let contract = IAstriaWithdrawer::new(contract_address, provider.clone()); - - let base_chain_asset_precision = contract - .base_chain_asset_precision() - .call() - .await - .wrap_err("failed to get asset withdrawal decimals")?; - let asset_withdrawal_divisor = - 10u128.pow(18u32.checked_sub(base_chain_asset_precision).expect( - "base_chain_asset_precision must be <= 18, as the contract constructor enforces \ - this", - )); - let batcher = Batcher::new( event_rx, provider, - batch_tx, + submitter_handle, &shutdown_token, fee_asset_id, rollup_asset_denom, @@ -156,12 +143,90 @@ impl Watcher { info!("ics20 withdrawal event handler exited"); res.context("ics20 withdrawal event handler exited")? } - () = shutdown_token.cancelled() => { + () = shutdown_token.cancelled() => { info!("watcher shutting down"); Ok(()) } } } + + /// Gets the startup data from the submitter and connects to the Ethereum node. + /// + /// Returns the contract handle, the asset ID of the fee asset, and the divisor for the asset + /// withdrawal amount. + /// + /// # Errors + /// - If the fee asset ID provided in the config is not a valid fee asset on the sequencer. + /// - If the Ethereum node cannot be connected to after several retries. + /// - If the asset withdrawal decimals cannot be fetched. + async fn startup( + &mut self, + ) -> eyre::Result<( + Arc>, + IAstriaWithdrawer>, + asset::Id, + u128, + )> { + // wait for submitter to be ready + let SequencerStartupInfo { + fee_asset_id, + } = self.submitter_handle.recv_startup_info().await?; + + // connect to eth node + let retry_config = tryhard::RetryFutureConfig::new(1024) + .exponential_backoff(Duration::from_millis(500)) + .max_delay(Duration::from_secs(60)) + .on_retry( + |attempt, next_delay: Option, error: &ProviderError| { + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + attempt, + wait_duration, + error = error as &dyn std::error::Error, + "attempt to connect to rollup node failed; retrying after backoff", + ); + futures::future::ready(()) + }, + ); + + let provider = tryhard::retry_fn(|| { + let url = self.ethereum_rpc_endpoint.clone(); + async move { + let websocket_client = Ws::connect_with_reconnects(url, 0).await?; + Ok(Provider::new(websocket_client)) + } + }) + .with_config(retry_config) + .await + .wrap_err("failed connecting to rollup after several retries; giving up")?; + let provider = Arc::new(provider); + + // get contract handle + let contract = IAstriaWithdrawer::new(self.contract_address, provider.clone()); + + // get asset withdrawal decimals + let base_chain_asset_precision = contract + .base_chain_asset_precision() + .call() + .await + .wrap_err("failed to get asset withdrawal decimals")?; + let asset_withdrawal_divisor = + 10u128.pow(18u32.checked_sub(base_chain_asset_precision).expect( + "base_chain_asset_precision must be <= 18, as the contract constructor enforces \ + this", + )); + + self.state.set_watcher_ready(); + + Ok(( + provider.clone(), + contract, + fee_asset_id, + asset_withdrawal_divisor, + )) + } } async fn watch_for_sequencer_withdrawal_events( @@ -174,7 +239,11 @@ async fn watch_for_sequencer_withdrawal_events( .from_block(from_block) .address(contract.address().into()); - let mut stream = events.stream().await.unwrap().with_meta(); + let mut stream = events + .stream() + .await + .wrap_err("failed to subscribe to sequencer withdrawal events")? + .with_meta(); while let Some(item) = stream.next().await { if let Ok((event, meta)) = item { @@ -200,7 +269,11 @@ async fn watch_for_ics20_withdrawal_events( .from_block(from_block) .address(contract.address().into()); - let mut stream = events.stream().await.unwrap().with_meta(); + let mut stream = events + .stream() + .await + .wrap_err("failed to subscribe to ics20 withdrawal events")? + .with_meta(); while let Some(item) = stream.next().await { if let Ok((event, meta)) = item { @@ -219,7 +292,7 @@ async fn watch_for_ics20_withdrawal_events( struct Batcher { event_rx: mpsc::Receiver<(WithdrawalEvent, LogMeta)>, provider: Arc>, - batch_tx: mpsc::Sender, + submitter_handle: submitter::Handle, shutdown_token: CancellationToken, fee_asset_id: asset::Id, rollup_asset_denom: Denom, @@ -230,7 +303,7 @@ impl Batcher { pub(crate) fn new( event_rx: mpsc::Receiver<(WithdrawalEvent, LogMeta)>, provider: Arc>, - batch_tx: mpsc::Sender, + submitter_handle: submitter::Handle, shutdown_token: &CancellationToken, fee_asset_id: asset::Id, rollup_asset_denom: Denom, @@ -239,7 +312,7 @@ impl Batcher { Self { event_rx, provider, - batch_tx, + submitter_handle, shutdown_token: shutdown_token.clone(), fee_asset_id, rollup_asset_denom, @@ -275,8 +348,7 @@ impl Batcher { if block_number.as_u64() > curr_batch.rollup_height { if !curr_batch.actions.is_empty() { - self.batch_tx - .send(curr_batch) + self.submitter_handle.send_batch(curr_batch) .await .wrap_err("failed to send batched events; receiver dropped?")?; } @@ -306,8 +378,7 @@ impl Batcher { } else { // block number increased; send current batch and start a new one if !curr_batch.actions.is_empty() { - self.batch_tx - .send(curr_batch) + self.submitter_handle.send_batch(curr_batch) .await .wrap_err("failed to send batched events; receiver dropped?")?; } @@ -356,6 +427,7 @@ mod tests { }, utils::hex, }; + use tokio::sync::oneshot; use super::*; use crate::withdrawer::ethereum::{ @@ -460,14 +532,21 @@ mod tests { panic!("expected action to be BridgeUnlock, got {expected_action:?}"); }; - let (event_tx, mut event_rx) = mpsc::channel(100); + let (batch_tx, mut batch_rx) = mpsc::channel(100); + let (startup_tx, startup_rx) = oneshot::channel(); + let submitter_handle = submitter::Handle::new(startup_rx, batch_tx); + startup_tx + .send(SequencerStartupInfo { + fee_asset_id: denom.id(), + }) + .unwrap(); + let watcher = Watcher::new( &hex::encode(contract_address), &anvil.ws_endpoint(), - event_tx, + submitter_handle, &CancellationToken::new(), Arc::new(State::new()), - denom.id(), denom, ) .unwrap(); @@ -477,7 +556,7 @@ mod tests { // make another tx to trigger anvil to make another block send_sequencer_withdraw_transaction(&contract, value, recipient).await; - let batch = event_rx.recv().await.unwrap(); + let batch = batch_rx.recv().await.unwrap(); assert_eq!(batch.actions.len(), 1); let Action::BridgeUnlock(action) = &batch.actions[0] else { panic!( @@ -541,14 +620,21 @@ mod tests { }; expected_action.timeout_time = 0; // zero this for testing - let (event_tx, mut event_rx) = mpsc::channel(100); + let (batch_tx, mut batch_rx) = mpsc::channel(100); + let (startup_tx, startup_rx) = oneshot::channel(); + let submitter_handle = submitter::Handle::new(startup_rx, batch_tx); + startup_tx + .send(SequencerStartupInfo { + fee_asset_id: denom.id(), + }) + .unwrap(); + let watcher = Watcher::new( &hex::encode(contract_address), &anvil.ws_endpoint(), - event_tx, + submitter_handle, &CancellationToken::new(), Arc::new(State::new()), - denom.id(), denom, ) .unwrap(); @@ -558,7 +644,7 @@ mod tests { // make another tx to trigger anvil to make another block send_ics20_withdraw_transaction(&contract, value, recipient).await; - let mut batch = event_rx.recv().await.unwrap(); + let mut batch = batch_rx.recv().await.unwrap(); assert_eq!(batch.actions.len(), 1); let Action::Ics20Withdrawal(ref mut action) = batch.actions[0] else { panic!( @@ -648,14 +734,21 @@ mod tests { panic!("expected action to be BridgeUnlock, got {expected_action:?}"); }; - let (event_tx, mut event_rx) = mpsc::channel(100); + let (batch_tx, mut batch_rx) = mpsc::channel(100); + let (startup_tx, startup_rx) = oneshot::channel(); + let submitter_handle = submitter::Handle::new(startup_rx, batch_tx); + startup_tx + .send(SequencerStartupInfo { + fee_asset_id: denom.id(), + }) + .unwrap(); + let watcher = Watcher::new( &hex::encode(contract_address), &anvil.ws_endpoint(), - event_tx, + submitter_handle, &CancellationToken::new(), Arc::new(State::new()), - denom.id(), denom, ) .unwrap(); @@ -665,7 +758,7 @@ mod tests { // make another tx to trigger anvil to make another block send_sequencer_withdraw_transaction_erc20(&contract, value, recipient).await; - let batch = event_rx.recv().await.unwrap(); + let batch = batch_rx.recv().await.unwrap(); assert_eq!(batch.actions.len(), 1); let Action::BridgeUnlock(action) = &batch.actions[0] else { panic!( @@ -739,14 +832,21 @@ mod tests { }; expected_action.timeout_time = 0; // zero this for testing - let (event_tx, mut event_rx) = mpsc::channel(100); + let (batch_tx, mut batch_rx) = mpsc::channel(100); + let (startup_tx, startup_rx) = oneshot::channel(); + let submitter_handle = submitter::Handle::new(startup_rx, batch_tx); + startup_tx + .send(SequencerStartupInfo { + fee_asset_id: asset::Id::from_denom("transfer/channel-0/utia"), + }) + .unwrap(); + let watcher = Watcher::new( &hex::encode(contract_address), &anvil.ws_endpoint(), - event_tx, + submitter_handle, &CancellationToken::new(), Arc::new(State::new()), - denom.id(), denom, ) .unwrap(); @@ -756,7 +856,7 @@ mod tests { // make another tx to trigger anvil to make another block send_ics20_withdraw_transaction_astria_bridgeable_erc20(&contract, value, recipient).await; - let mut batch = event_rx.recv().await.unwrap(); + let mut batch = batch_rx.recv().await.unwrap(); assert_eq!(batch.actions.len(), 1); let Action::Ics20Withdrawal(ref mut action) = batch.actions[0] else { panic!( diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/mod.rs b/crates/astria-bridge-withdrawer/src/withdrawer/mod.rs index 5c54075b71..85760b95c6 100644 --- a/crates/astria-bridge-withdrawer/src/withdrawer/mod.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/mod.rs @@ -65,18 +65,22 @@ impl Service { fee_asset_denomination, ethereum_contract_address, ethereum_rpc_endpoint, + rollup_asset_denomination, + min_expected_fee_asset_balance, .. } = cfg; let state = Arc::new(State::new()); // make submitter object - let (submitter, batches_tx) = submitter::Builder { + let (submitter, submitter_handle) = submitter::Builder { shutdown_token: shutdown_handle.token(), sequencer_cometbft_endpoint, sequencer_chain_id, sequencer_key_path, state: state.clone(), + expected_fee_asset_id: asset::Id::from_denom(&fee_asset_denomination), + min_expected_fee_asset_balance: u128::from(min_expected_fee_asset_balance), } .build() .wrap_err("failed to initialize submitter")?; @@ -84,11 +88,10 @@ impl Service { let ethereum_watcher = Watcher::new( ðereum_contract_address, ðereum_rpc_endpoint, - batches_tx, + submitter_handle, &shutdown_handle.token(), state.clone(), - asset::Id::from_denom(&fee_asset_denomination), - asset::Denom::from(cfg.rollup_asset_denomination), + asset::Denom::from(rollup_asset_denomination), ) .wrap_err("failed to initialize ethereum watcher")?; @@ -174,6 +177,11 @@ impl Service { } } +#[derive(Debug)] +pub struct SequencerStartupInfo { + pub fee_asset_id: asset::Id, +} + /// A handle for instructing the [`Service`] to shut down. /// /// It is returned along with its related `Service` from [`Service::new`]. The diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/builder.rs b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/builder.rs index d58d7f0a48..47f85b69a1 100644 --- a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/builder.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/builder.rs @@ -1,46 +1,92 @@ use std::sync::Arc; +use astria_core::primitive::v1::asset; use astria_eyre::eyre::{ self, Context as _, }; -use tokio::sync::mpsc; +use tokio::sync::{ + mpsc, + oneshot, +}; use tokio_util::sync::CancellationToken; use tracing::info; use super::state::State; -use crate::withdrawer::submitter::Batch; +use crate::withdrawer::{ + submitter::Batch, + SequencerStartupInfo, +}; const BATCH_QUEUE_SIZE: usize = 256; +pub(crate) struct Handle { + startup_info_rx: Option>, + batches_tx: mpsc::Sender, +} + +impl Handle { + pub(crate) fn new( + startup_info_rx: oneshot::Receiver, + batches_tx: mpsc::Sender, + ) -> Self { + Self { + startup_info_rx: Some(startup_info_rx), + batches_tx, + } + } + + pub(crate) async fn recv_startup_info(&mut self) -> eyre::Result { + self.startup_info_rx + .take() + .expect("startup info should only be taken once - this is a bug") + .await + .wrap_err("failed to get startup info from submitter. channel was dropped.") + } + + pub(crate) async fn send_batch(&self, batch: Batch) -> eyre::Result<()> { + self.batches_tx + .send(batch) + .await + .wrap_err("failed to send batch") + } +} + pub(crate) struct Builder { pub(crate) shutdown_token: CancellationToken, pub(crate) sequencer_key_path: String, pub(crate) sequencer_chain_id: String, pub(crate) sequencer_cometbft_endpoint: String, pub(crate) state: Arc, + pub(crate) expected_fee_asset_id: asset::Id, + pub(crate) min_expected_fee_asset_balance: u128, } impl Builder { /// Instantiates an `Submitter`. - pub(crate) fn build(self) -> eyre::Result<(super::Submitter, mpsc::Sender)> { + pub(crate) fn build(self) -> eyre::Result<(super::Submitter, Handle)> { let Self { shutdown_token, sequencer_key_path, sequencer_chain_id, sequencer_cometbft_endpoint, state, + expected_fee_asset_id, + min_expected_fee_asset_balance, } = self; let signer = super::signer::SequencerKey::try_from_path(sequencer_key_path) .wrap_err("failed to load sequencer private ky")?; info!(address = %telemetry::display::hex(&signer.address), "loaded sequencer signer"); - let (batches_tx, batches_rx) = tokio::sync::mpsc::channel(BATCH_QUEUE_SIZE); let sequencer_cometbft_client = sequencer_client::HttpClient::new(&*sequencer_cometbft_endpoint) .wrap_err("failed constructing cometbft http client")?; + let (batches_tx, batches_rx) = tokio::sync::mpsc::channel(BATCH_QUEUE_SIZE); + let (startup_tx, startup_rx) = tokio::sync::oneshot::channel(); + let handle = Handle::new(startup_rx, batches_tx); + Ok(( super::Submitter { shutdown_token, @@ -49,8 +95,11 @@ impl Builder { sequencer_cometbft_client, signer, sequencer_chain_id, + startup_tx, + expected_fee_asset_id, + min_expected_fee_asset_balance, }, - batches_tx, + handle, )) } } diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/mod.rs b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/mod.rs index dd475a43c8..b28f20b32d 100644 --- a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/mod.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/mod.rs @@ -3,30 +3,45 @@ use std::{ time::Duration, }; -use astria_core::protocol::transaction::v1alpha1::{ - Action, - TransactionParams, - UnsignedTransaction, +use astria_core::{ + primitive::v1::asset, + protocol::{ + asset::v1alpha1::AllowedFeeAssetIdsResponse, + transaction::v1alpha1::{ + Action, + TransactionParams, + UnsignedTransaction, + }, + }, }; use astria_eyre::eyre::{ self, ensure, eyre, Context, + OptionExt, }; pub(crate) use builder::Builder; +pub(super) use builder::Handle; use sequencer_client::{ - tendermint_rpc, - tendermint_rpc::endpoint::broadcast::tx_commit, + tendermint_rpc::{ + self, + endpoint::broadcast::tx_commit, + }, Address, - SequencerClientExt as _, + SequencerClientExt, SignedTransaction, }; use signer::SequencerKey; use state::State; use tokio::{ select, - sync::mpsc, + sync::{ + mpsc, + oneshot::{ + self, + }, + }, time::Instant, }; use tokio_util::sync::CancellationToken; @@ -44,6 +59,7 @@ use tracing::{ use super::{ batch::Batch, state, + SequencerStartupInfo, }; mod builder; @@ -58,17 +74,18 @@ pub(super) struct Submitter { sequencer_cometbft_client: sequencer_client::HttpClient, signer: SequencerKey, sequencer_chain_id: String, + startup_tx: oneshot::Sender, + expected_fee_asset_id: asset::Id, + min_expected_fee_asset_balance: u128, } impl Submitter { pub(super) async fn run(mut self) -> eyre::Result<()> { - let actual_chain_id = - get_sequencer_chain_id(self.sequencer_cometbft_client.clone()).await?; - ensure!( - self.sequencer_chain_id == actual_chain_id.to_string(), - "sequencer_chain_id provided in config does not match chain_id returned from sequencer" - ); - self.state.set_submitter_ready(); + // call startup + let startup = self.startup().await?; + self.startup_tx + .send(startup) + .map_err(|_startup| eyre!("failed to send startup info to watcher"))?; let reason = loop { select!( @@ -84,7 +101,12 @@ impl Submitter { info!("received None from batch channel, shutting down"); break Err(eyre!("batch channel closed")); }; - if let Err(e) = self.process_batch(actions, rollup_height).await { + if let Err(e) = process_batch( + self.sequencer_cometbft_client.clone(), + &self.signer, + self.state.clone(), + &self.sequencer_chain_id, + actions, rollup_height).await { break Err(e); } } @@ -107,77 +129,129 @@ impl Submitter { Ok(()) } - async fn process_batch( - &mut self, - actions: Vec, - rollup_height: u64, - ) -> eyre::Result<()> { - // get nonce and make unsigned transaction - let nonce = get_latest_nonce( - self.sequencer_cometbft_client.clone(), - self.signer.address, - self.state.clone(), - ) - .await?; - debug!(nonce, "fetched latest nonce"); - - let unsigned = UnsignedTransaction { - actions, - params: TransactionParams::builder() - .nonce(nonce) - .chain_id(&self.sequencer_chain_id) - .try_build() - .context( - "failed to construct transcation parameters from latest nonce and configured \ - sequencer chain ID", - )?, - }; + /// Confirms the config values used for initialization against the sequencer node's cometbft + /// instance and set the submitter state to ready. + /// + /// # Errors + /// + /// - `self.chain_id` does not match the value returned from the sequencer node + /// - `self.fee_asset_id` is not a valid fee asset on the sequencer node + /// - `self.sequencer_key.address` does not have a sufficient balance of `self.fee_asset_id`. + async fn startup(&mut self) -> eyre::Result { + let actual_chain_id = + get_sequencer_chain_id(self.sequencer_cometbft_client.clone(), self.state.clone()) + .await?; + ensure!( + self.sequencer_chain_id == actual_chain_id.to_string(), + "sequencer_chain_id provided in config does not match chain_id returned from sequencer" + ); - // sign transaction - let signed = unsigned.into_signed(&self.signer.signing_key); - debug!(tx_hash = %telemetry::display::hex(&signed.sha256_of_proto_encoding()), "signed transaction"); + // confirm that the fee asset ID is valid + let allowed_fee_asset_ids_resp = + get_allowed_fee_asset_ids(self.sequencer_cometbft_client.clone(), self.state.clone()) + .await?; + ensure!( + allowed_fee_asset_ids_resp + .fee_asset_ids + .contains(&self.expected_fee_asset_id), + "fee_asset_id provided in config is not a valid fee asset on the sequencer" + ); - // submit transaction and handle response - let rsp = submit_tx( - self.sequencer_cometbft_client.clone(), - signed, - self.state.clone(), - ) + // confirm that the sequencer key has a sufficient balance of the fee asset + let fee_asset_balances = self + .sequencer_cometbft_client + .get_latest_balance(self.signer.address) + .await?; + let fee_asset_balance = fee_asset_balances + .balances + .into_iter() + .find(|balance| balance.denom.id() == self.expected_fee_asset_id) + .ok_or_eyre("withdrawer's account does not have the minimum balance of the fee asset")? + .balance; + ensure!( + fee_asset_balance >= self.min_expected_fee_asset_balance, + "sequencer key does not have a sufficient balance of the fee asset" + ); + + self.state.set_submitter_ready(); + + // send startup info to watcher + let startup = SequencerStartupInfo { + fee_asset_id: self.expected_fee_asset_id, + }; + Ok(startup) + } +} + +async fn process_batch( + sequencer_cometbft_client: sequencer_client::HttpClient, + sequnecer_key: &SequencerKey, + state: Arc, + sequencer_chain_id: &str, + actions: Vec, + rollup_height: u64, +) -> eyre::Result<()> { + // get nonce and make unsigned transaction + let nonce = get_latest_nonce( + sequencer_cometbft_client.clone(), + sequnecer_key.address, + state.clone(), + ) + .await?; + debug!(nonce, "fetched latest nonce"); + + let unsigned = UnsignedTransaction { + actions, + params: TransactionParams::builder() + .nonce(nonce) + .chain_id(sequencer_chain_id) + .try_build() + .context( + "failed to construct transcation parameters from latest nonce and configured \ + sequencer chain ID", + )?, + }; + + // sign transaction + let signed = unsigned.into_signed(&sequnecer_key.signing_key); + debug!(tx_hash = %telemetry::display::hex(&signed.sha256_of_proto_encoding()), "signed transaction"); + + // submit transaction and handle response + let rsp = submit_tx(sequencer_cometbft_client.clone(), signed, state.clone()) .await .context("failed to submit transaction to to cometbft")?; - if let tendermint::abci::Code::Err(check_tx_code) = rsp.check_tx.code { - error!( - abci.code = check_tx_code, - abci.log = rsp.check_tx.log, - rollup.height = rollup_height, - "transaction failed to be included in the mempool, aborting." - ); - Err(eyre!( - "check_tx failure upon submitting transaction to sequencer" - )) - } else if let tendermint::abci::Code::Err(deliver_tx_code) = rsp.tx_result.code { - error!( - abci.code = deliver_tx_code, - abci.log = rsp.tx_result.log, - rollup.height = rollup_height, - "transaction failed to be executed in a block, aborting." - ); - Err(eyre!( - "deliver_tx failure upon submitting transaction to sequencer" - )) - } else { - // update state after successful submission - info!( - sequencer.block = rsp.height.value(), - sequencer.tx_hash = %rsp.hash, - rollup.height = rollup_height, - "withdraw batch successfully executed." - ); - self.state.set_last_rollup_height_submitted(rollup_height); - self.state.set_last_sequencer_height(rsp.height.value()); - self.state.set_last_sequencer_tx_hash(rsp.hash); - Ok(()) - } + if let tendermint::abci::Code::Err(check_tx_code) = rsp.check_tx.code { + error!( + abci.code = check_tx_code, + abci.log = rsp.check_tx.log, + rollup.height = rollup_height, + "transaction failed to be included in the mempool, aborting." + ); + Err(eyre!( + "check_tx failure upon submitting transaction to sequencer" + )) + } else if let tendermint::abci::Code::Err(deliver_tx_code) = rsp.tx_result.code { + error!( + abci.code = deliver_tx_code, + abci.log = rsp.tx_result.log, + rollup.height = rollup_height, + "transaction failed to be executed in a block, aborting." + ); + Err(eyre!( + "deliver_tx failure upon submitting transaction to sequencer" + )) + } else { + // update state after successful submission + info!( + sequencer.block = rsp.height.value(), + sequencer.tx_hash = %rsp.hash, + rollup.height = rollup_height, + "withdraw batch successfully executed." + ); + state.set_last_rollup_height_submitted(rollup_height); + state.set_last_sequencer_height(rsp.height.value()); + state.set_last_sequencer_tx_hash(rsp.hash); + Ok(()) } } @@ -295,8 +369,10 @@ async fn submit_tx( res } +#[instrument(skip_all)] async fn get_sequencer_chain_id( client: sequencer_client::HttpClient, + state: Arc, ) -> eyre::Result { use sequencer_client::Client as _; @@ -305,6 +381,9 @@ async fn get_sequencer_chain_id( .max_delay(Duration::from_secs(20)) .on_retry( |attempt: u32, next_delay: Option, error: &tendermint_rpc::Error| { + let state = Arc::clone(&state); + state.set_sequencer_connected(false); + let wait_duration = next_delay .map(humantime::format_duration) .map(tracing::field::display); @@ -323,5 +402,45 @@ async fn get_sequencer_chain_id( .await .wrap_err("failed to get genesis info from Sequencer after a lot of attempts")?; + state.set_sequencer_connected(true); + Ok(genesis.chain_id) } + +#[instrument(skip_all)] +async fn get_allowed_fee_asset_ids( + client: sequencer_client::HttpClient, + state: Arc, +) -> eyre::Result { + let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(20)) + .on_retry( + |attempt: u32, + next_delay: Option, + error: &sequencer_client::extension_trait::Error| { + let state = Arc::clone(&state); + state.set_sequencer_connected(false); + + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + attempt, + wait_duration, + error = error as &dyn std::error::Error, + "attempt to fetch sequencer allowed fee asset ids; retrying after backoff", + ); + futures::future::ready(()) + }, + ); + + let res = tryhard::retry_fn(|| client.get_allowed_fee_asset_ids()) + .with_config(retry_config) + .await + .wrap_err("failed to get allowed fee asset ids from Sequencer after a lot of attempts"); + + state.set_sequencer_connected(res.is_ok()); + + res +} diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/tests.rs b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/tests.rs index 21cdcb91fd..20f2a0b34a 100644 --- a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/tests.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/tests.rs @@ -2,21 +2,28 @@ use std::{ io::Write as _, sync::Arc, time::Duration, + vec, }; use astria_core::{ generated::protocol::account::v1alpha1::NonceResponse, primitive::v1::{ - asset::Denom, + asset::{ + self, + Denom, + }, Address, ASTRIA_ADDRESS_PREFIX, }, - protocol::transaction::v1alpha1::{ - action::{ - BridgeUnlockAction, - Ics20Withdrawal, + protocol::{ + account::v1alpha1::AssetBalance, + transaction::v1alpha1::{ + action::{ + BridgeUnlockAction, + Ics20Withdrawal, + }, + Action, }, - Action, }, }; use astria_eyre::eyre; @@ -38,15 +45,13 @@ use tendermint::{ types::ExecTxResult, }, block::Height, + chain, }; use tendermint_rpc::{ endpoint::broadcast::tx_sync, request, }; -use tokio::sync::{ - mpsc, - watch, -}; +use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::debug; use wiremock::{ @@ -70,7 +75,6 @@ use crate::withdrawer::{ }, state, submitter, - StateSnapshot, }; const SEQUENCER_CHAIN_ID: &str = "test_sequencer-1000"; @@ -94,75 +98,122 @@ static TELEMETRY: Lazy<()> = Lazy::new(|| { } }); -async fn setup() -> ( - Submitter, - mpsc::Sender, - CancellationToken, - MockServer, - MockGuard, -) { - Lazy::force(&TELEMETRY); +struct TestSubmitter { + submitter: Option, + submitter_handle: submitter::Handle, + cometbft_mock: MockServer, + submitter_task_handle: Option>>, +} + +impl TestSubmitter { + async fn setup() -> Self { + Lazy::force(&TELEMETRY); - // set up external resources - let shutdown_token = CancellationToken::new(); + // set up external resources + let shutdown_token = CancellationToken::new(); - // sequencer signer key - let keyfile = NamedTempFile::new().unwrap(); - (&keyfile) - .write_all("2bd806c97f0e00af1a1fc3328fa763a9269723c8db8fac4f93af71db186d6e90".as_bytes()) + // sequencer signer key + let keyfile = NamedTempFile::new().unwrap(); + (&keyfile) + .write_all( + "2bd806c97f0e00af1a1fc3328fa763a9269723c8db8fac4f93af71db186d6e90".as_bytes(), + ) + .unwrap(); + let sequencer_key_path = keyfile.path().to_str().unwrap().to_string(); + + // cometbft + let cometbft_mock = MockServer::start().await; + let sequencer_cometbft_endpoint = format!("http://{}", cometbft_mock.address()); + + // withdrawer state + let state = Arc::new(state::State::new()); + // not testing watcher here so just set it to ready + state.set_watcher_ready(); + + let (submitter, submitter_handle) = submitter::Builder { + shutdown_token: shutdown_token.clone(), + sequencer_key_path, + sequencer_chain_id: SEQUENCER_CHAIN_ID.to_string(), + sequencer_cometbft_endpoint, + state, + expected_fee_asset_id: Denom::from("nria".to_string()).id(), + min_expected_fee_asset_balance: 1_000_000, + } + .build() .unwrap(); - let sequencer_key_path = keyfile.path().to_str().unwrap().to_string(); - - // cometbft - let cometbft_mock = MockServer::start().await; - let sequencer_cometbft_endpoint = format!("http://{}", cometbft_mock.address()); - - // withdrawer state - let state = Arc::new(state::State::new()); - // not testing watcher here so just set it to ready - state.set_watcher_ready(); - - let (submitter, batches_tx) = submitter::Builder { - shutdown_token: shutdown_token.clone(), - sequencer_key_path, - sequencer_chain_id: SEQUENCER_CHAIN_ID.to_string(), - sequencer_cometbft_endpoint, - state, + + Self { + submitter: Some(submitter), + submitter_task_handle: None, + submitter_handle, + cometbft_mock, + } } - .build() - .unwrap(); - // mount submitter startup response - let startup_guard = register_genesis_response(&cometbft_mock).await; + async fn startup_and_spawn_with_guards(&mut self, startup_guards: Vec) { + let submitter = self.submitter.take().unwrap(); - ( - submitter, - batches_tx, - shutdown_token, - cometbft_mock, - startup_guard, - ) + let mut state = submitter.state.subscribe(); + + self.submitter_task_handle = Some(tokio::spawn(submitter.run())); + + // wait for all startup guards to be satisfied + for guard in startup_guards { + tokio::time::timeout(Duration::from_millis(100), guard.wait_until_satisfied()) + .await + .unwrap(); + } + + // consume the startup info in place of the watcher + self.submitter_handle.recv_startup_info().await.unwrap(); + + // wait for the submitter to be ready + state + .wait_for(state::StateSnapshot::is_ready) + .await + .unwrap(); + } + + async fn startup_and_spawn(&mut self) { + let startup_guards = register_startup_guards(&self.cometbft_mock).await; + self.startup_and_spawn_with_guards(startup_guards).await; + } + + async fn spawn() -> Self { + let mut submitter = Self::setup().await; + submitter.startup_and_spawn().await; + submitter + } } -async fn wait_for_startup( - mut status: watch::Receiver, - startup_guard: MockGuard, -) -> eyre::Result<()> { - // wait for the submitter to be ready - status - .wait_for(state::StateSnapshot::is_ready) - .await - .unwrap(); +async fn register_default_chain_id_guard(cometbft_mock: &MockServer) -> MockGuard { + register_genesis_chain_id_response(SEQUENCER_CHAIN_ID, cometbft_mock).await +} - // wait for startup guard to be satisfied - tokio::time::timeout( - Duration::from_millis(1000), - startup_guard.wait_until_satisfied(), +async fn register_default_fee_asset_ids_guard(cometbft_mock: &MockServer) -> MockGuard { + let fee_asset_ids = vec![Denom::from("nria".to_string()).id()]; + register_allowed_fee_asset_ids_response(fee_asset_ids, cometbft_mock).await +} + +async fn register_default_min_expected_fee_asset_balance_guard( + cometbft_mock: &MockServer, +) -> MockGuard { + register_get_latest_balance( + vec![AssetBalance { + denom: Denom::from("nria".to_string()), + balance: 1_000_000u128, + }], + cometbft_mock, ) .await - .unwrap(); +} - Ok(()) +async fn register_startup_guards(cometbft_mock: &MockServer) -> Vec { + vec![ + register_default_chain_id_guard(cometbft_mock).await, + register_default_fee_asset_ids_guard(cometbft_mock).await, + register_default_min_expected_fee_asset_balance_guard(cometbft_mock).await, + ] } fn make_ics20_withdrawal_action() -> Action { @@ -268,7 +319,7 @@ fn signed_tx_from_request(request: &Request) -> SignedTransaction { signed_tx } -async fn register_genesis_response(server: &MockServer) -> MockGuard { +async fn register_genesis_chain_id_response(chain_id: &str, server: &MockServer) -> MockGuard { use tendermint::{ consensus::{ params::{ @@ -283,7 +334,7 @@ async fn register_genesis_response(server: &MockServer) -> MockGuard { let response = tendermint_rpc::endpoint::genesis::Response:: { genesis: Genesis { genesis_time: Time::from_unix_timestamp(1, 1).unwrap(), - chain_id: SEQUENCER_CHAIN_ID.try_into().unwrap(), + chain_id: chain::Id::try_from(chain_id).unwrap(), initial_height: 1, consensus_params: Params { block: tendermint::block::Size { @@ -309,18 +360,73 @@ async fn register_genesis_response(server: &MockServer) -> MockGuard { }; let wrapper = response::Wrapper::new_with_id(tendermint_rpc::Id::Num(1), Some(response), None); - Mock::given(body_partial_json( - json!({"jsonrpc": "2.0", "method": "genesis", "params": null}), - )) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(&wrapper) - .append_header("Content-Type", "application/json"), - ) - .up_to_n_times(1) - .expect(1) - .mount_as_scoped(server) - .await + Mock::given(body_partial_json(json!({"method": "genesis"}))) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(&wrapper) + .append_header("Content-Type", "application/json"), + ) + .up_to_n_times(1) + .expect(1) + .mount_as_scoped(server) + .await +} + +async fn register_allowed_fee_asset_ids_response( + fee_asset_ids: Vec, + cometbft_mock: &MockServer, +) -> MockGuard { + let response = tendermint_rpc::endpoint::abci_query::Response { + response: tendermint_rpc::endpoint::abci_query::AbciQuery { + value: astria_core::protocol::asset::v1alpha1::AllowedFeeAssetIdsResponse { + fee_asset_ids, + height: 1, + } + .into_raw() + .encode_to_vec(), + ..Default::default() + }, + }; + let wrapper = response::Wrapper::new_with_id(tendermint_rpc::Id::Num(1), Some(response), None); + Mock::given(body_partial_json(json!({"method": "abci_query"}))) + .and(body_string_contains("asset/allowed_fee_asset_ids")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(&wrapper) + .append_header("Content-Type", "application/json"), + ) + .expect(1) + .mount_as_scoped(cometbft_mock) + .await +} + +async fn register_get_latest_balance( + balances: Vec, + server: &MockServer, +) -> MockGuard { + let response = tendermint_rpc::endpoint::abci_query::Response { + response: tendermint_rpc::endpoint::abci_query::AbciQuery { + value: astria_core::protocol::account::v1alpha1::BalanceResponse { + balances, + height: 1, + } + .into_raw() + .encode_to_vec(), + ..Default::default() + }, + }; + + let wrapper = response::Wrapper::new_with_id(tendermint_rpc::Id::Num(1), Some(response), None); + Mock::given(body_partial_json(json!({"method": "abci_query"}))) + .and(body_string_contains("accounts/balance")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(&wrapper) + .append_header("Content-Type", "application/json"), + ) + .expect(1) + .mount_as_scoped(server) + .await } async fn register_get_nonce_response(server: &MockServer, response: NonceResponse) -> MockGuard { @@ -373,14 +479,21 @@ fn compare_actions(expected: &Action, actual: &Action) { } } -/// Sanity check to check that it works +/// Test that the submitter starts up successfully +#[tokio::test] +async fn submitter_startup_success() { + let _submitter = TestSubmitter::spawn().await; +} + +/// Sanity check to check that batch submission works #[tokio::test] async fn submitter_submit_success() { - // set up submitter and batch - let (submitter, batches_tx, _shutdown_token, cometbft_mock, startup_guard) = setup().await; - let state = submitter.state.subscribe(); - let _submitter_handle = tokio::spawn(submitter.run()); - wait_for_startup(state, startup_guard).await.unwrap(); + let submitter = TestSubmitter::spawn().await; + let TestSubmitter { + submitter_handle, + cometbft_mock, + .. + } = submitter; // set up guards on mock cometbft let nonce_guard = register_get_nonce_response( @@ -397,7 +510,7 @@ async fn submitter_submit_success() { // send batch to submitter let batch = make_batch_with_bridge_unlock_and_ics20_withdrawal(); - batches_tx.send(batch).await.unwrap(); + submitter_handle.send_batch(batch).await.unwrap(); // wait for the nonce and broadcast guards to be satisfied tokio::time::timeout( @@ -431,11 +544,13 @@ async fn submitter_submit_success() { /// mempool (CheckTx) #[tokio::test] async fn submitter_submit_check_tx_failure() { - // set up submitter and batch - let (submitter, batches_tx, _shutdown_token, cometbft_mock, startup_guard) = setup().await; - let state = submitter.state.subscribe(); - let submitter_handle = tokio::spawn(submitter.run()); - wait_for_startup(state, startup_guard).await.unwrap(); + let submitter = TestSubmitter::spawn().await; + let TestSubmitter { + submitter_handle, + cometbft_mock, + mut submitter_task_handle, + .. + } = submitter; // set up guards on mock cometbft let nonce_guard = register_get_nonce_response( @@ -454,7 +569,7 @@ async fn submitter_submit_check_tx_failure() { // send batch to submitter let batch = make_batch_with_bridge_unlock_and_ics20_withdrawal(); - batches_tx.send(batch).await.unwrap(); + submitter_handle.send_batch(batch).await.unwrap(); // wait for the nonce and broadcast guards to be satisfied tokio::time::timeout( @@ -471,21 +586,26 @@ async fn submitter_submit_check_tx_failure() { .unwrap(); // make sure the submitter halts and the task returns - let _submitter_result = tokio::time::timeout(Duration::from_millis(100), submitter_handle) - .await - .unwrap() - .unwrap(); + let _submitter_result = tokio::time::timeout( + Duration::from_millis(100), + submitter_task_handle.take().unwrap(), + ) + .await + .unwrap() + .unwrap(); } /// Test that the submitter halts when transaction submissions fails to be executed in a block /// (DeliverTx) #[tokio::test] async fn submitter_submit_deliver_tx_failure() { - // set up submitter and batch - let (submitter, batches_tx, _shutdown_token, cometbft_mock, startup_guard) = setup().await; - let state = submitter.state.subscribe(); - let submitter_handle = tokio::spawn(submitter.run()); - wait_for_startup(state, startup_guard).await.unwrap(); + let submitter = TestSubmitter::spawn().await; + let TestSubmitter { + submitter_handle, + cometbft_mock, + mut submitter_task_handle, + .. + } = submitter; // set up guards on mock cometbft let nonce_guard = register_get_nonce_response( @@ -504,7 +624,7 @@ async fn submitter_submit_deliver_tx_failure() { // send batch to submitter let batch = make_batch_with_bridge_unlock_and_ics20_withdrawal(); - batches_tx.send(batch).await.unwrap(); + submitter_handle.send_batch(batch).await.unwrap(); // wait for the nonce and broadcast guards to be satisfied tokio::time::timeout( @@ -521,8 +641,11 @@ async fn submitter_submit_deliver_tx_failure() { .unwrap(); // make sure the submitter halts and the task returns - let _submitter_result = tokio::time::timeout(Duration::from_millis(100), submitter_handle) - .await - .unwrap() - .unwrap(); + let _submitter_result = tokio::time::timeout( + Duration::from_millis(100), + submitter_task_handle.take().unwrap(), + ) + .await + .unwrap() + .unwrap(); }