-
Notifications
You must be signed in to change notification settings - Fork 67
/
on_settlement_event_updater.rs
138 lines (124 loc) · 5.03 KB
/
on_settlement_event_updater.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
//! This module is responsible for updating the database, for each settlement
//! event that is emitted by the settlement contract.
//
// Each settlement transaction is expected to contain an auction id to uniquely
// identify the auction for which it was allowed to be brought onchain.
// This auction id is used to build the accociation between the settlement event
// and the auction in the database.
//
// Another responsibility of this module is to observe the settlement and save
// data of interest to the database. This data includes surplus, taken fees, gas
// used etc.
use {
crate::{domain::settlement, infra},
anyhow::{anyhow, Context, Result},
database::PgTransaction,
};
#[derive(Clone)]
pub struct OnSettlementEventUpdater {
eth: infra::Ethereum,
persistence: infra::Persistence,
}
impl OnSettlementEventUpdater {
/// Creates a new OnSettlementEventUpdater and asynchronously schedules the
/// first update run.
pub fn new(eth: infra::Ethereum, persistence: infra::Persistence) -> Self {
Self { eth, persistence }
}
/// Deletes settlement_observations and order executions for the given range
pub async fn delete_observations(
transaction: &mut PgTransaction<'_>,
from_block: u64,
) -> Result<()> {
database::settlements::delete(transaction, from_block)
.await
.context("delete_settlement_observations")?;
Ok(())
}
/// Fetches all the available missing data needed for bookkeeping.
/// This needs to get called after indexing a new settlement event
/// since this code needs that data to already be present in the DB.
pub async fn update(&self) {
loop {
match self.single_update().await {
Ok(true) => {
tracing::debug!("on settlement event updater ran and processed event");
// There might be more pending updates, continue immediately.
continue;
}
Ok(false) => {
tracing::debug!("on settlement event updater ran without update");
break;
}
Err(err) => {
tracing::error!(?err, "on settlement event update task failed");
break;
}
}
}
}
/// Update database for settlement events that have not been processed yet.
///
/// Returns whether an update was performed.
async fn single_update(&self) -> Result<bool> {
// Find a settlement event that has not been processed yet.
let Some(event) = self.persistence.get_settlement_without_auction().await? else {
return Ok(false);
};
tracing::debug!(tx = ?event.transaction, "updating settlement details");
// Reconstruct the settlement transaction based on the blockchain transaction
// hash
let transaction = match self.eth.transaction(event.transaction).await {
Ok(transaction) => {
let separator = self.eth.contracts().settlement_domain_separator();
settlement::Transaction::new(&transaction, separator)
}
Err(err) => {
tracing::warn!(hash = ?event.transaction, ?err, "no tx found");
return Ok(false);
}
};
// Build the <auction_id, settlement> association
let (auction_id, settlement) = match transaction {
Ok(transaction) => {
let auction_id = transaction.auction_id;
let settlement = match settlement::Settlement::new(transaction, &self.persistence)
.await
{
Ok(settlement) => Some(settlement),
Err(err) if retryable(&err) => return Err(err.into()),
Err(err) => {
tracing::warn!(hash = ?event.transaction, ?auction_id, ?err, "invalid settlement");
None
}
};
(auction_id, settlement)
}
Err(err) => {
tracing::warn!(hash = ?event.transaction, ?err, "invalid settlement transaction");
// default values so we don't get stuck on invalid settlement transactions
(0.into(), None)
}
};
tracing::debug!(hash = ?event.transaction, ?auction_id, "saving settlement details for tx");
if let Err(err) = self
.persistence
.save_settlement(event, auction_id, settlement.as_ref())
.await
{
return Err(anyhow!(
"save settlement: {:?}, {auction_id}, {err}",
event.transaction
));
}
Ok(true)
}
}
/// Whether OnSettlementEventUpdater loop should retry on the given error.
fn retryable(err: &settlement::Error) -> bool {
match err {
settlement::Error::Infra(_) => true,
settlement::Error::InconsistentData(_) => false,
settlement::Error::WrongEnvironment => false,
}
}