diff --git a/charts/deploy.just b/charts/deploy.just index b502c56ce3..513fc92ae3 100644 --- a/charts/deploy.just +++ b/charts/deploy.just @@ -272,7 +272,7 @@ run-smoke-test: sleep 1 fi done - if [ $CHECKS -eq $MAX_CHECKS ]; then + if [ $CHECKS -gt $MAX_CHECKS ]; then echo "Bridge Out Sequencer failure" exit 1 fi @@ -293,8 +293,8 @@ run-smoke-test: sleep 1 fi done - if [ $CHECKS -eq $MAX_CHECKS ]; then - echo "Bridge Out Sequencer failure" + if [ $CHECKS -gt $MAX_CHECKS ]; then + echo "Finalization failure" exit 1 fi exit 0 diff --git a/charts/evm-bridge-withdrawer/Chart.yaml b/charts/evm-bridge-withdrawer/Chart.yaml index 8c87fbe42d..10054ccec2 100644 --- a/charts/evm-bridge-withdrawer/Chart.yaml +++ b/charts/evm-bridge-withdrawer/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.0.1 +version: 0.0.2 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/evm-bridge-withdrawer/templates/configmaps.yaml b/charts/evm-bridge-withdrawer/templates/configmaps.yaml index c0b0e41901..d0e7c8c0e2 100644 --- a/charts/evm-bridge-withdrawer/templates/configmaps.yaml +++ b/charts/evm-bridge-withdrawer/templates/configmaps.yaml @@ -13,7 +13,6 @@ data: ASTRIA_BRIDGE_WITHDRAWER_SEQUENCER_BRIDGE_ADDRESS: "{{ .Values.config.sequencerBridgeAddress }}" ASTRIA_BRIDGE_WITHDRAWER_FEE_ASSET_DENOMINATION: "{{ .Values.config.feeAssetDenom }}" ASTRIA_BRIDGE_WITHDRAWER_ROLLUP_ASSET_DENOMINATION: "{{ .Values.config.rollupAssetDenom }}" - ASTRIA_BRIDGE_WITHDRAWER_MIN_EXPECTED_FEE_ASSET_BALANCE: "{{ .Values.config.minExpectedFeeAssetBalance }}" ASTRIA_BRIDGE_WITHDRAWER_ETHEREUM_CONTRACT_ADDRESS: "{{ .Values.config.evmContractAddress }}" ASTRIA_BRIDGE_WITHDRAWER_ETHEREUM_RPC_ENDPOINT: "{{ .Values.config.evmRpcEndpoint }}" ASTRIA_BRIDGE_WITHDRAWER_NO_METRICS: "{{ not .Values.metrics.enabled }}" @@ -30,6 +29,7 @@ data: OTEL_EXPORTER_OTLP_TRACE_HEADERS: "{{ .Values.otel.traceHeaders }}" OTEL_SERVICE_NAME: "{{ tpl .Values.otel.serviceName . }}" {{- if not .Values.global.dev }} + ASTRIA_BRIDGE_WITHDRAWER_MIN_EXPECTED_FEE_ASSET_BALANCE: "{{ .Values.config.minExpectedFeeAssetBalance }}" {{- else }} {{- end }} --- diff --git a/crates/astria-bridge-withdrawer/local.env.example b/crates/astria-bridge-withdrawer/local.env.example index b01f3ce07a..e9f0e77145 100644 --- a/crates/astria-bridge-withdrawer/local.env.example +++ b/crates/astria-bridge-withdrawer/local.env.example @@ -37,9 +37,6 @@ ASTRIA_BRIDGE_WITHDRAWER_SEQUENCER_ADDRESS_PREFIX=astria # The fee asset denomination to use for the bridge account's transactions. ASTRIA_BRIDGE_WITHDRAWER_FEE_ASSET_DENOMINATION="nria" -# The minimum expected balance of the fee asset in the bridge account. -ASTRIA_BRIDGE_WITHDRAWER_MIN_EXPECTED_FEE_ASSET_BALANCE=1000000 - # The asset denomination being withdrawn from the rollup. ASTRIA_BRIDGE_WITHDRAWER_ROLLUP_ASSET_DENOMINATION="nria" diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs index 58c2c1cc16..adb7de7457 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs @@ -50,6 +50,7 @@ use tokio_util::sync::CancellationToken; use tracing::{ debug, info, + trace, warn, }; @@ -60,19 +61,20 @@ use crate::bridge_withdrawer::{ EventWithMetadata, WithdrawalEvent, }, + startup, state::State, submitter, - SequencerStartupInfo, }; pub(crate) struct Builder { + pub(crate) shutdown_token: CancellationToken, + pub(crate) startup_handle: startup::InfoHandle, pub(crate) ethereum_contract_address: String, pub(crate) ethereum_rpc_endpoint: String, - pub(crate) submitter_handle: submitter::Handle, - pub(crate) shutdown_token: CancellationToken, pub(crate) state: Arc, pub(crate) rollup_asset_denom: Denom, pub(crate) bridge_address: Address, + pub(crate) submitter_handle: submitter::Handle, pub(crate) sequencer_address_prefix: String, } @@ -81,11 +83,12 @@ impl Builder { let Builder { ethereum_contract_address, ethereum_rpc_endpoint, - submitter_handle, shutdown_token, + startup_handle, state, rollup_asset_denom, bridge_address, + submitter_handle, sequencer_address_prefix, } = self; @@ -105,11 +108,12 @@ impl Builder { Ok(Watcher { contract_address, ethereum_rpc_endpoint: ethereum_rpc_endpoint.to_string(), - submitter_handle, rollup_asset_denom, bridge_address, state, shutdown_token: shutdown_token.clone(), + startup_handle, + submitter_handle, sequencer_address_prefix, }) } @@ -117,13 +121,14 @@ impl Builder { /// Watches for withdrawal events emitted by the `AstriaWithdrawer` contract. pub(crate) struct Watcher { + shutdown_token: CancellationToken, + startup_handle: startup::InfoHandle, + submitter_handle: submitter::Handle, contract_address: ethers::types::Address, ethereum_rpc_endpoint: String, - submitter_handle: submitter::Handle, rollup_asset_denom: Denom, bridge_address: Address, state: Arc, - shutdown_token: CancellationToken, sequencer_address_prefix: String, } @@ -135,14 +140,13 @@ impl Watcher { .wrap_err("watcher failed to start up")?; let Self { - contract_address: _contract_address, - ethereum_rpc_endpoint: _ethereum_rps_endpoint, - submitter_handle, rollup_asset_denom, bridge_address, state, shutdown_token, + submitter_handle, sequencer_address_prefix, + .. } = self; let converter = EventToActionConvertConfig { @@ -192,15 +196,19 @@ impl Watcher { u128, u64, )> { - // wait for submitter to be ready - let SequencerStartupInfo { + let startup::Info { fee_asset, - next_batch_rollup_height, - } = self - .submitter_handle - .recv_startup_info() - .await - .wrap_err("failed to get sequencer startup info")?; + starting_rollup_height, + .. + } = select! { + () = self.shutdown_token.cancelled() => { + return Err(eyre!("watcher received shutdown signal while waiting for startup")); + } + + startup_info = self.startup_handle.get_info() => { + startup_info.wrap_err("failed to receive startup info")? + } + }; // connect to eth node let retry_config = tryhard::RetryFutureConfig::new(1024) @@ -255,7 +263,7 @@ impl Watcher { contract, fee_asset, asset_withdrawal_divisor, - next_batch_rollup_height, + starting_rollup_height, )) } } @@ -405,7 +413,7 @@ async fn get_and_send_events_at_block( } if batch.actions.is_empty() { - debug!("no actions to send at block {block_number}"); + trace!("no actions to send at block {block_number}"); } else { let actions_len = batch.actions.len(); submitter_handle @@ -546,10 +554,9 @@ mod tests { }, utils::hex, }; - use tokio::sync::{ - mpsc, - mpsc::error::TryRecvError::Empty, - oneshot, + use tokio::sync::mpsc::{ + self, + error::TryRecvError, }; use super::*; @@ -641,23 +648,23 @@ mod tests { let value = 1_000_000_000.into(); let recipient = crate::astria_address([1u8; 20]); - let bridge_address = crate::astria_address([1u8; 20]); - let denom = default_native_asset(); - + let denom = "nria".parse::().unwrap(); + + let state = Arc::new(State::new()); + let startup_handle = startup::InfoHandle::new(state.subscribe()); + state.set_startup_info(startup::Info { + starting_rollup_height: 1, + fee_asset: denom.clone(), + chain_id: "astria".to_string(), + }); let (batch_tx, mut batch_rx) = mpsc::channel(100); - let (startup_tx, startup_rx) = oneshot::channel(); - let submitter_handle = submitter::Handle::new(startup_rx, batch_tx); - startup_tx - .send(SequencerStartupInfo { - fee_asset: "nria".parse().unwrap(), - next_batch_rollup_height: 1, - }) - .unwrap(); + let submitter_handle = submitter::Handle::new(batch_tx); let watcher = Builder { ethereum_contract_address: hex::encode(contract_address), ethereum_rpc_endpoint: anvil.ws_endpoint(), + startup_handle, submitter_handle, shutdown_token: CancellationToken::new(), state: Arc::new(State::new()), @@ -669,7 +676,6 @@ mod tests { .unwrap(); tokio::task::spawn(watcher.run()); - let receipt = send_sequencer_withdraw_transaction(&contract, value, recipient).await; let expected_event = EventWithMetadata { event: WithdrawalEvent::Sequencer(SequencerWithdrawalFilter { @@ -702,7 +708,7 @@ mod tests { ); }; assert_eq!(action, &expected_action); - assert_eq!(batch_rx.try_recv().unwrap_err(), Empty); + assert_eq!(batch_rx.try_recv().unwrap_err(), TryRecvError::Empty); } #[tokio::test] @@ -744,24 +750,24 @@ mod tests { panic!("expected action to be BridgeUnlock, got {expected_action:?}"); }; + let state = Arc::new(State::new()); + let startup_handle = startup::InfoHandle::new(state.subscribe()); + state.set_startup_info(startup::Info { + starting_rollup_height: 1, + fee_asset: denom.clone(), + chain_id: "astria".to_string(), + }); let (batch_tx, mut batch_rx) = mpsc::channel(100); - let (startup_tx, startup_rx) = oneshot::channel(); - let submitter_handle = submitter::Handle::new(startup_rx, batch_tx); - startup_tx - .send(SequencerStartupInfo { - fee_asset: denom.clone(), - next_batch_rollup_height: 1, - }) - .unwrap(); let watcher = Builder { ethereum_contract_address: hex::encode(contract_address), ethereum_rpc_endpoint: anvil.ws_endpoint(), - submitter_handle, + startup_handle, shutdown_token: CancellationToken::new(), state: Arc::new(State::new()), rollup_asset_denom: denom.clone(), bridge_address, + submitter_handle: submitter::Handle::new(batch_tx), sequencer_address_prefix: crate::ASTRIA_ADDRESS_PREFIX.into(), } .build() @@ -821,27 +827,28 @@ mod tests { let value = 1_000_000_000.into(); let recipient = "somebech32address".to_string(); + let bridge_address = crate::astria_address([1u8; 20]); let denom = "transfer/channel-0/utia".parse::().unwrap(); + let state = Arc::new(State::new()); + let startup_handle = startup::InfoHandle::new(state.subscribe()); + state.set_startup_info(startup::Info { + starting_rollup_height: 1, + fee_asset: denom.clone(), + chain_id: "astria".to_string(), + }); let (batch_tx, mut batch_rx) = mpsc::channel(100); - let (startup_tx, startup_rx) = oneshot::channel(); - let submitter_handle = submitter::Handle::new(startup_rx, batch_tx); - startup_tx - .send(SequencerStartupInfo { - fee_asset: denom.clone(), - next_batch_rollup_height: 1, - }) - .unwrap(); let watcher = Builder { ethereum_contract_address: hex::encode(contract_address), ethereum_rpc_endpoint: anvil.ws_endpoint(), - submitter_handle, + startup_handle, shutdown_token: CancellationToken::new(), state: Arc::new(State::new()), rollup_asset_denom: denom.clone(), bridge_address, + submitter_handle: submitter::Handle::new(batch_tx), sequencer_address_prefix: crate::ASTRIA_ADDRESS_PREFIX.into(), } .build() @@ -860,6 +867,7 @@ mod tests { block_number: receipt.block_number.unwrap(), transaction_hash: receipt.transaction_hash, }; + let Action::Ics20Withdrawal(mut expected_action) = event_to_action( expected_event, denom.clone(), @@ -883,7 +891,7 @@ mod tests { }; action.timeout_time = 0; // zero this for testing assert_eq!(action, &expected_action); - assert_eq!(batch_rx.try_recv().unwrap_err(), Empty); + assert_eq!(batch_rx.try_recv().unwrap_err(), TryRecvError::Empty); } async fn mint_tokens( @@ -947,27 +955,27 @@ mod tests { let value = 1_000_000_000.into(); let recipient = crate::astria_address([1u8; 20]); - let denom = default_native_asset(); let bridge_address = crate::astria_address([1u8; 20]); + let denom = default_native_asset(); + let state = Arc::new(State::new()); + let startup_handle = startup::InfoHandle::new(state.subscribe()); + state.set_startup_info(startup::Info { + starting_rollup_height: 1, + fee_asset: denom.clone(), + chain_id: "astria".to_string(), + }); let (batch_tx, mut batch_rx) = mpsc::channel(100); - let (startup_tx, startup_rx) = oneshot::channel(); - let submitter_handle = submitter::Handle::new(startup_rx, batch_tx); - startup_tx - .send(SequencerStartupInfo { - fee_asset: "nria".parse().unwrap(), - next_batch_rollup_height: 1, - }) - .unwrap(); let watcher = Builder { ethereum_contract_address: hex::encode(contract_address), ethereum_rpc_endpoint: anvil.ws_endpoint(), - submitter_handle, + startup_handle, shutdown_token: CancellationToken::new(), state: Arc::new(State::new()), rollup_asset_denom: denom.clone(), bridge_address, + submitter_handle: submitter::Handle::new(batch_tx), sequencer_address_prefix: crate::ASTRIA_ADDRESS_PREFIX.into(), } .build() @@ -1007,7 +1015,7 @@ mod tests { ); }; assert_eq!(action, &expected_action); - assert_eq!(batch_rx.try_recv().unwrap_err(), Empty); + assert_eq!(batch_rx.try_recv().unwrap_err(), TryRecvError::Empty); } async fn send_ics20_withdraw_transaction_astria_bridgeable_erc20( @@ -1049,27 +1057,27 @@ mod tests { let value = 1_000_000_000.into(); let recipient = "somebech32address".to_string(); - let denom = "transfer/channel-0/utia".parse::().unwrap(); let bridge_address = crate::astria_address([1u8; 20]); + let denom = "transfer/channel-0/utia".parse::().unwrap(); + let state = Arc::new(State::new()); + let startup_handle = startup::InfoHandle::new(state.subscribe()); + state.set_startup_info(startup::Info { + starting_rollup_height: 1, + fee_asset: denom.clone(), + chain_id: "astria".to_string(), + }); let (batch_tx, mut batch_rx) = mpsc::channel(100); - let (startup_tx, startup_rx) = oneshot::channel(); - let submitter_handle = submitter::Handle::new(startup_rx, batch_tx); - startup_tx - .send(SequencerStartupInfo { - fee_asset: "transfer/channel-0/utia".parse().unwrap(), - next_batch_rollup_height: 1, - }) - .unwrap(); let watcher = Builder { ethereum_contract_address: hex::encode(contract_address), ethereum_rpc_endpoint: anvil.ws_endpoint(), - submitter_handle, + startup_handle, shutdown_token: CancellationToken::new(), state: Arc::new(State::new()), rollup_asset_denom: denom.clone(), bridge_address, + submitter_handle: submitter::Handle::new(batch_tx), sequencer_address_prefix: crate::ASTRIA_ADDRESS_PREFIX.into(), } .build() @@ -1116,6 +1124,6 @@ mod tests { }; action.timeout_time = 0; // zero this for testing assert_eq!(action, &expected_action); - assert_eq!(batch_rx.try_recv().unwrap_err(), Empty); + assert_eq!(batch_rx.try_recv().unwrap_err(), TryRecvError::Empty); } } diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs index 4265fb5eb7..d655f9c645 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs @@ -7,10 +7,7 @@ use std::{ time::Duration, }; -use astria_core::primitive::v1::asset::{ - self, - Denom, -}; +use astria_core::primitive::v1::asset::Denom; use astria_eyre::eyre::{ self, WrapErr as _, @@ -44,6 +41,7 @@ use crate::{ mod batch; mod ethereum; +mod startup; mod state; mod submitter; @@ -53,6 +51,7 @@ pub struct BridgeWithdrawer { api_server: api::ApiServer, submitter: Submitter, ethereum_watcher: watcher::Watcher, + startup: startup::Startup, state: Arc, } @@ -77,42 +76,54 @@ impl BridgeWithdrawer { ethereum_contract_address, ethereum_rpc_endpoint, rollup_asset_denomination, - min_expected_fee_asset_balance, + sequencer_bridge_address, .. } = cfg; let state = Arc::new(State::new()); + let sequencer_bridge_address = sequencer_bridge_address + .parse() + .wrap_err("failed to parse sequencer bridge address")?; + + // make startup object + let startup = startup::Builder { + shutdown_token: shutdown_handle.token(), + state: state.clone(), + sequencer_chain_id, + sequencer_cometbft_endpoint: sequencer_cometbft_endpoint.clone(), + sequencer_bridge_address, + expected_fee_asset: fee_asset_denomination, + } + .build() + .wrap_err("failed to initialize startup")?; + + let startup_handle = startup::InfoHandle::new(state.subscribe()); + // make submitter object let (submitter, submitter_handle) = submitter::Builder { shutdown_token: shutdown_handle.token(), + startup_handle: startup_handle.clone(), sequencer_cometbft_endpoint, - sequencer_chain_id, sequencer_key_path, sequencer_address_prefix: sequencer_address_prefix.clone(), state: state.clone(), - expected_fee_asset: fee_asset_denomination, - min_expected_fee_asset_balance: u128::from(min_expected_fee_asset_balance), metrics, } .build() .wrap_err("failed to initialize submitter")?; - let sequencer_bridge_address = cfg - .sequencer_bridge_address - .parse() - .wrap_err("failed to parse sequencer bridge address")?; - let ethereum_watcher = watcher::Builder { ethereum_contract_address, ethereum_rpc_endpoint, - submitter_handle, + startup_handle, shutdown_token: shutdown_handle.token(), state: state.clone(), rollup_asset_denom: rollup_asset_denomination .parse::() .wrap_err("failed to parse ROLLUP_ASSET_DENOMINATION as Denom")?, bridge_address: sequencer_bridge_address, + submitter_handle, sequencer_address_prefix: sequencer_address_prefix.clone(), } .build() @@ -130,18 +141,22 @@ impl BridgeWithdrawer { api_server, submitter, ethereum_watcher, + startup, state, }; Ok((service, shutdown_handle)) } + // Panic won't happen because `startup_task` is unwraped lazily after checking if it's `Some`. + #[allow(clippy::missing_panics_doc)] pub async fn run(self) { let Self { shutdown_token, api_server, submitter, ethereum_watcher, + startup, state: _state, } = self; @@ -158,54 +173,74 @@ impl BridgeWithdrawer { }); info!("spawned API server"); + let mut startup_task = Some(tokio::spawn(startup.run())); + info!("spawned startup task"); + let mut submitter_task = tokio::spawn(submitter.run()); info!("spawned submitter task"); let mut ethereum_watcher_task = tokio::spawn(ethereum_watcher.run()); info!("spawned ethereum watcher task"); - let shutdown = select!( - o = &mut api_task => { - report_exit("api server", o); - Shutdown { - api_task: None, - submitter_task: Some(submitter_task), - ethereum_watcher_task: Some(ethereum_watcher_task), - api_shutdown_signal, - token: shutdown_token + let shutdown = loop { + select!( + o = async { startup_task.as_mut().unwrap().await }, if startup_task.is_none() => { + match o { + Ok(_) => { + info!(task = "startup", "task has exited"); + startup_task = None; + }, + Err(error) => { + error!(task = "startup", %error, "task returned with error"); + break Shutdown { + api_task: Some(api_task), + submitter_task: Some(submitter_task), + ethereum_watcher_task: Some(ethereum_watcher_task), + startup_task: None, + api_shutdown_signal, + token: shutdown_token, + }; + } + } } - } - o = &mut submitter_task => { - report_exit("submitter", o); - Shutdown { - api_task: Some(api_task), - submitter_task: None, - ethereum_watcher_task:Some(ethereum_watcher_task), - api_shutdown_signal, - token: shutdown_token + o = &mut api_task => { + report_exit("api server", o); + break Shutdown { + api_task: None, + submitter_task: Some(submitter_task), + ethereum_watcher_task: Some(ethereum_watcher_task), + startup_task, + api_shutdown_signal, + token: shutdown_token + } } - } - o = &mut ethereum_watcher_task => { - report_exit("ethereum watcher", o); - Shutdown { - api_task: Some(api_task), - submitter_task: Some(submitter_task), - ethereum_watcher_task: None, - api_shutdown_signal, - token: shutdown_token + o = &mut submitter_task => { + report_exit("submitter", o); + break Shutdown { + api_task: Some(api_task), + submitter_task: None, + ethereum_watcher_task:Some(ethereum_watcher_task), + startup_task, + api_shutdown_signal, + token: shutdown_token + } } - } - - ); + o = &mut ethereum_watcher_task => { + report_exit("ethereum watcher", o); + break Shutdown { + api_task: Some(api_task), + submitter_task: Some(submitter_task), + ethereum_watcher_task: None, + startup_task, + api_shutdown_signal, + token: shutdown_token + } + } + ); + }; shutdown.run().await; } } -#[derive(Debug)] -pub struct SequencerStartupInfo { - pub fee_asset: asset::Denom, - pub next_batch_rollup_height: u64, -} - /// A handle for instructing the [`Service`] to shut down. /// /// It is returned along with its related `Service` from [`Service::new`]. The @@ -264,6 +299,7 @@ struct Shutdown { api_task: Option>>, submitter_task: Option>>, ethereum_watcher_task: Option>>, + startup_task: Option>>, api_shutdown_signal: oneshot::Sender<()>, token: CancellationToken, } @@ -271,19 +307,38 @@ struct Shutdown { impl Shutdown { const API_SHUTDOWN_TIMEOUT_SECONDS: u64 = 4; const ETHEREUM_WATCHER_SHUTDOWN_TIMEOUT_SECONDS: u64 = 5; - const SUBMITTER_SHUTDOWN_TIMEOUT_SECONDS: u64 = 20; + const STARTUP_SHUTDOWN_TIMEOUT_SECONDS: u64 = 1; + const SUBMITTER_SHUTDOWN_TIMEOUT_SECONDS: u64 = 19; async fn run(self) { let Self { api_task, submitter_task, ethereum_watcher_task, + startup_task, api_shutdown_signal, token, } = self; token.cancel(); + // Giving startup 1 second to shutdown because it should be very quick. + if let Some(mut startup_task) = startup_task { + info!("waiting for startup task to shut down"); + let limit = Duration::from_secs(Self::STARTUP_SHUTDOWN_TIMEOUT_SECONDS); + match timeout(limit, &mut startup_task).await.map(flatten_result) { + Ok(Ok(())) => info!("startup exited gracefully"), + Ok(Err(error)) => error!(%error, "startup exited with an error"), + Err(_) => { + error!( + timeout_secs = limit.as_secs(), + "startup did not shut down within timeout; killing it" + ); + startup_task.abort(); + } + } + } + // Giving submitter 20 seconds to shutdown because Kubernetes issues a SIGKILL after 30. if let Some(mut submitter_task) = submitter_task { info!("waiting for submitter task to shut down"); @@ -302,8 +357,6 @@ impl Shutdown { submitter_task.abort(); } } - } else { - info!("submitter task was already dead"); } // Giving ethereum watcher 5 seconds to shutdown because Kubernetes issues a SIGKILL after @@ -325,8 +378,6 @@ impl Shutdown { ethereum_watcher_task.abort(); } } - } else { - info!("watcher task was already dead"); } // Giving the API task 4 seconds. 5s for watcher + 20 for submitter + 4s = 29s (out of 30s @@ -346,8 +397,6 @@ impl Shutdown { api_task.abort(); } } - } else { - info!("API server was already dead"); } } } diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/startup.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/startup.rs new file mode 100644 index 0000000000..d68e7392bc --- /dev/null +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/startup.rs @@ -0,0 +1,488 @@ +use std::{ + sync::Arc, + time::Duration, +}; + +use astria_core::{ + bridge::{ + self, + Ics20WithdrawalFromRollupMemo, + }, + primitive::v1::asset, + protocol::{ + asset::v1alpha1::AllowedFeeAssetsResponse, + bridge::v1alpha1::BridgeAccountLastTxHashResponse, + transaction::v1alpha1::Action, + }, +}; +use astria_eyre::eyre::{ + self, + bail, + ensure, + OptionExt as _, + WrapErr as _, +}; +use prost::{ + Message as _, + Name as _, +}; +use sequencer_client::{ + tendermint_rpc, + Address, + SequencerClientExt as _, + SignedTransaction, +}; +use tendermint_rpc::{ + endpoint::tx, + Client as _, +}; +use tokio::sync::watch; +use tokio_util::sync::CancellationToken; +use tracing::{ + info, + instrument, + warn, +}; +use tryhard::backoff_strategies::ExponentialBackoff; + +use super::state::{ + self, + State, +}; + +pub(super) struct Builder { + pub(super) shutdown_token: CancellationToken, + pub(super) state: Arc, + pub(super) sequencer_chain_id: String, + pub(super) sequencer_cometbft_endpoint: String, + pub(super) sequencer_bridge_address: Address, + pub(super) expected_fee_asset: asset::Denom, +} + +impl Builder { + pub(super) fn build(self) -> eyre::Result { + let Self { + shutdown_token, + state, + sequencer_chain_id, + sequencer_cometbft_endpoint, + sequencer_bridge_address, + expected_fee_asset, + } = self; + + let sequencer_cometbft_client = + sequencer_client::HttpClient::new(&*sequencer_cometbft_endpoint) + .wrap_err("failed constructing cometbft http client")?; + + Ok(Startup { + shutdown_token, + state, + sequencer_chain_id, + sequencer_cometbft_client, + sequencer_bridge_address, + expected_fee_asset, + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] +pub(super) struct Info { + pub(super) starting_rollup_height: u64, + pub(super) fee_asset: asset::Denom, + pub(super) chain_id: String, +} + +#[derive(Debug, Clone)] +pub(super) struct InfoHandle { + rx: watch::Receiver, +} + +impl InfoHandle { + pub(super) fn new(rx: watch::Receiver) -> Self { + Self { + rx, + } + } + + pub(super) async fn get_info(&mut self) -> eyre::Result { + let state = self + .rx + .wait_for(|state| state.get_startup_info().is_some()) + .await + .wrap_err("failed to get startup info")?; + + Ok(state + .get_startup_info() + .expect("the previous line guarantes that the state is intialized") + .clone()) + } +} + +pub(super) struct Startup { + shutdown_token: CancellationToken, + state: Arc, + sequencer_chain_id: String, + sequencer_cometbft_client: sequencer_client::HttpClient, + sequencer_bridge_address: Address, + expected_fee_asset: asset::Denom, +} + +impl Startup { + pub(super) async fn run(mut self) -> eyre::Result<()> { + let shutdown_token = self.shutdown_token.clone(); + + let state = self.state.clone(); + let startup_task = async move { + self.confirm_sequencer_config() + .await + .wrap_err("failed to confirm sequencer config")?; + let starting_rollup_height = self + .get_starting_rollup_height() + .await + .wrap_err("failed to get next rollup block height")?; + + // send the startup info to the submitter + let info = Info { + chain_id: self.sequencer_chain_id.clone(), + fee_asset: self.expected_fee_asset, + starting_rollup_height, + }; + + state.set_startup_info(info); + + Ok(()) + }; + + tokio::select!( + () = shutdown_token.cancelled() => { + bail!("startup was cancelled"); + } + res = startup_task => { + res + } + ) + } + + /// Confirms configuration values against the sequencer node. Values checked: + /// + /// - `self.sequencer_chain_id` matches the value returned from the sequencer node's genesis + /// - `self.fee_asset_id` is a valid fee asset on the sequencer node + /// - `self.sequencer_key.address` has a sufficient balance of `self.fee_asset_id` + /// + /// # Errors + /// + /// - `self.chain_id` does not match the value returned from the sequencer node + /// - `self.fee_asset_id` is not a valid fee asset on the sequencer node + /// - `self.sequencer_key.address` does not have a sufficient balance of `self.fee_asset_id`. + async fn confirm_sequencer_config(&self) -> eyre::Result<()> { + // confirm the sequencer chain id + let actual_chain_id = + get_sequencer_chain_id(self.sequencer_cometbft_client.clone(), self.state.clone()) + .await + .wrap_err("failed to get chain id from sequencer")?; + ensure!( + self.sequencer_chain_id == actual_chain_id.to_string(), + "sequencer_chain_id provided in config does not match chain_id returned from sequencer" + ); + + // confirm that the fee asset ID is valid + let allowed_fee_asset_ids_resp = + get_allowed_fee_asset_ids(self.sequencer_cometbft_client.clone(), self.state.clone()) + .await + .wrap_err("failed to get allowed fee asset ids from sequencer")?; + let expected_fee_asset_ibc = self.expected_fee_asset.to_ibc_prefixed(); + ensure!( + allowed_fee_asset_ids_resp + .fee_assets + .iter() + .any(|asset| asset.to_ibc_prefixed() == expected_fee_asset_ibc), + "fee_asset provided in config is not a valid fee asset on the sequencer" + ); + + Ok(()) + } + + /// Gets the last transaction by the bridge account on the sequencer. This is used to + /// determine the starting rollup height for syncing to the latest on-chain state. + /// + /// # Returns + /// The last transaction by the bridge account on the sequencer, if it exists. + /// + /// # Errors + /// + /// 1. Failing to fetch the last transaction hash by the bridge account. + /// 2. Failing to convert the last transaction hash to a tendermint hash. + /// 3. Failing to fetch the last transaction by the bridge account. + /// 4. The last transaction by the bridge account failed to execute (this should not happen + /// in the sequencer logic). + /// 5. Failing to convert the transaction data from bytes to proto. + /// 6. Failing to convert the transaction data from proto to `SignedTransaction`. + async fn get_last_transaction(&self) -> eyre::Result> { + // get last transaction hash by the bridge account, if it exists + let last_transaction_hash_resp = get_bridge_account_last_transaction_hash( + self.sequencer_cometbft_client.clone(), + self.state.clone(), + self.sequencer_bridge_address, + ) + .await + .wrap_err("failed to fetch last transaction hash by the bridge account")?; + + let Some(tx_hash) = last_transaction_hash_resp.tx_hash else { + return Ok(None); + }; + + let tx_hash = tendermint::Hash::try_from(tx_hash.to_vec()) + .wrap_err("failed to convert last transaction hash to tendermint hash")?; + + // get the corresponding transaction + let last_transaction = get_sequencer_transaction_at_hash( + self.sequencer_cometbft_client.clone(), + self.state.clone(), + tx_hash, + ) + .await + .wrap_err("failed to fetch last transaction by the bridge account")?; + + // check that the transaction actually executed + ensure!( + last_transaction.tx_result.code == tendermint::abci::Code::Ok, + "last transaction by the bridge account failed to execute. this should not happen in \ + the sequencer logic." + ); + + let proto_tx = + astria_core::generated::protocol::transaction::v1alpha1::SignedTransaction::decode( + &*last_transaction.tx, + ) + .wrap_err_with(|| format!( + "failed to decode data in Sequencer CometBFT transaction as `{}`", + astria_core::generated::protocol::transaction::v1alpha1::SignedTransaction::full_name(), + ))?; + + let tx = SignedTransaction::try_from_raw(proto_tx) + .wrap_err_with(|| format!("failed to verify {}", astria_core::generated::protocol::transaction::v1alpha1::SignedTransaction::full_name()))?; + + info!( + last_bridge_account_tx.hash = %telemetry::display::hex(&tx_hash), + last_bridge_account_tx.height = %last_transaction.height, + "fetched last transaction by the bridge account" + ); + + Ok(Some(tx)) + } + + /// Gets the data necessary for syncing to the latest on-chain state from the sequencer. + /// Since we batch all events from a given rollup block into a single sequencer + /// transaction, we get the last tx finalized by the bridge account on the sequencer + /// and extract the rollup height from it. + /// + /// The rollup height is extracted from the block height value in the memo of one of the + /// actions in the batch. + /// + /// # Returns + /// The next batch rollup height to process. + /// + /// # Errors + /// + /// 1. Failing to get and deserialize a valid last transaction by the bridge account from the + /// sequencer. + /// 2. The last transaction by the bridge account failed to execute (this should not happen in + /// the sequencer logic) + /// 3. The last transaction by the bridge account did not contain a withdrawal action + /// 4. The memo of the last transaction by the bridge account could not be parsed + async fn get_starting_rollup_height(&mut self) -> eyre::Result { + let signed_transaction = self + .get_last_transaction() + .await + .wrap_err("failed to get the bridge account's last sequencer transaction")?; + let starting_rollup_height = if let Some(signed_transaction) = signed_transaction { + rollup_height_from_signed_transaction(&signed_transaction) + .wrap_err( + "failed to extract rollup height from last transaction by the bridge account", + )? + .checked_add(1) + .ok_or_eyre("failed to increment rollup height by 1")? + } else { + 1 + }; + Ok(starting_rollup_height) + } +} + +/// Extracts the rollup height from the last transaction by the bridge account on the sequencer. +/// Since all the withdrawals from a rollup block are batched into a single sequencer transaction, +/// he rollup height can be extracted from the memo of any withdrawal action in the batch. +/// +/// # Returns +/// +/// The rollup height of the last batch of withdrawals. +/// +/// # Errors +/// +/// 1. The last transaction by the bridge account did not contain a withdrawal action. +/// 2. The memo of the last transaction by the bridge account could not be parsed. +/// 3. The block number in the memo of the last transaction by the bridge account could not be +/// converted to a u64. +fn rollup_height_from_signed_transaction( + signed_transaction: &SignedTransaction, +) -> eyre::Result { + // find the last batch's rollup block height + let withdrawal_action = signed_transaction + .actions() + .iter() + .find(|action| matches!(action, Action::BridgeUnlock(_) | Action::Ics20Withdrawal(_))) + .ok_or_eyre("last transaction by the bridge account did not contain a withdrawal action")?; + + let last_batch_rollup_height = match withdrawal_action { + Action::BridgeUnlock(action) => { + let memo: bridge::UnlockMemo = serde_json::from_slice(&action.memo) + .wrap_err("failed to parse memo from last transaction by the bridge account")?; + Some(memo.block_number) + } + Action::Ics20Withdrawal(action) => { + let memo: Ics20WithdrawalFromRollupMemo = serde_json::from_str(&action.memo) + .wrap_err("failed to parse memo from last transaction by the bridge account")?; + Some(memo.block_number) + } + _ => None, + } + .expect("action is already checked to be either BridgeUnlock or Ics20Withdrawal"); + + info!( + last_batch.tx_hash = %telemetry::display::hex(&signed_transaction.sha256_of_proto_encoding()), + last_batch.rollup_height = last_batch_rollup_height, + "extracted rollup height from last batch of withdrawals", + ); + + Ok(last_batch_rollup_height) +} + +#[instrument(skip_all)] +async fn get_bridge_account_last_transaction_hash( + client: sequencer_client::HttpClient, + state: Arc, + address: Address, +) -> eyre::Result { + let res = tryhard::retry_fn(|| client.get_bridge_account_last_transaction_hash(address)) + .with_config(make_sequencer_retry_config( + "attempt to fetch last bridge account's transaction hash from Sequencer; retrying \ + after backoff", + )) + .await + .wrap_err( + "failed to fetch last bridge account's transaction hash from Sequencer after a lot of \ + attempts", + ); + + state.set_sequencer_connected(res.is_ok()); + + res +} + +#[instrument(skip_all)] +async fn get_sequencer_transaction_at_hash( + client: sequencer_client::HttpClient, + state: Arc, + tx_hash: tendermint::Hash, +) -> eyre::Result { + let res = tryhard::retry_fn(|| client.tx(tx_hash, false)) + .with_config(make_cometbft_retry_config( + "attempt to get transaction from CometBFT; retrying after backoff", + )) + .await + .wrap_err("failed to get transaction from Sequencer after a lot of attempts"); + + state.set_sequencer_connected(res.is_ok()); + + res +} + +#[instrument(skip_all)] +async fn get_sequencer_chain_id( + client: sequencer_client::HttpClient, + state: Arc, +) -> eyre::Result { + let genesis: tendermint::Genesis = tryhard::retry_fn(|| client.genesis()) + .with_config(make_cometbft_retry_config( + "attempt to get genesis from CometBFT; retrying after backoff", + )) + .await + .wrap_err("failed to get genesis info from Sequencer after a lot of attempts")?; + + state.set_sequencer_connected(true); + + Ok(genesis.chain_id) +} + +#[instrument(skip_all)] +async fn get_allowed_fee_asset_ids( + client: sequencer_client::HttpClient, + state: Arc, +) -> eyre::Result { + let res = tryhard::retry_fn(|| client.get_allowed_fee_assets()) + .with_config(make_sequencer_retry_config( + "attempt to get allowed fee assets from Sequencer; retrying after backoff", + )) + .await + .wrap_err("failed to get allowed fee asset ids from Sequencer after a lot of attempts"); + + state.set_sequencer_connected(res.is_ok()); + + res +} + +fn make_cometbft_retry_config( + retry_message: &'static str, +) -> tryhard::RetryFutureConfig< + ExponentialBackoff, + impl Fn(u32, Option, &tendermint_rpc::Error) -> futures::future::Ready<()>, +> { + tryhard::RetryFutureConfig::new(u32::MAX) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(20)) + .on_retry( + move |attempt: u32, next_delay: Option, error: &tendermint_rpc::Error| { + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + attempt, + wait_duration, + error = error as &dyn std::error::Error, + retry_message, + ); + futures::future::ready(()) + }, + ) +} + +fn make_sequencer_retry_config( + retry_message: &'static str, +) -> tryhard::RetryFutureConfig< + ExponentialBackoff, + impl Fn( + u32, + Option, + &sequencer_client::extension_trait::Error, + ) -> futures::future::Ready<()>, +> { + tryhard::RetryFutureConfig::new(u32::MAX) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(20)) + .on_retry( + move |attempt: u32, + next_delay: Option, + error: &sequencer_client::extension_trait::Error| { + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + attempt, + wait_duration, + error = error as &dyn std::error::Error, + retry_message, + ); + futures::future::ready(()) + }, + ) +} diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/state.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/state.rs index 647bb489b5..bbdc4c561b 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/state.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/state.rs @@ -1,5 +1,7 @@ use tokio::sync::watch; +use super::startup; + pub(crate) struct State { inner: tokio::sync::watch::Sender, } @@ -39,6 +41,7 @@ macro_rules! forward_setter { } forward_setter!( + [set_startup_info <- startup::Info], [set_sequencer_connected <- bool], [set_last_rollup_height_submitted <- u64], [set_last_sequencer_height <- u64], @@ -47,6 +50,8 @@ forward_setter!( #[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize)] pub(crate) struct StateSnapshot { + startup_info: Option, + watcher_ready: bool, submitter_ready: bool, @@ -58,6 +63,19 @@ pub(crate) struct StateSnapshot { } impl StateSnapshot { + pub(super) fn get_startup_info(&self) -> Option { + self.startup_info.clone() + } + + pub(super) fn set_startup_info(&mut self, startup_info: startup::Info) -> bool { + if self.startup_info.is_none() { + self.startup_info = Some(startup_info); + true + } else { + false + } + } + pub(crate) fn set_watcher_ready(&mut self) { self.watcher_ready = true; } diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/builder.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/builder.rs index 5332c96129..8e61704e53 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/builder.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/builder.rs @@ -1,22 +1,18 @@ use std::sync::Arc; -use astria_core::primitive::v1::asset; use astria_eyre::eyre::{ self, Context as _, }; -use tokio::sync::{ - mpsc, - oneshot, -}; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::info; use super::state::State; use crate::{ bridge_withdrawer::{ + startup, submitter::Batch, - SequencerStartupInfo, }, metrics::Metrics, }; @@ -24,29 +20,16 @@ use crate::{ const BATCH_QUEUE_SIZE: usize = 256; pub(crate) struct Handle { - startup_info_rx: Option>, batches_tx: mpsc::Sender, } impl Handle { - pub(crate) fn new( - startup_info_rx: oneshot::Receiver, - batches_tx: mpsc::Sender, - ) -> Self { + pub(crate) fn new(batches_tx: mpsc::Sender) -> Self { Self { - startup_info_rx: Some(startup_info_rx), batches_tx, } } - pub(crate) async fn recv_startup_info(&mut self) -> eyre::Result { - self.startup_info_rx - .take() - .expect("startup info should only be taken once - this is a bug") - .await - .wrap_err("failed to get startup info from submitter. channel was dropped.") - } - pub(crate) async fn send_batch(&self, batch: Batch) -> eyre::Result<()> { self.batches_tx .send(batch) @@ -57,13 +40,11 @@ impl Handle { pub(crate) struct Builder { pub(crate) shutdown_token: CancellationToken, + pub(crate) startup_handle: startup::InfoHandle, pub(crate) sequencer_key_path: String, pub(crate) sequencer_address_prefix: String, - pub(crate) sequencer_chain_id: String, pub(crate) sequencer_cometbft_endpoint: String, pub(crate) state: Arc, - pub(crate) expected_fee_asset: asset::Denom, - pub(crate) min_expected_fee_asset_balance: u128, pub(crate) metrics: &'static Metrics, } @@ -72,13 +53,11 @@ impl Builder { pub(crate) fn build(self) -> eyre::Result<(super::Submitter, Handle)> { let Self { shutdown_token, + startup_handle, sequencer_key_path, sequencer_address_prefix, - sequencer_chain_id, sequencer_cometbft_endpoint, state, - expected_fee_asset, - min_expected_fee_asset_balance, metrics, } = self; @@ -94,20 +73,16 @@ impl Builder { .wrap_err("failed constructing cometbft http client")?; let (batches_tx, batches_rx) = tokio::sync::mpsc::channel(BATCH_QUEUE_SIZE); - let (startup_tx, startup_rx) = tokio::sync::oneshot::channel(); - let handle = Handle::new(startup_rx, batches_tx); + let handle = Handle::new(batches_tx); Ok(( super::Submitter { shutdown_token, + startup_handle, state, batches_rx, sequencer_cometbft_client, signer, - sequencer_chain_id, - startup_tx, - expected_fee_asset, - min_expected_fee_asset_balance, metrics, }, handle, diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/mod.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/mod.rs index 4e37e04910..b869d7c7f3 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/mod.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/mod.rs @@ -3,56 +3,29 @@ use std::{ time::Duration, }; -use astria_core::{ - bridge::{ - self, - Ics20WithdrawalFromRollupMemo, - }, - primitive::v1::asset, - protocol::{ - asset::v1alpha1::AllowedFeeAssetsResponse, - bridge::v1alpha1::BridgeAccountLastTxHashResponse, - transaction::v1alpha1::{ - Action, - TransactionParams, - UnsignedTransaction, - }, - }, +use astria_core::protocol::transaction::v1alpha1::{ + Action, + TransactionParams, + UnsignedTransaction, }; use astria_eyre::eyre::{ self, - ensure, eyre, Context, - OptionExt, }; pub(crate) use builder::Builder; pub(super) use builder::Handle; -use prost::Message as _; use sequencer_client::{ - tendermint_rpc::{ - self, - endpoint::broadcast::tx_commit, - }, + tendermint_rpc::endpoint::broadcast::tx_commit, Address, - BalanceResponse, SequencerClientExt, SignedTransaction, }; use signer::SequencerKey; use state::State; -use tendermint_rpc::{ - endpoint::tx, - Client, -}; use tokio::{ select, - sync::{ - mpsc, - oneshot::{ - self, - }, - }, + sync::mpsc, time::Instant, }; use tokio_util::sync::CancellationToken; @@ -69,39 +42,40 @@ use tracing::{ use super::{ batch::Batch, + startup, state, - SequencerStartupInfo, }; use crate::metrics::Metrics; mod builder; -mod signer; +pub(crate) mod signer; #[cfg(test)] mod tests; pub(super) struct Submitter { shutdown_token: CancellationToken, + startup_handle: startup::InfoHandle, state: Arc, batches_rx: mpsc::Receiver, sequencer_cometbft_client: sequencer_client::HttpClient, signer: SequencerKey, - sequencer_chain_id: String, - startup_tx: oneshot::Sender, - expected_fee_asset: asset::Denom, - min_expected_fee_asset_balance: u128, metrics: &'static Metrics, } impl Submitter { pub(super) async fn run(mut self) -> eyre::Result<()> { - // call startup - let startup = self - .startup() - .await - .wrap_err("submitter failed to start up")?; - self.startup_tx - .send(startup) - .map_err(|_startup| eyre!("failed to send startup info to watcher"))?; + let sequencer_chain_id = select! { + () = self.shutdown_token.cancelled() => { + info!("submitter received shutdown signal while waiting for startup"); + return Ok(()); + } + + startup_info = self.startup_handle.get_info() => { + let startup::Info { chain_id, .. } = startup_info.wrap_err("submitter failed to get startup info")?; + self.state.set_submitter_ready(); + chain_id + } + }; let reason = loop { select!( @@ -122,10 +96,10 @@ impl Submitter { self.sequencer_cometbft_client.clone(), &self.signer, self.state.clone(), - &self.sequencer_chain_id, + &sequencer_chain_id, actions, rollup_height, - self.metrics + self.metrics, ).await { break Err(e); } @@ -148,175 +122,6 @@ impl Submitter { Ok(()) } - - /// Confirms configuration values against the sequencer node and then syncs the next sequencer - /// nonce and rollup block according to the latest on-chain state. - /// - /// Configuration values checked: - /// - `self.chain_id` matches the value returned from the sequencer node's genesis - /// - `self.fee_asset_id` is a valid fee asset on the sequencer node - /// - `self.sequencer_key.address` has a sufficient balance of `self.fee_asset_id` - /// - /// Sync process: - /// - Fetch the last transaction hash by the bridge account from the sequencer - /// - Fetch the corresponding transaction - /// - Extract the last nonce used from the transaction - /// - Extract the rollup block height from the memo of one of the withdraw actions in the - /// transaction - /// - /// # Returns - /// A struct with the information collected and validated during startup: - /// - `fee_asset_id` - /// - `next_batch_rollup_height` - /// - /// # Errors - /// - /// - `self.chain_id` does not match the value returned from the sequencer node - /// - `self.fee_asset_id` is not a valid fee asset on the sequencer node - /// - `self.sequencer_key.address` does not have a sufficient balance of `self.fee_asset_id`. - async fn startup(&mut self) -> eyre::Result { - let actual_chain_id = - get_sequencer_chain_id(self.sequencer_cometbft_client.clone(), self.state.clone()) - .await - .wrap_err("failed to get chain id from sequencer")?; - ensure!( - self.sequencer_chain_id == actual_chain_id.to_string(), - "sequencer_chain_id provided in config does not match chain_id returned from sequencer" - ); - - let expected_fee_asset_ibc = self.expected_fee_asset.to_ibc_prefixed(); - // confirm that the fee asset ID is valid - let allowed_fee_assets_resp = - get_allowed_fee_assets(self.sequencer_cometbft_client.clone(), self.state.clone()) - .await - .wrap_err("failed to get allowed fee asset ids from sequencer")?; - ensure!( - allowed_fee_assets_resp - .fee_assets - .iter() - .any(|asset| asset.to_ibc_prefixed() == expected_fee_asset_ibc), - "fee_asset_id provided in config is not a valid fee asset on the sequencer" - ); - - // confirm that the sequencer key has a sufficient balance of the fee asset - let fee_asset_balances = get_latest_balance( - self.sequencer_cometbft_client.clone(), - self.state.clone(), - *self.signer.address(), - ) - .await - .wrap_err("failed to get latest balance")?; - let fee_asset_balance = fee_asset_balances - .balances - .into_iter() - .find(|balance| balance.denom.to_ibc_prefixed() == expected_fee_asset_ibc) - .ok_or_eyre("withdrawer's account does not have the minimum balance of the fee asset")? - .balance; - ensure!( - fee_asset_balance >= self.min_expected_fee_asset_balance, - "sequencer key does not have a sufficient balance of the fee asset" - ); - - // sync to latest on-chain state - let next_batch_rollup_height = self - .get_next_rollup_height() - .await - .wrap_err("failed to get next rollup block height")?; - - self.state.set_submitter_ready(); - - // send startup info to watcher - let startup = SequencerStartupInfo { - fee_asset: self.expected_fee_asset.clone(), - next_batch_rollup_height, - }; - Ok(startup) - } - - /// Gets the data necessary for syncing to the latest on-chain state from the sequencer. Since - /// we batch all events from a given rollup block into a single sequencer transaction, we - /// get the last tx finalized by the bridge account on the sequencer and extract the rollup - /// height from it. - /// - /// The rollup height is extracted from the block height value in the memo of one of the actions - /// in the batch. - /// - /// # Returns - /// The next batch rollup height to process. - /// - /// # Errors - /// - /// 1. Failing to get and deserialize a valid last transaction by the bridge account from the - /// sequencer. - /// 2. The last transaction by the bridge account failed to execute (this should not happen in - /// the sequencer logic) - /// 3. The last transaction by the bridge account did not contain a withdrawal action - /// 4. The memo of the last transaction by the bridge account could not be parsed - async fn get_next_rollup_height(&mut self) -> eyre::Result { - let signed_transaction = self - .get_last_transaction() - .await - .wrap_err("failed to get the bridge account's last sequencer transaction")?; - let next_batch_rollup_height = if let Some(signed_transaction) = signed_transaction { - rollup_height_from_signed_transaction(&signed_transaction).wrap_err( - "failed to extract rollup height from last transaction by the bridge account", - )? - } else { - 1 - }; - Ok(next_batch_rollup_height) - } - - async fn get_last_transaction(&self) -> eyre::Result> { - // get last transaction hash by the bridge account, if it exists - let last_transaction_hash_resp = get_bridge_account_last_transaction_hash( - self.sequencer_cometbft_client.clone(), - self.state.clone(), - *self.signer.address(), - ) - .await - .wrap_err("failed to fetch last transaction hash by the bridge account")?; - - let Some(tx_hash) = last_transaction_hash_resp.tx_hash else { - return Ok(None); - }; - - let tx_hash = tendermint::Hash::try_from(tx_hash.to_vec()) - .wrap_err("failed to convert last transaction hash to tendermint hash")?; - - // get the corresponding transaction - let last_transaction = get_tx( - self.sequencer_cometbft_client.clone(), - self.state.clone(), - tx_hash, - ) - .await - .wrap_err("failed to fetch last transaction by the bridge account")?; - - // check that the transaction actually executed - ensure!( - last_transaction.tx_result.code == tendermint::abci::Code::Ok, - "last transaction by the bridge account failed to execute. this should not happen in \ - the sequencer logic." - ); - - let proto_tx = - astria_core::generated::protocol::transaction::v1alpha1::SignedTransaction::decode( - &*last_transaction.tx, - ) - .wrap_err("failed to convert transaction data from CometBFT to proto")?; - - let tx = SignedTransaction::try_from_raw(proto_tx) - .wrap_err("failed to convert transaction data from proto to SignedTransaction")?; - - info!( - last_bridge_account_tx.hash = %telemetry::display::hex(&tx_hash), - last_bridge_account_tx.height = i64::from(last_transaction.height), - "fetched last transaction by the bridge account" - ); - - Ok(Some(tx)) - } } async fn process_batch( @@ -507,232 +312,3 @@ async fn submit_tx( res } - -#[instrument(skip_all)] -async fn get_sequencer_chain_id( - client: sequencer_client::HttpClient, - state: Arc, -) -> eyre::Result { - use sequencer_client::Client as _; - - let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(20)) - .on_retry( - |attempt: u32, next_delay: Option, error: &tendermint_rpc::Error| { - let state = Arc::clone(&state); - state.set_sequencer_connected(false); - - let wait_duration = next_delay - .map(humantime::format_duration) - .map(tracing::field::display); - warn!( - attempt, - wait_duration, - error = error as &dyn std::error::Error, - "attempt to fetch sequencer genesis info; retrying after backoff", - ); - futures::future::ready(()) - }, - ); - - let genesis: tendermint::Genesis = tryhard::retry_fn(|| client.genesis()) - .with_config(retry_config) - .await - .wrap_err("failed to get genesis info from Sequencer after a lot of attempts")?; - - state.set_sequencer_connected(true); - - Ok(genesis.chain_id) -} - -#[instrument(skip_all)] -async fn get_allowed_fee_assets( - client: sequencer_client::HttpClient, - state: Arc, -) -> eyre::Result { - let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(20)) - .on_retry( - |attempt: u32, - next_delay: Option, - error: &sequencer_client::extension_trait::Error| { - let state = Arc::clone(&state); - state.set_sequencer_connected(false); - - let wait_duration = next_delay - .map(humantime::format_duration) - .map(tracing::field::display); - warn!( - attempt, - wait_duration, - error = error as &dyn std::error::Error, - "attempt to fetch sequencer allowed fee asset ids; retrying after backoff", - ); - futures::future::ready(()) - }, - ); - - let res = tryhard::retry_fn(|| client.get_allowed_fee_assets()) - .with_config(retry_config) - .await - .wrap_err("failed to get allowed fee asset ids from Sequencer after a lot of attempts"); - - state.set_sequencer_connected(res.is_ok()); - - res -} - -#[instrument(skip_all)] -async fn get_latest_balance( - client: sequencer_client::HttpClient, - state: Arc, - address: Address, -) -> eyre::Result { - let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(20)) - .on_retry( - |attempt: u32, - next_delay: Option, - error: &sequencer_client::extension_trait::Error| { - let state = Arc::clone(&state); - state.set_sequencer_connected(false); - - let wait_duration = next_delay - .map(humantime::format_duration) - .map(tracing::field::display); - warn!( - attempt, - wait_duration, - error = error as &dyn std::error::Error, - "attempt to get latest balance; retrying after backoff", - ); - futures::future::ready(()) - }, - ); - - let res = tryhard::retry_fn(|| client.get_latest_balance(address)) - .with_config(retry_config) - .await - .wrap_err("failed to get latest balance from Sequencer after a lot of attempts"); - - state.set_sequencer_connected(res.is_ok()); - - res -} - -#[instrument(skip_all)] -async fn get_bridge_account_last_transaction_hash( - client: sequencer_client::HttpClient, - state: Arc, - address: Address, -) -> eyre::Result { - let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(20)) - .on_retry( - |attempt: u32, - next_delay: Option, - error: &sequencer_client::extension_trait::Error| { - let state = Arc::clone(&state); - state.set_sequencer_connected(false); - - let wait_duration = next_delay - .map(humantime::format_duration) - .map(tracing::field::display); - warn!( - attempt, - wait_duration, - error = error as &dyn std::error::Error, - "attempt to fetch last bridge account's transaction hash; retrying after \ - backoff", - ); - futures::future::ready(()) - }, - ); - - let res = tryhard::retry_fn(|| client.get_bridge_account_last_transaction_hash(address)) - .with_config(retry_config) - .await - .wrap_err( - "failed to fetch last bridge account's transaction hash from Sequencer after a lot of \ - attempts", - ); - - state.set_sequencer_connected(res.is_ok()); - - res -} - -#[instrument(skip_all)] -async fn get_tx( - client: sequencer_client::HttpClient, - state: Arc, - tx_hash: tendermint::Hash, -) -> eyre::Result { - let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(20)) - .on_retry( - |attempt: u32, next_delay: Option, error: &tendermint_rpc::Error| { - let state = Arc::clone(&state); - state.set_sequencer_connected(false); - - let wait_duration = next_delay - .map(humantime::format_duration) - .map(tracing::field::display); - warn!( - attempt, - wait_duration, - error = error as &dyn std::error::Error, - "attempt to get transaction from Sequencer; retrying after backoff", - ); - futures::future::ready(()) - }, - ); - - let res = tryhard::retry_fn(|| client.tx(tx_hash, false)) - .with_config(retry_config) - .await - .wrap_err("failed to get transaction from Sequencer after a lot of attempts"); - - state.set_sequencer_connected(res.is_ok()); - - res -} - -fn rollup_height_from_signed_transaction( - signed_transaction: &SignedTransaction, -) -> eyre::Result { - // find the last batch's rollup block height - let withdrawal_action = signed_transaction - .actions() - .iter() - .find(|action| matches!(action, Action::BridgeUnlock(_) | Action::Ics20Withdrawal(_))) - .ok_or_eyre("last transaction by the bridge account did not contain a withdrawal action")?; - - let last_batch_rollup_height = match withdrawal_action { - Action::BridgeUnlock(action) => { - let memo: bridge::UnlockMemo = serde_json::from_slice(&action.memo) - .wrap_err("failed to parse memo from last transaction by the bridge account")?; - Some(memo.block_number) - } - Action::Ics20Withdrawal(action) => { - let memo: Ics20WithdrawalFromRollupMemo = serde_json::from_str(&action.memo) - .wrap_err("failed to parse memo from last transaction by the bridge account")?; - Some(memo.block_number) - } - _ => None, - } - .expect("action is already checked to be either BridgeUnlock or Ics20Withdrawal"); - - info!( - last_batch.tx_hash = %telemetry::display::hex(&signed_transaction.sha256_of_proto_encoding()), - last_batch.rollup_height = last_batch_rollup_height, - "extracted rollup height from last batch of withdrawals", - ); - - Ok(last_batch_rollup_height) -} diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/tests.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/tests.rs index eb99bc63b1..edc2615b6c 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/tests.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/tests.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, io::Write as _, sync::Arc, time::Duration, @@ -11,26 +10,18 @@ use astria_core::{ self, Ics20WithdrawalFromRollupMemo, }, - crypto::SigningKey, generated::protocol::account::v1alpha1::NonceResponse, primitive::v1::asset, - protocol::{ - account::v1alpha1::AssetBalance, - bridge::v1alpha1::BridgeAccountLastTxHashResponse, - transaction::v1alpha1::{ - action::{ - BridgeUnlockAction, - Ics20Withdrawal, - }, - Action, - TransactionParams, - UnsignedTransaction, + protocol::transaction::v1alpha1::{ + action::{ + BridgeUnlockAction, + Ics20Withdrawal, }, + Action, }, }; use astria_eyre::eyre::{ self, - Context, }; use ibc_types::core::client::Height as IbcHeight; use once_cell::sync::Lazy; @@ -46,18 +37,13 @@ use serde_json::json; use tempfile::NamedTempFile; use tendermint::{ abci::{ - self, response::CheckTx, types::ExecTxResult, }, block::Height, - chain, }; use tendermint_rpc::{ - endpoint::{ - broadcast::tx_sync, - tx, - }, + endpoint::broadcast::tx_sync, request, }; use tokio::task::JoinHandle; @@ -79,6 +65,7 @@ use super::Submitter; use crate::{ bridge_withdrawer::{ batch::Batch, + startup, state, submitter, }, @@ -87,8 +74,6 @@ use crate::{ const SEQUENCER_CHAIN_ID: &str = "test_sequencer-1000"; const DEFAULT_LAST_ROLLUP_HEIGHT: u64 = 1; -const DEFAULT_LAST_SEQUENCER_HEIGHT: u64 = 0; -const DEFAULT_SEQUENCER_NONCE: u32 = 0; const DEFAULT_IBC_DENOM: &str = "transfer/channel-0/utia"; fn default_native_asset() -> asset::Denom { @@ -141,22 +126,19 @@ impl TestSubmitter { let cometbft_mock = MockServer::start().await; let sequencer_cometbft_endpoint = format!("http://{}", cometbft_mock.address()); - // withdrawer state let state = Arc::new(state::State::new()); - // not testing watcher here so just set it to ready + let startup_handle = startup::InfoHandle::new(state.subscribe()); state.set_watcher_ready(); let metrics = Box::leak(Box::new(Metrics::new())); let (submitter, submitter_handle) = submitter::Builder { shutdown_token: shutdown_token.clone(), + startup_handle, sequencer_key_path, sequencer_address_prefix: "astria".into(), - sequencer_chain_id: SEQUENCER_CHAIN_ID.to_string(), sequencer_cometbft_endpoint, state, - expected_fee_asset: default_native_asset(), - min_expected_fee_asset_balance: 1_000_000, metrics, } .build() @@ -170,23 +152,18 @@ impl TestSubmitter { } } - async fn startup_and_spawn_with_guards(&mut self, startup_guards: HashMap) { + async fn startup(&mut self) { let submitter = self.submitter.take().unwrap(); let mut state = submitter.state.subscribe(); - self.submitter_task_handle = Some(tokio::spawn(submitter.run())); - - // wait for all startup guards to be satisfied - for (name, guard) in startup_guards { - tokio::time::timeout(Duration::from_millis(100), guard.wait_until_satisfied()) - .await - .wrap_err(format!("{name} guard not satisfied in time.")) - .unwrap(); - } + submitter.state.set_startup_info(startup::Info { + fee_asset: "fee-asset".parse::().unwrap(), + starting_rollup_height: 1, + chain_id: SEQUENCER_CHAIN_ID.to_string(), + }); - // consume the startup info in place of the watcher - self.submitter_handle.recv_startup_info().await.unwrap(); + self.submitter_task_handle = Some(tokio::spawn(submitter.run())); // wait for the submitter to be ready state @@ -195,85 +172,13 @@ impl TestSubmitter { .unwrap(); } - async fn startup_and_spawn(&mut self) { - let startup_guards = register_startup_guards(&self.cometbft_mock).await; - let sync_guards = register_sync_guards(&self.cometbft_mock).await; - self.startup_and_spawn_with_guards( - startup_guards - .into_iter() - .chain(sync_guards.into_iter()) - .collect(), - ) - .await; - } - async fn spawn() -> Self { let mut submitter = Self::setup().await; - submitter.startup_and_spawn().await; + submitter.startup().await; submitter } } -async fn register_default_chain_id_guard(cometbft_mock: &MockServer) -> MockGuard { - register_genesis_chain_id_response(SEQUENCER_CHAIN_ID, cometbft_mock).await -} - -async fn register_default_fee_assets_guard(cometbft_mock: &MockServer) -> MockGuard { - let fee_assets = vec![default_native_asset()]; - register_allowed_fee_assets_response(fee_assets, cometbft_mock).await -} - -async fn register_default_min_expected_fee_asset_balance_guard( - cometbft_mock: &MockServer, -) -> MockGuard { - register_get_latest_balance( - vec![AssetBalance { - denom: default_native_asset(), - balance: 1_000_000u128, - }], - cometbft_mock, - ) - .await -} - -async fn register_default_last_bridge_tx_hash_guard(cometbft_mock: &MockServer) -> MockGuard { - register_last_bridge_tx_hash_guard(cometbft_mock, make_last_bridge_tx_hash_response()).await -} - -async fn register_default_last_bridge_tx_guard(cometbft_mock: &MockServer) -> MockGuard { - register_tx_guard(cometbft_mock, make_tx_response()).await -} - -async fn register_startup_guards(cometbft_mock: &MockServer) -> HashMap { - HashMap::from([ - ( - "chain_id".to_string(), - register_default_chain_id_guard(cometbft_mock).await, - ), - ( - "fee_assets".to_string(), - register_default_fee_assets_guard(cometbft_mock).await, - ), - ( - "min_expected_fee_asset_balance".to_string(), - register_default_min_expected_fee_asset_balance_guard(cometbft_mock).await, - ), - ]) -} - -async fn register_sync_guards(cometbft_mock: &MockServer) -> HashMap { - HashMap::from([ - ( - "tx_hash".to_string(), - register_default_last_bridge_tx_hash_guard(cometbft_mock).await, - ), - ( - "last_bridge_tx".to_string(), - register_default_last_bridge_tx_guard(cometbft_mock).await, - ), - ]) -} - fn make_ics20_withdrawal_action() -> Action { let denom = DEFAULT_IBC_DENOM.parse::().unwrap(); let destination_chain_address = "address".to_string(); @@ -355,47 +260,6 @@ fn make_tx_commit_deliver_tx_failure_response() -> tx_commit::Response { } } -fn make_last_bridge_tx_hash_response() -> BridgeAccountLastTxHashResponse { - BridgeAccountLastTxHashResponse { - height: DEFAULT_LAST_ROLLUP_HEIGHT, - tx_hash: Some([0u8; 32]), - } -} - -fn make_signed_bridge_transaction() -> SignedTransaction { - let alice_secret_bytes: [u8; 32] = - hex::decode("2bd806c97f0e00af1a1fc3328fa763a9269723c8db8fac4f93af71db186d6e90") - .unwrap() - .try_into() - .unwrap(); - let alice_key = SigningKey::from(alice_secret_bytes); - - let actions = vec![make_bridge_unlock_action(), make_ics20_withdrawal_action()]; - UnsignedTransaction { - params: TransactionParams::builder() - .nonce(DEFAULT_SEQUENCER_NONCE) - .chain_id(SEQUENCER_CHAIN_ID) - .build(), - actions, - } - .into_signed(&alice_key) -} - -fn make_tx_response() -> tx::Response { - let tx = make_signed_bridge_transaction(); - tx::Response { - hash: tx.sha256_of_proto_encoding().to_vec().try_into().unwrap(), - height: DEFAULT_LAST_SEQUENCER_HEIGHT.try_into().unwrap(), - index: 0, - tx_result: ExecTxResult { - code: abci::Code::Ok, - ..ExecTxResult::default() - }, - tx: tx.into_raw().encode_to_vec(), - proof: None, - } -} - /// Convert a `Request` object to a `SignedTransaction` fn signed_tx_from_request(request: &Request) -> SignedTransaction { use astria_core::generated::protocol::transaction::v1alpha1::SignedTransaction as RawSignedTransaction; @@ -413,139 +277,6 @@ fn signed_tx_from_request(request: &Request) -> SignedTransaction { signed_tx } -async fn register_genesis_chain_id_response(chain_id: &str, server: &MockServer) -> MockGuard { - use tendermint::{ - consensus::{ - params::{ - AbciParams, - ValidatorParams, - }, - Params, - }, - genesis::Genesis, - time::Time, - }; - let response = tendermint_rpc::endpoint::genesis::Response:: { - genesis: Genesis { - genesis_time: Time::from_unix_timestamp(1, 1).unwrap(), - chain_id: chain::Id::try_from(chain_id).unwrap(), - initial_height: 1, - consensus_params: Params { - block: tendermint::block::Size { - max_bytes: 1024, - max_gas: 1024, - time_iota_ms: 1000, - }, - evidence: tendermint::evidence::Params { - max_age_num_blocks: 1000, - max_age_duration: tendermint::evidence::Duration(Duration::from_secs(3600)), - max_bytes: 1_048_576, - }, - validator: ValidatorParams { - pub_key_types: vec![tendermint::public_key::Algorithm::Ed25519], - }, - version: None, - abci: AbciParams::default(), - }, - validators: vec![], - app_hash: tendermint::hash::AppHash::default(), - app_state: serde_json::Value::Null, - }, - }; - - let wrapper = response::Wrapper::new_with_id(tendermint_rpc::Id::Num(1), Some(response), None); - Mock::given(body_partial_json(json!({"method": "genesis"}))) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(&wrapper) - .append_header("Content-Type", "application/json"), - ) - .up_to_n_times(1) - .expect(1) - .mount_as_scoped(server) - .await -} - -async fn register_allowed_fee_assets_response( - fee_assets: Vec, - cometbft_mock: &MockServer, -) -> MockGuard { - let response = tendermint_rpc::endpoint::abci_query::Response { - response: tendermint_rpc::endpoint::abci_query::AbciQuery { - value: astria_core::protocol::asset::v1alpha1::AllowedFeeAssetsResponse { - fee_assets, - height: 1, - } - .into_raw() - .encode_to_vec(), - ..Default::default() - }, - }; - let wrapper = response::Wrapper::new_with_id(tendermint_rpc::Id::Num(1), Some(response), None); - Mock::given(body_partial_json(json!({"method": "abci_query"}))) - .and(body_string_contains("asset/allowed_fee_assets")) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(&wrapper) - .append_header("Content-Type", "application/json"), - ) - .expect(1) - .mount_as_scoped(cometbft_mock) - .await -} - -async fn register_get_latest_balance( - balances: Vec, - server: &MockServer, -) -> MockGuard { - let response = tendermint_rpc::endpoint::abci_query::Response { - response: tendermint_rpc::endpoint::abci_query::AbciQuery { - value: astria_core::protocol::account::v1alpha1::BalanceResponse { - balances, - height: 1, - } - .into_raw() - .encode_to_vec(), - ..Default::default() - }, - }; - - let wrapper = response::Wrapper::new_with_id(tendermint_rpc::Id::Num(1), Some(response), None); - Mock::given(body_partial_json(json!({"method": "abci_query"}))) - .and(body_string_contains("accounts/balance")) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(&wrapper) - .append_header("Content-Type", "application/json"), - ) - .expect(1) - .mount_as_scoped(server) - .await -} - -async fn register_last_bridge_tx_hash_guard( - server: &MockServer, - response: BridgeAccountLastTxHashResponse, -) -> MockGuard { - let response = tendermint_rpc::endpoint::abci_query::Response { - response: tendermint_rpc::endpoint::abci_query::AbciQuery { - value: response.into_raw().encode_to_vec(), - ..Default::default() - }, - }; - let wrapper = response::Wrapper::new_with_id(tendermint_rpc::Id::Num(1), Some(response), None); - Mock::given(body_partial_json(json!({"method": "abci_query"}))) - .and(body_string_contains("bridge/account_last_tx_hash")) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(&wrapper) - .append_header("Content-Type", "application/json"), - ) - .expect(1) - .mount_as_scoped(server) - .await -} - async fn register_get_nonce_response(server: &MockServer, response: NonceResponse) -> MockGuard { let response = tendermint_rpc::endpoint::abci_query::Response { response: tendermint_rpc::endpoint::abci_query::AbciQuery { @@ -566,19 +297,6 @@ async fn register_get_nonce_response(server: &MockServer, response: NonceRespons .await } -async fn register_tx_guard(server: &MockServer, response: tx::Response) -> MockGuard { - let wrapper = response::Wrapper::new_with_id(tendermint_rpc::Id::Num(1), Some(response), None); - Mock::given(body_partial_json(json!({"method": "tx"}))) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(&wrapper) - .append_header("Content-Type", "application/json"), - ) - .expect(1) - .mount_as_scoped(server) - .await -} - async fn register_broadcast_tx_commit_response( server: &MockServer, response: tx_commit::Response, diff --git a/crates/astria-bridge-withdrawer/src/config.rs b/crates/astria-bridge-withdrawer/src/config.rs index ad9ed3d1ad..7a2f33f754 100644 --- a/crates/astria-bridge-withdrawer/src/config.rs +++ b/crates/astria-bridge-withdrawer/src/config.rs @@ -18,8 +18,6 @@ pub struct Config { pub sequencer_key_path: String, // The fee asset denomination to use for the bridge account's transactions. pub fee_asset_denomination: asset::Denom, - // The minimum expected balance of the fee asset in the bridge account. - pub min_expected_fee_asset_balance: u64, // The asset denomination being withdrawn from the rollup. pub rollup_asset_denomination: String, // The bridge address corresponding to the bridged rollup asset on the sequencer.