diff --git a/crates/astria-bridge-withdrawer/src/api.rs b/crates/astria-bridge-withdrawer/src/api.rs index 81eb2fa13e..2a1abe248c 100644 --- a/crates/astria-bridge-withdrawer/src/api.rs +++ b/crates/astria-bridge-withdrawer/src/api.rs @@ -20,6 +20,7 @@ use http::status::StatusCode; use hyper::server::conn::AddrIncoming; use serde::Serialize; use tokio::sync::watch; +use tracing::instrument; use crate::bridge_withdrawer::StateSnapshot; @@ -51,6 +52,7 @@ pub(crate) fn start(socket_addr: SocketAddr, withdrawer_state: WithdrawerState) } #[allow(clippy::unused_async)] // Permit because axum handlers must be async +#[instrument(skip_all)] async fn get_healthz(State(withdrawer_state): State) -> Healthz { if withdrawer_state.borrow().is_healthy() { Healthz::Ok @@ -66,6 +68,7 @@ async fn get_healthz(State(withdrawer_state): State) -> Healthz /// + there is a current sequencer height (implying a block from sequencer was received) /// + there is a current data availability height (implying a height was received from the DA) #[allow(clippy::unused_async)] // Permit because axum handlers must be async +#[instrument(skip_all)] async fn get_readyz(State(withdrawer_state): State) -> Readyz { let is_withdrawer_online = withdrawer_state.borrow().is_ready(); if is_withdrawer_online { @@ -76,6 +79,7 @@ async fn get_readyz(State(withdrawer_state): State) -> Readyz { } #[allow(clippy::unused_async)] // Permit because axum handlers must be async +#[instrument(skip_all)] async fn get_status(State(withdrawer_state): State) -> Json { Json(withdrawer_state.borrow().clone()) } diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs index 3de1a631fd..fcda31c177 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/ethereum/watcher.rs @@ -38,6 +38,7 @@ use tokio_util::sync::CancellationToken; use tracing::{ debug, info, + info_span, instrument, warn, }; @@ -293,11 +294,13 @@ async fn watch_for_blocks( bail!("current rollup block missing block number") }; - info!( - block.height = current_rollup_block_height.as_u64(), - block.hash = current_rollup_block.hash.map(tracing::field::display), - "got current block" - ); + info_span!("watch_for_blocks").in_scope(|| { + info!( + block.height = current_rollup_block_height.as_u64(), + block.hash = current_rollup_block.hash.map(tracing::field::display), + "got current block" + ); + }); // sync any blocks missing between `next_rollup_block_height` and the current latest // (inclusive). @@ -314,7 +317,7 @@ async fn watch_for_blocks( loop { select! { () = shutdown_token.cancelled() => { - info!("block watcher shutting down"); + info_span!("watch_for_blocks").in_scope(|| info!("block watcher shutting down")); return Ok(()); } block = block_rx.next() => { diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs index d886331a31..82b2d66af3 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs @@ -11,9 +11,20 @@ use astria_eyre::eyre::{ self, WrapErr as _, }; +use axum::{ + routing::IntoMakeService, + Router, + Server, +}; +use ethereum::watcher::Watcher; +use hyper::server::conn::AddrIncoming; +use startup::Startup; use tokio::{ select, - sync::oneshot, + sync::oneshot::{ + self, + Receiver, + }, task::{ JoinError, JoinHandle, @@ -24,6 +35,7 @@ use tokio_util::sync::CancellationToken; use tracing::{ error, info, + instrument, }; pub(crate) use self::state::StateSnapshot; @@ -167,34 +179,29 @@ impl BridgeWithdrawer { // Separate the API shutdown signal from the cancellation token because we want it to live // until the very end. let (api_shutdown_signal, api_shutdown_signal_rx) = oneshot::channel::<()>(); - let mut api_task = tokio::spawn(async move { - api_server - .with_graceful_shutdown(async move { - let _ = api_shutdown_signal_rx.await; - }) - .await - .wrap_err("api server ended unexpectedly") - }); - info!("spawned API server"); - - let mut startup_task = Some(tokio::spawn(startup.run())); - info!("spawned startup task"); - - let mut submitter_task = tokio::spawn(submitter.run()); - info!("spawned submitter task"); - let mut ethereum_watcher_task = tokio::spawn(ethereum_watcher.run()); - info!("spawned ethereum watcher task"); + let TaskHandles { + mut api_task, + mut startup_task, + mut submitter_task, + mut ethereum_watcher_task, + } = spawn_tasks( + api_server, + api_shutdown_signal_rx, + startup, + submitter, + ethereum_watcher, + ); let shutdown = loop { select!( o = async { startup_task.as_mut().unwrap().await }, if startup_task.is_none() => { match o { Ok(_) => { - info!(task = "startup", "task has exited"); + report_exit("startup", Ok(Ok(()))); startup_task = None; }, Err(error) => { - error!(task = "startup", %error, "task returned with error"); + report_exit("startup", Err(error)); break Shutdown { api_task: Some(api_task), submitter_task: Some(submitter_task), @@ -245,6 +252,48 @@ impl BridgeWithdrawer { } } +#[allow(clippy::struct_field_names)] // allow: for parity with the `Shutdown` struct. +struct TaskHandles { + api_task: JoinHandle>, + startup_task: Option>>, + submitter_task: JoinHandle>, + ethereum_watcher_task: JoinHandle>, +} + +#[instrument(skip_all)] +fn spawn_tasks( + api_server: Server>, + api_shutdown_signal_rx: Receiver<()>, + startup: Startup, + submitter: Submitter, + ethereum_watcher: Watcher, +) -> TaskHandles { + let api_task = tokio::spawn(async move { + api_server + .with_graceful_shutdown(async move { + let _ = api_shutdown_signal_rx.await; + }) + .await + .wrap_err("api server ended unexpectedly") + }); + info!("spawned API server"); + + let startup_task = Some(tokio::spawn(startup.run())); + info!("spawned startup task"); + + let submitter_task = tokio::spawn(submitter.run()); + info!("spawned submitter task"); + let ethereum_watcher_task = tokio::spawn(ethereum_watcher.run()); + info!("spawned ethereum watcher task"); + + TaskHandles { + api_task, + startup_task, + submitter_task, + ethereum_watcher_task, + } +} + /// A handle for instructing the [`Service`] to shut down. /// /// It is returned along with its related `Service` from [`Service::new`]. The @@ -275,6 +324,7 @@ impl ShutdownHandle { } impl Drop for ShutdownHandle { + #[instrument(skip_all)] fn drop(&mut self) { if !self.token.is_cancelled() { info!("shutdown handle dropped, issuing shutdown to all services"); @@ -283,6 +333,7 @@ impl Drop for ShutdownHandle { } } +#[instrument(skip_all)] fn report_exit(task_name: &str, outcome: Result, JoinError>) { match outcome { Ok(Ok(())) => info!(task = task_name, "task has exited"), @@ -314,6 +365,7 @@ impl Shutdown { const STARTUP_SHUTDOWN_TIMEOUT_SECONDS: u64 = 1; const SUBMITTER_SHUTDOWN_TIMEOUT_SECONDS: u64 = 19; + #[instrument(skip_all)] async fn run(self) { let Self { api_task, diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/startup.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/startup.rs index 52412ccdc6..e4c1e95250 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/startup.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/startup.rs @@ -47,6 +47,7 @@ use tracing::{ instrument, warn, Instrument as _, + Level, Span, }; use tryhard::backoff_strategies::ExponentialBackoff; @@ -120,6 +121,7 @@ impl InfoHandle { } } + #[instrument(skip_all, err)] pub(super) async fn get_info(&mut self) -> eyre::Result { let state = self .rx @@ -202,6 +204,7 @@ impl Startup { /// - `self.chain_id` does not match the value returned from the sequencer node /// - `self.fee_asset` is not a valid fee asset on the sequencer node /// - `self.sequencer_bridge_address` does not have a sufficient balance of `self.fee_asset`. + #[instrument(skip_all, err)] async fn confirm_sequencer_config(&self) -> eyre::Result<()> { // confirm the sequencer chain id let actual_chain_id = @@ -250,6 +253,7 @@ impl Startup { /// in the sequencer logic). /// 5. Failing to convert the transaction data from bytes to proto. /// 6. Failing to convert the transaction data from proto to `SignedTransaction`. + #[instrument(skip_all, err)] async fn get_last_transaction(&self) -> eyre::Result> { // get last transaction hash by the bridge account, if it exists let last_transaction_hash_resp = get_bridge_account_last_transaction_hash( @@ -323,6 +327,7 @@ impl Startup { /// the sequencer logic) /// 3. The last transaction by the bridge account did not contain a withdrawal action /// 4. The memo of the last transaction by the bridge account could not be parsed + #[instrument(skip_all, err)] async fn get_starting_rollup_height(&mut self) -> eyre::Result { let signed_transaction = self .get_last_transaction() @@ -347,6 +352,7 @@ impl Startup { } } +#[instrument(skip_all, err(level = Level::WARN))] async fn ensure_mempool_empty( cometbft_client: sequencer_client::HttpClient, sequencer_client: sequencer_service_client::SequencerServiceClient, @@ -391,6 +397,7 @@ async fn ensure_mempool_empty( /// 2. Failing to get the latest nonce from cometBFT's mempool. /// 3. The pending nonce from the Sequencer's app-side mempool does not match the latest nonce from /// cometBFT's mempool after the exponential backoff times out. +#[instrument(skip_all, err)] async fn wait_for_empty_mempool( cometbft_client: sequencer_client::HttpClient, sequencer_grpc_endpoint: String, @@ -485,7 +492,7 @@ fn rollup_height_from_signed_transaction( Ok(last_batch_rollup_height) } -#[instrument(skip_all)] +#[instrument(skip_all, err)] async fn get_bridge_account_last_transaction_hash( client: sequencer_client::HttpClient, state: Arc, @@ -507,7 +514,7 @@ async fn get_bridge_account_last_transaction_hash( res } -#[instrument(skip_all)] +#[instrument(skip_all, err)] async fn get_sequencer_transaction_at_hash( client: sequencer_client::HttpClient, state: Arc, @@ -525,7 +532,7 @@ async fn get_sequencer_transaction_at_hash( res } -#[instrument(skip_all)] +#[instrument(skip_all, err)] async fn get_sequencer_chain_id( client: sequencer_client::HttpClient, state: Arc, @@ -542,7 +549,7 @@ async fn get_sequencer_chain_id( Ok(genesis.chain_id) } -#[instrument(skip_all)] +#[instrument(skip_all, err)] async fn get_allowed_fee_assets( client: sequencer_client::HttpClient, state: Arc, @@ -559,7 +566,7 @@ async fn get_allowed_fee_assets( res } -#[instrument(skip_all)] +#[instrument(skip_all, err)] async fn get_latest_nonce( client: sequencer_client::HttpClient, state: Arc, diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/builder.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/builder.rs index e93551138e..e20335cbc3 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/builder.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/builder.rs @@ -6,7 +6,10 @@ use astria_eyre::eyre::{ }; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; -use tracing::info; +use tracing::{ + info, + instrument, +}; use super::state::State; use crate::{ @@ -30,6 +33,7 @@ impl Handle { } } + #[instrument(skip_all, err)] pub(crate) async fn send_batch(&self, batch: Batch) -> eyre::Result<()> { self.batches_tx .send(batch) diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/mod.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/mod.rs index 4febe56ecf..21ffba98b5 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/mod.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/mod.rs @@ -74,7 +74,7 @@ impl Submitter { pub(super) async fn run(mut self) -> eyre::Result<()> { let (sequencer_chain_id, sequencer_grpc_client) = select! { () = self.shutdown_token.cancelled() => { - info!("submitter received shutdown signal while waiting for startup"); + report_exit(Ok("submitter received shutdown signal while waiting for startup")); return Ok(()); } @@ -96,7 +96,6 @@ impl Submitter { biased; () = self.shutdown_token.cancelled() => { - info!("received shutdown signal"); break Ok("shutdown requested"); } @@ -124,17 +123,12 @@ impl Submitter { // close the channel to signal to batcher that the submitter is shutting down self.batches_rx.close(); - match reason { - Ok(reason) => info!(reason, "submitter shutting down"), - Err(reason) => { - error!(%reason, "submitter shutting down"); - } - } + report_exit(reason); Ok(()) } - #[instrument(skip_all)] + #[instrument(skip_all, err)] async fn process_batch( &self, sequencer_grpc_client: SequencerServiceClient, @@ -182,24 +176,18 @@ impl Submitter { .await .context("failed to submit transaction to cometbft")?; if let tendermint::abci::Code::Err(check_tx_code) = rsp.check_tx.code { - error!( - abci.code = check_tx_code, - abci.log = rsp.check_tx.log, - rollup.height = rollup_height, - "transaction failed to be included in the mempool, aborting." - ); Err(eyre!( - "check_tx failure upon submitting transaction to sequencer" + "check_tx failure upon submitting transaction to sequencer: transaction failed to \ + be included in the mempool, aborting. abci.code = {check_tx_code}, abci.log = \ + {}, rollup.height = {rollup_height}", + rsp.check_tx.log )) } else if let tendermint::abci::Code::Err(deliver_tx_code) = rsp.tx_result.code { - error!( - abci.code = deliver_tx_code, - abci.log = rsp.tx_result.log, - rollup.height = rollup_height, - "transaction failed to be executed in a block, aborting." - ); Err(eyre!( - "deliver_tx failure upon submitting transaction to sequencer" + "deliver_tx failure upon submitting transaction to sequencer: transaction failed \ + to be executed in a block, aborting. abci.code = {deliver_tx_code}, abci.log = \ + {}, rollup.height = {rollup_height}", + rsp.tx_result.log, )) } else { // update state after successful submission @@ -217,6 +205,16 @@ impl Submitter { } } +#[instrument(skip_all)] +fn report_exit(reason: eyre::Result<&str>) { + match reason { + Ok(reason) => info!(%reason, "submitter shutting down"), + Err(reason) => { + error!(%reason, "submitter shutting down"); + } + } +} + /// Submits a `SignedTransaction` to the sequencer with an exponential backoff #[instrument( name = "submit_tx", @@ -224,7 +222,8 @@ impl Submitter { fields( nonce = tx.nonce(), transaction.hash = %telemetry::display::hex(&tx.sha256_of_proto_encoding()), - ) + ), + err )] async fn submit_tx( client: sequencer_client::HttpClient, @@ -279,6 +278,7 @@ async fn submit_tx( res } +#[instrument(skip_all, err)] pub(crate) async fn get_pending_nonce( client: sequencer_service_client::SequencerServiceClient, address: Address,