Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
8149513
add error messages
itamarreif Jun 11, 2024
5948dfe
fix service name
itamarreif Jun 12, 2024
5917967
Merge branch 'main' of github.com:astriaorg/astria into itamarreif/br…
itamarreif Jun 12, 2024
f6d2bae
fix name
itamarreif Jun 13, 2024
86a1a03
fix renaming
itamarreif Jun 13, 2024
3018c16
Merge branch 'main' of github.com:astriaorg/astria into itamarreif/br…
itamarreif Jun 13, 2024
ad934e7
startup refactor no tests
itamarreif Jun 15, 2024
45552bc
cleanup tests
itamarreif Jun 17, 2024
b8b5c4e
Merge branch 'main' of github.com:astriaorg/astria into itamarreif/br…
itamarreif Jun 17, 2024
6cdaa25
Merge branch 'main' of github.com:astriaorg/astria into itamarreif/br…
itamarreif Jun 18, 2024
9d83093
Merge branch 'main' of github.com:astriaorg/astria into itamarreif/br…
itamarreif Jun 18, 2024
2ff6227
Merge branch 'main' of github.com:astriaorg/astria into itamarreif/br…
itamarreif Jun 20, 2024
e57e7d9
Merge branch 'main' of github.com:astriaorg/astria into itamarreif/br…
itamarreif Jun 24, 2024
dc5109f
address review comments
itamarreif Jun 24, 2024
7daee50
fix clippy
itamarreif Jun 24, 2024
6370462
Merge branch 'main' of github.com:astriaorg/astria into itamarreif/br…
itamarreif Jun 26, 2024
f36ed88
wait for empty mempool
itamarreif Jun 26, 2024
db3531a
config fixes
itamarreif Jun 26, 2024
24ddada
add pending nonce to submitter
itamarreif Jun 27, 2024
d73d66f
simplify startup handle
itamarreif Jun 28, 2024
b4dc531
fix tests
itamarreif Jun 28, 2024
1a19561
Merge branch 'main' of github.com:astriaorg/astria into itamarreif/br…
itamarreif Jun 28, 2024
c65412c
fix problems after merge
itamarreif Jun 28, 2024
2019dd6
Merge branch 'main' of github.com:astriaorg/astria into itamarreif/br…
itamarreif Jun 28, 2024
ebae9aa
clean up retries
itamarreif Jul 1, 2024
c57b5b9
comment
itamarreif Jul 1, 2024
a8b67a7
fix clippy
itamarreif Jul 1, 2024
277c179
add comment for lint
itamarreif Jul 1, 2024
e2510dd
files
itamarreif Jun 28, 2024
d3736ff
test migration
itamarreif Jul 1, 2024
9da9ed9
Merge branch 'itamarreif/bridge-withdrawer-startup-fixes' of github.c…
itamarreif Jul 1, 2024
c812eec
Merge branch 'itamarreif/bridge-withdrawer-startup-fixes' of github.c…
itamarreif Jul 1, 2024
c385713
clean up comments
itamarreif Jul 2, 2024
d0a8414
fix tests
itamarreif Jul 2, 2024
72c600c
clean up comment
itamarreif Jul 2, 2024
fb0178a
Merge branch 'itamarreif/bridge-withdrawer/blackbox' of github.com:as…
itamarreif Jul 2, 2024
6cf2c76
Merge branch 'itamarreif/bridge-withdrawer-startup-fixes' of github.c…
itamarreif Jul 2, 2024
0a8bff4
fix bad merge
itamarreif Jul 2, 2024
2b78d52
more bad merge fix
itamarreif Jul 2, 2024
56162c6
clean up spawn
itamarreif Jul 2, 2024
1c57c2e
black box helpers
itamarreif Jul 2, 2024
bc31a6f
eth test object
itamarreif Jul 3, 2024
aeecb65
move memo out to core
itamarreif Jul 3, 2024
f932b75
startup test works
itamarreif Jul 4, 2024
d0bbdcc
clippy cleanups
itamarreif Jul 4, 2024
6c5bfbe
test env works, need to fix eth
itamarreif Jul 4, 2024
a1a471a
fix startup task handle
itamarreif Jul 5, 2024
ac806fd
fix contract addr
itamarreif Jul 5, 2024
33767a9
sanity check test
itamarreif Jul 9, 2024
ea1692a
Merge branch 'main' of github.com:astriaorg/astria into itamarreif/br…
itamarreif Jul 10, 2024
f939986
Merge branch 'itamarreif/bridge-withdrawer/nonce-fix' of github.com:a…
itamarreif Jul 10, 2024
973c1b2
missed merge conflict
itamarreif Jul 10, 2024
cf73380
test passes except shutdown
itamarreif Jul 10, 2024
c583fc3
watcher logs
itamarreif Jul 10, 2024
a6620b4
fix hanging tests
itamarreif Jul 10, 2024
a8a75ce
test works
itamarreif Jul 10, 2024
b60f139
black box test works
itamarreif Jul 11, 2024
7c31561
native erc20 tests
itamarreif Jul 11, 2024
fad8e3b
fix tests
itamarreif Jul 11, 2024
697d247
ibc timeout nonsense
itamarreif Jul 11, 2024
ea9bf4c
erc20 tests
itamarreif Jul 12, 2024
72b2bd4
remove submitter tests
itamarreif Jul 12, 2024
68d578a
tests work
itamarreif Jul 12, 2024
052c597
make the crab happy
itamarreif Jul 12, 2024
c6ff49f
cleanup
itamarreif Jul 12, 2024
1333e13
fix weird merge conflict remnant
itamarreif Jul 12, 2024
ee42f6c
clean up comment
itamarreif Jul 12, 2024
896c632
less noisy logs?
itamarreif Jul 12, 2024
8cfa67f
remove unnecessary lint
itamarreif Jul 15, 2024
d6ca924
Merge branch 'main' of github.com:astriaorg/astria into itamarreif/br…
itamarreif Jul 23, 2024
ce0c200
fix ws endpoint and test action helpers
itamarreif Jul 23, 2024
312b74b
fix toml lint
itamarreif Jul 23, 2024
21b0ba5
toml fix?
itamarreif Jul 23, 2024
cc18578
remove ethers from astria core
itamarreif Jul 26, 2024
feed1bb
Merge branch 'main' of github.com:astriaorg/astria into itamarreif/br…
itamarreif Jul 26, 2024
f58a178
fix memo imports
itamarreif Jul 26, 2024
9894136
add ignore annotations on tests that need anvil
SuperFluffy Jul 26, 2024
8c69624
clean up black test imports
SuperFluffy Jul 26, 2024
4ff58ed
clean up instrumentation and tracing, remove new trace events from lo…
SuperFluffy Jul 29, 2024
f2bc000
remove unecessary scoping from nonce-fetch
SuperFluffy Jul 29, 2024
b0b8659
rm bad import
SuperFluffy Jul 29, 2024
e35b8d1
address clippy
SuperFluffy Jul 29, 2024
eedd73e
streamline expected<>actual action assertion
SuperFluffy Jul 29, 2024
8e63d79
import trait as _
SuperFluffy Jul 29, 2024
c6f9780
more clippy
SuperFluffy Jul 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/astria-bridge-withdrawer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@ telemetry = { package = "astria-telemetry", path = "../astria-telemetry", featur

[dev-dependencies]
astria-core = { path = "../astria-core", features = ["server", "test-utils"] }
astria-grpc-mock = { path = "../astria-grpc-mock" }
config = { package = "astria-config", path = "../astria-config", features = [
"tests",
] }
reqwest = { workspace = true, features = ["json"] }
tempfile = { workspace = true }
tendermint-rpc = { workspace = true }
tokio-stream = { workspace = true, features = ["net"] }
wiremock = { workspace = true }

[build-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use ethers::{
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{
debug,
info,
instrument,
warn,
};

Expand Down Expand Up @@ -99,38 +101,25 @@ pub(crate) struct Watcher {
state: Arc<State>,
}

struct FullyInitialized {
shutdown_token: CancellationToken,
submitter_handle: submitter::Handle,
state: Arc<State>,
provider: Arc<Provider<Ws>>,
action_fetcher: GetWithdrawalActions<Provider<Ws>>,
starting_rollup_height: u64,
}

impl Watcher {
pub(crate) async fn run(mut self) -> Result<()> {
let (provider, action_fetcher, next_rollup_block_height) = self
pub(crate) async fn run(self) -> Result<()> {
let fully_init = self
.startup()
.await
.wrap_err("watcher failed to start up")?;

let Self {
state,
shutdown_token,
submitter_handle,
..
} = self;

state.set_watcher_ready();
fully_init.state.set_watcher_ready();

tokio::select! {
res = watch_for_blocks(
provider,
action_fetcher,
next_rollup_block_height,
submitter_handle,
shutdown_token.clone(),
) => {
info!("block handler exited");
res.context("block handler exited")
}
() = shutdown_token.cancelled() => {
info!("watcher shutting down");
Ok(())
}
}
fully_init.run().await
}

/// Gets the startup data from the submitter and connects to the Ethereum node.
Expand All @@ -142,23 +131,39 @@ impl Watcher {
/// - If the fee asset ID provided in the config is not a valid fee asset on the sequencer.
/// - If the Ethereum node cannot be connected to after several retries.
/// - If the asset withdrawal decimals cannot be fetched.
async fn startup(
&mut self,
) -> eyre::Result<(Arc<Provider<Ws>>, GetWithdrawalActions<Provider<Ws>>, u64)> {
#[instrument(skip_all, err)]
async fn startup(self) -> eyre::Result<FullyInitialized> {
let Self {
shutdown_token,
mut startup_handle,
submitter_handle,
contract_address,
ethereum_rpc_endpoint,
rollup_asset_denom,
bridge_address,
state,
} = self;

let startup::Info {
fee_asset,
starting_rollup_height,
..
} = select! {
() = self.shutdown_token.cancelled() => {
() = shutdown_token.cancelled() => {
return Err(eyre!("watcher received shutdown signal while waiting for startup"));
}

startup_info = self.startup_handle.get_info() => {
startup_info = startup_handle.get_info() => {
startup_info.wrap_err("failed to receive startup info")?
}
};

debug!(
fee_asset = %fee_asset,
starting_rollup_height = starting_rollup_height,
"received startup info"
);

// connect to eth node
let retry_config = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(500))
Expand All @@ -179,7 +184,7 @@ impl Watcher {
);

let provider = tryhard::retry_fn(|| {
let url = self.ethereum_rpc_endpoint.clone();
let url = ethereum_rpc_endpoint.clone();
async move {
let websocket_client = Ws::connect_with_reconnects(url, 0).await?;
Ok(Provider::new(websocket_client))
Expand All @@ -190,62 +195,79 @@ impl Watcher {
.wrap_err("failed connecting to rollup after several retries; giving up")?;

let provider = Arc::new(provider);
let ics20_asset_to_withdraw = if self.rollup_asset_denom.last_channel().is_some() {
let ics20_asset_to_withdraw = if rollup_asset_denom.last_channel().is_some() {
info!(
rollup_asset_denom = %self.rollup_asset_denom,
%rollup_asset_denom,
"configured rollup asset contains an ics20 channel; ics20 withdrawals will be emitted"
);
Some(self.rollup_asset_denom.clone())
Some(rollup_asset_denom.clone())
} else {
info!(
rollup_asset_denom = %self.rollup_asset_denom,
%rollup_asset_denom,
"configured rollup asset does not contain an ics20 channel; ics20 withdrawals will not be emitted"
);
None
};
let action_fetcher = GetWithdrawalActionsBuilder::new()
.provider(provider.clone())
.fee_asset(fee_asset)
.contract_address(self.contract_address)
.bridge_address(self.bridge_address)
.sequencer_asset_to_withdraw(self.rollup_asset_denom.clone().into())
.contract_address(contract_address)
.bridge_address(bridge_address)
.sequencer_asset_to_withdraw(rollup_asset_denom.clone().into())
.set_ics20_asset_to_withdraw(ics20_asset_to_withdraw)
.try_build()
.await
.wrap_err("failed to construct contract event to sequencer action fetcher")?;

self.state.set_watcher_ready();
Ok(FullyInitialized {
shutdown_token,
submitter_handle,
state,
provider,
action_fetcher,
starting_rollup_height,
})
}
}

Ok((provider.clone(), action_fetcher, starting_rollup_height))
impl FullyInitialized {
async fn run(self) -> eyre::Result<()> {
tokio::select! {
res = watch_for_blocks(
self.provider,
self.action_fetcher,
self.starting_rollup_height,
self.submitter_handle,
self.shutdown_token.clone(),
) => {
res.context("block handler exited")
}
() = self.shutdown_token.cancelled() => {
Ok(())
}
}
}
}

async fn sync_from_next_rollup_block_height(
#[instrument(skip_all, fields(from_rollup_height, to_rollup_height), err)]
async fn sync_unprocessed_rollup_heights(
provider: Arc<Provider<Ws>>,
action_fetcher: &GetWithdrawalActions<Provider<Ws>>,
submitter_handle: &submitter::Handle,
next_rollup_block_height_to_check: u64,
current_rollup_block_height: u64,
from_rollup_height: u64,
to_rollup_height: u64,
) -> Result<()> {
if current_rollup_block_height < next_rollup_block_height_to_check {
return Ok(());
}

for i in next_rollup_block_height_to_check..=current_rollup_block_height {
let Some(block) = provider
for i in from_rollup_height..=to_rollup_height {
let block = provider
.get_block(i)
.await
.wrap_err("failed to get block")?
else {
bail!("block with number {i} missing");
};

get_and_send_events_at_block(action_fetcher, block, submitter_handle)
.map_err(eyre::Report::new)
.and_then(|block| block.ok_or_eyre("block is missing"))
.wrap_err_with(|| format!("failed to get block at rollup height `{i}`"))?;
get_and_forward_block_events(action_fetcher, block, submitter_handle)
.await
.wrap_err("failed to get and send events at block")?;
}

info!("synced from {next_rollup_block_height_to_check} to {current_rollup_block_height}");
Ok(())
}

Expand All @@ -271,9 +293,15 @@ async fn watch_for_blocks(
bail!("current rollup block missing block number")
};

info!(
block.height = current_rollup_block_height.as_u64(),
block.hash = current_rollup_block.hash.map(tracing::field::display),
"got current block"
);

// sync any blocks missing between `next_rollup_block_height` and the current latest
// (inclusive).
sync_from_next_rollup_block_height(
sync_unprocessed_rollup_heights(
provider.clone(),
&action_fetcher,
&submitter_handle,
Expand All @@ -291,7 +319,7 @@ async fn watch_for_blocks(
}
block = block_rx.next() => {
if let Some(block) = block {
get_and_send_events_at_block(
get_and_forward_block_events(
&action_fetcher,
block,
&submitter_handle,
Expand All @@ -306,7 +334,11 @@ async fn watch_for_blocks(
}
}

async fn get_and_send_events_at_block(
#[instrument(skip_all, fields(
block.hash = block.hash.map(tracing::field::display),
block.number = block.number.map(tracing::field::display),
), err)]
async fn get_and_forward_block_events(
actions_fetcher: &GetWithdrawalActions<Provider<Ws>>,
block: Block<H256>,
submitter_handle: &submitter::Handle,
Expand All @@ -319,12 +351,7 @@ async fn get_and_send_events_at_block(
let actions = actions_fetcher
.get_for_block_hash(block_hash)
.await
.wrap_err_with(|| {
format!(
"failed getting actions for block; block hash: `{block_hash}`, block height: \
`{rollup_height}`"
)
})?;
.wrap_err("failed getting actions for block")?;

if actions.is_empty() {
info!(
Expand Down
10 changes: 7 additions & 3 deletions crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ impl BridgeWithdrawer {
Ok((service, shutdown_handle))
}

pub fn local_addr(&self) -> SocketAddr {
self.api_server.local_addr()
}

// Panic won't happen because `startup_task` is unwraped lazily after checking if it's `Some`.
#[allow(clippy::missing_panics_doc)]
pub async fn run(self) {
Expand Down Expand Up @@ -346,12 +350,12 @@ impl Shutdown {
.await
.map(flatten_result)
{
Ok(Ok(())) => info!("withdrawer exited gracefully"),
Ok(Err(error)) => error!(%error, "withdrawer exited with an error"),
Ok(Ok(())) => info!("submitter exited gracefully"),
Ok(Err(error)) => error!(%error, "submitter exited with an error"),
Err(_) => {
error!(
timeout_secs = limit.as_secs(),
"watcher did not shut down within timeout; killing it"
"submitter did not shut down within timeout; killing it"
);
submitter_task.abort();
}
Expand Down
Loading