Skip to content

Commit d67e3fe

Browse files
committed
Add event_store to upkeep service to cleaning database
1 parent 363a7ef commit d67e3fe

File tree

3 files changed

+30
-3
lines changed

3 files changed

+30
-3
lines changed

mithril-aggregator/src/dependency_injection/builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,6 +1328,7 @@ impl DependenciesBuilder {
13281328
self.get_sqlite_connection().await?,
13291329
self.get_sqlite_connection_cardano_transaction_pool()
13301330
.await?,
1331+
self.get_event_store_sqlite_connection().await?,
13311332
self.get_signed_entity_lock().await?,
13321333
self.root_logger(),
13331334
));

mithril-aggregator/src/event_store/database/test_helper.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
1+
use std::path::Path;
2+
13
use mithril_common::StdResult;
24
use mithril_persistence::sqlite::{ConnectionBuilder, ConnectionOptions, SqliteConnection};
35

6+
/// File sqlite database without foreign key support with event store migrations applied and WAL activated
7+
pub fn event_store_db_file_connection(db_path: &Path) -> StdResult<SqliteConnection> {
8+
let builder = ConnectionBuilder::open_file(db_path)
9+
.with_options(&[ConnectionOptions::EnableWriteAheadLog]);
10+
build_event_store_db_connection(builder)
11+
}
12+
413
/// In-memory sqlite database without foreign key support with migrations applied
514
pub fn event_store_db_connection() -> StdResult<SqliteConnection> {
615
let builder = ConnectionBuilder::open_memory();

mithril-aggregator/src/services/upkeep.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub trait UpkeepService: Send + Sync {
3333
pub struct AggregatorUpkeepService {
3434
main_db_connection: Arc<SqliteConnection>,
3535
cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
36+
event_store_connection: Arc<SqliteConnection>,
3637
signed_entity_type_lock: Arc<SignedEntityTypeLock>,
3738
logger: Logger,
3839
}
@@ -42,12 +43,14 @@ impl AggregatorUpkeepService {
4243
pub fn new(
4344
main_db_connection: Arc<SqliteConnection>,
4445
cardano_tx_connection_pool: Arc<SqliteConnectionPool>,
46+
event_store_connection: Arc<SqliteConnection>,
4547
signed_entity_type_lock: Arc<SignedEntityTypeLock>,
4648
logger: Logger,
4749
) -> Self {
4850
Self {
4951
main_db_connection,
5052
cardano_tx_connection_pool,
53+
event_store_connection,
5154
signed_entity_type_lock,
5255
logger: logger.new_with_component_name::<Self>(),
5356
}
@@ -64,6 +67,7 @@ impl AggregatorUpkeepService {
6467

6568
let main_db_connection = self.main_db_connection.clone();
6669
let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone();
70+
let event_store_connection = self.event_store_connection.clone();
6771
let db_upkeep_logger = self.logger.clone();
6872

6973
// Run the database upkeep tasks in another thread to avoid blocking the tokio runtime
@@ -84,6 +88,12 @@ impl AggregatorUpkeepService {
8488
.with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
8589
.run()?;
8690

91+
info!(db_upkeep_logger, "Cleaning event database");
92+
SqliteCleaner::new(&event_store_connection)
93+
.with_logger(db_upkeep_logger.clone())
94+
.with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
95+
.run()?;
96+
8797
Ok(())
8898
});
8999

@@ -116,23 +126,28 @@ mod tests {
116126
cardano_tx_db_connection, cardano_tx_db_file_connection, main_db_connection,
117127
main_db_file_connection,
118128
};
129+
use crate::event_store::database::test_helper::{
130+
event_store_db_connection, event_store_db_file_connection,
131+
};
119132
use crate::test_tools::TestLogger;
120133

121134
use super::*;
122135

123136
#[tokio::test]
124137
async fn test_cleanup_database() {
125-
let (main_db_path, ctx_db_path, log_path) = {
138+
let (main_db_path, ctx_db_path, event_store_db_path, log_path) = {
126139
let db_dir = TempDir::create("aggregator_upkeep", "test_cleanup_database");
127140
(
128141
db_dir.join("main.db"),
129142
db_dir.join("cardano_tx.db"),
143+
db_dir.join("event_store.db"),
130144
db_dir.join("upkeep.log"),
131145
)
132146
};
133147

134148
let main_db_connection = main_db_file_connection(&main_db_path).unwrap();
135149
let cardano_tx_connection = cardano_tx_db_file_connection(&ctx_db_path).unwrap();
150+
let event_store_connection = event_store_db_file_connection(&event_store_db_path).unwrap();
136151

137152
// Separate block to force log flushing by dropping the service that owns the logger
138153
{
@@ -141,6 +156,7 @@ mod tests {
141156
Arc::new(SqliteConnectionPool::build_from_connection(
142157
cardano_tx_connection,
143158
)),
159+
Arc::new(event_store_connection),
144160
Arc::new(SignedEntityTypeLock::default()),
145161
TestLogger::file(&log_path),
146162
);
@@ -159,8 +175,8 @@ mod tests {
159175
assert_eq!(
160176
logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message())
161177
.count(),
162-
2,
163-
"Should have run twice since the two databases have a `WalCheckpointTruncate` cleanup"
178+
3,
179+
"Should have run three times since the three databases have a `WalCheckpointTruncate` cleanup"
164180
);
165181
}
166182

@@ -182,6 +198,7 @@ mod tests {
182198
let service = AggregatorUpkeepService::new(
183199
Arc::new(main_db_connection().unwrap()),
184200
Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()),
201+
Arc::new(event_store_db_connection().unwrap()),
185202
signed_entity_type_lock.clone(),
186203
TestLogger::file(&log_path),
187204
);

0 commit comments

Comments
 (0)