Skip to content

Commit b573a6d

Browse files
committed
Implement pruning behavior in the aggregator
1 parent 2b3eb1f commit b573a6d

File tree

4 files changed

+108
-23
lines changed

4 files changed

+108
-23
lines changed

mithril-aggregator/src/dependency_injection/builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,6 +1343,7 @@ impl DependenciesBuilder {
13431343
.await?,
13441344
self.get_event_store_sqlite_connection().await?,
13451345
self.get_signed_entity_lock().await?,
1346+
vec![], // TODO XXX Add pruning tasks here
13461347
self.root_logger(),
13471348
));
13481349

mithril-aggregator/src/runtime/runner.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ pub trait AggregatorRunnerTrait: Sync + Send {
110110
async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>;
111111

112112
/// Perform the upkeep tasks.
113-
async fn upkeep(&self) -> StdResult<()>;
113+
async fn upkeep(&self, epoch: Epoch) -> StdResult<()>;
114114

115115
/// Precompute what doesn't change for the actual epoch
116116
async fn precompute_epoch_data(&self) -> StdResult<()>;
@@ -522,9 +522,9 @@ impl AggregatorRunnerTrait for AggregatorRunner {
522522
Ok(())
523523
}
524524

525-
async fn upkeep(&self) -> StdResult<()> {
525+
async fn upkeep(&self, epoch: Epoch) -> StdResult<()> {
526526
debug!(self.logger, ">> upkeep");
527-
self.dependencies.upkeep_service.run().await
527+
self.dependencies.upkeep_service.run(epoch).await
528528
}
529529

530530
async fn create_open_message(
@@ -1010,16 +1010,20 @@ pub mod tests {
10101010
}
10111011

10121012
#[tokio::test]
1013-
async fn test_upkeep() {
1013+
async fn test_upkeep_calls_run_on_upkeep_service() {
10141014
let mut upkeep_service = MockUpkeepService::new();
1015-
upkeep_service.expect_run().returning(|| Ok(())).times(1);
1015+
upkeep_service
1016+
.expect_run()
1017+
.with(eq(Epoch(5)))
1018+
.returning(|_| Ok(()))
1019+
.times(1);
10161020

10171021
let mut deps = initialize_dependencies().await;
10181022
deps.upkeep_service = Arc::new(upkeep_service);
10191023

10201024
let runner = AggregatorRunner::new(Arc::new(deps));
10211025

1022-
runner.upkeep().await.unwrap();
1026+
runner.upkeep(Epoch(5)).await.unwrap();
10231027
}
10241028

10251029
#[tokio::test]

mithril-aggregator/src/runtime/state_machine.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ impl AggregatorRuntime {
245245
trace!(self.logger, "Trying transition from IDLE to READY state");
246246

247247
if maybe_current_time_point.is_none()
248-
|| maybe_current_time_point.unwrap().epoch < new_time_point.epoch
248+
|| maybe_current_time_point.as_ref().unwrap().epoch < new_time_point.epoch
249249
{
250250
self.runner.close_signer_registration_round().await?;
251251
self.runner
@@ -256,7 +256,7 @@ impl AggregatorRuntime {
256256
.update_stake_distribution(&new_time_point)
257257
.await?;
258258
self.runner.inform_new_epoch(new_time_point.epoch).await?;
259-
self.runner.upkeep().await?;
259+
self.runner.upkeep(new_time_point.epoch).await?;
260260
self.runner
261261
.open_signer_registration_round(&new_time_point)
262262
.await?;
@@ -450,7 +450,11 @@ mod tests {
450450
.expect_precompute_epoch_data()
451451
.once()
452452
.returning(|| Ok(()));
453-
runner.expect_upkeep().once().returning(|| Ok(()));
453+
runner
454+
.expect_upkeep()
455+
.with(predicate::eq(TimePoint::dummy().epoch))
456+
.once()
457+
.returning(|_| Ok(()));
454458
runner
455459
.expect_increment_runtime_cycle_total_since_startup_counter()
456460
.once()
@@ -514,7 +518,11 @@ mod tests {
514518
.expect_precompute_epoch_data()
515519
.once()
516520
.returning(|| Ok(()));
517-
runner.expect_upkeep().once().returning(|| Ok(()));
521+
runner
522+
.expect_upkeep()
523+
.with(predicate::eq(TimePoint::dummy().epoch))
524+
.once()
525+
.returning(|_| Ok(()));
518526
runner
519527
.expect_increment_runtime_cycle_total_since_startup_counter()
520528
.once()

mithril-aggregator/src/services/upkeep.rs

Lines changed: 85 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,32 @@ use std::sync::Arc;
99

1010
use anyhow::Context;
1111
use async_trait::async_trait;
12-
use slog::{info, Logger};
13-
12+
use mithril_common::entities::Epoch;
1413
use mithril_common::logging::LoggerExtensions;
1514
use mithril_common::signed_entity_type_lock::SignedEntityTypeLock;
1615
use mithril_common::StdResult;
1716
use mithril_persistence::sqlite::{
1817
SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool,
1918
};
19+
use slog::{info, Logger};
2020

2121
/// Define the service responsible for the upkeep of the application.
2222
#[cfg_attr(test, mockall::automock)]
2323
#[async_trait]
2424
pub trait UpkeepService: Send + Sync {
2525
/// Run the upkeep service.
26-
async fn run(&self) -> StdResult<()>;
26+
async fn run(&self, epoch: Epoch) -> StdResult<()>;
27+
}
28+
29+
/// Define the task responsible for pruning a datasource below a certain epoch threshold.
30+
#[cfg_attr(test, mockall::automock)]
31+
#[async_trait]
32+
pub trait EpochPruningTask: Send + Sync {
33+
/// Get the name of the data that will be pruned.
34+
fn pruned_data(&self) -> &'static str;
35+
36+
/// Prune the datasource based on the given current epoch.
37+
async fn prune(&self, current_epoch: Epoch) -> StdResult<()>;
2738
}
2839

2940
/// Implementation of the upkeep service for the aggregator.
@@ -35,6 +46,7 @@ pub struct AggregatorUpkeepService {
3546
cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
3647
event_store_connection: Arc<SqliteConnection>,
3748
signed_entity_type_lock: Arc<SignedEntityTypeLock>,
49+
pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
3850
logger: Logger,
3951
}
4052

@@ -45,17 +57,31 @@ impl AggregatorUpkeepService {
4557
cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
4658
event_store_connection: Arc<SqliteConnection>,
4759
signed_entity_type_lock: Arc<SignedEntityTypeLock>,
60+
pruning_tasks: Vec<Arc<dyn EpochPruningTask>>,
4861
logger: Logger,
4962
) -> Self {
5063
Self {
5164
main_db_connection,
5265
cardano_tx_connection_pool,
5366
event_store_connection,
5467
signed_entity_type_lock,
68+
pruning_tasks,
5569
logger: logger.new_with_component_name::<Self>(),
5670
}
5771
}
5872

73+
async fn execute_pruning_tasks(&self, current_epoch: Epoch) -> StdResult<()> {
74+
for task in &self.pruning_tasks {
75+
info!(
76+
self.logger, "Pruning stale data";
77+
"pruned_data" => task.pruned_data(), "current_epoch" => ?current_epoch
78+
);
79+
task.prune(current_epoch).await?;
80+
}
81+
82+
Ok(())
83+
}
84+
5985
async fn upkeep_all_databases(&self) -> StdResult<()> {
6086
if self.signed_entity_type_lock.has_locked_entities().await {
6187
info!(
@@ -105,9 +131,13 @@ impl AggregatorUpkeepService {
105131

106132
#[async_trait]
107133
impl UpkeepService for AggregatorUpkeepService {
108-
async fn run(&self) -> StdResult<()> {
134+
async fn run(&self, current_epoch: Epoch) -> StdResult<()> {
109135
info!(self.logger, "Start upkeep of the application");
110136

137+
self.execute_pruning_tasks(current_epoch)
138+
.await
139+
.with_context(|| "Pruning tasks failed")?;
140+
111141
self.upkeep_all_databases()
112142
.await
113143
.with_context(|| "Database upkeep failed")?;
@@ -121,6 +151,7 @@ impl UpkeepService for AggregatorUpkeepService {
121151
mod tests {
122152
use mithril_common::entities::SignedEntityTypeDiscriminants;
123153
use mithril_common::test_utils::TempDir;
154+
use mockall::predicate::eq;
124155

125156
use crate::database::test_helper::{
126157
cardano_tx_db_connection, cardano_tx_db_file_connection, main_db_connection,
@@ -133,6 +164,26 @@ mod tests {
133164

134165
use super::*;
135166

167+
fn mock_epoch_pruning_task(
168+
mock_config: impl FnOnce(&mut MockEpochPruningTask),
169+
) -> Arc<dyn EpochPruningTask> {
170+
let mut task_mock = MockEpochPruningTask::new();
171+
task_mock.expect_pruned_data().return_const("mock_data");
172+
mock_config(&mut task_mock);
173+
Arc::new(task_mock)
174+
}
175+
176+
fn default_upkeep_service() -> AggregatorUpkeepService {
177+
AggregatorUpkeepService::new(
178+
Arc::new(main_db_connection().unwrap()),
179+
Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
180+
Arc::new(event_store_db_connection().unwrap()),
181+
Arc::new(SignedEntityTypeLock::default()),
182+
vec![],
183+
TestLogger::stdout(),
184+
)
185+
}
186+
136187
#[tokio::test]
137188
async fn test_cleanup_database() {
138189
let (main_db_path, ctx_db_path, event_store_db_path, log_path) = {
@@ -158,10 +209,11 @@ mod tests {
158209
)),
159210
Arc::new(event_store_connection),
160211
Arc::new(SignedEntityTypeLock::default()),
212+
vec![],
161213
TestLogger::file(&log_path),
162214
);
163215

164-
service.run().await.expect("Upkeep service failed");
216+
service.run(Epoch(5)).await.expect("Upkeep service failed");
165217
}
166218

167219
let logs = std::fs::read_to_string(&log_path).unwrap();
@@ -195,14 +247,12 @@ mod tests {
195247

196248
// Separate block to force log flushing by dropping the service that owns the logger
197249
{
198-
let service = AggregatorUpkeepService::new(
199-
Arc::new(main_db_connection().unwrap()),
200-
Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
201-
Arc::new(event_store_db_connection().unwrap()),
202-
signed_entity_type_lock.clone(),
203-
TestLogger::file(&log_path),
204-
);
205-
service.run().await.expect("Upkeep service failed");
250+
let service = AggregatorUpkeepService {
251+
signed_entity_type_lock: signed_entity_type_lock.clone(),
252+
logger: TestLogger::file(&log_path),
253+
..default_upkeep_service()
254+
};
255+
service.run(Epoch(5)).await.expect("Upkeep service failed");
206256
}
207257

208258
let logs = std::fs::read_to_string(&log_path).unwrap();
@@ -218,4 +268,26 @@ mod tests {
218268
0,
219269
);
220270
}
271+
#[tokio::test]
272+
async fn test_execute_all_pruning_tasks() {
273+
let task1 = mock_epoch_pruning_task(|mock| {
274+
mock.expect_prune()
275+
.once()
276+
.with(eq(Epoch(14)))
277+
.returning(|_| Ok(()));
278+
});
279+
let task2 = mock_epoch_pruning_task(|mock| {
280+
mock.expect_prune()
281+
.once()
282+
.with(eq(Epoch(14)))
283+
.returning(|_| Ok(()));
284+
});
285+
286+
let service = AggregatorUpkeepService {
287+
pruning_tasks: vec![task1, task2],
288+
..default_upkeep_service()
289+
};
290+
291+
service.run(Epoch(14)).await.expect("Upkeep service failed");
292+
}
221293
}

0 commit comments

Comments
 (0)