Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use astria_core::{
primitive::v1::{
asset,
asset::Denom,
Address,
},
protocol::transaction::v1alpha1::{
action::{
Expand Down Expand Up @@ -95,7 +94,7 @@ fn event_to_bridge_unlock(
transaction_hash,
};
let action = BridgeUnlockAction {
to: event.sender.to_fixed_bytes().into(),
to: event.destination_chain_address.to_fixed_bytes().into(),
amount: event
.amount
.as_u128()
Expand Down Expand Up @@ -127,11 +126,7 @@ fn event_to_ics20_withdrawal(
// TODO: make this configurable
const ICS20_WITHDRAWAL_TIMEOUT: Duration = Duration::from_secs(300);

let sender: [u8; 20] = event
.sender
.as_bytes()
.try_into()
.expect("U160 must be 20 bytes");
let sender = event.sender.to_fixed_bytes().into();
let denom = rollup_asset_denom.clone();

let (_, channel) = denom
Expand All @@ -152,7 +147,7 @@ fn event_to_ics20_withdrawal(
// returned to the rollup.
// this is only ok for now because addresses on the sequencer and the rollup are both 20
// bytes, but this won't work otherwise.
return_address: Address::from(sender),
return_address: sender,
amount: event
.amount
.as_u128()
Expand Down Expand Up @@ -207,7 +202,7 @@ mod tests {
};

let expected_action = BridgeUnlockAction {
to: [0u8; 20].into(),
to: [1u8; 20].into(),
amount: 99,
memo: serde_json::to_vec(&BridgeUnlockMemo {
block_number: 1.into(),
Expand Down Expand Up @@ -239,7 +234,7 @@ mod tests {
};

let expected_action = BridgeUnlockAction {
to: [0u8; 20].into(),
to: [1u8; 20].into(),
amount: 99,
memo: serde_json::to_vec(&BridgeUnlockMemo {
block_number: 1.into(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use astria_eyre::{
};
use ethers::{
contract::LogMeta,
core::types::Block,
providers::{
Middleware,
Provider,
StreamExt as _,
Ws,
Expand All @@ -26,6 +28,7 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;
use tracing::{
error,
info,
warn,
};
Expand Down Expand Up @@ -106,7 +109,7 @@ impl Watcher {
.await
.wrap_err("failed to connect to ethereum RPC endpoint")?,
);
let contract = AstriaWithdrawer::new(contract_address, provider);
let contract = AstriaWithdrawer::new(contract_address, provider.clone());

let base_chain_asset_precision = contract
.base_chain_asset_precision()
Expand All @@ -121,6 +124,7 @@ impl Watcher {

let batcher = Batcher::new(
event_rx,
provider,
batch_tx,
&shutdown_token,
fee_asset_id,
Expand Down Expand Up @@ -214,6 +218,7 @@ async fn watch_for_ics20_withdrawal_events(

struct Batcher {
event_rx: mpsc::Receiver<(WithdrawalEvent, LogMeta)>,
provider: Arc<Provider<Ws>>,
batch_tx: mpsc::Sender<Batch>,
shutdown_token: CancellationToken,
fee_asset_id: asset::Id,
Expand All @@ -224,6 +229,7 @@ struct Batcher {
impl Batcher {
pub(crate) fn new(
event_rx: mpsc::Receiver<(WithdrawalEvent, LogMeta)>,
provider: Arc<Provider<Ws>>,
batch_tx: mpsc::Sender<Batch>,
shutdown_token: &CancellationToken,
fee_asset_id: asset::Id,
Expand All @@ -232,17 +238,22 @@ impl Batcher {
) -> Self {
Self {
event_rx,
provider,
batch_tx,
shutdown_token: shutdown_token.clone(),
fee_asset_id,
rollup_asset_denom,
asset_withdrawal_divisor,
}
}
}

impl Batcher {
pub(crate) async fn run(mut self) -> Result<()> {
let mut block_rx = self
.provider
.subscribe_blocks()
.await
.wrap_err("failed to subscribe to blocks")?;

let mut curr_batch = Batch {
actions: Vec::new(),
rollup_height: 0,
Expand All @@ -254,6 +265,32 @@ impl Batcher {
info!("batcher shutting down");
break;
}
block = block_rx.next() => {
if let Some(Block { number, .. }) = block {
let Some(block_number) = number else {
// don't think this should happen
warn!("block number missing; skipping");
continue;
};

if block_number.as_u64() > curr_batch.rollup_height {
if !curr_batch.actions.is_empty() {
self.batch_tx
.send(curr_batch)
.await
.wrap_err("failed to send batched events; receiver dropped?")?;
}

curr_batch = Batch {
actions: Vec::new(),
rollup_height: block_number.as_u64(),
};
}
} else {
error!("block stream closed; shutting down batcher");
break;
}
}
item = self.event_rx.recv() => {
if let Some((event, meta)) = item {
let event_with_metadata = EventWithMetadata {
Expand Down Expand Up @@ -281,7 +318,7 @@ impl Batcher {
};
}
} else {
warn!("event receiver dropped; shutting down batcher");
error!("event receiver dropped; shutting down batcher");
break;
}
}
Expand Down