Skip to content

Commit 4175974

Browse files
committed
Create a task to prune StakePool and add it to the UpkeepService.
1 parent ef6c72d commit 4175974

File tree

2 files changed

+47
-18
lines changed

2 files changed

+47
-18
lines changed

mithril-aggregator/src/database/repository/stake_pool_store.rs

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::database::query::{
1515
DeleteStakePoolQuery, GetStakePoolQuery, InsertOrReplaceStakePoolQuery,
1616
};
1717
use crate::database::record::StakePool;
18+
use crate::services::EpochPruningTask;
1819

1920
/// Service to deal with stake pools (read & write).
2021
pub struct StakePoolStore {
@@ -53,17 +54,6 @@ impl StakeStorer for StakePoolStore {
5354
.with_context(|| format!("persist stakes failure, epoch: {epoch}"))
5455
.map_err(AdapterError::GeneralError)?;
5556

56-
// Prune useless old stake distributions.
57-
if let Some(threshold) = self.retention_limit {
58-
let _ = self
59-
.connection
60-
.fetch(DeleteStakePoolQuery::below_epoch_threshold(
61-
epoch - threshold,
62-
))
63-
.map_err(AdapterError::QueryError)?
64-
.count();
65-
}
66-
6757
Ok(Some(StakeDistribution::from_iter(
6858
pools.into_iter().map(|p| (p.stake_pool_id, p.stake)),
6959
)))
@@ -95,27 +85,43 @@ impl StakeDistributionRetriever for StakePoolStore {
9585
}
9686
}
9787

88+
#[async_trait]
89+
impl EpochPruningTask for StakePoolStore {
90+
fn pruned_data(&self) -> &'static str {
91+
"Stake pool"
92+
}
93+
94+
async fn prune(&self, epoch: Epoch) -> StdResult<()> {
95+
if let Some(threshold) = self.retention_limit {
96+
self.connection
97+
.apply(DeleteStakePoolQuery::below_epoch_threshold(
98+
epoch - threshold,
99+
))
100+
.map_err(AdapterError::QueryError)?;
101+
}
102+
Ok(())
103+
}
104+
}
105+
98106
#[cfg(test)]
99107
mod tests {
100108
use crate::database::test_helper::{insert_stake_pool, main_db_connection};
101109

102110
use super::*;
103111

104112
#[tokio::test]
105-
async fn save_protocol_parameters_prune_older_epoch_settings() {
113+
async fn prune_epoch_settings_older_than_threshold() {
106114
let connection = main_db_connection().unwrap();
107115
const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: u64 = 10;
108116
insert_stake_pool(&connection, &[1, 2]).unwrap();
109117
let store =
110118
StakePoolStore::new(Arc::new(connection), Some(STAKE_POOL_PRUNE_EPOCH_THRESHOLD));
111119

112120
store
113-
.save_stakes(
114-
Epoch(2) + STAKE_POOL_PRUNE_EPOCH_THRESHOLD,
115-
StakeDistribution::from_iter([("pool1".to_string(), 100)]),
116-
)
121+
.prune(Epoch(2) + STAKE_POOL_PRUNE_EPOCH_THRESHOLD)
117122
.await
118-
.expect("saving stakes should not fails");
123+
.unwrap();
124+
119125
let epoch1_stakes = store.get_stakes(Epoch(1)).await.unwrap();
120126
let epoch2_stakes = store.get_stakes(Epoch(2)).await.unwrap();
121127

@@ -129,6 +135,27 @@ mod tests {
129135
);
130136
}
131137

138+
#[tokio::test]
139+
async fn without_threshold_nothing_is_pruned() {
140+
let connection = main_db_connection().unwrap();
141+
insert_stake_pool(&connection, &[1, 2]).unwrap();
142+
let store = StakePoolStore::new(Arc::new(connection), None);
143+
144+
store.prune(Epoch(100)).await.unwrap();
145+
146+
let epoch1_stakes = store.get_stakes(Epoch(1)).await.unwrap();
147+
let epoch2_stakes = store.get_stakes(Epoch(2)).await.unwrap();
148+
149+
assert!(
150+
epoch1_stakes.is_some(),
151+
"Stakes at epoch 1 should have been pruned",
152+
);
153+
assert!(
154+
epoch2_stakes.is_some(),
155+
"Stakes at epoch 2 should still exist",
156+
);
157+
}
158+
132159
#[tokio::test]
133160
async fn retrieve_with_no_stakes_returns_none() {
134161
let connection = main_db_connection().unwrap();

mithril-aggregator/src/dependency_injection/builder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1337,13 +1337,15 @@ impl DependenciesBuilder {
13371337
}
13381338

13391339
async fn build_upkeep_service(&mut self) -> Result<Arc<dyn UpkeepService>> {
1340+
let stake_pool_pruning_task = self.get_stake_store().await?;
1341+
13401342
let upkeep_service = Arc::new(AggregatorUpkeepService::new(
13411343
self.get_sqlite_connection().await?,
13421344
self.get_sqlite_connection_cardano_transaction_pool()
13431345
.await?,
13441346
self.get_event_store_sqlite_connection().await?,
13451347
self.get_signed_entity_lock().await?,
1346-
vec![], // TODO XXX Add pruning tasks here
1348+
vec![stake_pool_pruning_task],
13471349
self.root_logger(),
13481350
));
13491351

0 commit comments

Comments
 (0)