diff --git a/Cargo.lock b/Cargo.lock index 1e21cf160b..c7b5f633b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -502,17 +502,28 @@ dependencies = [ "astria-core", "astria-eyre", "astria-grpc-mock", + "astria-sequencer-client", "astria-telemetry", "axum", + "ed25519-consensus", "ethers", + "futures", + "hex", "http", + "humantime", "hyper", + "ibc-types", "metrics", + "pin-project-lite", + "prost", "serde", "serde_json", + "sha2 0.10.8", + "tendermint 0.34.1", "tokio", "tokio-util 0.7.10", "tracing", + "tryhard", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index cd4ad3a39c..7f0650438c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,6 +72,7 @@ itertools = "0.12.1" itoa = "1.0.10" jsonrpsee = { version = "0.20" } once_cell = "1.17.1" +pin-project-lite = "0.2.13" sha2 = "0.10" serde = "1" serde_json = "1" diff --git a/crates/astria-bridge-withdrawer/Cargo.toml b/crates/astria-bridge-withdrawer/Cargo.toml index 38158c7f92..1612bbc2e1 100644 --- a/crates/astria-bridge-withdrawer/Cargo.toml +++ b/crates/astria-bridge-withdrawer/Cargo.toml @@ -15,18 +15,32 @@ name = "astria-bridge-withdrawer" http = "0.2.9" axum = { workspace = true } +ed25519-consensus = { workspace = true } +futures = { workspace = true } +hex = { workspace = true } ethers = { workspace = true, features = ["ethers-solc", "ws"] } hyper = { workspace = true } +humantime = { workspace = true } +ibc-types = { workspace = true } metrics = { workspace = true } +pin-project-lite = { workspace = true } +prost = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +sha2 = { workspace = true } +tendermint = { workspace = true } tracing = { workspace = true } +tryhard = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } tokio-util = { workspace = true } astria-build-info = { path = "../astria-build-info", features = ["runtime"] } +astria-core = { path = "../astria-core", features = ["serde", "server"] } astria-eyre = { path = "../astria-eyre" } config = { package = "astria-config", path = "../astria-config" } +sequencer-client = { package = "astria-sequencer-client", path = "../astria-sequencer-client", features = [ + "http", +] } telemetry = { package = "astria-telemetry", path = "../astria-telemetry", features = [ "display", ] } diff --git a/crates/astria-bridge-withdrawer/local.env.example b/crates/astria-bridge-withdrawer/local.env.example index 0c43abfa99..becc6aaf49 100644 --- a/crates/astria-bridge-withdrawer/local.env.example +++ b/crates/astria-bridge-withdrawer/local.env.example @@ -24,8 +24,23 @@ NO_COLOR= # serves RPCs. ASTRIA_BRIDGE_WITHDRAWER_COMETBFT_ENDPOINT="http://127.0.0.1:26657" +# Chain ID of the sequencer chain which transactions are submitted to. +ASTRIA_BRIDGE_WITHDRAWER_SEQUENCER_CHAIN_ID="astria-dev-1" + +# The path to the file storing the private key for the sequencer account used for signing +# transactions. The file should contain a hex-encoded Ed25519 secret key. +ASTRIA_BRIDGE_WITHDRAWER_SEQUENCER_KEY_PATH=/path/to/priv_sequencer_key.json + +# The fee asset denomination to use for the bridge account's transactions. +ASTRIA_BRIDGE_WITHDRAWER_FEE_ASSET_DENOMINATION="nria" + +# The asset denomination being withdrawn from the rollup. +ASTRIA_BRIDGE_WITHDRAWER_ROLLUP_ASSET_DENOMINATION="nria", + +# The address of the AstriaWithdrawer contract on the evm rollup. ASTRIA_BRIDGE_WITHDRAWER_ETHEREUM_CONTRACT_ADDRESS="0x" +# The rpc endpoint of the evm rollup. ASTRIA_BRIDGE_WITHDRAWER_ETHEREUM_RPC_ENDPOINT="http://127.0.0.1:8545" # The socket address at which the bridge service will server healthz, readyz, and status calls. diff --git a/crates/astria-bridge-withdrawer/src/api.rs b/crates/astria-bridge-withdrawer/src/api.rs index cbc562d022..25e7e786f3 100644 --- a/crates/astria-bridge-withdrawer/src/api.rs +++ b/crates/astria-bridge-withdrawer/src/api.rs @@ -21,38 +21,38 @@ use hyper::server::conn::AddrIncoming; use serde::Serialize; use tokio::sync::watch; -use crate::ethereum; +use crate::withdrawer::StateSnapshot; pub(crate) type ApiServer = axum::Server>; -type BridgeState = watch::Receiver; +type WithdrawerState = watch::Receiver; #[derive(Clone)] /// `AppState` is used for as an axum extractor in its method handlers. struct AppState { - bridge_state: BridgeState, + withdrawer_state: WithdrawerState, } -impl FromRef for BridgeState { +impl FromRef for WithdrawerState { fn from_ref(app_state: &AppState) -> Self { - app_state.bridge_state.clone() + app_state.withdrawer_state.clone() } } -pub(crate) fn start(socket_addr: SocketAddr, bridge_state: BridgeState) -> ApiServer { +pub(crate) fn start(socket_addr: SocketAddr, withdrawer_state: WithdrawerState) -> ApiServer { let app = Router::new() .route("/healthz", get(get_healthz)) .route("/readyz", get(get_readyz)) .route("/status", get(get_status)) .with_state(AppState { - bridge_state, + withdrawer_state, }); axum::Server::bind(&socket_addr).serve(app.into_make_service()) } #[allow(clippy::unused_async)] // Permit because axum handlers must be async -async fn get_healthz(State(bridge_state): State) -> Healthz { - if bridge_state.borrow().is_healthy() { +async fn get_healthz(State(withdrawer_state): State) -> Healthz { + if withdrawer_state.borrow().is_healthy() { Healthz::Ok } else { Healthz::Degraded @@ -66,9 +66,9 @@ async fn get_healthz(State(bridge_state): State) -> Healthz { /// + there is a current sequencer height (implying a block from sequencer was received) /// + there is a current data availability height (implying a height was received from the DA) #[allow(clippy::unused_async)] // Permit because axum handlers must be async -async fn get_readyz(State(bridge_state): State) -> Readyz { - let is_bridge_online = bridge_state.borrow().is_ready(); - if is_bridge_online { +async fn get_readyz(State(withdrawer_state): State) -> Readyz { + let is_withdrawer_online = withdrawer_state.borrow().is_ready(); + if is_withdrawer_online { Readyz::Ok } else { Readyz::NotReady @@ -76,8 +76,8 @@ async fn get_readyz(State(bridge_state): State) -> Readyz { } #[allow(clippy::unused_async)] // Permit because axum handlers must be async -async fn get_status(State(bridge_state): State) -> Json { - Json(bridge_state.borrow().clone()) +async fn get_status(State(withdrawer_state): State) -> Json { + Json(withdrawer_state.borrow().clone()) } enum Healthz { diff --git a/crates/astria-bridge-withdrawer/src/config.rs b/crates/astria-bridge-withdrawer/src/config.rs index a6200e430e..dd5462edea 100644 --- a/crates/astria-bridge-withdrawer/src/config.rs +++ b/crates/astria-bridge-withdrawer/src/config.rs @@ -11,11 +11,18 @@ use serde::{ pub struct Config { // The cometbft rpc endpoint for submitting transactions to the sequencer. pub cometbft_endpoint: String, + // The chain id of the sequencer chain. + pub sequencer_chain_id: String, + // The path to the private key used to sign transactions submitted to the sequencer. + pub sequencer_key_path: String, + // The fee asset denomination to use for the bridge account's transactions. + pub fee_asset_denomination: String, + // The asset denomination being withdrawn from the rollup. + pub rollup_asset_denomination: String, // The address of the AstriaWithdrawer contract on the evm rollup. pub ethereum_contract_address: String, // The rpc endpoint of the evm rollup. pub ethereum_rpc_endpoint: String, - // The socket address at which the bridge service will server healthz, readyz, and status // calls. pub api_addr: String, diff --git a/crates/astria-bridge-withdrawer/src/ethereum/state.rs b/crates/astria-bridge-withdrawer/src/ethereum/state.rs deleted file mode 100644 index aa03cc916a..0000000000 --- a/crates/astria-bridge-withdrawer/src/ethereum/state.rs +++ /dev/null @@ -1,41 +0,0 @@ -use tokio::sync::watch; - -pub(crate) struct State { - inner: tokio::sync::watch::Sender, -} - -impl State { - pub(super) fn new() -> Self { - let (inner, _) = watch::channel(StateSnapshot::default()); - Self { - inner, - } - } - - pub(super) fn set_ready(&self) { - self.inner.send_modify(StateSnapshot::set_ready); - } - - pub(super) fn subscribe(&self) -> watch::Receiver { - self.inner.subscribe() - } -} - -#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize)] -pub(crate) struct StateSnapshot { - ready: bool, -} - -impl StateSnapshot { - pub(crate) fn set_ready(&mut self) { - self.ready = true; - } - - pub(crate) fn is_ready(&self) -> bool { - self.ready - } - - pub(crate) fn is_healthy(&self) -> bool { - todo!() - } -} diff --git a/crates/astria-bridge-withdrawer/src/lib.rs b/crates/astria-bridge-withdrawer/src/lib.rs index 6817436a18..ea25871261 100644 --- a/crates/astria-bridge-withdrawer/src/lib.rs +++ b/crates/astria-bridge-withdrawer/src/lib.rs @@ -1,10 +1,9 @@ pub(crate) mod api; -pub mod bridge_service; mod build_info; pub(crate) mod config; -pub(crate) mod ethereum; pub mod metrics_init; +pub mod withdrawer; -pub use bridge_service::BridgeService; pub use build_info::BUILD_INFO; pub use config::Config; +pub use withdrawer::WithdrawerService; diff --git a/crates/astria-bridge-withdrawer/src/main.rs b/crates/astria-bridge-withdrawer/src/main.rs index 88f715c0ae..9dfc2d4daa 100644 --- a/crates/astria-bridge-withdrawer/src/main.rs +++ b/crates/astria-bridge-withdrawer/src/main.rs @@ -2,8 +2,8 @@ use std::process::ExitCode; use astria_bridge_withdrawer::{ metrics_init, - BridgeService, Config, + WithdrawerService, BUILD_INFO, }; use astria_eyre::eyre::WrapErr as _; @@ -54,10 +54,10 @@ async fn main() -> ExitCode { let mut sigterm = signal(SignalKind::terminate()) .expect("setting a SIGTERM listener should always work on Unix"); - let (bridge, shutdown_handle) = BridgeService::new(cfg) + let (withdrawer, shutdown_handle) = WithdrawerService::new(cfg) .await - .expect("could not initialize bridge"); - let bridge_handle = tokio::spawn(bridge.run()); + .expect("could not initialize withdrawer"); + let withdrawer_handle = tokio::spawn(withdrawer.run()); let shutdown_token = shutdown_handle.token(); tokio::select!( @@ -72,10 +72,10 @@ async fn main() -> ExitCode { } ); - if let Err(error) = bridge_handle.await { - error!(%error, "failed to join main bridge task"); + if let Err(error) = withdrawer_handle.await { + error!(%error, "failed to join main withdrawer task"); } - info!("bridge stopped"); + info!("withdrawer stopped"); ExitCode::SUCCESS } diff --git a/crates/astria-bridge-withdrawer/src/metrics_init.rs b/crates/astria-bridge-withdrawer/src/metrics_init.rs index e88c68efc3..44b039e72b 100644 --- a/crates/astria-bridge-withdrawer/src/metrics_init.rs +++ b/crates/astria-bridge-withdrawer/src/metrics_init.rs @@ -2,5 +2,56 @@ //! //! Registers metrics & lists constants to be used as metric names throughout crate. +use metrics::{ + describe_counter, + describe_gauge, + describe_histogram, + Unit, +}; + /// Registers all metrics used by this crate. -pub fn register() {} +pub fn register() { + describe_counter!( + NONCE_FETCH_COUNT, + Unit::Count, + "The number of times we have attempted to fetch the nonce" + ); + describe_counter!( + NONCE_FETCH_FAILURE_COUNT, + Unit::Count, + "The number of times we have failed to fetch the nonce" + ); + describe_histogram!( + NONCE_FETCH_LATENCY, + Unit::Milliseconds, + "The latency of nonce fetch" + ); + describe_gauge!(CURRENT_NONCE, Unit::Count, "The current nonce"); + describe_histogram!( + SEQUENCER_SUBMISSION_LATENCY, + Unit::Milliseconds, + "The latency of submitting a transaction to the sequencer" + ); + describe_counter!( + SEQUENCER_SUBMISSION_FAILURE_COUNT, + Unit::Count, + "The number of failed transaction submissions to the sequencer" + ); +} + +pub const NONCE_FETCH_COUNT: &str = concat!(env!("CARGO_CRATE_NAME"), "_nonce_fetch_count"); + +pub const NONCE_FETCH_FAILURE_COUNT: &str = + concat!(env!("CARGO_CRATE_NAME"), "_nonce_fetch_failure_count"); + +pub const NONCE_FETCH_LATENCY: &str = concat!(env!("CARGO_CRATE_NAME"), "_nonce_fetch_latency"); + +pub const CURRENT_NONCE: &str = concat!(env!("CARGO_CRATE_NAME"), "_current_nonce"); + +pub const SEQUENCER_SUBMISSION_FAILURE_COUNT: &str = concat!( + env!("CARGO_CRATE_NAME"), + "_sequencer_submission_failure_count" +); + +pub const SEQUENCER_SUBMISSION_LATENCY: &str = + concat!(env!("CARGO_CRATE_NAME"), "_sequencer_submission_latency"); diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/batch.rs b/crates/astria-bridge-withdrawer/src/withdrawer/batch.rs new file mode 100644 index 0000000000..38009ff9e4 --- /dev/null +++ b/crates/astria-bridge-withdrawer/src/withdrawer/batch.rs @@ -0,0 +1,256 @@ +use std::time::Duration; + +use astria_core::{ + primitive::v1::{ + asset, + asset::Denom, + Address, + }, + protocol::transaction::v1alpha1::{ + action::{ + BridgeUnlockAction, + Ics20Withdrawal, + }, + Action, + }, +}; +use astria_eyre::eyre::{ + self, + OptionExt, + WrapErr as _, +}; +use ethers::types::{ + TxHash, + U64, +}; +use ibc_types::core::client::Height as IbcHeight; +use serde::{ + Deserialize, + Serialize, +}; + +use super::ethereum::astria_withdrawer::{ + Ics20WithdrawalFilter, + SequencerWithdrawalFilter, +}; + +#[derive(Debug, PartialEq, Eq)] +pub(crate) enum WithdrawalEvent { + Sequencer(SequencerWithdrawalFilter), + Ics20(Ics20WithdrawalFilter), +} + +#[derive(Debug, PartialEq, Eq)] +pub(crate) struct EventWithMetadata { + pub(crate) event: WithdrawalEvent, + /// The block in which the log was emitted + pub(crate) block_number: U64, + /// The transaction hash in which the log was emitted + pub(crate) transaction_hash: TxHash, +} + +pub(crate) fn event_to_action( + event_with_metadata: EventWithMetadata, + fee_asset_id: asset::Id, + rollup_asset_denom: Denom, +) -> eyre::Result { + let action = match event_with_metadata.event { + WithdrawalEvent::Sequencer(event) => event_to_bridge_unlock( + event, + event_with_metadata.block_number, + event_with_metadata.transaction_hash, + fee_asset_id, + ) + .wrap_err("failed to convert sequencer withdrawal event to action")?, + WithdrawalEvent::Ics20(event) => event_to_ics20_withdrawal( + event, + event_with_metadata.block_number, + event_with_metadata.transaction_hash, + fee_asset_id, + rollup_asset_denom, + ) + .wrap_err("failed to convert ics20 withdrawal event to action")?, + }; + Ok(action) +} + +#[derive(Debug, Serialize, Deserialize)] +struct BridgeUnlockMemo { + block_number: U64, + transaction_hash: TxHash, +} + +fn event_to_bridge_unlock( + event: SequencerWithdrawalFilter, + block_number: U64, + transaction_hash: TxHash, + fee_asset_id: asset::Id, +) -> eyre::Result { + let memo = BridgeUnlockMemo { + block_number, + transaction_hash, + }; + let action = BridgeUnlockAction { + to: event.sender.to_fixed_bytes().into(), + amount: event.amount.as_u128(), + memo: serde_json::to_vec(&memo).wrap_err("failed to serialize memo to json")?, + fee_asset_id, + }; + Ok(Action::BridgeUnlock(action)) +} + +#[derive(Debug, Serialize, Deserialize)] +struct Ics20WithdrawalMemo { + memo: String, + block_number: U64, + transaction_hash: TxHash, +} + +fn event_to_ics20_withdrawal( + event: Ics20WithdrawalFilter, + block_number: U64, + transaction_hash: TxHash, + fee_asset_id: asset::Id, + rollup_asset_denom: Denom, +) -> eyre::Result { + // TODO: make this configurable + const ICS20_WITHDRAWAL_TIMEOUT: Duration = Duration::from_secs(300); + + let sender: [u8; 20] = event + .sender + .as_bytes() + .try_into() + .expect("U160 must be 20 bytes"); + let denom = rollup_asset_denom.clone(); + + let (_, channel) = denom + .prefix() + .rsplit_once('/') + .ok_or_eyre("denom must have a channel to be withdrawn via IBC")?; + + let memo = Ics20WithdrawalMemo { + memo: String::from_utf8(event.memo.to_vec()) + .wrap_err("failed to convert event memo to utf8")?, + block_number, + transaction_hash, + }; + + let action = Ics20Withdrawal { + denom: Denom::from(rollup_asset_denom), + destination_chain_address: event.destination_chain_address, + // note: this is actually a rollup address; we expect failed ics20 withdrawals to be + // returned to the rollup. + // this is only ok for now because addresses on the sequencer and the rollup are both 20 + // bytes, but this won't work otherwise. + return_address: Address::from(sender), + amount: event.amount.as_u128(), + memo: serde_json::to_string(&memo).wrap_err("failed to serialize memo to json")?, + fee_asset_id, + // note: this refers to the timeout on the destination chain, which we are unaware of. + // thus, we set it to the maximum possible value. + timeout_height: IbcHeight::new(u64::MAX, u64::MAX) + .wrap_err("failed to generate timeout height")?, + timeout_time: calculate_packet_timeout_time(ICS20_WITHDRAWAL_TIMEOUT) + .wrap_err("failed to calculate packet timeout time")?, + source_channel: channel + .parse() + .wrap_err("failed to parse channel from denom")?, + }; + Ok(Action::Ics20Withdrawal(action)) +} + +fn calculate_packet_timeout_time(timeout_delta: Duration) -> eyre::Result { + tendermint::Time::now() + .checked_add(timeout_delta) + .ok_or_eyre("time must not overflow from now plus 10 minutes")? + .unix_timestamp_nanos() + .try_into() + .wrap_err("failed to convert packet timeout i128 to u64") +} + +pub(crate) struct Batch { + /// The withdrawal payloads + pub(crate) actions: Vec, + /// The corresponding rollup block height + pub(crate) rollup_height: u64, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::withdrawer::ethereum::astria_withdrawer::SequencerWithdrawalFilter; + + #[test] + fn event_to_bridge_unlock() { + let denom = Denom::from("nria".to_string()); + let event_with_meta = EventWithMetadata { + event: WithdrawalEvent::Sequencer(SequencerWithdrawalFilter { + sender: [0u8; 20].into(), + amount: 99.into(), + destination_chain_address: [1u8; 20].into(), + }), + block_number: 1.into(), + transaction_hash: [2u8; 32].into(), + }; + let action = event_to_action(event_with_meta, denom.id(), denom.clone()).unwrap(); + let Action::BridgeUnlock(action) = action else { + panic!("expected BridgeUnlock action, got {:?}", action); + }; + + let expected_action = BridgeUnlockAction { + to: [0u8; 20].into(), + amount: 99, + memo: serde_json::to_vec(&BridgeUnlockMemo { + block_number: 1.into(), + transaction_hash: [2u8; 32].into(), + }) + .unwrap(), + fee_asset_id: denom.id(), + }; + + assert_eq!(action, expected_action); + } + + #[test] + fn event_to_ics20_withdrawal() { + let denom = Denom::from("transfer/channel-0/utia".to_string()); + let destination_chain_address = "address".to_string(); + let event_with_meta = EventWithMetadata { + event: WithdrawalEvent::Ics20(Ics20WithdrawalFilter { + sender: [0u8; 20].into(), + amount: 99.into(), + destination_chain_address: destination_chain_address.clone(), + memo: b"hello".into(), + }), + block_number: 1.into(), + transaction_hash: [2u8; 32].into(), + }; + + let action = event_to_action(event_with_meta, denom.id(), denom.clone()).unwrap(); + let Action::Ics20Withdrawal(mut action) = action else { + panic!("expected Ics20Withdrawal action, got {:?}", action); + }; + + // TODO: instead of zeroing this, we should pass in the latest block time to the function + // and generate the timeout time from that. + action.timeout_time = 0; // zero this for testing + + let expected_action = Ics20Withdrawal { + denom: denom.clone(), + destination_chain_address, + return_address: [0u8; 20].into(), + amount: 99, + memo: serde_json::to_string(&Ics20WithdrawalMemo { + memo: "hello".to_string(), + block_number: 1.into(), + transaction_hash: [2u8; 32].into(), + }) + .unwrap(), + fee_asset_id: denom.id(), + timeout_height: IbcHeight::new(u64::MAX, u64::MAX).unwrap(), + timeout_time: 0, // zero this for testing + source_channel: "channel-0".parse().unwrap(), + }; + assert_eq!(action, expected_action); + } +} diff --git a/crates/astria-bridge-withdrawer/src/ethereum/astria_withdrawer.rs b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/astria_withdrawer.rs similarity index 100% rename from crates/astria-bridge-withdrawer/src/ethereum/astria_withdrawer.rs rename to crates/astria-bridge-withdrawer/src/withdrawer/ethereum/astria_withdrawer.rs diff --git a/crates/astria-bridge-withdrawer/src/ethereum/mod.rs b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/mod.rs similarity index 69% rename from crates/astria-bridge-withdrawer/src/ethereum/mod.rs rename to crates/astria-bridge-withdrawer/src/withdrawer/ethereum/mod.rs index 435597dbd0..674e368330 100644 --- a/crates/astria-bridge-withdrawer/src/ethereum/mod.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/mod.rs @@ -1,8 +1,6 @@ pub(crate) mod astria_withdrawer; -mod state; mod watcher; -pub(crate) use state::StateSnapshot; pub(crate) use watcher::Watcher; #[cfg(test)] diff --git a/crates/astria-bridge-withdrawer/src/ethereum/test_utils.rs b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/test_utils.rs similarity index 100% rename from crates/astria-bridge-withdrawer/src/ethereum/test_utils.rs rename to crates/astria-bridge-withdrawer/src/withdrawer/ethereum/test_utils.rs diff --git a/crates/astria-bridge-withdrawer/src/ethereum/watcher.rs b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/watcher.rs similarity index 74% rename from crates/astria-bridge-withdrawer/src/ethereum/watcher.rs rename to crates/astria-bridge-withdrawer/src/withdrawer/ethereum/watcher.rs index 0863ef7140..af8cd3995b 100644 --- a/crates/astria-bridge-withdrawer/src/ethereum/watcher.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/ethereum/watcher.rs @@ -1,5 +1,9 @@ use std::sync::Arc; +use astria_core::primitive::v1::{ + asset, + asset::Denom, +}; use astria_eyre::{ eyre::{ eyre, @@ -14,10 +18,6 @@ use ethers::{ StreamExt as _, Ws, }, - types::{ - TxHash, - U64, - }, utils::hex, }; use tokio::{ @@ -25,20 +25,20 @@ use tokio::{ sync::mpsc, }; use tokio_util::sync::CancellationToken; -use tracing::info; +use tracing::{ + info, + warn, +}; -use crate::ethereum::{ - astria_withdrawer::{ - astria_withdrawer::{ - Ics20WithdrawalFilter, - SequencerWithdrawalFilter, - }, - AstriaWithdrawer, - }, - state::{ - State, - StateSnapshot, +use crate::withdrawer::{ + batch::{ + event_to_action, + Batch, + EventWithMetadata, + WithdrawalEvent, }, + ethereum::astria_withdrawer::AstriaWithdrawer, + state::State, }; /// Watches for withdrawal events emitted by the `AstriaWithdrawer` contract. @@ -54,8 +54,11 @@ impl Watcher { pub(crate) async fn new( ethereum_contract_address: &str, ethereum_rpc_endpoint: &str, - event_with_metadata_tx: mpsc::Sender>, + batch_tx: mpsc::Sender, shutdown_token: &CancellationToken, + state: Arc, + fee_asset_id: asset::Id, + rollup_asset_denom: Denom, ) -> Result { let provider = Arc::new( Provider::::connect(ethereum_rpc_endpoint) @@ -67,8 +70,22 @@ impl Watcher { let contract = AstriaWithdrawer::new(contract_address, provider); let (event_tx, event_rx) = mpsc::channel(100); - let batcher = Batcher::new(event_rx, event_with_metadata_tx, shutdown_token); - let state = Arc::new(State::new()); + + if rollup_asset_denom.prefix().is_empty() { + warn!( + "rollup asset denomination is not prefixed; Ics20Withdrawal actions will not be \ + submitted" + ); + } + + // TODO: verify fee_asset_id against sequencer + let batcher = Batcher::new( + event_rx, + batch_tx, + shutdown_token, + fee_asset_id, + rollup_asset_denom, + ); Ok(Self { contract, event_tx, @@ -80,10 +97,6 @@ impl Watcher { } impl Watcher { - pub(crate) fn subscribe_to_state(&self) -> tokio::sync::watch::Receiver { - self.state.subscribe() - } - pub(crate) async fn run(self) -> Result<()> { let Watcher { contract, @@ -95,8 +108,6 @@ impl Watcher { tokio::task::spawn(batcher.run()); - state.set_ready(); - // start from block 1 right now // TODO: determine the last block we've seen based on the sequencer data let sequencer_withdrawal_event_handler = tokio::task::spawn( @@ -108,6 +119,8 @@ impl Watcher { 1, )); + state.set_watcher_ready(); + tokio::select! { res = sequencer_withdrawal_event_handler => { info!("sequencer withdrawal event handler exited"); @@ -177,45 +190,38 @@ async fn watch_for_ics20_withdrawal_events( Ok(()) } -#[derive(Debug, PartialEq, Eq)] -pub(crate) enum WithdrawalEvent { - Sequencer(SequencerWithdrawalFilter), - Ics20(Ics20WithdrawalFilter), -} - -#[derive(Debug, PartialEq, Eq)] -pub(crate) struct EventWithMetadata { - event: WithdrawalEvent, - /// The block in which the log was emitted - block_number: U64, - /// The transaction hash in which the log was emitted - transaction_hash: TxHash, -} - struct Batcher { event_rx: mpsc::Receiver<(WithdrawalEvent, LogMeta)>, - event_with_metadata_tx: mpsc::Sender>, + batch_tx: mpsc::Sender, shutdown_token: CancellationToken, + fee_asset_id: asset::Id, + rollup_asset_denom: Denom, } impl Batcher { pub(crate) fn new( event_rx: mpsc::Receiver<(WithdrawalEvent, LogMeta)>, - event_with_metadata_tx: mpsc::Sender>, + batch_tx: mpsc::Sender, shutdown_token: &CancellationToken, + fee_asset_id: asset::Id, + rollup_asset_denom: Denom, ) -> Self { Self { event_rx, - event_with_metadata_tx, + batch_tx, shutdown_token: shutdown_token.clone(), + fee_asset_id, + rollup_asset_denom, } } } impl Batcher { pub(crate) async fn run(mut self) -> Result<()> { - let mut events = Vec::new(); - let mut last_block_number: U64 = 0.into(); + let mut curr_batch = Batch { + actions: Vec::new(), + rollup_height: 0, + }; loop { select! { @@ -230,21 +236,24 @@ impl Batcher { block_number: meta.block_number, transaction_hash: meta.transaction_hash, }; + let action = event_to_action(event_with_metadata, self.fee_asset_id, self.rollup_asset_denom.clone())?; - if meta.block_number == last_block_number { + if meta.block_number.as_u64() == curr_batch.rollup_height { // block number was the same; add event to current batch - events.push(event_with_metadata); + curr_batch.actions.push(action); } else { // block number increased; send current batch and start a new one - if !events.is_empty() { - self.event_with_metadata_tx - .send(events) + if !curr_batch.actions.is_empty() { + self.batch_tx + .send(curr_batch) .await .wrap_err("failed to send batched events; receiver dropped?")?; } - events = vec![event_with_metadata]; - last_block_number = meta.block_number; + curr_batch = Batch { + actions: vec![action], + rollup_height: meta.block_number.as_u64(), + }; } } } @@ -268,6 +277,7 @@ fn address_from_string(s: &str) -> Result { #[cfg(test)] mod tests { + use astria_core::protocol::transaction::v1alpha1::Action; use ethers::{ prelude::SignerMiddleware, providers::Middleware, @@ -280,7 +290,16 @@ mod tests { }; use super::*; - use crate::ethereum::test_utils::deploy_astria_withdrawer; + use crate::withdrawer::{ + batch::EventWithMetadata, + ethereum::{ + astria_withdrawer::{ + Ics20WithdrawalFilter, + SequencerWithdrawalFilter, + }, + test_utils::deploy_astria_withdrawer, + }, + }; #[test] fn address_from_string_prefix() { @@ -342,6 +361,14 @@ mod tests { block_number: receipt.block_number.unwrap(), transaction_hash: receipt.transaction_hash, }; + let denom: Denom = Denom::from_base_denom("nria"); + let expected_action = event_to_action(expected_event, denom.id(), denom.clone()).unwrap(); + let Action::BridgeUnlock(expected_action) = expected_action else { + panic!( + "expected action to be BridgeUnlock, got {:?}", + expected_action + ); + }; let (event_tx, mut event_rx) = mpsc::channel(100); let watcher = Watcher::new( @@ -349,6 +376,9 @@ mod tests { &anvil.ws_endpoint(), event_tx, &CancellationToken::new(), + Arc::new(State::new()), + denom.id(), + denom, ) .await .unwrap(); @@ -358,9 +388,15 @@ mod tests { // make another tx to trigger anvil to make another block send_sequencer_withdraw_transaction(&contract, value, recipient).await; - let events = event_rx.recv().await.unwrap(); - assert_eq!(events.len(), 1); - assert_eq!(events[0], expected_event); + let batch = event_rx.recv().await.unwrap(); + assert_eq!(batch.actions.len(), 1); + let Action::BridgeUnlock(action) = &batch.actions[0] else { + panic!( + "expected action to be BridgeUnlock, got {:?}", + batch.actions[0] + ); + }; + assert_eq!(action, &expected_action); } async fn send_ics20_withdraw_transaction( @@ -406,6 +442,13 @@ mod tests { block_number: receipt.block_number.unwrap(), transaction_hash: receipt.transaction_hash, }; + let denom = Denom::from("transfer/channel-0/utia".to_string()); + let Action::Ics20Withdrawal(mut expected_action) = + event_to_action(expected_event, denom.id(), denom.clone()).unwrap() + else { + panic!("expected action to be Ics20Withdrawal"); + }; + expected_action.timeout_time = 0; // zero this for testing let (event_tx, mut event_rx) = mpsc::channel(100); let watcher = Watcher::new( @@ -413,6 +456,9 @@ mod tests { &anvil.ws_endpoint(), event_tx, &CancellationToken::new(), + Arc::new(State::new()), + denom.id(), + denom, ) .await .unwrap(); @@ -422,8 +468,15 @@ mod tests { // make another tx to trigger anvil to make another block send_ics20_withdraw_transaction(&contract, value, recipient).await; - let events = event_rx.recv().await.unwrap(); - assert_eq!(events.len(), 1); - assert_eq!(events[0], expected_event); + let mut batch = event_rx.recv().await.unwrap(); + assert_eq!(batch.actions.len(), 1); + let Action::Ics20Withdrawal(ref mut action) = batch.actions[0] else { + panic!( + "expected action to be Ics20Withdrawal, got {:?}", + batch.actions[0] + ); + }; + action.timeout_time = 0; // zero this for testing + assert_eq!(action, &expected_action); } } diff --git a/crates/astria-bridge-withdrawer/src/bridge_service.rs b/crates/astria-bridge-withdrawer/src/withdrawer/mod.rs similarity index 58% rename from crates/astria-bridge-withdrawer/src/bridge_service.rs rename to crates/astria-bridge-withdrawer/src/withdrawer/mod.rs index 2f7f4e1c03..ac9ef623b5 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_service.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/mod.rs @@ -1,18 +1,17 @@ use std::{ net::SocketAddr, + sync::Arc, time::Duration, }; +use astria_core::primitive::v1::asset; use astria_eyre::eyre::{ self, WrapErr as _, }; use tokio::{ select, - sync::{ - mpsc, - oneshot, - }, + sync::oneshot, task::{ JoinError, JoinHandle, @@ -25,21 +24,33 @@ use tracing::{ info, }; +pub(crate) use self::state::StateSnapshot; +use self::{ + ethereum::Watcher, + state::State, + submitter::Submitter, +}; use crate::{ api, config::Config, - ethereum::Watcher, }; -pub struct BridgeService { +mod batch; +mod ethereum; +mod state; +mod submitter; + +pub struct WithdrawerService { // Token to signal all subtasks to shut down gracefully. shutdown_token: CancellationToken, api_server: api::ApiServer, + submitter: Submitter, ethereum_watcher: Watcher, + state: Arc, } -impl BridgeService { - /// Instantiates a new `BridgeService`. +impl WithdrawerService { + /// Instantiates a new `WithdrawerService`. /// /// # Errors /// @@ -47,22 +58,44 @@ impl BridgeService { pub async fn new(cfg: Config) -> eyre::Result<(Self, ShutdownHandle)> { let shutdown_handle = ShutdownHandle::new(); let Config { - api_addr, .. + api_addr, + cometbft_endpoint, + sequencer_chain_id, + sequencer_key_path, + fee_asset_denomination, + ethereum_contract_address, + ethereum_rpc_endpoint, + .. } = cfg; - // TODO: use event_rx in the sequencer submitter - let (event_tx, _event_rx) = mpsc::channel(100); + let state = Arc::new(State::new()); + + // make submitter object + let (submitter, submitter_handle) = submitter::Builder { + shutdown_token: shutdown_handle.token(), + cometbft_endpoint, + sequencer_chain_id, + sequencer_key_path, + state: state.clone(), + } + .build() + .await + .wrap_err("failed to initialize submitter")?; + let ethereum_watcher = Watcher::new( - &cfg.ethereum_contract_address, - &cfg.ethereum_rpc_endpoint, - event_tx.clone(), + ðereum_contract_address, + ðereum_rpc_endpoint, + submitter_handle.batches_tx, &shutdown_handle.token(), + state.clone(), + asset::Id::from_denom(&fee_asset_denomination), + asset::Denom::from(cfg.rollup_asset_denomination), ) .await .wrap_err("failed to initialize ethereum watcher")?; // make api server - let state_rx = ethereum_watcher.subscribe_to_state(); + let state_rx = state.subscribe(); let api_socket_addr = api_addr.parse::().wrap_err_with(|| { format!("failed to parse provided `api_addr` string as socket address: `{api_addr}`",) })?; @@ -71,7 +104,9 @@ impl BridgeService { let service = Self { shutdown_token: shutdown_handle.token(), api_server, + submitter, ethereum_watcher, + state, }; Ok((service, shutdown_handle)) @@ -81,7 +116,9 @@ impl BridgeService { let Self { shutdown_token, api_server, + submitter, ethereum_watcher, + state: _state, } = self; // Separate the API shutdown signal from the cancellation token because we want it to live @@ -97,17 +134,41 @@ impl BridgeService { }); info!("spawned API server"); + let mut submitter_task = tokio::spawn(submitter.run()); + info!("spawned submitter task"); let mut ethereum_watcher_task = tokio::spawn(ethereum_watcher.run()); info!("spawned ethereum watcher task"); let shutdown = select!( o = &mut api_task => { report_exit("api server", o); - Shutdown { api_task: None, ethereum_watcher_task: Some(ethereum_watcher_task), api_shutdown_signal, token: shutdown_token } + Shutdown { + api_task: None, + submitter_task: Some(submitter_task), + ethereum_watcher_task: Some(ethereum_watcher_task), + api_shutdown_signal, + shutdown_token + } + } + o = &mut submitter_task => { + report_exit("submitter", o); + Shutdown { + api_task: Some(api_task), + submitter_task: None, + ethereum_watcher_task:Some(ethereum_watcher_task), + api_shutdown_signal, + shutdown_token + } } o = &mut ethereum_watcher_task => { report_exit("ethereum watcher", o); - Shutdown { api_task: Some(api_task), ethereum_watcher_task: Some(ethereum_watcher_task), api_shutdown_signal, token: shutdown_token } + Shutdown { + api_task: Some(api_task), + submitter_task: Some(submitter_task), + ethereum_watcher_task: None, + api_shutdown_signal, + shutdown_token + } } ); @@ -115,10 +176,10 @@ impl BridgeService { } } -/// A handle for instructing the [`Bridge`] to shut down. +/// A handle for instructing the [`WithdrawerService`] to shut down. /// -/// It is returned along with its related `Bridge` from [`Bridge::new`]. The -/// `Bridge` will begin to shut down as soon as [`ShutdownHandle::shutdown`] is called or +/// It is returned along with its related `WithdrawerService` from [`WithdrawerService::new`]. The +/// `WithdrawerService` will begin to shut down as soon as [`ShutdownHandle::shutdown`] is called or /// when the `ShutdownHandle` is dropped. pub struct ShutdownHandle { token: CancellationToken, @@ -171,29 +232,55 @@ fn report_exit(task_name: &str, outcome: Result, JoinError>) { struct Shutdown { api_task: Option>>, + submitter_task: Option>>, ethereum_watcher_task: Option>>, api_shutdown_signal: oneshot::Sender<()>, - token: CancellationToken, + shutdown_token: CancellationToken, } impl Shutdown { const API_SHUTDOWN_TIMEOUT_SECONDS: u64 = 4; - const BRIDGE_SHUTDOWN_TIMEOUT_SECONDS: u64 = 25; + const ETHEREUM_WATCHER_SHUTDOWN_TIMEOUT_SECONDS: u64 = 5; + const SUBMITTER_SHUTDOWN_TIMEOUT_SECONDS: u64 = 20; async fn run(self) { let Self { api_task, + submitter_task, ethereum_watcher_task, api_shutdown_signal, - token, + shutdown_token: token, } = self; token.cancel(); - // Giving bridge 25 seconds to shutdown because Kubernetes issues a SIGKILL after 30. + // Giving submitter 20 seconds to shutdown because Kubernetes issues a SIGKILL after 30. + if let Some(mut submitter_task) = submitter_task { + info!("waiting for submitter task to shut down"); + let limit = Duration::from_secs(Self::SUBMITTER_SHUTDOWN_TIMEOUT_SECONDS); + match timeout(limit, &mut submitter_task) + .await + .map(flatten_result) + { + Ok(Ok(())) => info!("withdrawer exited gracefully"), + Ok(Err(error)) => error!(%error, "withdrawer exited with an error"), + Err(_) => { + error!( + timeout_secs = limit.as_secs(), + "watcher did not shut down within timeout; killing it" + ); + submitter_task.abort(); + } + } + } else { + info!("submitter task was already dead"); + } + + // Giving ethereum watcher 5 seconds to shutdown because Kubernetes issues a SIGKILL after + // 30. if let Some(mut ethereum_watcher_task) = ethereum_watcher_task { info!("waiting for watcher task to shut down"); - let limit = Duration::from_secs(Self::BRIDGE_SHUTDOWN_TIMEOUT_SECONDS); + let limit = Duration::from_secs(Self::ETHEREUM_WATCHER_SHUTDOWN_TIMEOUT_SECONDS); match timeout(limit, &mut ethereum_watcher_task) .await .map(flatten_result) @@ -212,7 +299,8 @@ impl Shutdown { info!("watcher task was already dead"); } - // Giving the API task 4 seconds. 25 for watcher + 4s = 29s (out of 30s for k8s). + // Giving the API task 4 seconds. 5s for watcher + 20 for submitter + 4s = 29s (out of 30s + // for k8s). if let Some(mut api_task) = api_task { info!("sending shutdown signal to API server"); let _ = api_shutdown_signal.send(()); diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/state.rs b/crates/astria-bridge-withdrawer/src/withdrawer/state.rs new file mode 100644 index 0000000000..647bb489b5 --- /dev/null +++ b/crates/astria-bridge-withdrawer/src/withdrawer/state.rs @@ -0,0 +1,103 @@ +use tokio::sync::watch; + +pub(crate) struct State { + inner: tokio::sync::watch::Sender, +} + +impl State { + pub(super) fn new() -> Self { + let (inner, _) = watch::channel(StateSnapshot::default()); + Self { + inner, + } + } + + pub(super) fn set_watcher_ready(&self) { + self.inner.send_modify(StateSnapshot::set_watcher_ready); + } + + pub(super) fn set_submitter_ready(&self) { + self.inner.send_modify(StateSnapshot::set_submitter_ready); + } + + pub(super) fn subscribe(&self) -> watch::Receiver { + self.inner.subscribe() + } +} + +macro_rules! forward_setter { + ($([$fn:ident <- $val:ty]),*$(,)?) => { + impl State { + $( + pub(super) fn $fn(&self, val: $val) { + self.inner + .send_if_modified(|state| state.$fn(val)); + } + )* + } + }; +} + +forward_setter!( + [set_sequencer_connected <- bool], + [set_last_rollup_height_submitted <- u64], + [set_last_sequencer_height <- u64], + [set_last_sequencer_tx_hash <- tendermint::Hash], +); + +#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize)] +pub(crate) struct StateSnapshot { + watcher_ready: bool, + submitter_ready: bool, + + sequencer_connected: bool, + + last_rollup_height_submitted: Option, + last_sequencer_block: Option, + last_sequencer_tx_hash: Option, +} + +impl StateSnapshot { + pub(crate) fn set_watcher_ready(&mut self) { + self.watcher_ready = true; + } + + pub(crate) fn set_submitter_ready(&mut self) { + self.submitter_ready = true; + } + + pub(crate) fn is_ready(&self) -> bool { + self.submitter_ready && self.watcher_ready + } + + pub(crate) fn is_healthy(&self) -> bool { + self.sequencer_connected + } + + /// Sets the sequencer connection status to `connected`. + fn set_sequencer_connected(&mut self, connected: bool) -> bool { + let changed = self.sequencer_connected ^ connected; + self.sequencer_connected = connected; + changed + } + + fn set_last_rollup_height_submitted(&mut self, height: u64) -> bool { + let changed = self + .last_rollup_height_submitted + .map_or(true, |h| h != height); + self.last_rollup_height_submitted = Some(height); + changed + } + + fn set_last_sequencer_height(&mut self, height: u64) -> bool { + let changed = self.last_sequencer_block.map_or(true, |h| h != height); + self.last_sequencer_block = Some(height); + changed + } + + fn set_last_sequencer_tx_hash(&mut self, hash: tendermint::Hash) -> bool { + let changed = self.last_sequencer_tx_hash.map_or(true, |h| h != hash); + self.last_sequencer_tx_hash = Some(hash); + changed + } +} diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/builder.rs b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/builder.rs new file mode 100644 index 0000000000..f6ce8d182f --- /dev/null +++ b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/builder.rs @@ -0,0 +1,95 @@ +use std::{ + sync::Arc, + time::Duration, +}; + +use astria_eyre::eyre::{ + self, + ensure, + Context as _, +}; +use sequencer_client::tendermint_rpc; +use tokio_util::sync::CancellationToken; +use tracing::warn; + +use super::state::State; + +const BATCH_QUEUE_SIZE: usize = 256; + +pub(crate) struct Builder { + pub(crate) shutdown_token: CancellationToken, + pub(crate) sequencer_key_path: String, + pub(crate) sequencer_chain_id: String, + pub(crate) cometbft_endpoint: String, + pub(crate) state: Arc, +} + +impl Builder { + /// Instantiates an `Submitter`. + pub(crate) async fn build(self) -> eyre::Result<(super::Submitter, super::Handle)> { + let Self { + shutdown_token, + sequencer_key_path, + sequencer_chain_id, + cometbft_endpoint, + state, + } = self; + + let signer = super::signer::SequencerSigner::from_path(sequencer_key_path)?; + let (batches_tx, batches_rx) = tokio::sync::mpsc::channel(BATCH_QUEUE_SIZE); + + let sequencer_cometbft_client = sequencer_client::HttpClient::new(&*cometbft_endpoint) + .context("failed constructing cometbft http client")?; + + let actual_chain_id = get_sequencer_chain_id(sequencer_cometbft_client.clone()).await?; + ensure!( + sequencer_chain_id == actual_chain_id.to_string(), + "sequencer_chain_id provided in config does not match chain_id returned from sequencer" + ); + + Ok(( + super::Submitter { + shutdown_token, + state, + batches_rx, + signer, + sequencer_chain_id, + sequencer_cometbft_client, + }, + super::Handle { + batches_tx, + }, + )) + } +} + +async fn get_sequencer_chain_id( + client: sequencer_client::HttpClient, +) -> eyre::Result { + use sequencer_client::Client as _; + + let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(20)) + .on_retry( + |attempt: u32, next_delay: Option, error: &tendermint_rpc::Error| { + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + attempt, + wait_duration, + error = error as &dyn std::error::Error, + "attempt to fetch sequencer genesis info; retrying after backoff", + ); + futures::future::ready(()) + }, + ); + + let genesis: tendermint::Genesis = tryhard::retry_fn(|| client.genesis()) + .with_config(retry_config) + .await + .wrap_err("failed to get genesis info from Sequencer after a lot of attempts")?; + + Ok(genesis.chain_id) +} diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/mod.rs b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/mod.rs new file mode 100644 index 0000000000..eb705279c0 --- /dev/null +++ b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/mod.rs @@ -0,0 +1,286 @@ +use std::{ + sync::Arc, + time::Duration, +}; + +use astria_core::protocol::transaction::v1alpha1::{ + Action, + TransactionParams, + UnsignedTransaction, +}; +use astria_eyre::eyre::{ + self, + eyre, + Context, +}; +pub(crate) use builder::Builder; +use sequencer_client::{ + tendermint_rpc::endpoint::broadcast::tx_commit, + Address, + SequencerClientExt as _, + SignedTransaction, +}; +use signer::SequencerSigner; +use state::State; +use tokio::{ + select, + sync::mpsc, + time::Instant, +}; +use tokio_util::sync::CancellationToken; +use tracing::{ + debug, + error, + info, + info_span, + instrument, + warn, + Instrument as _, + Span, +}; + +use super::{ + batch::Batch, + state, +}; + +mod builder; +mod signer; + +pub(super) struct Handle { + pub(super) batches_tx: mpsc::Sender, +} + +pub(super) struct Submitter { + shutdown_token: CancellationToken, + state: Arc, + batches_rx: mpsc::Receiver, + sequencer_cometbft_client: sequencer_client::HttpClient, + signer: SequencerSigner, + sequencer_chain_id: String, +} + +impl Submitter { + pub(super) async fn run(mut self) -> eyre::Result<()> { + self.state.set_submitter_ready(); + + let reason = loop { + select!( + biased; + + () = self.shutdown_token.cancelled() => { + info!("received shutdown signal"); + break Ok("shutdown requested"); + } + + batch = self.batches_rx.recv() => { + let Some(Batch { actions, rollup_height }) = batch else { + info!("received None from batch channel, shutting down"); + break Err(eyre!("batch channel closed")); + }; + if let Err(e) = self.process_batch(actions, rollup_height).await { + break Err(e); + } + } + ) + }; + + // update status + self.state.set_sequencer_connected(false); + + // close the channel to signal to batcher that the submitter is shutting down + self.batches_rx.close(); + + match reason { + Ok(reason) => info!(reason, "submitter shutting down"), + Err(reason) => { + error!(%reason, "submitter shutting down") + } + } + + Ok(()) + } + + async fn process_batch( + &mut self, + actions: Vec, + rollup_height: u64, + ) -> eyre::Result<()> { + // get nonce and make unsigned transaction + let nonce = get_latest_nonce( + self.sequencer_cometbft_client.clone(), + self.signer.address, + self.state.clone(), + ) + .await?; + debug!(nonce, "fetched latest nonce"); + + let unsigned = UnsignedTransaction { + actions, + params: TransactionParams { + nonce, + chain_id: self.sequencer_chain_id.clone(), + }, + }; + + // sign transaction + let signed = unsigned.into_signed(&self.signer.signing_key); + debug!(tx_hash = %telemetry::display::hex(&signed.sha256_of_proto_encoding()), "signed transaction"); + + // submit transaction and handle response + let rsp = submit_tx( + self.sequencer_cometbft_client.clone(), + signed, + self.state.clone(), + ) + .await + .context("failed to submit transaction to to cometbft")?; + if let tendermint::abci::Code::Err(check_tx_code) = rsp.check_tx.code { + error!( + abci.code = check_tx_code, + abci.log = rsp.check_tx.log, + rollup.height = rollup_height, + "transaction failed to be included in the mempool, aborting." + ); + Err(eyre!( + "check_tx failure upon submitting transaction to sequencer" + )) + } else if let tendermint::abci::Code::Err(deliver_tx_code) = rsp.tx_result.code { + error!( + abci.code = deliver_tx_code, + abci.log = rsp.tx_result.log, + rollup.height = rollup_height, + "transaction failed to be executed in a block, aborting." + ); + Err(eyre!( + "deliver_tx failure upon submitting transaction to sequencer" + )) + } else { + // update state after successful submission + info!( + sequencer.block = rsp.height.value(), + sequencer.tx_hash = ?rsp.hash, + rollup.height = rollup_height, + "withdraw batch successfully executed." + ); + self.state.set_last_rollup_height_submitted(rollup_height); + self.state.set_last_sequencer_height(rsp.height.value()); + self.state.set_last_sequencer_tx_hash(rsp.hash); + Ok(()) + } + } +} + +/// Queries the sequencer for the latest nonce with an exponential backoff +#[instrument(name = "get_latest_nonce", skip_all, fields(%address))] +async fn get_latest_nonce( + client: sequencer_client::HttpClient, + address: Address, + state: Arc, +) -> eyre::Result { + debug!("fetching latest nonce from sequencer"); + metrics::counter!(crate::metrics_init::NONCE_FETCH_COUNT).increment(1); + let span = Span::current(); + let start = Instant::now(); + let retry_config = tryhard::RetryFutureConfig::new(1024) + .exponential_backoff(Duration::from_millis(200)) + .max_delay(Duration::from_secs(60)) + .on_retry( + |attempt, + next_delay: Option, + err: &sequencer_client::extension_trait::Error| { + metrics::counter!(crate::metrics_init::NONCE_FETCH_FAILURE_COUNT).increment(1); + + let state = Arc::clone(&state); + state.set_sequencer_connected(false); + + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + parent: span.clone(), + error = err as &dyn std::error::Error, + attempt, + wait_duration, + "failed getting latest nonce from sequencer; retrying after backoff", + ); + async move {} + }, + ); + let res = tryhard::retry_fn(|| { + let client = client.clone(); + let span = info_span!(parent: span.clone(), "attempt get nonce"); + async move { client.get_latest_nonce(address).await.map(|rsp| rsp.nonce) }.instrument(span) + }) + .with_config(retry_config) + .await + .wrap_err("failed getting latest nonce from sequencer after 1024 attempts"); + + state.set_sequencer_connected(res.is_ok()); + + metrics::histogram!(crate::metrics_init::NONCE_FETCH_LATENCY).record(start.elapsed()); + + res +} + +/// Submits a `SignedTransaction` to the sequencer with an exponential backoff +#[instrument( + name = "submit_tx", + skip_all, + fields( + nonce = tx.unsigned_transaction().params.nonce, + transaction.hash = %telemetry::display::hex(&tx.sha256_of_proto_encoding()), + ) +)] +async fn submit_tx( + client: sequencer_client::HttpClient, + tx: SignedTransaction, + state: Arc, +) -> eyre::Result { + let nonce = tx.unsigned_transaction().params.nonce; + metrics::gauge!(crate::metrics_init::CURRENT_NONCE).set(nonce); + let start = std::time::Instant::now(); + debug!("submitting signed transaction to sequencer"); + let span = Span::current(); + let retry_config = tryhard::RetryFutureConfig::new(1024) + .exponential_backoff(Duration::from_millis(200)) + .max_delay(Duration::from_secs(60)) + .on_retry( + |attempt, + next_delay: Option, + err: &sequencer_client::extension_trait::Error| { + metrics::counter!(crate::metrics_init::SEQUENCER_SUBMISSION_FAILURE_COUNT) + .increment(1); + + let state = Arc::clone(&state); + state.set_sequencer_connected(false); + + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + parent: span.clone(), + attempt, + wait_duration, + error = err as &dyn std::error::Error, + "failed sending transaction to sequencer; retrying after backoff", + ); + async move {} + }, + ); + let res = tryhard::retry_fn(|| { + let client = client.clone(); + let tx = tx.clone(); + let span = info_span!(parent: span.clone(), "attempt send"); + async move { client.submit_transaction_commit(tx).await }.instrument(span) + }) + .with_config(retry_config) + .await + .wrap_err("failed sending transaction after 1024 attempts"); + + state.set_sequencer_connected(res.is_ok()); + + metrics::histogram!(crate::metrics_init::SEQUENCER_SUBMISSION_LATENCY).record(start.elapsed()); + + res +} diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/signer.rs b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/signer.rs new file mode 100644 index 0000000000..0556f82e43 --- /dev/null +++ b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/signer.rs @@ -0,0 +1,34 @@ +use std::{ + fs, + path::Path, +}; + +use astria_core::crypto::SigningKey; +use astria_eyre::eyre::{ + self, + eyre, +}; +use sequencer_client::Address; + +pub(super) struct SequencerSigner { + pub(super) address: Address, + pub(super) signing_key: SigningKey, +} + +impl SequencerSigner { + /// Construct a `SequencerSigner` from a file. + /// + /// The file should contain a hex-encoded ed25519 secret key. + pub(super) fn from_path>(path: P) -> eyre::Result { + let hex = fs::read_to_string(path)?; + let bytes: [u8; 32] = hex::decode(hex.trim())? + .try_into() + .map_err(|_| eyre!("invalid private key length; must be 32 bytes"))?; + let signing_key = SigningKey::from(bytes); + + Ok(Self { + address: Address::from_verification_key(signing_key.verification_key()), + signing_key, + }) + } +} diff --git a/crates/astria-composer/Cargo.toml b/crates/astria-composer/Cargo.toml index 68555171a4..d494d0fc9a 100644 --- a/crates/astria-composer/Cargo.toml +++ b/crates/astria-composer/Cargo.toml @@ -20,7 +20,6 @@ telemetry = { package = "astria-telemetry", path = "../astria-telemetry", featur "display", ] } -pin-project-lite = "0.2.13" tonic-health = "0.10.2" async-trait = { workspace = true } @@ -33,6 +32,7 @@ hyper = { workspace = true } hex = { workspace = true } itertools = { workspace = true } once_cell = { workspace = true } +pin-project-lite = { workspace = true } prost = { workspace = true } reqwest = { workspace = true, features = ["json"] } rand = { workspace = true } diff --git a/crates/astria-conductor/Cargo.toml b/crates/astria-conductor/Cargo.toml index 44a2e3c10d..bc00533abb 100644 --- a/crates/astria-conductor/Cargo.toml +++ b/crates/astria-conductor/Cargo.toml @@ -39,6 +39,7 @@ indexmap = { workspace = true } itertools = { workspace = true } itoa = { workspace = true } pbjson-types = { workspace = true } +pin-project-lite = { workspace = true } prost = { workspace = true } rand = { workspace = true } serde = { workspace = true, features = ["derive"] } @@ -54,7 +55,6 @@ tryhard = { workspace = true } async-trait = "0.1.73" futures-bounded = "0.2.3" http = "0.2.9" -pin-project-lite = "0.2" tokio-stream = "0.1.14" tracing-futures = { version = "0.2.5", features = ["futures-03"] } moka = { version = "0.12.5", features = ["future"] } diff --git a/crates/astria-core/src/primitive/v1/asset.rs b/crates/astria-core/src/primitive/v1/asset.rs index 2525971f2c..3415d8ec84 100644 --- a/crates/astria-core/src/primitive/v1/asset.rs +++ b/crates/astria-core/src/primitive/v1/asset.rs @@ -23,7 +23,7 @@ pub fn default_native_asset_id() -> Id { /// Note that the full denomination trace of the token is `prefix/base_denom`, /// in the case that a prefix is present. /// This is hashed to create the ID. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Denom { id: Id, diff --git a/crates/astria-core/src/protocol/transaction/v1alpha1/action.rs b/crates/astria-core/src/protocol/transaction/v1alpha1/action.rs index 783f237edc..d30e416ab3 100644 --- a/crates/astria-core/src/protocol/transaction/v1alpha1/action.rs +++ b/crates/astria-core/src/protocol/transaction/v1alpha1/action.rs @@ -694,26 +694,26 @@ enum MintActionErrorKind { /// /// It also contains a `return_address` field which may or may not be the same as the signer /// of the packet. The funds will be returned to the `return_address` in the case of a timeout. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Ics20Withdrawal { // a transparent value consisting of an amount and a denom. - amount: u128, - denom: Denom, + pub amount: u128, + pub denom: Denom, // the address on the destination chain to send the transfer to. - destination_chain_address: String, + pub destination_chain_address: String, // an Astria address to use to return funds from this withdrawal // in the case it fails. - return_address: Address, + pub return_address: Address, // the height (on Astria) at which this transfer expires. - timeout_height: IbcHeight, + pub timeout_height: IbcHeight, // the unix timestamp (in nanoseconds) at which this transfer expires. - timeout_time: u64, + pub timeout_time: u64, // the source channel used for the withdrawal. - source_channel: ChannelId, + pub source_channel: ChannelId, // the asset to use for fee payment. - fee_asset_id: asset::Id, + pub fee_asset_id: asset::Id, // a memo to include with the transfer - memo: String, + pub memo: String, } impl Ics20Withdrawal { @@ -1307,7 +1307,7 @@ enum BridgeLockActionErrorKind { } #[allow(clippy::module_name_repetitions)] -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct BridgeUnlockAction { pub to: Address, pub amount: u128, diff --git a/crates/astria-sequencer-relayer/Cargo.toml b/crates/astria-sequencer-relayer/Cargo.toml index 6fd2c6ccf7..d74f75a0b8 100644 --- a/crates/astria-sequencer-relayer/Cargo.toml +++ b/crates/astria-sequencer-relayer/Cargo.toml @@ -18,7 +18,6 @@ dirs = "5.0" futures-bounded = "0.2.3" http = "0.2.9" k256 = "0.13.3" -pin-project-lite = "0.2" serde_path_to_error = "0.1.13" axum = { workspace = true } @@ -33,6 +32,7 @@ itertools = { workspace = true } itoa = { workspace = true } metrics = { workspace = true } pbjson-types = { workspace = true } +pin-project-lite = { workspace = true } prost = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true }