Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine onchain order event handling logic #2828

Merged
merged 7 commits into from
Jul 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 88 additions & 111 deletions crates/autopilot/src/database/onchain_order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,137 +156,31 @@ impl<T: Sync + Send + Clone, W: Sync + Send + Clone> EventStoring<ContractEvent>
events: Vec<EthContractEvent<ContractEvent>>,
range: RangeInclusive<u64>,
) -> Result<()> {
let order_placement_events = events
.clone()
.into_iter()
.filter(|EthContractEvent { data, .. }| {
matches!(data, ContractEvent::OrderPlacement(_))
})
.collect();
let invalidation_events = get_invalidation_events(events)?;
let invalided_order_uids = extract_invalidated_order_uids(invalidation_events)?;
let (custom_onchain_data, quotes, broadcasted_order_data, orders) = self
.extract_custom_and_general_order_data(order_placement_events)
.await?;

let _timer = DatabaseMetrics::get()
.database_queries
.with_label_values(&["replace_onchain_order_events"])
.start_timer();

let mut transaction = self.db.pool.begin().await?;

database::onchain_broadcasted_orders::mark_as_reorged(
&mut transaction,
*range.start() as i64,
)
.await
.context("mark_onchain_order_events failed")?;

database::onchain_invalidations::delete_invalidations(
&mut transaction,
*range.start() as i64,
)
.await
.context("invalidating_onchain_order_events failed")?;

database::onchain_invalidations::insert_onchain_invalidations(
&mut transaction,
invalided_order_uids.as_slice(),
)
.await
.context("insert_onchain_invalidations failed")?;

self.custom_onchain_data_parser
.append_custom_order_info_to_db(&mut transaction, custom_onchain_data)
.await
.context("append_custom_onchain_orders failed")?;

database::onchain_broadcasted_orders::append(
&mut transaction,
broadcasted_order_data.as_slice(),
)
.await
.context("append_onchain_orders failed")?;

// We only need to insert quotes for orders that will be included in an
// auction (they are needed to compute solver rewards). If placement
// failed, then the quote is not needed.
insert_quotes(
&mut transaction,
quotes.into_iter().flatten().collect::<Vec<_>>().as_slice(),
)
.await
.context("appending quotes for onchain orders failed")?;
self.delete_events(&mut transaction, range).await?;
self.insert_events(events, &mut transaction).await?;

database::orders::insert_orders_and_ignore_conflicts(&mut transaction, orders.as_slice())
.await
.context("insert_orders failed")?;
transaction.commit().await.context("commit")?;

for order in &invalided_order_uids {
tracing::debug!(?order, "invalidated order");
}
for order in &orders {
tracing::debug!(order =? order.uid, "upserted order");
}

Ok(())
}

async fn append_events(&mut self, events: Vec<EthContractEvent<ContractEvent>>) -> Result<()> {
let order_placement_events = events
.clone()
.into_iter()
.filter(|EthContractEvent { data, .. }| {
matches!(data, ContractEvent::OrderPlacement(_))
})
.collect();
let invalidation_events = get_invalidation_events(events)?;
let invalided_order_uids = extract_invalidated_order_uids(invalidation_events)?;
let (custom_order_data, quotes, broadcasted_order_data, orders) = self
.extract_custom_and_general_order_data(order_placement_events)
.await?;

let _timer = DatabaseMetrics::get()
.database_queries
.with_label_values(&["append_onchain_order_events"])
fleupold marked this conversation as resolved.
Show resolved Hide resolved
.start_timer();
let mut transaction = self.db.pool.begin().await?;

database::onchain_invalidations::insert_onchain_invalidations(
&mut transaction,
invalided_order_uids.as_slice(),
)
.await
.context("insert_onchain_invalidations failed")?;
database::onchain_broadcasted_orders::append(
&mut transaction,
broadcasted_order_data.as_slice(),
)
.await
.context("append_onchain_orders failed")?;

self.custom_onchain_data_parser
.append_custom_order_info_to_db(&mut transaction, custom_order_data)
.await
.context("append_custom_onchain_orders failed")?;

// We only need to insert quotes for orders that will be included in an
// auction (they are needed to compute solver rewards). If placement
// failed, then the quote is not needed.
insert_quotes(
&mut transaction,
quotes.into_iter().flatten().collect::<Vec<_>>().as_slice(),
)
.await
.context("appending quotes for onchain orders failed")?;

database::orders::insert_orders_and_ignore_conflicts(&mut transaction, orders.as_slice())
.await
.context("insert_orders failed")?;

let mut transaction = self.db.pool.begin().await?;
self.insert_events(events, &mut transaction).await?;
transaction.commit().await.context("commit")?;

Ok(())
}

Expand Down Expand Up @@ -372,6 +266,89 @@ impl<T: Send + Sync + Clone, W: Send + Sync> OnchainOrderParser<T, W> {
);
Ok(multiunzip(data_tuple))
}

async fn delete_events(
&self,
transaction: &mut sqlx::Transaction<'static, sqlx::Postgres>,
range: RangeInclusive<u64>,
) -> Result<()> {
database::onchain_broadcasted_orders::mark_as_reorged(
transaction,
i64::try_from(*range.start()).unwrap_or(i64::MAX),
)
.await
.context("mark_onchain_order_events failed")?;

database::onchain_invalidations::delete_invalidations(
transaction,
i64::try_from(*range.start()).unwrap_or(i64::MAX),
)
.await
.context("invalidating_onchain_order_events failed")?;

Ok(())
}

async fn insert_events(
&self,
events: Vec<EthContractEvent<ContractEvent>>,
transaction: &mut sqlx::Transaction<'static, sqlx::Postgres>,
) -> Result<()> {
let order_placement_events = events
.clone()
.into_iter()
.filter(|EthContractEvent { data, .. }| {
matches!(data, ContractEvent::OrderPlacement(_))
})
.collect();
let invalidation_events = get_invalidation_events(events)?;
let invalided_order_uids = extract_invalidated_order_uids(invalidation_events)?;
let (custom_onchain_data, quotes, broadcasted_order_data, orders) = self
.extract_custom_and_general_order_data(order_placement_events)
.await?;

database::onchain_invalidations::insert_onchain_invalidations(
transaction,
invalided_order_uids.as_slice(),
)
.await
.context("insert_onchain_invalidations failed")?;

self.custom_onchain_data_parser
.append_custom_order_info_to_db(transaction, custom_onchain_data)
.await
.context("append_custom_onchain_orders failed")?;

database::onchain_broadcasted_orders::append(
transaction,
broadcasted_order_data.as_slice(),
)
.await
.context("append_onchain_orders failed")?;

// We only need to insert quotes for orders that will be included in an
// auction (they are needed to compute solver rewards). If placement
// failed, then the quote is not needed.
insert_quotes(
transaction,
quotes.into_iter().flatten().collect::<Vec<_>>().as_slice(),
)
.await
.context("appending quotes for onchain orders failed")?;

database::orders::insert_orders_and_ignore_conflicts(transaction, orders.as_slice())
.await
.context("insert_orders failed")?;

for order in &invalided_order_uids {
tracing::debug!(?order, "invalidated order");
}
for order in &orders {
tracing::debug!(order =? order.uid, "upserted order");
}

Ok(())
}
}

async fn get_block_numbers_of_events(
Expand Down
Loading