Skip to content

Commit e50d080

Browse files
committed
Split snapshoter & digester
1 parent 6886e90 commit e50d080

File tree

12 files changed

+401
-971
lines changed

12 files changed

+401
-971
lines changed

demo/protocol-demo/Cargo.lock

Lines changed: 85 additions & 222 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.lock

Lines changed: 17 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@ slog-term = "2.9.0"
1717
clap = { version = "3.1.6", features = ["derive"] }
1818
clap-verbosity-flag = "1.0.0"
1919
chrono = "0.4"
20+
cloud-storage = "0.11.0"
21+
flate2 = "1.0.23"
2022
warp = "0.3"
2123
tokio = { version = "1.17.0", features = ["full"] }
2224
tokio-util = { version = "0.7.1", features = ["codec"] }
2325
serde = { version = "1.0", features = ["derive"] }
2426
serde_json = "1.0"
2527
serde_yaml = "0.8"
28+
tar = "0.4.38"
29+
thiserror = "1.0.31"
2630
async-trait = "0.1.52"
2731
openapiv3 = "1.0.1"
2832
hex = "0.4.3"

mithril-aggregator/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod entities;
33
mod http_server;
44
mod multi_signer;
55
mod snapshot_store;
6+
mod snapshotter;
67

78
pub use crate::entities::Config;
89
pub use crate::http_server::Server;
@@ -12,3 +13,4 @@ pub use crate::multi_signer::{
1213
};
1314
pub use crate::snapshot_store::SnapshotStoreHTTPClient;
1415
pub use dependency::DependencyManager;
16+
pub use snapshotter::Snapshotter;

mithril-aggregator/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@ use clap::Parser;
55
use mithril_aggregator::{
66
key_decode_hex, Config, DependencyManager, MultiSigner, MultiSignerImpl, ProtocolParameters,
77
ProtocolPartyId, ProtocolSignerVerificationKey, ProtocolStake, Server, SnapshotStoreHTTPClient,
8+
Snapshotter,
89
};
910
use mithril_common::fake_data;
10-
use mithril_common::snapshotter::Snapshotter;
1111
use slog::{Drain, Level, Logger};
1212
use slog_scope::debug;
1313
use std::env;
1414
use std::sync::Arc;
1515
use tokio::sync::RwLock;
16+
1617
/// Node args
1718
#[derive(Parser, Debug, Clone)]
1819
pub struct Args {
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
use mithril_common::entities::Snapshot;
2+
use mithril_common::immutable_digester::ImmutableDigester;
3+
4+
use chrono::prelude::*;
5+
use cloud_storage::bucket::Entity;
6+
use cloud_storage::bucket_access_control::Role;
7+
use cloud_storage::object_access_control::NewObjectAccessControl;
8+
use cloud_storage::Client;
9+
use flate2::write::GzEncoder;
10+
use flate2::Compression;
11+
use slog::{error, info, Logger};
12+
use std::env;
13+
use std::fs::File;
14+
use std::io;
15+
use std::io::{Seek, SeekFrom};
16+
use std::path::Path;
17+
use thiserror::Error;
18+
use tokio::time::{sleep, Duration};
19+
use tokio_util::codec::BytesCodec;
20+
use tokio_util::codec::FramedRead;
21+
22+
/// Snapshotter
23+
pub struct Snapshotter {
24+
/// Interval between each snapshot, in seconds
25+
interval: u32,
26+
27+
/// DB directory to snapshot
28+
db_directory: String,
29+
30+
/// The logger where the logs should be written
31+
logger: Logger,
32+
}
33+
34+
#[derive(Error, Debug)]
35+
enum SnapshotError {
36+
#[error("Create archive error: ")]
37+
CreateArchiveError(#[from] io::Error),
38+
39+
#[error("Upload file error: `{0}`")]
40+
UploadFileError(String),
41+
}
42+
43+
impl Snapshotter {
44+
/// Snapshotter factory
45+
pub fn new(interval: u32, db_directory: String, logger: Logger) -> Self {
46+
Self {
47+
interval,
48+
db_directory,
49+
logger,
50+
}
51+
}
52+
53+
/// Run snapshotter loop
54+
pub async fn run(&self) {
55+
info!(self.logger, "Starting Snapshotter");
56+
loop {
57+
info!(self.logger, "Snapshotting");
58+
59+
let digester = ImmutableDigester::new(self.db_directory.clone(), self.logger.clone());
60+
match digester.compute_digest() {
61+
Ok(digest_result) => {
62+
if let Err(e) = self.snapshot(digest_result.digest).await {
63+
error!(self.logger, "{:?}", e)
64+
}
65+
}
66+
Err(e) => {
67+
error!(self.logger, "{:?}", e)
68+
}
69+
};
70+
71+
info!(self.logger, "Sleeping for {}", self.interval);
72+
sleep(Duration::from_millis(self.interval.into())).await;
73+
}
74+
}
75+
76+
async fn snapshot(&self, immutable_digest: String) -> Result<(), SnapshotError> {
77+
let archive_name = "testnet.tar.gz";
78+
info!(self.logger, "snapshot hash: {}", immutable_digest);
79+
80+
let size = self.create_archive(archive_name)?;
81+
82+
let timestamp: DateTime<Utc> = Utc::now();
83+
let created_at = format!("{:?}", timestamp);
84+
85+
let snapshots = vec![Snapshot {
86+
digest: immutable_digest,
87+
certificate_hash: "".to_string(),
88+
size,
89+
created_at,
90+
locations: vec![format!(
91+
"https://storage.googleapis.com/cardano-testnet/{}",
92+
archive_name
93+
)],
94+
}];
95+
96+
info!(
97+
self.logger,
98+
"snapshot: {}",
99+
serde_json::to_string(&snapshots).unwrap()
100+
);
101+
serde_json::to_writer(&File::create("snapshots.json").unwrap(), &snapshots).unwrap();
102+
103+
self.upload_file(archive_name).await?;
104+
self.upload_file("snapshots.json").await?;
105+
106+
Ok(())
107+
}
108+
109+
fn create_archive(&self, archive_name: &str) -> Result<u64, SnapshotError> {
110+
let path = Path::new(".").join(archive_name);
111+
let tar_gz = File::create(&path).map_err(SnapshotError::CreateArchiveError)?;
112+
let enc = GzEncoder::new(tar_gz, Compression::default());
113+
let mut tar = tar::Builder::new(enc);
114+
115+
info!(
116+
self.logger,
117+
"compressing {} into {}",
118+
&self.db_directory,
119+
&path.to_str().unwrap()
120+
);
121+
122+
tar.append_dir_all(".", &self.db_directory)
123+
.map_err(SnapshotError::CreateArchiveError)?;
124+
125+
// complete gz encoding and retrieve underlying file to compute size accurately
126+
let mut gz = tar
127+
.into_inner()
128+
.map_err(SnapshotError::CreateArchiveError)?;
129+
gz.try_finish().map_err(SnapshotError::CreateArchiveError)?;
130+
let mut f = gz.finish().map_err(SnapshotError::CreateArchiveError)?;
131+
let size: u64 = f
132+
.seek(SeekFrom::End(0))
133+
.map_err(SnapshotError::CreateArchiveError)?;
134+
135+
Ok(size)
136+
}
137+
138+
async fn upload_file(&self, filename: &str) -> Result<(), SnapshotError> {
139+
if env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON").is_err() {
140+
return Err(SnapshotError::UploadFileError(
141+
"Missing GOOGLE_APPLICATION_CREDENTIALS_JSON environment variable".to_string(),
142+
));
143+
};
144+
145+
info!(self.logger, "uploading {}", filename);
146+
let client = Client::default();
147+
let file = tokio::fs::File::open(filename).await.unwrap();
148+
let stream = FramedRead::new(file, BytesCodec::new());
149+
client
150+
.object()
151+
.create_streamed(
152+
"cardano-testnet",
153+
stream,
154+
None,
155+
filename,
156+
"application/octet-stream",
157+
)
158+
.await
159+
.map_err(|e| SnapshotError::UploadFileError(e.to_string()))?;
160+
161+
info!(self.logger, "uploaded {}", filename);
162+
163+
// ensure the uploaded file as public read access
164+
// when a file is uploaded to gcloud storage its permissions are overwritten so
165+
// we need to put them back
166+
let new_bucket_access_control = NewObjectAccessControl {
167+
entity: Entity::AllUsers,
168+
role: Role::Reader,
169+
};
170+
171+
info!(
172+
self.logger,
173+
"updating acl for {}: {:?}", filename, new_bucket_access_control
174+
);
175+
176+
client
177+
.object_access_control()
178+
.create("cardano-testnet", filename, &new_bucket_access_control)
179+
.await
180+
.map_err(|e| SnapshotError::UploadFileError(e.to_string()))?;
181+
182+
info!(self.logger, "updated acl for {} ", filename);
183+
184+
Ok(())
185+
}
186+
}
187+
188+
#[cfg(test)]
189+
mod tests {}

0 commit comments

Comments
 (0)