Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions crates/astria-bridge-withdrawer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,11 @@ telemetry = { package = "astria-telemetry", path = "../astria-telemetry", featur

[dev-dependencies]
astria-core = { path = "../astria-core", features = ["server", "test-utils"] }
astria-grpc-mock = { path = "../astria-grpc-mock" }
config = { package = "astria-config", path = "../astria-config", features = [
"tests",
] }
reqwest = { workspace = true, features = ["json"] }
tempfile = { workspace = true }
tendermint-rpc = { workspace = true }
tokio-stream = { workspace = true, features = ["net"] }
wiremock = { workspace = true }

[build-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ use ethers::{
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{
debug,
info,
instrument,
warn,
};

Expand Down Expand Up @@ -101,25 +99,38 @@ pub(crate) struct Watcher {
state: Arc<State>,
}

struct FullyInitialized {
shutdown_token: CancellationToken,
submitter_handle: submitter::Handle,
state: Arc<State>,
provider: Arc<Provider<Ws>>,
action_fetcher: GetWithdrawalActions<Provider<Ws>>,
starting_rollup_height: u64,
}

impl Watcher {
pub(crate) async fn run(self) -> Result<()> {
let fully_init = self
pub(crate) async fn run(mut self) -> Result<()> {
let (provider, action_fetcher, next_rollup_block_height) = self
.startup()
.await
.wrap_err("watcher failed to start up")?;

fully_init.state.set_watcher_ready();
let Self {
state,
shutdown_token,
submitter_handle,
..
} = self;

state.set_watcher_ready();

fully_init.run().await
tokio::select! {
res = watch_for_blocks(
provider,
action_fetcher,
next_rollup_block_height,
submitter_handle,
shutdown_token.clone(),
) => {
info!("block handler exited");
res.context("block handler exited")
}
() = shutdown_token.cancelled() => {
info!("watcher shutting down");
Ok(())
}
}
}

/// Gets the startup data from the submitter and connects to the Ethereum node.
Expand All @@ -131,39 +142,23 @@ impl Watcher {
/// - If the fee asset ID provided in the config is not a valid fee asset on the sequencer.
/// - If the Ethereum node cannot be connected to after several retries.
/// - If the asset withdrawal decimals cannot be fetched.
#[instrument(skip_all, err)]
async fn startup(self) -> eyre::Result<FullyInitialized> {
let Self {
shutdown_token,
mut startup_handle,
submitter_handle,
contract_address,
ethereum_rpc_endpoint,
rollup_asset_denom,
bridge_address,
state,
} = self;

async fn startup(
&mut self,
) -> eyre::Result<(Arc<Provider<Ws>>, GetWithdrawalActions<Provider<Ws>>, u64)> {
let startup::Info {
fee_asset,
starting_rollup_height,
..
} = select! {
() = shutdown_token.cancelled() => {
() = self.shutdown_token.cancelled() => {
return Err(eyre!("watcher received shutdown signal while waiting for startup"));
}

startup_info = startup_handle.get_info() => {
startup_info = self.startup_handle.get_info() => {
startup_info.wrap_err("failed to receive startup info")?
}
};

debug!(
fee_asset = %fee_asset,
starting_rollup_height = starting_rollup_height,
"received startup info"
);

// connect to eth node
let retry_config = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(500))
Expand All @@ -184,7 +179,7 @@ impl Watcher {
);

let provider = tryhard::retry_fn(|| {
let url = ethereum_rpc_endpoint.clone();
let url = self.ethereum_rpc_endpoint.clone();
async move {
let websocket_client = Ws::connect_with_reconnects(url, 0).await?;
Ok(Provider::new(websocket_client))
Expand All @@ -195,79 +190,62 @@ impl Watcher {
.wrap_err("failed connecting to rollup after several retries; giving up")?;

let provider = Arc::new(provider);
let ics20_asset_to_withdraw = if rollup_asset_denom.last_channel().is_some() {
let ics20_asset_to_withdraw = if self.rollup_asset_denom.last_channel().is_some() {
info!(
%rollup_asset_denom,
rollup_asset_denom = %self.rollup_asset_denom,
"configured rollup asset contains an ics20 channel; ics20 withdrawals will be emitted"
);
Some(rollup_asset_denom.clone())
Some(self.rollup_asset_denom.clone())
} else {
info!(
%rollup_asset_denom,
rollup_asset_denom = %self.rollup_asset_denom,
"configured rollup asset does not contain an ics20 channel; ics20 withdrawals will not be emitted"
);
None
};
let action_fetcher = GetWithdrawalActionsBuilder::new()
.provider(provider.clone())
.fee_asset(fee_asset)
.contract_address(contract_address)
.bridge_address(bridge_address)
.sequencer_asset_to_withdraw(rollup_asset_denom.clone().into())
.contract_address(self.contract_address)
.bridge_address(self.bridge_address)
.sequencer_asset_to_withdraw(self.rollup_asset_denom.clone().into())
.set_ics20_asset_to_withdraw(ics20_asset_to_withdraw)
.try_build()
.await
.wrap_err("failed to construct contract event to sequencer action fetcher")?;

Ok(FullyInitialized {
shutdown_token,
submitter_handle,
state,
provider,
action_fetcher,
starting_rollup_height,
})
}
}
self.state.set_watcher_ready();

impl FullyInitialized {
async fn run(self) -> eyre::Result<()> {
tokio::select! {
res = watch_for_blocks(
self.provider,
self.action_fetcher,
self.starting_rollup_height,
self.submitter_handle,
self.shutdown_token.clone(),
) => {
res.context("block handler exited")
}
() = self.shutdown_token.cancelled() => {
Ok(())
}
}
Ok((provider.clone(), action_fetcher, starting_rollup_height))
}
}

#[instrument(skip_all, fields(from_rollup_height, to_rollup_height), err)]
async fn sync_unprocessed_rollup_heights(
async fn sync_from_next_rollup_block_height(
provider: Arc<Provider<Ws>>,
action_fetcher: &GetWithdrawalActions<Provider<Ws>>,
submitter_handle: &submitter::Handle,
from_rollup_height: u64,
to_rollup_height: u64,
next_rollup_block_height_to_check: u64,
current_rollup_block_height: u64,
) -> Result<()> {
for i in from_rollup_height..=to_rollup_height {
let block = provider
if current_rollup_block_height < next_rollup_block_height_to_check {
return Ok(());
}

for i in next_rollup_block_height_to_check..=current_rollup_block_height {
let Some(block) = provider
.get_block(i)
.await
.map_err(eyre::Report::new)
.and_then(|block| block.ok_or_eyre("block is missing"))
.wrap_err_with(|| format!("failed to get block at rollup height `{i}`"))?;
get_and_forward_block_events(action_fetcher, block, submitter_handle)
.wrap_err("failed to get block")?
else {
bail!("block with number {i} missing");
};

get_and_send_events_at_block(action_fetcher, block, submitter_handle)
.await
.wrap_err("failed to get and send events at block")?;
}

info!("synced from {next_rollup_block_height_to_check} to {current_rollup_block_height}");
Ok(())
}

Expand All @@ -293,15 +271,9 @@ async fn watch_for_blocks(
bail!("current rollup block missing block number")
};

info!(
block.height = current_rollup_block_height.as_u64(),
block.hash = current_rollup_block.hash.map(tracing::field::display),
"got current block"
);

// sync any blocks missing between `next_rollup_block_height` and the current latest
// (inclusive).
sync_unprocessed_rollup_heights(
sync_from_next_rollup_block_height(
provider.clone(),
&action_fetcher,
&submitter_handle,
Expand All @@ -319,7 +291,7 @@ async fn watch_for_blocks(
}
block = block_rx.next() => {
if let Some(block) = block {
get_and_forward_block_events(
get_and_send_events_at_block(
&action_fetcher,
block,
&submitter_handle,
Expand All @@ -334,11 +306,7 @@ async fn watch_for_blocks(
}
}

#[instrument(skip_all, fields(
block.hash = block.hash.map(tracing::field::display),
block.number = block.number.map(tracing::field::display),
), err)]
async fn get_and_forward_block_events(
async fn get_and_send_events_at_block(
actions_fetcher: &GetWithdrawalActions<Provider<Ws>>,
block: Block<H256>,
submitter_handle: &submitter::Handle,
Expand All @@ -351,7 +319,12 @@ async fn get_and_forward_block_events(
let actions = actions_fetcher
.get_for_block_hash(block_hash)
.await
.wrap_err("failed getting actions for block")?;
.wrap_err_with(|| {
format!(
"failed getting actions for block; block hash: `{block_hash}`, block height: \
`{rollup_height}`"
)
})?;

if actions.is_empty() {
info!(
Expand Down
10 changes: 3 additions & 7 deletions crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,6 @@ impl BridgeWithdrawer {
Ok((service, shutdown_handle))
}

pub fn local_addr(&self) -> SocketAddr {
self.api_server.local_addr()
}

// Panic won't happen because `startup_task` is unwraped lazily after checking if it's `Some`.
#[allow(clippy::missing_panics_doc)]
pub async fn run(self) {
Expand Down Expand Up @@ -350,12 +346,12 @@ impl Shutdown {
.await
.map(flatten_result)
{
Ok(Ok(())) => info!("submitter exited gracefully"),
Ok(Err(error)) => error!(%error, "submitter exited with an error"),
Ok(Ok(())) => info!("withdrawer exited gracefully"),
Ok(Err(error)) => error!(%error, "withdrawer exited with an error"),
Err(_) => {
error!(
timeout_secs = limit.as_secs(),
"submitter did not shut down within timeout; killing it"
"watcher did not shut down within timeout; killing it"
);
submitter_task.abort();
}
Expand Down
Loading