Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl BridgeWithdrawer {
sequencer_bridge_address,
sequencer_grpc_endpoint: sequencer_grpc_endpoint.clone(),
expected_fee_asset: fee_asset_denomination,
metrics,
}
.build()
.wrap_err("failed to initialize startup")?;
Expand Down
87 changes: 19 additions & 68 deletions crates/astria-bridge-withdrawer/src/bridge_withdrawer/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ use std::{
};

use astria_core::{
generated::sequencerblock::v1alpha1::{
sequencer_service_client::{
self,
SequencerServiceClient,
},
GetPendingNonceRequest,
generated::sequencerblock::v1alpha1::sequencer_service_client::{
self,
SequencerServiceClient,
},
primitive::v1::asset,
protocol::{
Expand Down Expand Up @@ -54,10 +51,14 @@ use tracing::{
};
use tryhard::backoff_strategies::ExponentialBackoff;

use super::state::{
self,
State,
use super::{
state::{
self,
State,
},
submitter::get_pending_nonce,
};
use crate::metrics::Metrics;

pub(super) struct Builder {
pub(super) shutdown_token: CancellationToken,
Expand All @@ -67,6 +68,7 @@ pub(super) struct Builder {
pub(super) sequencer_grpc_endpoint: String,
pub(super) sequencer_bridge_address: Address,
pub(super) expected_fee_asset: asset::Denom,
pub(super) metrics: &'static Metrics,
}

impl Builder {
Expand All @@ -79,6 +81,7 @@ impl Builder {
sequencer_bridge_address,
sequencer_grpc_endpoint,
expected_fee_asset,
metrics,
} = self;

let sequencer_cometbft_client =
Expand All @@ -93,6 +96,7 @@ impl Builder {
sequencer_grpc_endpoint,
sequencer_bridge_address,
expected_fee_asset,
metrics,
})
}
}
Expand Down Expand Up @@ -138,6 +142,7 @@ pub(super) struct Startup {
sequencer_grpc_endpoint: String,
sequencer_bridge_address: Address,
expected_fee_asset: asset::Denom,
metrics: &'static Metrics,
}

impl Startup {
Expand All @@ -155,6 +160,7 @@ impl Startup {
self.sequencer_grpc_endpoint.clone(),
self.sequencer_bridge_address,
self.state.clone(),
self.metrics,
)
.await
.wrap_err("failed to wait for mempool to be empty")?;
Expand Down Expand Up @@ -346,8 +352,9 @@ async fn ensure_mempool_empty(
sequencer_client: sequencer_service_client::SequencerServiceClient<Channel>,
address: Address,
state: Arc<State>,
metrics: &'static Metrics,
) -> eyre::Result<()> {
let pending = get_pending_nonce(sequencer_client, state.clone(), address)
let pending = get_pending_nonce(sequencer_client, address, state.clone(), metrics)
.await
.wrap_err("failed to get pending nonce")?;
let latest = get_latest_nonce(cometbft_client, state, address)
Expand Down Expand Up @@ -389,6 +396,7 @@ async fn wait_for_empty_mempool(
sequencer_grpc_endpoint: String,
address: Address,
state: Arc<State>,
metrics: &'static Metrics,
) -> eyre::Result<()> {
let retry_config = tryhard::RetryFutureConfig::new(u32::MAX)
.exponential_backoff(Duration::from_secs(1))
Expand Down Expand Up @@ -420,7 +428,7 @@ async fn wait_for_empty_mempool(
let sequencer_client = sequencer_client.clone();
let cometbft_client = cometbft_client.clone();
let state = state.clone();
ensure_mempool_empty(cometbft_client, sequencer_client, address, state)
ensure_mempool_empty(cometbft_client, sequencer_client, address, state, metrics)
})
.with_config(retry_config)
.await
Expand Down Expand Up @@ -597,38 +605,6 @@ async fn get_latest_nonce(
res
}

// TODO(https://github.com/astriaorg/astria/issues/1274): deduplicate here and in crate::bridge_withdrawer::submitter
#[instrument(skip_all)]
async fn get_pending_nonce(
client: sequencer_service_client::SequencerServiceClient<Channel>,
state: Arc<State>,
address: Address,
) -> eyre::Result<u32> {
let span = Span::current();
let res = tryhard::retry_fn(|| {
let mut client = client.clone();
let span = info_span!(parent: span.clone(), "attempt get pending nonce");
async move {
client
.get_pending_nonce(GetPendingNonceRequest {
address: Some(address.into_raw()),
})
.await
.map(|rsp| rsp.into_inner().inner)
}
.instrument(span)
})
.with_config(make_sequencer_grpc_retry_config(
"attempt to get pending nonce from sequencer; retrying after backoff",
))
.await
.wrap_err("failed getting pending nonce from sequencing after 1024 attempts");

state.set_sequencer_connected(res.is_ok());

res
}

fn make_cometbft_retry_config(
retry_message: &'static str,
) -> tryhard::RetryFutureConfig<
Expand Down Expand Up @@ -684,28 +660,3 @@ fn make_cometbft_ext_retry_config(
},
)
}

fn make_sequencer_grpc_retry_config(
retry_message: &'static str,
) -> tryhard::RetryFutureConfig<
ExponentialBackoff,
impl Fn(u32, Option<Duration>, &tonic::Status) -> futures::future::Ready<()>,
> {
tryhard::RetryFutureConfig::new(u32::MAX)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(20))
.on_retry(
move |attempt: u32, next_delay: Option<Duration>, error: &tonic::Status| {
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
warn!(
attempt,
wait_duration,
error = error as &dyn std::error::Error,
retry_message,
);
futures::future::ready(())
},
)
}
Loading