Skip to content

Commit 857708a

Browse files
committed
Introduce the AggregatorRuntime
1 parent 631d25d commit 857708a

File tree

4 files changed

+68
-61
lines changed

4 files changed

+68
-61
lines changed

mithril-aggregator/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod dependency;
22
mod entities;
33
mod http_server;
44
mod multi_signer;
5+
mod runtime;
56
mod snapshot_store;
67
mod snapshotter;
78

@@ -13,4 +14,5 @@ pub use crate::multi_signer::{
1314
};
1415
pub use crate::snapshot_store::SnapshotStoreHTTPClient;
1516
pub use dependency::DependencyManager;
17+
pub use runtime::AggregatorRuntime;
1618
pub use snapshotter::Snapshotter;

mithril-aggregator/src/main.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
use clap::Parser;
44

55
use mithril_aggregator::{
6-
key_decode_hex, Config, DependencyManager, MultiSigner, MultiSignerImpl, ProtocolParameters,
7-
ProtocolPartyId, ProtocolSignerVerificationKey, ProtocolStake, Server, SnapshotStoreHTTPClient,
8-
Snapshotter,
6+
key_decode_hex, AggregatorRuntime, Config, DependencyManager, MultiSigner, MultiSignerImpl,
7+
ProtocolParameters, ProtocolPartyId, ProtocolSignerVerificationKey, ProtocolStake, Server,
8+
SnapshotStoreHTTPClient,
99
};
1010
use mithril_common::fake_data;
1111
use slog::{Drain, Level, Logger};
@@ -98,12 +98,9 @@ async fn main() {
9898

9999
// Start snapshot uploader
100100
let handle = tokio::spawn(async move {
101-
let snapshotter = Snapshotter::new(
102-
args.snapshot_interval * 1000,
103-
args.db_directory.clone(),
104-
slog_scope::logger(),
105-
);
106-
snapshotter.run().await
101+
let runtime =
102+
AggregatorRuntime::new(args.snapshot_interval * 1000, args.db_directory.clone());
103+
runtime.run().await
107104
});
108105

109106
// Start REST server

mithril-aggregator/src/runtime.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use crate::Snapshotter;
2+
use mithril_common::immutable_digester::ImmutableDigester;
3+
use slog_scope::{error, info};
4+
use tokio::time::{sleep, Duration};
5+
6+
/// AggregatorRuntime
7+
pub struct AggregatorRuntime {
8+
/// Interval between each snapshot, in seconds
9+
interval: u32,
10+
11+
/// DB directory to snapshot
12+
db_directory: String,
13+
}
14+
15+
impl AggregatorRuntime {
16+
/// AggregatorRuntime factory
17+
pub fn new(interval: u32, db_directory: String) -> Self {
18+
Self {
19+
interval,
20+
db_directory,
21+
}
22+
}
23+
24+
/// Run snapshotter loop
25+
pub async fn run(&self) {
26+
info!("Starting Snapshotter");
27+
let snapshotter = Snapshotter::new(self.db_directory.clone());
28+
let digester = ImmutableDigester::new(self.db_directory.clone(), slog_scope::logger());
29+
30+
loop {
31+
info!("Snapshotting");
32+
33+
match digester.compute_digest() {
34+
Ok(digest_result) => {
35+
if let Err(e) = snapshotter.snapshot(digest_result.digest).await {
36+
error!("{:?}", e)
37+
}
38+
}
39+
Err(e) => {
40+
error!("{:?}", e)
41+
}
42+
};
43+
44+
info!("Sleeping for {}", self.interval);
45+
sleep(Duration::from_millis(self.interval.into())).await;
46+
}
47+
}
48+
}

mithril-aggregator/src/snapshotter.rs

Lines changed: 12 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use mithril_common::entities::Snapshot;
2-
use mithril_common::immutable_digester::ImmutableDigester;
32

43
use chrono::prelude::*;
54
use cloud_storage::bucket::Entity;
@@ -8,31 +7,24 @@ use cloud_storage::object_access_control::NewObjectAccessControl;
87
use cloud_storage::Client;
98
use flate2::write::GzEncoder;
109
use flate2::Compression;
11-
use slog::{error, info, Logger};
10+
use slog_scope::info;
1211
use std::env;
1312
use std::fs::File;
1413
use std::io;
1514
use std::io::{Seek, SeekFrom};
1615
use std::path::Path;
1716
use thiserror::Error;
18-
use tokio::time::{sleep, Duration};
1917
use tokio_util::codec::BytesCodec;
2018
use tokio_util::codec::FramedRead;
2119

2220
/// Snapshotter
2321
pub struct Snapshotter {
24-
/// Interval between each snapshot, in seconds
25-
interval: u32,
26-
2722
/// DB directory to snapshot
2823
db_directory: String,
29-
30-
/// The logger where the logs should be written
31-
logger: Logger,
3224
}
3325

3426
#[derive(Error, Debug)]
35-
enum SnapshotError {
27+
pub enum SnapshotError {
3628
#[error("Create archive error: ")]
3729
CreateArchiveError(#[from] io::Error),
3830

@@ -42,40 +34,13 @@ enum SnapshotError {
4234

4335
impl Snapshotter {
4436
/// Snapshotter factory
45-
pub fn new(interval: u32, db_directory: String, logger: Logger) -> Self {
46-
Self {
47-
interval,
48-
db_directory,
49-
logger,
50-
}
51-
}
52-
53-
/// Run snapshotter loop
54-
pub async fn run(&self) {
55-
info!(self.logger, "Starting Snapshotter");
56-
loop {
57-
info!(self.logger, "Snapshotting");
58-
59-
let digester = ImmutableDigester::new(self.db_directory.clone(), self.logger.clone());
60-
match digester.compute_digest() {
61-
Ok(digest_result) => {
62-
if let Err(e) = self.snapshot(digest_result.digest).await {
63-
error!(self.logger, "{:?}", e)
64-
}
65-
}
66-
Err(e) => {
67-
error!(self.logger, "{:?}", e)
68-
}
69-
};
70-
71-
info!(self.logger, "Sleeping for {}", self.interval);
72-
sleep(Duration::from_millis(self.interval.into())).await;
73-
}
37+
pub fn new(db_directory: String) -> Self {
38+
Self { db_directory }
7439
}
7540

76-
async fn snapshot(&self, immutable_digest: String) -> Result<(), SnapshotError> {
41+
pub async fn snapshot(&self, immutable_digest: String) -> Result<(), SnapshotError> {
7742
let archive_name = "testnet.tar.gz";
78-
info!(self.logger, "snapshot hash: {}", immutable_digest);
43+
info!("snapshot hash: {}", immutable_digest);
7944

8045
let size = self.create_archive(archive_name)?;
8146

@@ -93,11 +58,7 @@ impl Snapshotter {
9358
)],
9459
}];
9560

96-
info!(
97-
self.logger,
98-
"snapshot: {}",
99-
serde_json::to_string(&snapshots).unwrap()
100-
);
61+
info!("snapshot: {}", serde_json::to_string(&snapshots).unwrap());
10162
serde_json::to_writer(&File::create("snapshots.json").unwrap(), &snapshots).unwrap();
10263

10364
self.upload_file(archive_name).await?;
@@ -113,7 +74,6 @@ impl Snapshotter {
11374
let mut tar = tar::Builder::new(enc);
11475

11576
info!(
116-
self.logger,
11777
"compressing {} into {}",
11878
&self.db_directory,
11979
&path.to_str().unwrap()
@@ -142,7 +102,7 @@ impl Snapshotter {
142102
));
143103
};
144104

145-
info!(self.logger, "uploading {}", filename);
105+
info!("uploading {}", filename);
146106
let client = Client::default();
147107
let file = tokio::fs::File::open(filename).await.unwrap();
148108
let stream = FramedRead::new(file, BytesCodec::new());
@@ -158,7 +118,7 @@ impl Snapshotter {
158118
.await
159119
.map_err(|e| SnapshotError::UploadFileError(e.to_string()))?;
160120

161-
info!(self.logger, "uploaded {}", filename);
121+
info!("uploaded {}", filename);
162122

163123
// ensure the uploaded file as public read access
164124
// when a file is uploaded to gcloud storage its permissions are overwritten so
@@ -169,8 +129,8 @@ impl Snapshotter {
169129
};
170130

171131
info!(
172-
self.logger,
173-
"updating acl for {}: {:?}", filename, new_bucket_access_control
132+
"updating acl for {}: {:?}",
133+
filename, new_bucket_access_control
174134
);
175135

176136
client
@@ -179,7 +139,7 @@ impl Snapshotter {
179139
.await
180140
.map_err(|e| SnapshotError::UploadFileError(e.to_string()))?;
181141

182-
info!(self.logger, "updated acl for {} ", filename);
142+
info!("updated acl for {} ", filename);
183143

184144
Ok(())
185145
}

0 commit comments

Comments
 (0)