Skip to content

Commit 0a7fb9f

Browse files
authored
Merge pull request #2103 from input-output-hk/sfa/2075/refactor_pruning_with_upkeep_service
Refactor aggregator pruning with upkeep service
2 parents c936b60 + ef8feb1 commit 0a7fb9f

File tree

11 files changed

+288
-76
lines changed

11 files changed

+288
-76
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/mithril-persistence/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-persistence"
3-
version = "0.2.31"
3+
version = "0.2.32"
44
description = "Common types, interfaces, and utilities to persist data for Mithril nodes."
55
authors = { workspace = true }
66
edition = { workspace = true }

internal/mithril-persistence/src/sqlite/connection_extensions.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ pub trait ConnectionExtensions {
3030
fn fetch_collect<Q: Query, B: FromIterator<Q::Entity>>(&self, query: Q) -> StdResult<B> {
3131
Ok(self.fetch(query)?.collect::<B>())
3232
}
33+
34+
/// Apply a query that do not return data from the database(ie: insert, delete, ...).
35+
fn apply<Q: Query>(&self, query: Q) -> StdResult<()> {
36+
self.fetch(query)?.count();
37+
Ok(())
38+
}
3339
}
3440

3541
impl ConnectionExtensions for SqliteConnection {
@@ -79,6 +85,8 @@ fn prepare_statement<'conn>(
7985
mod tests {
8086
use sqlite::Connection;
8187

88+
use crate::sqlite::{HydrationError, SqLiteEntity, WhereCondition};
89+
8290
use super::*;
8391

8492
#[test]
@@ -115,4 +123,56 @@ mod tests {
115123

116124
assert_eq!(value, 45);
117125
}
126+
127+
#[test]
128+
fn test_apply_execute_the_query() {
129+
struct DummySqLiteEntity {}
130+
impl SqLiteEntity for DummySqLiteEntity {
131+
fn hydrate(_row: sqlite::Row) -> Result<Self, HydrationError>
132+
where
133+
Self: Sized,
134+
{
135+
unimplemented!()
136+
}
137+
138+
fn get_projection() -> crate::sqlite::Projection {
139+
unimplemented!()
140+
}
141+
}
142+
143+
struct FakeQuery {
144+
sql: String,
145+
}
146+
impl Query for FakeQuery {
147+
type Entity = DummySqLiteEntity;
148+
149+
fn filters(&self) -> WhereCondition {
150+
WhereCondition::default()
151+
}
152+
153+
fn get_definition(&self, _condition: &str) -> String {
154+
self.sql.clone()
155+
}
156+
}
157+
158+
let connection = Connection::open_thread_safe(":memory:").unwrap();
159+
connection
160+
.execute("create table query_test(text_data);")
161+
.unwrap();
162+
163+
let value: i64 = connection
164+
.query_single_cell("select count(*) from query_test", &[])
165+
.unwrap();
166+
assert_eq!(value, 0);
167+
168+
let query = FakeQuery {
169+
sql: "insert into query_test(text_data) values ('row 1')".to_string(),
170+
};
171+
connection.apply(query).unwrap();
172+
173+
let value: i64 = connection
174+
.query_single_cell("select count(*) from query_test", &[])
175+
.unwrap();
176+
assert_eq!(value, 1);
177+
}
118178
}

mithril-aggregator/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-aggregator"
3-
version = "0.5.107"
3+
version = "0.5.108"
44
description = "A Mithril Aggregator server"
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-aggregator/src/database/repository/epoch_settings_store.rs

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::database::query::{
1212
DeleteEpochSettingsQuery, GetEpochSettingsQuery, UpdateEpochSettingsQuery,
1313
};
1414
use crate::entities::AggregatorEpochSettings;
15+
use crate::services::EpochPruningTask;
1516
use crate::EpochSettingsStorer;
1617

1718
/// Service to deal with epoch settings (read & write).
@@ -68,17 +69,6 @@ impl EpochSettingsStorer for EpochSettingsStore {
6869
.map_err(|e| AdapterError::GeneralError(e.context("persist epoch settings failure")))?
6970
.unwrap_or_else(|| panic!("No entity returned by the persister, epoch = {epoch:?}"));
7071

71-
// Prune useless old epoch settings.
72-
if let Some(threshold) = self.retention_limit {
73-
let _ = self
74-
.connection
75-
.fetch(DeleteEpochSettingsQuery::below_epoch_threshold(
76-
epoch - threshold,
77-
))
78-
.map_err(AdapterError::QueryError)?
79-
.count();
80-
}
81-
8272
Ok(Some(epoch_settings_record.into()))
8373
}
8474

@@ -102,6 +92,25 @@ impl EpochSettingsStorer for EpochSettingsStore {
10292
}
10393
}
10494

95+
#[async_trait]
96+
impl EpochPruningTask for EpochSettingsStore {
97+
fn pruned_data(&self) -> &'static str {
98+
"Epoch settings"
99+
}
100+
101+
/// Prune useless old epoch settings.
102+
async fn prune(&self, epoch: Epoch) -> StdResult<()> {
103+
if let Some(threshold) = self.retention_limit {
104+
self.connection
105+
.apply(DeleteEpochSettingsQuery::below_epoch_threshold(
106+
epoch - threshold,
107+
))
108+
.map_err(AdapterError::QueryError)?;
109+
}
110+
Ok(())
111+
}
112+
}
113+
105114
#[cfg(test)]
106115
mod tests {
107116
use mithril_common::entities::BlockNumber;
@@ -172,7 +181,7 @@ mod tests {
172181
}
173182

174183
#[tokio::test]
175-
async fn save_epoch_settings_prune_older_epoch_settings() {
184+
async fn prune_epoch_settings_older_than_threshold() {
176185
const EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD: u64 = 5;
177186

178187
let connection = main_db_connection().unwrap();
@@ -183,12 +192,10 @@ mod tests {
183192
);
184193

185194
store
186-
.save_epoch_settings(
187-
Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD,
188-
AggregatorEpochSettings::dummy(),
189-
)
195+
.prune(Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD)
190196
.await
191-
.expect("saving epoch settings should not fails");
197+
.unwrap();
198+
192199
let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
193200
let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
194201

@@ -202,6 +209,27 @@ mod tests {
202209
);
203210
}
204211

212+
#[tokio::test]
213+
async fn without_threshold_nothing_is_pruned() {
214+
let connection = main_db_connection().unwrap();
215+
insert_epoch_settings(&connection, &[1, 2]).unwrap();
216+
let store = EpochSettingsStore::new(Arc::new(connection), None);
217+
218+
store.prune(Epoch(100)).await.unwrap();
219+
220+
let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
221+
let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();
222+
223+
assert!(
224+
epoch1_params.is_some(),
225+
"Epoch settings at epoch 1 should have been pruned",
226+
);
227+
assert!(
228+
epoch2_params.is_some(),
229+
"Epoch settings at epoch 2 should still exist",
230+
);
231+
}
232+
205233
#[tokio::test]
206234
async fn save_epoch_settings_stores_in_database() {
207235
let connection = main_db_connection().unwrap();

mithril-aggregator/src/database/repository/stake_pool_store.rs

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::database::query::{
1515
DeleteStakePoolQuery, GetStakePoolQuery, InsertOrReplaceStakePoolQuery,
1616
};
1717
use crate::database::record::StakePool;
18+
use crate::services::EpochPruningTask;
1819

1920
/// Service to deal with stake pools (read & write).
2021
pub struct StakePoolStore {
@@ -53,17 +54,6 @@ impl StakeStorer for StakePoolStore {
5354
.with_context(|| format!("persist stakes failure, epoch: {epoch}"))
5455
.map_err(AdapterError::GeneralError)?;
5556

56-
// Prune useless old stake distributions.
57-
if let Some(threshold) = self.retention_limit {
58-
let _ = self
59-
.connection
60-
.fetch(DeleteStakePoolQuery::below_epoch_threshold(
61-
epoch - threshold,
62-
))
63-
.map_err(AdapterError::QueryError)?
64-
.count();
65-
}
66-
6757
Ok(Some(StakeDistribution::from_iter(
6858
pools.into_iter().map(|p| (p.stake_pool_id, p.stake)),
6959
)))
@@ -91,9 +81,25 @@ impl StakeStorer for StakePoolStore {
9181
#[async_trait]
9282
impl StakeDistributionRetriever for StakePoolStore {
9383
async fn retrieve(&self, epoch: Epoch) -> StdResult<Option<StakeDistribution>> {
94-
let stake_distribution = self.get_stakes(epoch).await?;
84+
self.get_stakes(epoch).await
85+
}
86+
}
9587

96-
Ok(stake_distribution)
88+
#[async_trait]
89+
impl EpochPruningTask for StakePoolStore {
90+
fn pruned_data(&self) -> &'static str {
91+
"Stake pool"
92+
}
93+
94+
async fn prune(&self, epoch: Epoch) -> StdResult<()> {
95+
if let Some(threshold) = self.retention_limit {
96+
self.connection
97+
.apply(DeleteStakePoolQuery::below_epoch_threshold(
98+
epoch - threshold,
99+
))
100+
.map_err(AdapterError::QueryError)?;
101+
}
102+
Ok(())
97103
}
98104
}
99105

@@ -104,20 +110,18 @@ mod tests {
104110
use super::*;
105111

106112
#[tokio::test]
107-
async fn save_protocol_parameters_prune_older_epoch_settings() {
113+
async fn prune_epoch_settings_older_than_threshold() {
108114
let connection = main_db_connection().unwrap();
109115
const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: u64 = 10;
110116
insert_stake_pool(&connection, &[1, 2]).unwrap();
111117
let store =
112118
StakePoolStore::new(Arc::new(connection), Some(STAKE_POOL_PRUNE_EPOCH_THRESHOLD));
113119

114120
store
115-
.save_stakes(
116-
Epoch(2) + STAKE_POOL_PRUNE_EPOCH_THRESHOLD,
117-
StakeDistribution::from_iter([("pool1".to_string(), 100)]),
118-
)
121+
.prune(Epoch(2) + STAKE_POOL_PRUNE_EPOCH_THRESHOLD)
119122
.await
120-
.expect("saving stakes should not fails");
123+
.unwrap();
124+
121125
let epoch1_stakes = store.get_stakes(Epoch(1)).await.unwrap();
122126
let epoch2_stakes = store.get_stakes(Epoch(2)).await.unwrap();
123127

@@ -131,6 +135,27 @@ mod tests {
131135
);
132136
}
133137

138+
#[tokio::test]
139+
async fn without_threshold_nothing_is_pruned() {
140+
let connection = main_db_connection().unwrap();
141+
insert_stake_pool(&connection, &[1, 2]).unwrap();
142+
let store = StakePoolStore::new(Arc::new(connection), None);
143+
144+
store.prune(Epoch(100)).await.unwrap();
145+
146+
let epoch1_stakes = store.get_stakes(Epoch(1)).await.unwrap();
147+
let epoch2_stakes = store.get_stakes(Epoch(2)).await.unwrap();
148+
149+
assert!(
150+
epoch1_stakes.is_some(),
151+
"Stakes at epoch 1 should have been pruned",
152+
);
153+
assert!(
154+
epoch2_stakes.is_some(),
155+
"Stakes at epoch 2 should still exist",
156+
);
157+
}
158+
134159
#[tokio::test]
135160
async fn retrieve_with_no_stakes_returns_none() {
136161
let connection = main_db_connection().unwrap();

0 commit comments

Comments
 (0)