Skip to content

Commit 8fdf36b

Browse files
committed
refactor: make upkeep pruning tasks apply themself the retention limit
Instead of making the upkeep service itself compute the threshold based on the limit. This design allow each task to use different limit if needed.
1 parent 4d66fb9 commit 8fdf36b

File tree

5 files changed

+86
-90
lines changed

5 files changed

+86
-90
lines changed

mithril-signer/src/database/repository/signed_beacon_repository.rs

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,16 @@ use crate::services::{EpochPruningTask, SignedBeaconStore};
1515
/// A [SignedBeaconStore] implementation using SQLite.
1616
pub struct SignedBeaconRepository {
1717
connection: Arc<SqliteConnection>,
18+
store_retention_limit: Option<u64>,
1819
}
1920

2021
impl SignedBeaconRepository {
2122
/// Create a new instance of the `SignedBeaconRepository`.
22-
pub fn new(connection: Arc<SqliteConnection>) -> Self {
23-
Self { connection }
23+
pub fn new(connection: Arc<SqliteConnection>, store_retention_limit: Option<u64>) -> Self {
24+
Self {
25+
connection,
26+
store_retention_limit,
27+
}
2428
}
2529

2630
/// Get the last signed beacon.
@@ -71,8 +75,14 @@ impl EpochPruningTask for SignedBeaconRepository {
7175
"Signed Beacon"
7276
}
7377

74-
async fn prune_below_epoch_threshold(&self, epoch_threshold: Epoch) -> StdResult<()> {
75-
self.prune_below_epoch(epoch_threshold)
78+
async fn prune(&self, current_epoch: Epoch) -> StdResult<()> {
79+
match self
80+
.store_retention_limit
81+
.map(|limit| current_epoch - limit)
82+
{
83+
Some(threshold) if *threshold > 0 => self.prune_below_epoch(threshold),
84+
_ => Ok(()),
85+
}
7686
}
7787
}
7888

@@ -102,7 +112,7 @@ mod tests {
102112
#[test]
103113
fn get_last_stored_signed_beacon() {
104114
let connection = Arc::new(main_db_connection().unwrap());
105-
let repository = SignedBeaconRepository::new(connection.clone());
115+
let repository = SignedBeaconRepository::new(connection.clone(), None);
106116

107117
let last_signed_beacon = repository.get_last().unwrap();
108118
assert_eq!(None, last_signed_beacon);
@@ -151,7 +161,7 @@ mod tests {
151161
#[tokio::test]
152162
async fn filter_out_nothing_if_nothing_was_previously_signed() {
153163
let connection = Arc::new(main_db_connection().unwrap());
154-
let repository = SignedBeaconRepository::new(connection.clone());
164+
let repository = SignedBeaconRepository::new(connection.clone(), None);
155165

156166
let to_filter = all_signed_entity_type_for(&TimePoint::dummy());
157167
let available_entities = repository
@@ -165,7 +175,7 @@ mod tests {
165175
#[tokio::test]
166176
async fn filter_out_nothing_if_previously_signed_entities_doesnt_match_passed_entities() {
167177
let connection = Arc::new(main_db_connection().unwrap());
168-
let repository = SignedBeaconRepository::new(connection.clone());
178+
let repository = SignedBeaconRepository::new(connection.clone(), None);
169179

170180
let time_point = TimePoint::dummy();
171181
insert_signed_beacons(
@@ -189,7 +199,7 @@ mod tests {
189199
#[tokio::test]
190200
async fn filter_out_everything_if_previously_signed_entities_match_all_passed_entities() {
191201
let connection = Arc::new(main_db_connection().unwrap());
192-
let repository = SignedBeaconRepository::new(connection.clone());
202+
let repository = SignedBeaconRepository::new(connection.clone(), None);
193203

194204
let to_filter = all_signed_entity_type_for(&TimePoint::dummy());
195205
insert_signed_beacons(
@@ -210,7 +220,7 @@ mod tests {
210220
#[tokio::test]
211221
async fn filter_out_partially_if_some_previously_signed_entities_match_passed_entities() {
212222
let connection = Arc::new(main_db_connection().unwrap());
213-
let repository = SignedBeaconRepository::new(connection.clone());
223+
let repository = SignedBeaconRepository::new(connection.clone(), None);
214224

215225
let time_point = TimePoint::dummy();
216226
let signed_beacons = [
@@ -253,7 +263,7 @@ mod tests {
253263
#[tokio::test]
254264
async fn mark_beacon_as_signed() {
255265
let connection = Arc::new(main_db_connection().unwrap());
256-
let repository = SignedBeaconRepository::new(connection.clone());
266+
let repository = SignedBeaconRepository::new(connection.clone(), None);
257267

258268
let beacon_to_sign = BeaconToSign {
259269
epoch: Epoch(13),
@@ -278,10 +288,50 @@ mod tests {
278288
assert_eq!(beacon_to_sign, signed_beacon);
279289
}
280290

281-
#[test]
282-
fn test_prune() {
291+
#[tokio::test]
292+
async fn test_dont_execute_pruning_tasks_if_no_retention_limit_set() {
293+
let connection = Arc::new(main_db_connection().unwrap());
294+
let repository = SignedBeaconRepository::new(connection.clone(), None);
295+
insert_signed_beacons(
296+
&connection,
297+
SignedBeaconRecord::fakes(&[(
298+
Epoch(8),
299+
vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
300+
)]),
301+
);
302+
303+
EpochPruningTask::prune(&repository, Epoch(1000))
304+
.await
305+
.unwrap();
306+
307+
let cursor = connection.fetch(GetSignedBeaconQuery::all()).unwrap();
308+
assert_eq!(1, cursor.count(),);
309+
}
310+
311+
#[tokio::test]
312+
async fn test_dont_execute_pruning_tasks_if_current_epoch_minus_retention_limit_is_0() {
283313
let connection = Arc::new(main_db_connection().unwrap());
284-
let repository = SignedBeaconRepository::new(connection.clone());
314+
let repository = SignedBeaconRepository::new(connection.clone(), Some(10));
315+
insert_signed_beacons(
316+
&connection,
317+
SignedBeaconRecord::fakes(&[(
318+
Epoch(8),
319+
vec![SignedEntityType::MithrilStakeDistribution(Epoch(8))],
320+
)]),
321+
);
322+
323+
EpochPruningTask::prune(&repository, Epoch(9))
324+
.await
325+
.unwrap();
326+
327+
let cursor = connection.fetch(GetSignedBeaconQuery::all()).unwrap();
328+
assert_eq!(1, cursor.count(),);
329+
}
330+
331+
#[tokio::test]
332+
async fn test_prune_task_substract_set_retention_limit_to_given_epoch() {
333+
let connection = Arc::new(main_db_connection().unwrap());
334+
let repository = SignedBeaconRepository::new(connection.clone(), Some(10));
285335
insert_signed_beacons(
286336
&connection,
287337
SignedBeaconRecord::fakes(&[
@@ -299,7 +349,9 @@ mod tests {
299349
]),
300350
);
301351

302-
repository.prune_below_epoch(Epoch(8)).unwrap();
352+
EpochPruningTask::prune(&repository, Epoch(18))
353+
.await
354+
.unwrap();
303355

304356
let signed_beacons: Vec<SignedBeaconRecord> = connection
305357
.fetch_collect(GetSignedBeaconQuery::all())

mithril-signer/src/dependency_injection/builder.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -347,14 +347,15 @@ impl<'a> DependenciesBuilder<'a> {
347347
slog_scope::logger(),
348348
Arc::new(preloader_activation),
349349
));
350-
let signed_beacon_repository =
351-
Arc::new(SignedBeaconRepository::new(sqlite_connection.clone()));
350+
let signed_beacon_repository = Arc::new(SignedBeaconRepository::new(
351+
sqlite_connection.clone(),
352+
self.config.store_retention_limit.map(|limit| limit as u64),
353+
));
352354
let upkeep_service = Arc::new(SignerUpkeepService::new(
353355
sqlite_connection.clone(),
354356
sqlite_connection_cardano_transaction_pool,
355357
signed_entity_type_lock.clone(),
356358
vec![signed_beacon_repository.clone()],
357-
self.config.store_retention_limit.map(|limit| limit as u64),
358359
slog_scope::logger(),
359360
));
360361
let certifier = Arc::new(SignerCertifierService::new(

mithril-signer/src/runtime/runner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,7 @@ mod tests {
642642
let upkeep_service = Arc::new(MockUpkeepService::new());
643643
let certifier = Arc::new(SignerCertifierService::new(
644644
ticker_service.clone(),
645-
Arc::new(SignedBeaconRepository::new(sqlite_connection.clone())),
645+
Arc::new(SignedBeaconRepository::new(sqlite_connection.clone(), None)),
646646
Arc::new(SignerSignedEntityConfigProvider::new(
647647
network,
648648
epoch_service.clone(),

mithril-signer/src/services/upkeep_service.rs

Lines changed: 14 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ pub trait EpochPruningTask: Send + Sync {
3333
/// Get the name of the data that will be pruned.
3434
fn pruned_data(&self) -> &'static str;
3535

36-
/// Prune the datasource below the given epoch threshold.
37-
async fn prune_below_epoch_threshold(&self, epoch_threshold: Epoch) -> StdResult<()>;
36+
/// Prune the datasource based on the given current epoch.
37+
async fn prune(&self, current_epoch: Epoch) -> StdResult<()>;
3838
}
3939

4040
/// Implementation of the upkeep service for the signer.
@@ -46,7 +46,6 @@ pub struct SignerUpkeepService {
4646
cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
4747
signed_entity_type_lock: Arc<SignedEntityTypeLock>,
4848
pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
49-
store_retention_limit: Option<u64>,
5049
logger: Logger,
5150
}
5251

@@ -57,37 +56,27 @@ impl SignerUpkeepService {
5756
cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
5857
signed_entity_type_lock: Arc<SignedEntityTypeLock>,
5958
pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
60-
store_retention_limit: Option<u64>,
6159
logger: Logger,
6260
) -> Self {
6361
Self {
6462
main_db_connection,
6563
cardano_tx_connection_pool,
6664
signed_entity_type_lock,
6765
pruning_tasks,
68-
store_retention_limit,
6966
logger,
7067
}
7168
}
7269

7370
async fn execute_pruning_tasks(&self, current_epoch: Epoch) -> StdResult<()> {
74-
match self
75-
.store_retention_limit
76-
.map(|limit| current_epoch - limit)
77-
{
78-
Some(threshold) if *threshold > 0 => {
79-
for task in &self.pruning_tasks {
80-
info!(
81-
self.logger, "UpkeepService::Pruning stale data";
82-
"pruned_data" => task.pruned_data(), "below_epoch_threshold" => ?threshold
83-
);
84-
85-
task.prune_below_epoch_threshold(threshold).await?;
86-
}
87-
Ok(())
88-
}
89-
_ => Ok(()),
71+
for task in &self.pruning_tasks {
72+
info!(
73+
self.logger, "UpkeepService::Pruning stale data";
74+
"pruned_data" => task.pruned_data(), "current_epoch" => ?current_epoch
75+
);
76+
task.prune(current_epoch).await?;
9077
}
78+
79+
Ok(())
9180
}
9281

9382
async fn upkeep_all_databases(&self) -> StdResult<()> {
@@ -198,7 +187,6 @@ mod tests {
198187
)),
199188
Arc::new(SignedEntityTypeLock::default()),
200189
vec![],
201-
None,
202190
TestLogger::file(&log_path),
203191
);
204192

@@ -241,7 +229,6 @@ mod tests {
241229
Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
242230
signed_entity_type_lock.clone(),
243231
vec![],
244-
None,
245232
TestLogger::file(&log_path),
246233
);
247234

@@ -265,15 +252,15 @@ mod tests {
265252
#[tokio::test]
266253
async fn test_execute_all_pruning_tasks() {
267254
let task1 = mock_epoch_pruning_task(|mock| {
268-
mock.expect_prune_below_epoch_threshold()
255+
mock.expect_prune()
269256
.once()
270-
.with(eq(Epoch(4)))
257+
.with(eq(Epoch(14)))
271258
.returning(|_| Ok(()));
272259
});
273260
let task2 = mock_epoch_pruning_task(|mock| {
274-
mock.expect_prune_below_epoch_threshold()
261+
mock.expect_prune()
275262
.once()
276-
.with(eq(Epoch(4)))
263+
.with(eq(Epoch(14)))
277264
.returning(|_| Ok(()));
278265
});
279266

@@ -282,52 +269,9 @@ mod tests {
282269
Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
283270
Arc::new(SignedEntityTypeLock::default()),
284271
vec![task1, task2],
285-
Some(10),
286272
TestLogger::stdout(),
287273
);
288274

289275
service.run(Epoch(14)).await.expect("Upkeep service failed");
290276
}
291-
292-
#[tokio::test]
293-
async fn test_dont_execute_pruning_tasks_if_current_epoch_minus_retention_limit_is_0() {
294-
let task = mock_epoch_pruning_task(|mock| {
295-
mock.expect_prune_below_epoch_threshold()
296-
.never()
297-
.returning(|_| Ok(()));
298-
});
299-
300-
let service = SignerUpkeepService::new(
301-
Arc::new(main_db_connection().unwrap()),
302-
Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
303-
Arc::new(SignedEntityTypeLock::default()),
304-
vec![task],
305-
Some(17),
306-
TestLogger::stdout(),
307-
);
308-
309-
service.run(Epoch(1)).await.expect("Upkeep service failed");
310-
service.run(Epoch(16)).await.expect("Upkeep service failed");
311-
service.run(Epoch(17)).await.expect("Upkeep service failed");
312-
}
313-
314-
#[tokio::test]
315-
async fn test_dont_execute_pruning_tasks_if_no_retention_limit_set() {
316-
let task = mock_epoch_pruning_task(|mock| {
317-
mock.expect_prune_below_epoch_threshold()
318-
.never()
319-
.returning(|_| Ok(()));
320-
});
321-
322-
let service = SignerUpkeepService::new(
323-
Arc::new(main_db_connection().unwrap()),
324-
Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
325-
Arc::new(SignedEntityTypeLock::default()),
326-
vec![task],
327-
None,
328-
TestLogger::stdout(),
329-
);
330-
331-
service.run(Epoch(29)).await.expect("Upkeep service failed");
332-
}
333277
}

mithril-signer/tests/test_extensions/state_machine_tester.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,11 +247,10 @@ impl StateMachineTester {
247247
sqlite_connection_cardano_transaction_pool,
248248
signed_entity_type_lock.clone(),
249249
vec![],
250-
None,
251250
slog_scope::logger(),
252251
));
253252
let signed_beacon_repository =
254-
Arc::new(SignedBeaconRepository::new(sqlite_connection.clone()));
253+
Arc::new(SignedBeaconRepository::new(sqlite_connection.clone(), None));
255254
let certifier = Arc::new(SignerCertifierService::new(
256255
ticker_service.clone(),
257256
signed_beacon_repository.clone(),

0 commit comments

Comments
 (0)