From 361d2ace411d2589ff7fa4d78d2bdce718417c39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Wed, 8 Jun 2022 14:52:18 +0200 Subject: [PATCH 01/18] compare beacons --- mithril-aggregator/src/lib.rs | 2 +- mithril-aggregator/src/main.rs | 23 ++- mithril-aggregator/src/runtime.rs | 235 ++++++++++++++++++++++++------ 3 files changed, 199 insertions(+), 61 deletions(-) diff --git a/mithril-aggregator/src/lib.rs b/mithril-aggregator/src/lib.rs index 5ea4f5debc2..290b9a20d3b 100644 --- a/mithril-aggregator/src/lib.rs +++ b/mithril-aggregator/src/lib.rs @@ -16,7 +16,7 @@ pub use crate::multi_signer::{MultiSigner, MultiSignerImpl, ProtocolError}; pub use crate::snapshot_stores::{RemoteSnapshotStore, SnapshotStore}; pub use beacon_store::{BeaconStore, BeaconStoreError, MemoryBeaconStore}; pub use dependency::DependencyManager; -pub use runtime::AggregatorRuntime; +pub use runtime::{AggregatorConfig, AggregatorRunner, AggregatorRuntime}; pub use snapshot_uploaders::{LocalSnapshotUploader, RemoteSnapshotUploader}; pub use snapshotter::{SnapshotError, Snapshotter}; pub use store::{ diff --git a/mithril-aggregator/src/main.rs b/mithril-aggregator/src/main.rs index 760e765a3a0..282955effdc 100644 --- a/mithril-aggregator/src/main.rs +++ b/mithril-aggregator/src/main.rs @@ -4,8 +4,8 @@ use clap::Parser; use config::{Map, Source, Value, ValueKind}; use mithril_aggregator::{ - AggregatorRuntime, BeaconStore, CertificatePendingStore, CertificateStore, Config, - DependencyManager, MemoryBeaconStore, MultiSigner, MultiSignerImpl, Server, + AggregatorConfig, AggregatorRuntime, BeaconStore, CertificatePendingStore, CertificateStore, + Config, DependencyManager, MemoryBeaconStore, MultiSigner, MultiSignerImpl, Server, VerificationKeyStore, }; use mithril_common::crypto_helper::ProtocolStakeDistribution; @@ -165,19 +165,18 @@ async fn main() -> Result<(), Box> { // Start snapshot uploader let snapshot_directory = config.snapshot_directory.clone(); + let runtime_dependencies = dependency_manager.clone(); let handle = tokio::spawn(async move { - let runtime = AggregatorRuntime::new( + let config = AggregatorConfig::new( args.runtime_interval * 1000, - config.network.clone(), - config.db_directory.clone(), - snapshot_directory, - beacon_store.clone(), - multi_signer.clone(), - snapshot_store.clone(), - snapshot_uploader, - certificate_pending_store.clone(), - certificate_store.clone(), + &config.network.clone(), + &config.db_directory.clone(), + &snapshot_directory, + runtime_dependencies, ); + let runtime = AggregatorRuntime::new(config, None, Arc::new(AggregatorRunner {})) + .await + .unwrap(); runtime.run().await }); diff --git a/mithril-aggregator/src/runtime.rs b/mithril-aggregator/src/runtime.rs index 22b0ef53be6..28f41af6774 100644 --- a/mithril-aggregator/src/runtime.rs +++ b/mithril-aggregator/src/runtime.rs @@ -1,25 +1,33 @@ +#![allow(dead_code, unused_imports)] use super::dependency::{BeaconStoreWrapper, MultiSignerWrapper, SnapshotStoreWrapper}; -use super::{BeaconStoreError, ProtocolError, SnapshotError, Snapshotter}; +use super::{BeaconStore, BeaconStoreError, ProtocolError, SnapshotError, Snapshotter}; use mithril_common::crypto_helper::Bytes; use mithril_common::digesters::{Digester, DigesterError, ImmutableDigester}; -use mithril_common::entities::{Beacon, Certificate}; +use mithril_common::entities::{Beacon, Certificate, CertificatePending}; use mithril_common::fake_data; use crate::dependency::{CertificatePendingStoreWrapper, CertificateStoreWrapper}; use crate::snapshot_stores::SnapshotStoreError; use crate::snapshot_uploaders::{SnapshotLocation, SnapshotUploader}; +use crate::DependencyManager; + +use async_trait::async_trait; use chrono::{DateTime, Utc}; use hex::ToHex; use mithril_common::entities::Snapshot; -use slog_scope::{debug, error, info, warn}; +use slog_scope::{debug, error, info, trace, warn}; use std::fs::File; use std::io; use std::io::{Seek, SeekFrom}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use thiserror::Error; use tokio::time::{sleep, Duration}; +#[cfg(test)] +use mockall::automock; + #[derive(Error, Debug)] pub enum RuntimeError { #[error("multi signer error")] @@ -50,70 +58,169 @@ pub enum RuntimeError { General(String), } -/// AggregatorRuntime -pub struct AggregatorRuntime { +pub struct IdleState { + current_beacon: Option, +} + +pub struct SigningState { + current_beacon: Beacon, + certificate_pending: CertificatePending, +} + +pub enum AggregatorState { + Idle(IdleState), + Signing(SigningState), +} + +pub struct AggregatorConfig { /// Interval between each snapshot, in seconds - interval: u32, + pub interval: u32, /// Cardano network - network: String, + pub network: String, /// DB directory to snapshot - db_directory: PathBuf, + pub db_directory: PathBuf, /// Directory to store snapshot - snapshot_directory: PathBuf, + pub snapshot_directory: PathBuf, - /// Beacon store - beacon_store: BeaconStoreWrapper, + pub dependencies: Arc, +} - /// Multi signer - multi_signer: MultiSignerWrapper, +impl AggregatorConfig { + pub fn new( + interval: u32, + network: &str, + db_directory: &Path, + snapshot_directory: &Path, + dependencies: Arc, + ) -> Self { + Self { + interval, + network: network.to_string(), + db_directory: db_directory.to_path_buf(), + snapshot_directory: snapshot_directory.to_path_buf(), + dependencies, + } + } +} - /// Snapshot store - snapshot_store: SnapshotStoreWrapper, +pub trait AggregatorRunnerTrait: Sync + Send { + fn is_new_beacon(&self) -> bool; +} - /// Snapshot uploader - snapshot_uploader: Box, +pub struct AggregatorRunner {} - /// Pending certificate store - #[allow(dead_code)] - certificate_pending_store: CertificatePendingStoreWrapper, +impl AggregatorRunner { + pub fn new() -> Self { + Self {} + } +} - /// Certificate store - certificate_store: CertificateStoreWrapper, +#[cfg_attr(test, automock)] +#[async_trait] +impl AggregatorRunnerTrait for AggregatorRunner { + fn is_new_beacon(&self) -> bool { + todo!() + } +} + +/// AggregatorRuntime +pub struct AggregatorRuntime { + /// the internal state of the automate + state: AggregatorState, + + /// configuration handler, also owns the dependencies + config: AggregatorConfig, + + /// specific runner for this state machine + runner: Arc, } impl AggregatorRuntime { - /// AggregatorRuntime factory - // TODO: Fix this by implementing an Aggregator Config that implements the From trait for a general Config - #[allow(clippy::too_many_arguments)] - pub fn new( - interval: u32, - network: String, - db_directory: PathBuf, - snapshot_directory: PathBuf, - beacon_store: BeaconStoreWrapper, - multi_signer: MultiSignerWrapper, - snapshot_store: SnapshotStoreWrapper, - snapshot_uploader: Box, - certificate_pending_store: CertificatePendingStoreWrapper, - certificate_store: CertificateStoreWrapper, - ) -> Self { - Self { - interval, - network, - db_directory, - snapshot_directory, - beacon_store, - multi_signer, - snapshot_store, - snapshot_uploader, - certificate_pending_store, - certificate_store, + pub async fn new( + config: AggregatorConfig, + init_state: Option, + runner: Arc, + ) -> Result { + info!("initializing runtime"); + + let state = if init_state.is_none() { + trace!("no initial state given"); + if config.dependencies.beacon_store.is_none() { + trace!("idle state, no current beacon"); + AggregatorState::Idle(IdleState { + current_beacon: None, + }) + } else { + let store = config.dependencies.beacon_store.as_ref().unwrap(); + let current_beacon = store + .read() + .await + .get_current_beacon() + .await + .map_err(|e| RuntimeError::General(e.to_string()))?; + trace!("idle state, got current beacon from store"); + + AggregatorState::Idle(IdleState { current_beacon }) + } + } else { + trace!("got initial state from caller"); + init_state.unwrap() + }; + + Ok::(Self { + config, + state, + runner, + }) + } + + pub async fn run(&self) { + info!("Starting runtime"); + loop { + if let Err(e) = self.cycle().await { + error!("{:?}", e) + } + + info!("Sleeping for {}", self.config.interval); + sleep(Duration::from_millis(self.config.interval.into())).await; } } + async fn cycle(&self) -> Result<(), RuntimeError> { + Ok(()) + } +} +/// AggregatorRuntime factory +// TODO: Fix this by implementing an Aggregator Config that implements the From trait for a general Config +/* + pub fn new( + interval: u32, + network: String, + db_directory: PathBuf, + snapshot_directory: PathBuf, + beacon_store: BeaconStoreWrapper, + multi_signer: MultiSignerWrapper, + snapshot_store: SnapshotStoreWrapper, + snapshot_uploader: Box, + certificate_pending_store: CertificatePendingStoreWrapper, + certificate_store: CertificateStoreWrapper, + ) -> Self { + Self { + interval, + network, + db_directory, + snapshot_directory, + beacon_store, + multi_signer, + snapshot_store, + snapshot_uploader, + certificate_pending_store, + certificate_store, + } + } /// Run snapshotter loop pub async fn run(&self) { info!("Starting runtime"); @@ -259,6 +366,7 @@ impl AggregatorRuntime { } } } + */ fn build_new_snapshot( digest: String, @@ -280,3 +388,34 @@ fn build_new_snapshot( vec![uploaded_snapshot_location], )) } + +#[cfg(test)] +mod tests { + use super::super::Config; + use super::*; + + async fn init_runtime( + init_state: Option, + runner: MockAggregatorRunner, + ) -> AggregatorRuntime { + let config: Config = config::Config::builder() + .build() + .map_err(|e| format!("configuration build error: {}", e)) + .unwrap() + .try_deserialize() + .map_err(|e| format!("configuration deserialize error: {}", e)) + .unwrap(); + let dependencies = Arc::new(DependencyManager::new(config)); + let config = AggregatorConfig::new( + 100, + "test", + Path::new("whatever"), + Path::new("whatever"), + dependencies, + ); + + AggregatorRuntime::new(config, init_state, Arc::new(runner)) + .await + .unwrap() + } +} From 8f5553c668820e6d649909a5b65c0b73a6d4e12d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Wed, 8 Jun 2022 18:05:10 +0200 Subject: [PATCH 02/18] [wip] before implementation --- mithril-aggregator/src/main.rs | 2 +- mithril-aggregator/src/runtime.rs | 141 +++++++++++++++++++++++++++--- mithril-common/src/entities.rs | 3 +- 3 files changed, 131 insertions(+), 15 deletions(-) diff --git a/mithril-aggregator/src/main.rs b/mithril-aggregator/src/main.rs index 282955effdc..002ebd2c0e1 100644 --- a/mithril-aggregator/src/main.rs +++ b/mithril-aggregator/src/main.rs @@ -174,7 +174,7 @@ async fn main() -> Result<(), Box> { &snapshot_directory, runtime_dependencies, ); - let runtime = AggregatorRuntime::new(config, None, Arc::new(AggregatorRunner {})) + let mut runtime = AggregatorRuntime::new(config, None, Arc::new(AggregatorRunner {})) .await .unwrap(); runtime.run().await diff --git a/mithril-aggregator/src/runtime.rs b/mithril-aggregator/src/runtime.rs index 28f41af6774..58af58214f3 100644 --- a/mithril-aggregator/src/runtime.rs +++ b/mithril-aggregator/src/runtime.rs @@ -17,6 +17,7 @@ use chrono::{DateTime, Utc}; use hex::ToHex; use mithril_common::entities::Snapshot; use slog_scope::{debug, error, info, trace, warn}; +use std::fmt::Display; use std::fs::File; use std::io; use std::io::{Seek, SeekFrom}; @@ -58,20 +59,31 @@ pub enum RuntimeError { General(String), } +#[derive(Clone, Debug)] pub struct IdleState { current_beacon: Option, } +#[derive(Clone, Debug)] pub struct SigningState { current_beacon: Beacon, certificate_pending: CertificatePending, } +#[derive(Clone, Debug)] pub enum AggregatorState { Idle(IdleState), Signing(SigningState), } +impl Display for AggregatorState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + AggregatorState::Idle(_) => write!(f, "idle"), + AggregatorState::Signing(_) => write!(f, "signing"), + } + } +} pub struct AggregatorConfig { /// Interval between each snapshot, in seconds pub interval: u32, @@ -106,8 +118,13 @@ impl AggregatorConfig { } } +#[async_trait] pub trait AggregatorRunnerTrait: Sync + Send { - fn is_new_beacon(&self) -> bool; + /// Return the current beacon if it is newer than the given one. + fn is_new_beacon(&self, beacon: Option<&Beacon>) -> Option; + async fn compute_digest(&self, new_beacon: &Beacon) -> Result; + async fn create_pending_certificate(&self, message: &str) + -> Result; } pub struct AggregatorRunner {} @@ -121,7 +138,25 @@ impl AggregatorRunner { #[cfg_attr(test, automock)] #[async_trait] impl AggregatorRunnerTrait for AggregatorRunner { - fn is_new_beacon(&self) -> bool { + fn is_new_beacon<'a>(&self, beacon: Option<&'a Beacon>) -> Option { + info!("checking if there is a new beacon"); + let current_beacon = mithril_common::fake_data::beacon(); + + if beacon.is_none() || current_beacon > *beacon.unwrap() { + Some(current_beacon) + } else { + None + } + } + + async fn compute_digest(&self, new_beacon: &Beacon) -> Result { + todo!() + } + + async fn create_pending_certificate( + &self, + message: &str, + ) -> Result { todo!() } } @@ -139,6 +174,10 @@ pub struct AggregatorRuntime { } impl AggregatorRuntime { + pub fn get_state(&self) -> String { + self.state.to_string() + } + pub async fn new( config: AggregatorConfig, init_state: Option, @@ -177,7 +216,7 @@ impl AggregatorRuntime { }) } - pub async fn run(&self) { + pub async fn run(&mut self) { info!("Starting runtime"); loop { if let Err(e) = self.cycle().await { @@ -189,7 +228,31 @@ impl AggregatorRuntime { } } - async fn cycle(&self) -> Result<(), RuntimeError> { + pub async fn cycle(&mut self) -> Result<(), RuntimeError> { + match self.state.clone() { + AggregatorState::Idle(state) => { + if let Some(beacon) = self.runner.is_new_beacon(state.current_beacon.as_ref()) { + let _ = self.from_idle_to_signing(beacon).await?; + } + } + AggregatorState::Signing(_state) => {} + } + Ok(()) + } + + async fn from_idle_to_signing(&mut self, new_beacon: Beacon) -> Result<(), RuntimeError> { + info!("transiting from IDLE to SIGNING state"); + let message = self.runner.compute_digest(&new_beacon).await?; + let certificate = self + .runner + .create_pending_certificate(&message) + .await + .map_err(|e| RuntimeError::General(e))?; + let state = SigningState { + current_beacon: new_beacon, + certificate_pending: certificate, + }; + self.state = AggregatorState::Signing(state); Ok(()) } } @@ -393,22 +456,33 @@ fn build_new_snapshot( mod tests { use super::super::Config; use super::*; + use mithril_common::fake_data; async fn init_runtime( init_state: Option, runner: MockAggregatorRunner, ) -> AggregatorRuntime { - let config: Config = config::Config::builder() - .build() - .map_err(|e| format!("configuration build error: {}", e)) - .unwrap() - .try_deserialize() - .map_err(|e| format!("configuration deserialize error: {}", e)) - .unwrap(); + use crate::entities::{SnapshotStoreType, SnapshotUploaderType}; + + let config = Config { + network: "testnet".to_string(), + url_snapshot_manifest: "https://storage.googleapis.com/cardano-testnet/snapshots.json" + .to_string(), + snapshot_store_type: SnapshotStoreType::Local, + snapshot_uploader_type: SnapshotUploaderType::Local, + server_url: "http://0.0.0.0:8080".to_string(), + db_directory: Default::default(), + snapshot_directory: Default::default(), + pending_certificate_store_directory: std::env::temp_dir() + .join("mithril_test_pending_cert_db"), + certificate_store_directory: std::env::temp_dir().join("mithril_test_cert_db"), + verification_key_store_directory: std::env::temp_dir() + .join("mithril_test_verification_key_db"), + }; let dependencies = Arc::new(DependencyManager::new(config)); let config = AggregatorConfig::new( 100, - "test", + "dev", Path::new("whatever"), Path::new("whatever"), dependencies, @@ -418,4 +492,47 @@ mod tests { .await .unwrap() } + + #[tokio::test] + pub async fn idle_check_no_new_beacon_with_current_beacon() { + let mut runner = MockAggregatorRunner::new(); + runner.expect_is_new_beacon().times(1).returning(|_| None); + let mut runtime = init_runtime( + Some(AggregatorState::Idle(IdleState { + current_beacon: Some(fake_data::beacon()), + })), + runner, + ) + .await; + + let _ = runtime.cycle().await.unwrap(); + assert_eq!("idle".to_string(), runtime.get_state()); + } + + #[tokio::test] + pub async fn idle_check_no_new_beacon_with_no_current_beacon() { + let mut runner = MockAggregatorRunner::new(); + runner + .expect_is_new_beacon() + .times(1) + .returning(|_| Some(fake_data::beacon())); + runner + .expect_compute_digest() + .times(1) + .returning(|_| Ok("whatever".to_string())); + runner + .expect_create_pending_certificate() + .times(1) + .returning(|_| Ok(fake_data::certificate_pending())); + let mut runtime = init_runtime( + Some(AggregatorState::Idle(IdleState { + current_beacon: None, + })), + runner, + ) + .await; + + let _ = runtime.cycle().await.unwrap(); + assert_eq!("signing".to_string(), runtime.get_state()); + } } diff --git a/mithril-common/src/entities.rs b/mithril-common/src/entities.rs index 07da17763a3..cf974dd4da6 100644 --- a/mithril-common/src/entities.rs +++ b/mithril-common/src/entities.rs @@ -5,7 +5,7 @@ use sha2::{Digest, Sha256}; pub type ImmutableFileNumber = u64; /// Beacon represents a point in the Cardano chain at which a Mithril certificate should be produced -#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize, Hash)] +#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize, Hash, PartialOrd)] pub struct Beacon { /// Cardano network #[serde(rename = "network")] @@ -39,7 +39,6 @@ impl Beacon { hex::encode(hasher.finalize()) } } - /// CertificatePending represents a pending certificate in the process of production #[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] pub struct CertificatePending { From 2712bcc25ffbd1a829c264d1737b2825b1d2b039 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Wed, 8 Jun 2022 18:52:44 +0200 Subject: [PATCH 03/18] [wip] refacto --- mithril-aggregator/src/runtime/mod.rs | 5 + mithril-aggregator/src/runtime/runner.rs | 99 ++++++++++++ .../src/{ => runtime}/runtime.rs | 145 ++---------------- 3 files changed, 118 insertions(+), 131 deletions(-) create mode 100644 mithril-aggregator/src/runtime/mod.rs create mode 100644 mithril-aggregator/src/runtime/runner.rs rename mithril-aggregator/src/{ => runtime}/runtime.rs (75%) diff --git a/mithril-aggregator/src/runtime/mod.rs b/mithril-aggregator/src/runtime/mod.rs new file mode 100644 index 00000000000..10c5a1e0d19 --- /dev/null +++ b/mithril-aggregator/src/runtime/mod.rs @@ -0,0 +1,5 @@ +mod runner; +mod runtime; + +pub use runner::{AggregatorConfig, AggregatorRunner}; +pub use runtime::*; diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs new file mode 100644 index 00000000000..10d793a864b --- /dev/null +++ b/mithril-aggregator/src/runtime/runner.rs @@ -0,0 +1,99 @@ +use std::path::PathBuf; + +use crate::DependencyManager; +use async_trait::async_trait; +use mithril_common::digesters::DigesterError; +use mithril_common::entities::{Beacon, CertificatePending}; +use std::path::Path; +use std::sync::Arc; + +#[cfg(test)] +use mockall::automock; +pub struct AggregatorConfig { + /// Interval between each snapshot, in seconds + pub interval: u32, + + /// Cardano network + pub network: String, + + /// DB directory to snapshot + pub db_directory: PathBuf, + + /// Directory to store snapshot + pub snapshot_directory: PathBuf, + + pub dependencies: Arc, +} + +impl AggregatorConfig { + pub fn new( + interval: u32, + network: &str, + db_directory: &Path, + snapshot_directory: &Path, + dependencies: Arc, + ) -> Self { + Self { + interval, + network: network.to_string(), + db_directory: db_directory.to_path_buf(), + snapshot_directory: snapshot_directory.to_path_buf(), + dependencies, + } + } +} + +#[async_trait] +pub trait AggregatorRunnerTrait: Sync + Send { + /// Return the current beacon if it is newer than the given one. + fn is_new_beacon(&self, beacon: Option<&Beacon>) -> Option; + async fn compute_digest(&self, new_beacon: &Beacon) -> Result; + async fn create_pending_certificate(&self, message: &str) + -> Result; +} + +pub struct AggregatorRunner { + config: AggregatorConfig, +} + +impl AggregatorRunner { + pub fn new(config: AggregatorConfig) -> Self { + Self { config } + } +} + +#[cfg_attr(test, automock)] +#[async_trait] +impl AggregatorRunnerTrait for AggregatorRunner { + fn is_new_beacon<'a>(&self, beacon: Option<&'a Beacon>) -> Option { + info!("checking if there is a new beacon"); + let current_beacon = mithril_common::fake_data::beacon(); + + if beacon.is_none() || current_beacon > *beacon.unwrap() { + Some(current_beacon) + } else { + None + } + } + + async fn compute_digest(&self, new_beacon: &Beacon) -> Result { + /* + let snapshotter = + Snapshotter::new(self.db_directory.clone(), self.snapshot_directory.clone()); + let digester = ImmutableDigester::new(self.db_directory.clone(), slog_scope::logger()); + + info!("Computing digest"; "db_directory" => self.db_directory.display()); + let digest_result = tokio::task::spawn_blocking(move || digester.compute_digest()) + .await + .map_err(|e| RuntimeError::General(e.to_string()))?; + */ + todo!() + } + + async fn create_pending_certificate( + &self, + message: &str, + ) -> Result { + todo!() + } +} diff --git a/mithril-aggregator/src/runtime.rs b/mithril-aggregator/src/runtime/runtime.rs similarity index 75% rename from mithril-aggregator/src/runtime.rs rename to mithril-aggregator/src/runtime/runtime.rs index 58af58214f3..333c7d33ca6 100644 --- a/mithril-aggregator/src/runtime.rs +++ b/mithril-aggregator/src/runtime/runtime.rs @@ -1,6 +1,5 @@ -#![allow(dead_code, unused_imports)] -use super::dependency::{BeaconStoreWrapper, MultiSignerWrapper, SnapshotStoreWrapper}; -use super::{BeaconStore, BeaconStoreError, ProtocolError, SnapshotError, Snapshotter}; +use crate::dependency::{BeaconStoreWrapper, MultiSignerWrapper, SnapshotStoreWrapper}; +use crate::{BeaconStore, BeaconStoreError, ProtocolError, SnapshotError, Snapshotter}; use mithril_common::crypto_helper::Bytes; use mithril_common::digesters::{Digester, DigesterError, ImmutableDigester}; @@ -12,7 +11,6 @@ use crate::snapshot_stores::SnapshotStoreError; use crate::snapshot_uploaders::{SnapshotLocation, SnapshotUploader}; use crate::DependencyManager; -use async_trait::async_trait; use chrono::{DateTime, Utc}; use hex::ToHex; use mithril_common::entities::Snapshot; @@ -84,90 +82,14 @@ impl Display for AggregatorState { } } } -pub struct AggregatorConfig { - /// Interval between each snapshot, in seconds - pub interval: u32, - - /// Cardano network - pub network: String, - - /// DB directory to snapshot - pub db_directory: PathBuf, - - /// Directory to store snapshot - pub snapshot_directory: PathBuf, - - pub dependencies: Arc, -} - -impl AggregatorConfig { - pub fn new( - interval: u32, - network: &str, - db_directory: &Path, - snapshot_directory: &Path, - dependencies: Arc, - ) -> Self { - Self { - interval, - network: network.to_string(), - db_directory: db_directory.to_path_buf(), - snapshot_directory: snapshot_directory.to_path_buf(), - dependencies, - } - } -} - -#[async_trait] -pub trait AggregatorRunnerTrait: Sync + Send { - /// Return the current beacon if it is newer than the given one. - fn is_new_beacon(&self, beacon: Option<&Beacon>) -> Option; - async fn compute_digest(&self, new_beacon: &Beacon) -> Result; - async fn create_pending_certificate(&self, message: &str) - -> Result; -} - -pub struct AggregatorRunner {} - -impl AggregatorRunner { - pub fn new() -> Self { - Self {} - } -} - -#[cfg_attr(test, automock)] -#[async_trait] -impl AggregatorRunnerTrait for AggregatorRunner { - fn is_new_beacon<'a>(&self, beacon: Option<&'a Beacon>) -> Option { - info!("checking if there is a new beacon"); - let current_beacon = mithril_common::fake_data::beacon(); - - if beacon.is_none() || current_beacon > *beacon.unwrap() { - Some(current_beacon) - } else { - None - } - } - - async fn compute_digest(&self, new_beacon: &Beacon) -> Result { - todo!() - } - - async fn create_pending_certificate( - &self, - message: &str, - ) -> Result { - todo!() - } -} /// AggregatorRuntime pub struct AggregatorRuntime { /// the internal state of the automate state: AggregatorState, - /// configuration handler, also owns the dependencies - config: AggregatorConfig, + /// time between each state machine execution + state_sleep: Duration, /// specific runner for this state machine runner: Arc, @@ -179,38 +101,24 @@ impl AggregatorRuntime { } pub async fn new( - config: AggregatorConfig, + state_sleep: Duration, init_state: Option, runner: Arc, ) -> Result { info!("initializing runtime"); let state = if init_state.is_none() { - trace!("no initial state given"); - if config.dependencies.beacon_store.is_none() { - trace!("idle state, no current beacon"); - AggregatorState::Idle(IdleState { - current_beacon: None, - }) - } else { - let store = config.dependencies.beacon_store.as_ref().unwrap(); - let current_beacon = store - .read() - .await - .get_current_beacon() - .await - .map_err(|e| RuntimeError::General(e.to_string()))?; - trace!("idle state, got current beacon from store"); - - AggregatorState::Idle(IdleState { current_beacon }) - } + trace!("idle state, no current beacon"); + AggregatorState::Idle(IdleState { + current_beacon: None, + }) } else { trace!("got initial state from caller"); init_state.unwrap() }; Ok::(Self { - config, + state_sleep, state, runner, }) @@ -223,8 +131,8 @@ impl AggregatorRuntime { error!("{:?}", e) } - info!("Sleeping for {}", self.config.interval); - sleep(Duration::from_millis(self.config.interval.into())).await; + info!("Sleeping…"); + sleep(self.state_sleep).await; } } @@ -454,7 +362,7 @@ fn build_new_snapshot( #[cfg(test)] mod tests { - use super::super::Config; + use crate::Config; use super::*; use mithril_common::fake_data; @@ -463,32 +371,7 @@ mod tests { runner: MockAggregatorRunner, ) -> AggregatorRuntime { use crate::entities::{SnapshotStoreType, SnapshotUploaderType}; - - let config = Config { - network: "testnet".to_string(), - url_snapshot_manifest: "https://storage.googleapis.com/cardano-testnet/snapshots.json" - .to_string(), - snapshot_store_type: SnapshotStoreType::Local, - snapshot_uploader_type: SnapshotUploaderType::Local, - server_url: "http://0.0.0.0:8080".to_string(), - db_directory: Default::default(), - snapshot_directory: Default::default(), - pending_certificate_store_directory: std::env::temp_dir() - .join("mithril_test_pending_cert_db"), - certificate_store_directory: std::env::temp_dir().join("mithril_test_cert_db"), - verification_key_store_directory: std::env::temp_dir() - .join("mithril_test_verification_key_db"), - }; - let dependencies = Arc::new(DependencyManager::new(config)); - let config = AggregatorConfig::new( - 100, - "dev", - Path::new("whatever"), - Path::new("whatever"), - dependencies, - ); - - AggregatorRuntime::new(config, init_state, Arc::new(runner)) + AggregatorRuntime::new(Duration::from_millis(100), init_state, Arc::new(runner)) .await .unwrap() } From d4acd66f3daee6a4703a8b92b2d719515f421114 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Thu, 9 Jun 2022 14:54:50 +0200 Subject: [PATCH 04/18] refactor runner code --- mithril-aggregator/src/main.rs | 11 ++- mithril-aggregator/src/runtime/error.rs | 36 +++++++++ mithril-aggregator/src/runtime/mod.rs | 4 +- mithril-aggregator/src/runtime/runner.rs | 59 ++++++++++----- mithril-aggregator/src/runtime/runtime.rs | 90 +++++++++-------------- 5 files changed, 119 insertions(+), 81 deletions(-) create mode 100644 mithril-aggregator/src/runtime/error.rs diff --git a/mithril-aggregator/src/main.rs b/mithril-aggregator/src/main.rs index 002ebd2c0e1..49eb97ee6d5 100644 --- a/mithril-aggregator/src/main.rs +++ b/mithril-aggregator/src/main.rs @@ -19,6 +19,7 @@ use std::error::Error; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::RwLock; +use tokio::time::Duration; /// Node args #[derive(Parser, Debug, Clone)] @@ -174,9 +175,13 @@ async fn main() -> Result<(), Box> { &snapshot_directory, runtime_dependencies, ); - let mut runtime = AggregatorRuntime::new(config, None, Arc::new(AggregatorRunner {})) - .await - .unwrap(); + let mut runtime = AggregatorRuntime::new( + Duration::from_millis(config.interval.into()), + None, + Arc::new(AggregatorRunner::new(config)), + ) + .await + .unwrap(); runtime.run().await }); diff --git a/mithril-aggregator/src/runtime/error.rs b/mithril-aggregator/src/runtime/error.rs new file mode 100644 index 00000000000..2db39843df5 --- /dev/null +++ b/mithril-aggregator/src/runtime/error.rs @@ -0,0 +1,36 @@ +use crate::snapshot_stores::SnapshotStoreError; +use crate::{BeaconStoreError, ProtocolError, SnapshotError}; + +use mithril_common::digesters::DigesterError; +use std::io; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum RuntimeError { + #[error("multi signer error")] + MultiSigner(#[from] ProtocolError), + + #[error("beacon store error")] + BeaconStore(#[from] BeaconStoreError), + + #[error("snapshotter error")] + Snapshotter(#[from] SnapshotError), + + #[error("digester error")] + Digester(#[from] DigesterError), + + #[error("snapshot store error")] + SnapshotStore(#[from] SnapshotStoreError), + + #[error("certificate store error")] + CertificateStore(String), + + #[error("snapshot uploader error: {0}")] + SnapshotUploader(String), + + #[error("snapshot build error")] + SnapshotBuild(#[from] io::Error), + + #[error("general error")] + General(String), +} diff --git a/mithril-aggregator/src/runtime/mod.rs b/mithril-aggregator/src/runtime/mod.rs index 10c5a1e0d19..11de682740c 100644 --- a/mithril-aggregator/src/runtime/mod.rs +++ b/mithril-aggregator/src/runtime/mod.rs @@ -1,5 +1,7 @@ +mod error; mod runner; mod runtime; -pub use runner::{AggregatorConfig, AggregatorRunner}; +pub use error::RuntimeError; +pub use runner::{AggregatorConfig, AggregatorRunner, AggregatorRunnerTrait}; pub use runtime::*; diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 10d793a864b..2580aa22193 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -2,13 +2,18 @@ use std::path::PathBuf; use crate::DependencyManager; use async_trait::async_trait; -use mithril_common::digesters::DigesterError; +use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester}; use mithril_common::entities::{Beacon, CertificatePending}; + +#[allow(unused_imports)] +use slog_scope::{debug, error, info, trace, warn}; use std::path::Path; use std::sync::Arc; #[cfg(test)] use mockall::automock; + +use super::RuntimeError; pub struct AggregatorConfig { /// Interval between each snapshot, in seconds pub interval: u32, @@ -46,10 +51,12 @@ impl AggregatorConfig { #[async_trait] pub trait AggregatorRunnerTrait: Sync + Send { /// Return the current beacon if it is newer than the given one. - fn is_new_beacon(&self, beacon: Option<&Beacon>) -> Option; - async fn compute_digest(&self, new_beacon: &Beacon) -> Result; - async fn create_pending_certificate(&self, message: &str) - -> Result; + fn is_new_beacon(&self, beacon: Option<&Beacon>) -> Result, RuntimeError>; + async fn compute_digest(&self, new_beacon: &Beacon) -> Result; + async fn create_pending_certificate( + &self, + digester_result: DigesterResult, + ) -> Result; } pub struct AggregatorRunner { @@ -65,35 +72,47 @@ impl AggregatorRunner { #[cfg_attr(test, automock)] #[async_trait] impl AggregatorRunnerTrait for AggregatorRunner { - fn is_new_beacon<'a>(&self, beacon: Option<&'a Beacon>) -> Option { + /// Is there a new beacon? + /// returns a new beacon if there is one more recent than the given one + fn is_new_beacon<'a>( + &self, + beacon: Option<&'a Beacon>, + ) -> Result, RuntimeError> { info!("checking if there is a new beacon"); + warn!("using fake data for the new beacon"); let current_beacon = mithril_common::fake_data::beacon(); if beacon.is_none() || current_beacon > *beacon.unwrap() { - Some(current_beacon) + Ok(Some(current_beacon)) } else { - None + Ok(None) } } - async fn compute_digest(&self, new_beacon: &Beacon) -> Result { - /* - let snapshotter = - Snapshotter::new(self.db_directory.clone(), self.snapshot_directory.clone()); - let digester = ImmutableDigester::new(self.db_directory.clone(), slog_scope::logger()); - - info!("Computing digest"; "db_directory" => self.db_directory.display()); + async fn compute_digest(&self, new_beacon: &Beacon) -> Result { + trace!("running runner::compute_digester"); + let digester = + ImmutableDigester::new(self.config.db_directory.clone(), slog_scope::logger()); + info!("Computing digest"; "db_directory" => self.config.db_directory.display()); let digest_result = tokio::task::spawn_blocking(move || digester.compute_digest()) .await - .map_err(|e| RuntimeError::General(e.to_string()))?; - */ - todo!() + .map_err(|e| RuntimeError::General(e.to_string()))??; + + if digest_result.last_immutable_file_number != new_beacon.immutable_file_number { + error!("digest beacon is different than the given beacon"); + Err(RuntimeError::General( + format!("The digest has been computed for a different immutable ({}) file than the one given in the beacon ({}).", digest_result.last_immutable_file_number, new_beacon.immutable_file_number) + )) + } else { + trace!("digest last immutable file number and new beacon file number are consistent"); + Ok(digest_result) + } } async fn create_pending_certificate( &self, - message: &str, - ) -> Result { + digester_result: DigesterResult, + ) -> Result { todo!() } } diff --git a/mithril-aggregator/src/runtime/runtime.rs b/mithril-aggregator/src/runtime/runtime.rs index 333c7d33ca6..675929f4999 100644 --- a/mithril-aggregator/src/runtime/runtime.rs +++ b/mithril-aggregator/src/runtime/runtime.rs @@ -1,16 +1,13 @@ -use crate::dependency::{BeaconStoreWrapper, MultiSignerWrapper, SnapshotStoreWrapper}; -use crate::{BeaconStore, BeaconStoreError, ProtocolError, SnapshotError, Snapshotter}; - use mithril_common::crypto_helper::Bytes; -use mithril_common::digesters::{Digester, DigesterError, ImmutableDigester}; use mithril_common::entities::{Beacon, Certificate, CertificatePending}; use mithril_common::fake_data; +use crate::dependency::{BeaconStoreWrapper, MultiSignerWrapper, SnapshotStoreWrapper}; use crate::dependency::{CertificatePendingStoreWrapper, CertificateStoreWrapper}; -use crate::snapshot_stores::SnapshotStoreError; use crate::snapshot_uploaders::{SnapshotLocation, SnapshotUploader}; -use crate::DependencyManager; +use crate::{BeaconStore, Snapshotter}; +use super::{AggregatorRunnerTrait, RuntimeError}; use chrono::{DateTime, Utc}; use hex::ToHex; use mithril_common::entities::Snapshot; @@ -21,54 +18,23 @@ use std::io; use std::io::{Seek, SeekFrom}; use std::path::{Path, PathBuf}; use std::sync::Arc; -use thiserror::Error; use tokio::time::{sleep, Duration}; #[cfg(test)] use mockall::automock; -#[derive(Error, Debug)] -pub enum RuntimeError { - #[error("multi signer error")] - MultiSigner(#[from] ProtocolError), - - #[error("beacon store error")] - BeaconStore(#[from] BeaconStoreError), - - #[error("snapshotter error")] - Snapshotter(#[from] SnapshotError), - - #[error("digester error")] - Digester(#[from] DigesterError), - - #[error("snapshot store error")] - SnapshotStore(#[from] SnapshotStoreError), - - #[error("certificate store error")] - CertificateStore(String), - - #[error("snapshot uploader error: {0}")] - SnapshotUploader(String), - - #[error("snapshot build error")] - SnapshotBuild(#[from] io::Error), - - #[error("general error")] - General(String), -} - -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct IdleState { current_beacon: Option, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct SigningState { current_beacon: Beacon, certificate_pending: CertificatePending, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub enum AggregatorState { Idle(IdleState), Signing(SigningState), @@ -139,8 +105,9 @@ impl AggregatorRuntime { pub async fn cycle(&mut self) -> Result<(), RuntimeError> { match self.state.clone() { AggregatorState::Idle(state) => { - if let Some(beacon) = self.runner.is_new_beacon(state.current_beacon.as_ref()) { - let _ = self.from_idle_to_signing(beacon).await?; + if let Some(beacon) = self.runner.is_new_beacon(state.current_beacon.as_ref())? { + let new_state = self.from_idle_to_signing(beacon).await?; + self.state = AggregatorState::Signing(new_state); } } AggregatorState::Signing(_state) => {} @@ -148,20 +115,24 @@ impl AggregatorRuntime { Ok(()) } - async fn from_idle_to_signing(&mut self, new_beacon: Beacon) -> Result<(), RuntimeError> { + /// transition + /// from IDLE state to SIGNING + async fn from_idle_to_signing( + &mut self, + new_beacon: Beacon, + ) -> Result { info!("transiting from IDLE to SIGNING state"); - let message = self.runner.compute_digest(&new_beacon).await?; + let digester_result = self.runner.compute_digest(&new_beacon).await?; let certificate = self .runner - .create_pending_certificate(&message) - .await - .map_err(|e| RuntimeError::General(e))?; + .create_pending_certificate(digester_result) + .await?; let state = SigningState { current_beacon: new_beacon, certificate_pending: certificate, }; - self.state = AggregatorState::Signing(state); - Ok(()) + + Ok(state) } } /// AggregatorRuntime factory @@ -362,15 +333,15 @@ fn build_new_snapshot( #[cfg(test)] mod tests { - use crate::Config; + use super::super::runner::MockAggregatorRunner; use super::*; + use mithril_common::digesters::DigesterResult; use mithril_common::fake_data; async fn init_runtime( init_state: Option, runner: MockAggregatorRunner, ) -> AggregatorRuntime { - use crate::entities::{SnapshotStoreType, SnapshotUploaderType}; AggregatorRuntime::new(Duration::from_millis(100), init_state, Arc::new(runner)) .await .unwrap() @@ -379,7 +350,10 @@ mod tests { #[tokio::test] pub async fn idle_check_no_new_beacon_with_current_beacon() { let mut runner = MockAggregatorRunner::new(); - runner.expect_is_new_beacon().times(1).returning(|_| None); + runner + .expect_is_new_beacon() + .times(1) + .returning(|_| Ok(None)); let mut runtime = init_runtime( Some(AggregatorState::Idle(IdleState { current_beacon: Some(fake_data::beacon()), @@ -398,11 +372,13 @@ mod tests { runner .expect_is_new_beacon() .times(1) - .returning(|_| Some(fake_data::beacon())); - runner - .expect_compute_digest() - .times(1) - .returning(|_| Ok("whatever".to_string())); + .returning(|_| Ok(Some(fake_data::beacon()))); + runner.expect_compute_digest().times(1).returning(|_| { + Ok(DigesterResult { + digest: "whatever".to_string(), + last_immutable_file_number: 123, + }) + }); runner .expect_create_pending_certificate() .times(1) From 403acb7cdc337aa50c27f0247836409f888ff0a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Thu, 9 Jun 2022 22:25:42 +0200 Subject: [PATCH 05/18] [wip] new runner workflow --- mithril-aggregator/src/runtime/runner.rs | 63 +++++++++++++++++++++-- mithril-aggregator/src/runtime/runtime.rs | 3 +- 2 files changed, 59 insertions(+), 7 deletions(-) diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 2580aa22193..59ad9b3dbe1 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -1,7 +1,8 @@ use std::path::PathBuf; -use crate::DependencyManager; +use crate::{multi_signer, DependencyManager}; use async_trait::async_trait; +use mithril::stm::StmVerificationKey; use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester}; use mithril_common::entities::{Beacon, CertificatePending}; @@ -53,10 +54,18 @@ pub trait AggregatorRunnerTrait: Sync + Send { /// Return the current beacon if it is newer than the given one. fn is_new_beacon(&self, beacon: Option<&Beacon>) -> Result, RuntimeError>; async fn compute_digest(&self, new_beacon: &Beacon) -> Result; - async fn create_pending_certificate( + async fn update_message_in_multisigner( &self, - digester_result: DigesterResult, + digest_result: DigesterResult, + ) -> Result<(), RuntimeError>; + async fn create_new_pending_certificate_from_multisigner( + &self, + beacon: Beacon, ) -> Result; + async fn save_pending_certificate( + &self, + pending_certificate: CertificatePending, + ) -> Result<(), RuntimeError>; } pub struct AggregatorRunner { @@ -94,10 +103,17 @@ impl AggregatorRunnerTrait for AggregatorRunner { let digester = ImmutableDigester::new(self.config.db_directory.clone(), slog_scope::logger()); info!("Computing digest"; "db_directory" => self.config.db_directory.display()); + + // digest is done in a separate thread because it is blocking the whole task + debug!("launching digester thread"); let digest_result = tokio::task::spawn_blocking(move || digester.compute_digest()) .await .map_err(|e| RuntimeError::General(e.to_string()))??; + debug!( + "last immutable file number: {}", + digest_result.last_immutable_file_number + ); if digest_result.last_immutable_file_number != new_beacon.immutable_file_number { error!("digest beacon is different than the given beacon"); Err(RuntimeError::General( @@ -109,10 +125,47 @@ impl AggregatorRunnerTrait for AggregatorRunner { } } - async fn create_pending_certificate( + async fn create_new_pending_certificate_from_multisigner( &self, - digester_result: DigesterResult, + beacon: Beacon, ) -> Result { + trace!("running runner::create_pending_certificate"); + let mut multi_signer = self + .config + .dependencies + .multi_signer + .as_ref() + .ok_or(RuntimeError::General(format!("no multisigner registered")))? + .read() + .await; + + debug!("creating certificate pending using multisigner"); + warn!("pending certificate's previous hash is fake"); + let pending_certificate = CertificatePending::new( + beacon, + multi_signer + .get_protocol_parameters() + .await + .ok_or_else(|| RuntimeError::General(format!("no protocol parameters")))? + .into(), + "123".to_string(), + multi_signer.get_signers().await?, + ); + + Ok(pending_certificate) + } + + async fn save_pending_certificate( + &self, + pending_certificate: CertificatePending, + ) -> Result<(), RuntimeError> { + todo!() + } + + async fn update_message_in_multisigner( + &self, + digest_result: DigesterResult, + ) -> Result<(), RuntimeError> { todo!() } } diff --git a/mithril-aggregator/src/runtime/runtime.rs b/mithril-aggregator/src/runtime/runtime.rs index 675929f4999..66ad0b23342 100644 --- a/mithril-aggregator/src/runtime/runtime.rs +++ b/mithril-aggregator/src/runtime/runtime.rs @@ -14,7 +14,6 @@ use mithril_common::entities::Snapshot; use slog_scope::{debug, error, info, trace, warn}; use std::fmt::Display; use std::fs::File; -use std::io; use std::io::{Seek, SeekFrom}; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -308,7 +307,6 @@ impl AggregatorRuntime { } } } - */ fn build_new_snapshot( digest: String, @@ -330,6 +328,7 @@ fn build_new_snapshot( vec![uploaded_snapshot_location], )) } + */ #[cfg(test)] mod tests { From 2527012bedabad759333756a5bf119a85930f0a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Fri, 10 Jun 2022 11:24:55 +0200 Subject: [PATCH 06/18] add first working transition --- mithril-aggregator/src/main.rs | 4 +- mithril-aggregator/src/runtime/error.rs | 5 +- mithril-aggregator/src/runtime/runner.rs | 40 ++++++++++---- mithril-aggregator/src/runtime/runtime.rs | 67 +++++++++++++++-------- mithril-common/src/digesters/digester.rs | 2 +- 5 files changed, 79 insertions(+), 39 deletions(-) diff --git a/mithril-aggregator/src/main.rs b/mithril-aggregator/src/main.rs index 49eb97ee6d5..31217cd6912 100644 --- a/mithril-aggregator/src/main.rs +++ b/mithril-aggregator/src/main.rs @@ -169,14 +169,14 @@ async fn main() -> Result<(), Box> { let runtime_dependencies = dependency_manager.clone(); let handle = tokio::spawn(async move { let config = AggregatorConfig::new( - args.runtime_interval * 1000, + args.runtime_interval, &config.network.clone(), &config.db_directory.clone(), &snapshot_directory, runtime_dependencies, ); let mut runtime = AggregatorRuntime::new( - Duration::from_millis(config.interval.into()), + Duration::from_secs(config.interval.into()), None, Arc::new(AggregatorRunner::new(config)), ) diff --git a/mithril-aggregator/src/runtime/error.rs b/mithril-aggregator/src/runtime/error.rs index 2db39843df5..3fd07529ebb 100644 --- a/mithril-aggregator/src/runtime/error.rs +++ b/mithril-aggregator/src/runtime/error.rs @@ -1,4 +1,5 @@ use crate::snapshot_stores::SnapshotStoreError; +use crate::store::StoreError; use crate::{BeaconStoreError, ProtocolError, SnapshotError}; use mithril_common::digesters::DigesterError; @@ -22,8 +23,8 @@ pub enum RuntimeError { #[error("snapshot store error")] SnapshotStore(#[from] SnapshotStoreError), - #[error("certificate store error")] - CertificateStore(String), + #[error("store error")] + StoreError(#[from] StoreError), #[error("snapshot uploader error: {0}")] SnapshotUploader(String), diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 59ad9b3dbe1..36146b7b8fe 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -1,8 +1,7 @@ use std::path::PathBuf; -use crate::{multi_signer, DependencyManager}; +use crate::DependencyManager; use async_trait::async_trait; -use mithril::stm::StmVerificationKey; use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester}; use mithril_common::entities::{Beacon, CertificatePending}; @@ -52,7 +51,7 @@ impl AggregatorConfig { #[async_trait] pub trait AggregatorRunnerTrait: Sync + Send { /// Return the current beacon if it is newer than the given one. - fn is_new_beacon(&self, beacon: Option<&Beacon>) -> Result, RuntimeError>; + async fn is_new_beacon(&self, beacon: Option) -> Result, RuntimeError>; async fn compute_digest(&self, new_beacon: &Beacon) -> Result; async fn update_message_in_multisigner( &self, @@ -83,18 +82,17 @@ impl AggregatorRunner { impl AggregatorRunnerTrait for AggregatorRunner { /// Is there a new beacon? /// returns a new beacon if there is one more recent than the given one - fn is_new_beacon<'a>( + async fn is_new_beacon( &self, - beacon: Option<&'a Beacon>, + maybe_beacon: Option, ) -> Result, RuntimeError> { info!("checking if there is a new beacon"); warn!("using fake data for the new beacon"); let current_beacon = mithril_common::fake_data::beacon(); - if beacon.is_none() || current_beacon > *beacon.unwrap() { - Ok(Some(current_beacon)) - } else { - Ok(None) + match maybe_beacon { + Some(beacon) if current_beacon > beacon => Ok(Some(current_beacon)), + _ => Ok(None), } } @@ -130,7 +128,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { beacon: Beacon, ) -> Result { trace!("running runner::create_pending_certificate"); - let mut multi_signer = self + let multi_signer = self .config .dependencies .multi_signer @@ -159,13 +157,31 @@ impl AggregatorRunnerTrait for AggregatorRunner { &self, pending_certificate: CertificatePending, ) -> Result<(), RuntimeError> { - todo!() + self.config + .dependencies + .certificate_pending_store + .as_ref() + .ok_or(RuntimeError::General(format!("no multisigner registered")))? + .write() + .await + .save(pending_certificate) + .await + .map_err(|e| e.into()) } async fn update_message_in_multisigner( &self, digest_result: DigesterResult, ) -> Result<(), RuntimeError> { - todo!() + self.config + .dependencies + .multi_signer + .as_ref() + .ok_or(RuntimeError::General(format!("no multisigner registered")))? + .write() + .await + .update_current_message(digest_result.digest.into_bytes()) + .await + .map_err(|e| RuntimeError::MultiSigner(e)) } } diff --git a/mithril-aggregator/src/runtime/runtime.rs b/mithril-aggregator/src/runtime/runtime.rs index 66ad0b23342..93724a9d040 100644 --- a/mithril-aggregator/src/runtime/runtime.rs +++ b/mithril-aggregator/src/runtime/runtime.rs @@ -1,27 +1,11 @@ -use mithril_common::crypto_helper::Bytes; use mithril_common::entities::{Beacon, Certificate, CertificatePending}; -use mithril_common::fake_data; - -use crate::dependency::{BeaconStoreWrapper, MultiSignerWrapper, SnapshotStoreWrapper}; -use crate::dependency::{CertificatePendingStoreWrapper, CertificateStoreWrapper}; -use crate::snapshot_uploaders::{SnapshotLocation, SnapshotUploader}; -use crate::{BeaconStore, Snapshotter}; use super::{AggregatorRunnerTrait, RuntimeError}; -use chrono::{DateTime, Utc}; -use hex::ToHex; -use mithril_common::entities::Snapshot; use slog_scope::{debug, error, info, trace, warn}; use std::fmt::Display; -use std::fs::File; -use std::io::{Seek, SeekFrom}; -use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::time::{sleep, Duration}; -#[cfg(test)] -use mockall::automock; - #[derive(Clone, Debug, PartialEq)] pub struct IdleState { current_beacon: Option, @@ -91,25 +75,41 @@ impl AggregatorRuntime { pub async fn run(&mut self) { info!("Starting runtime"); + debug!("current state: {}", self.state); + loop { if let Err(e) = self.cycle().await { error!("{:?}", e) } - info!("Sleeping…"); + info!("Sleeping for {} seconds", self.state_sleep.as_secs()); sleep(self.state_sleep).await; } } pub async fn cycle(&mut self) -> Result<(), RuntimeError> { + info!("new cycle"); match self.state.clone() { AggregatorState::Idle(state) => { - if let Some(beacon) = self.runner.is_new_beacon(state.current_beacon.as_ref())? { + info!("state IDLE"); + if let Some(beacon) = self + .runner + .is_new_beacon(state.current_beacon.clone()) + .await? + { + trace!( + "new beacon found, immutable file number = {}", + beacon.immutable_file_number + ); let new_state = self.from_idle_to_signing(beacon).await?; self.state = AggregatorState::Signing(new_state); + } else { + trace!("no new beacon"); } } - AggregatorState::Signing(_state) => {} + AggregatorState::Signing(_state) => { + todo!() + } } Ok(()) } @@ -120,11 +120,19 @@ impl AggregatorRuntime { &mut self, new_beacon: Beacon, ) -> Result { - info!("transiting from IDLE to SIGNING state"); + debug!("launching transition from IDLE to SIGNING state"); let digester_result = self.runner.compute_digest(&new_beacon).await?; + let _ = self + .runner + .update_message_in_multisigner(digester_result) + .await?; let certificate = self .runner - .create_pending_certificate(digester_result) + .create_new_pending_certificate_from_multisigner(new_beacon.clone()) + .await?; + let _ = self + .runner + .save_pending_certificate(certificate.clone()) .await?; let state = SigningState { current_beacon: new_beacon, @@ -336,6 +344,7 @@ mod tests { use super::*; use mithril_common::digesters::DigesterResult; use mithril_common::fake_data; + use mockall::predicate; async fn init_runtime( init_state: Option, @@ -379,9 +388,23 @@ mod tests { }) }); runner - .expect_create_pending_certificate() + .expect_update_message_in_multisigner() + .with(predicate::eq(DigesterResult { + digest: "whatever".to_string(), + last_immutable_file_number: 123, + })) + .times(1) + .returning(|_| Ok(())); + runner + .expect_create_new_pending_certificate_from_multisigner() + .with(predicate::eq(fake_data::beacon())) .times(1) .returning(|_| Ok(fake_data::certificate_pending())); + runner + .expect_save_pending_certificate() + .times(1) + .returning(|_| Ok(())); + let mut runtime = init_runtime( Some(AggregatorState::Idle(IdleState { current_beacon: None, diff --git a/mithril-common/src/digesters/digester.rs b/mithril-common/src/digesters/digester.rs index 08955ebab52..4168cd42c9b 100644 --- a/mithril-common/src/digesters/digester.rs +++ b/mithril-common/src/digesters/digester.rs @@ -3,7 +3,7 @@ use crate::entities::ImmutableFileNumber; use std::io; use thiserror::Error; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct DigesterResult { /// The computed digest pub digest: String, From b32864a2c7ec23fbd7ca8718fb5847399ffc716c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Fri, 10 Jun 2022 16:23:53 +0200 Subject: [PATCH 07/18] [wip] from signing to idle --- mithril-aggregator/src/runtime/runner.rs | 5 +++ mithril-aggregator/src/runtime/runtime.rs | 51 ++++++++++++++++++++++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 36146b7b8fe..577b01509dd 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -65,6 +65,7 @@ pub trait AggregatorRunnerTrait: Sync + Send { &self, pending_certificate: CertificatePending, ) -> Result<(), RuntimeError>; + async fn drop_pending_certificate(&self) -> Result<(), RuntimeError>; } pub struct AggregatorRunner { @@ -184,4 +185,8 @@ impl AggregatorRunnerTrait for AggregatorRunner { .await .map_err(|e| RuntimeError::MultiSigner(e)) } + + async fn drop_pending_certificate(&self) -> Result<(), RuntimeError> { + todo!() + } } diff --git a/mithril-aggregator/src/runtime/runtime.rs b/mithril-aggregator/src/runtime/runtime.rs index 93724a9d040..71725ea4263 100644 --- a/mithril-aggregator/src/runtime/runtime.rs +++ b/mithril-aggregator/src/runtime/runtime.rs @@ -107,13 +107,35 @@ impl AggregatorRuntime { trace!("no new beacon"); } } - AggregatorState::Signing(_state) => { - todo!() + AggregatorState::Signing(state) => { + info!("state SIGNING"); + if let Some(beacon) = self + .runner + .is_new_beacon(Some(state.current_beacon.clone())) + .await? + { + trace!( + "new beacon found, immutable file number = {}", + beacon.immutable_file_number + ); + let new_state = self.from_signing_to_idle(state).await?; + self.state = AggregatorState::Idle(new_state); + } else { + todo!() + } } } Ok(()) } + /// transition + /// from SIGNIN to IDLE + async fn from_signing_to_idle( + &mut self, + state: SigningState, + ) -> Result { + todo!() + } /// transition /// from IDLE state to SIGNING async fn from_idle_to_signing( @@ -416,4 +438,29 @@ mod tests { let _ = runtime.cycle().await.unwrap(); assert_eq!("signing".to_string(), runtime.get_state()); } + + #[tokio::test] + async fn signing_changing_beacon_to_idle() { + let mut runner = MockAggregatorRunner::new(); + runner + .expect_drop_pending_certificate() + .times(1) + .returning(|| Ok(())); + + let state = SigningState { + // this current beacon must be outdated so the state machine will + // return to idle state + current_beacon: { + let mut beacon = fake_data::beacon(); + beacon.immutable_file_number -= 1; + + beacon + }, + certificate_pending: fake_data::certificate_pending(), + }; + let mut runtime = init_runtime(Some(AggregatorState::Signing(state)), runner).await; + + let _ = runtime.cycle().await.unwrap(); + assert_eq!("idle".to_string(), runtime.get_state()); + } } From fd8f900c1bee74b2ebcfce66f6e847c1c4202025 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Mon, 13 Jun 2022 11:20:43 +0200 Subject: [PATCH 08/18] [wip] signing to idle --- mithril-aggregator/src/runtime/runner.rs | 37 +++++++++++++-- mithril-aggregator/src/runtime/runtime.rs | 46 +++++++++++++++++-- .../src/store/adapter/store_adapter.rs | 3 ++ 3 files changed, 78 insertions(+), 8 deletions(-) diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 577b01509dd..0a2b98e60e9 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -65,7 +65,8 @@ pub trait AggregatorRunnerTrait: Sync + Send { &self, pending_certificate: CertificatePending, ) -> Result<(), RuntimeError>; - async fn drop_pending_certificate(&self) -> Result<(), RuntimeError>; + async fn drop_pending_certificate(&self, beacon: &Beacon) -> Result<(), RuntimeError>; + async fn is_multisig_created(&self) -> Result; } pub struct AggregatorRunner { @@ -97,6 +98,10 @@ impl AggregatorRunnerTrait for AggregatorRunner { } } + async fn is_multisig_created(&self) -> Result { + todo!() + } + async fn compute_digest(&self, new_beacon: &Beacon) -> Result { trace!("running runner::compute_digester"); let digester = @@ -158,6 +163,8 @@ impl AggregatorRunnerTrait for AggregatorRunner { &self, pending_certificate: CertificatePending, ) -> Result<(), RuntimeError> { + trace!("saving pending certificate"); + self.config .dependencies .certificate_pending_store @@ -174,6 +181,8 @@ impl AggregatorRunnerTrait for AggregatorRunner { &self, digest_result: DigesterResult, ) -> Result<(), RuntimeError> { + trace!("update message in multisigner"); + self.config .dependencies .multi_signer @@ -186,7 +195,29 @@ impl AggregatorRunnerTrait for AggregatorRunner { .map_err(|e| RuntimeError::MultiSigner(e)) } - async fn drop_pending_certificate(&self) -> Result<(), RuntimeError> { - todo!() + async fn drop_pending_certificate(&self, beacon: &Beacon) -> Result<(), RuntimeError> { + trace!("drop pending certificate"); + + let maybe_pending_certificate = self + .config + .dependencies + .certificate_pending_store + .as_ref() + .ok_or(RuntimeError::General(format!( + "no certificate pending store registered" + )))? + .write() + .await + .remove(beacon) + .await?; + + if maybe_pending_certificate.is_none() { + warn!( + "no pending certificate for the given beacon {:?} this is not normal", + beacon + ); + } + + Ok(()) } } diff --git a/mithril-aggregator/src/runtime/runtime.rs b/mithril-aggregator/src/runtime/runtime.rs index 71725ea4263..5127257ef5e 100644 --- a/mithril-aggregator/src/runtime/runtime.rs +++ b/mithril-aggregator/src/runtime/runtime.rs @@ -92,6 +92,7 @@ impl AggregatorRuntime { match self.state.clone() { AggregatorState::Idle(state) => { info!("state IDLE"); + if let Some(beacon) = self .runner .is_new_beacon(state.current_beacon.clone()) @@ -104,11 +105,12 @@ impl AggregatorRuntime { let new_state = self.from_idle_to_signing(beacon).await?; self.state = AggregatorState::Signing(new_state); } else { - trace!("no new beacon"); + trace!("nothing to do in IDLE state") } } AggregatorState::Signing(state) => { info!("state SIGNING"); + if let Some(beacon) = self .runner .is_new_beacon(Some(state.current_beacon.clone())) @@ -118,10 +120,12 @@ impl AggregatorRuntime { "new beacon found, immutable file number = {}", beacon.immutable_file_number ); - let new_state = self.from_signing_to_idle(state).await?; + let new_state = self.from_signing_to_idle(state, beacon).await?; self.state = AggregatorState::Idle(new_state); - } else { + } else if self.runner.is_multisig_created().await? { todo!() + } else { + trace!("nothing to do in SIGNING state") } } } @@ -133,8 +137,15 @@ impl AggregatorRuntime { async fn from_signing_to_idle( &mut self, state: SigningState, + new_beacon: Beacon, ) -> Result { - todo!() + self.runner + .drop_pending_certificate(&state.current_beacon) + .await?; + + Ok(IdleState { + current_beacon: Some(new_beacon), + }) } /// transition /// from IDLE state to SIGNING @@ -442,10 +453,14 @@ mod tests { #[tokio::test] async fn signing_changing_beacon_to_idle() { let mut runner = MockAggregatorRunner::new(); + runner + .expect_is_new_beacon() + .times(1) + .returning(|_| Ok(Some(fake_data::beacon()))); runner .expect_drop_pending_certificate() .times(1) - .returning(|| Ok(())); + .returning(|_| Ok(())); let state = SigningState { // this current beacon must be outdated so the state machine will @@ -463,4 +478,25 @@ mod tests { let _ = runtime.cycle().await.unwrap(); assert_eq!("idle".to_string(), runtime.get_state()); } + + #[tokio::test] + async fn signing_same_beacon_to_signing() { + let mut runner = MockAggregatorRunner::new(); + runner + .expect_is_new_beacon() + .times(1) + .returning(|_| Ok(None)); + runner + .expect_is_multisig_created() + .times(1) + .returning(|| Ok(false)); + let state = SigningState { + current_beacon: fake_data::beacon(), + certificate_pending: fake_data::certificate_pending(), + }; + let mut runtime = init_runtime(Some(AggregatorState::Signing(state)), runner).await; + + let _ = runtime.cycle().await.unwrap(); + assert_eq!("signing".to_string(), runtime.get_state()); + } } diff --git a/mithril-common/src/store/adapter/store_adapter.rs b/mithril-common/src/store/adapter/store_adapter.rs index 51ac5be3d14..16811963dea 100644 --- a/mithril-common/src/store/adapter/store_adapter.rs +++ b/mithril-common/src/store/adapter/store_adapter.rs @@ -35,5 +35,8 @@ pub trait StoreAdapter: Sync + Send { how_many: usize, ) -> Result, AdapterError>; + /// remove values from store + /// + /// if the value exists it is returned by the adapter otherwise None is returned async fn remove(&mut self, key: &Self::Key) -> Result, AdapterError>; } From f3147917230e2f3ec471554a1e1a79f6c86f98b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Mon, 13 Jun 2022 14:48:49 +0200 Subject: [PATCH 09/18] add multisig check --- mithril-aggregator/src/runtime/runner.rs | 17 ++++++++++- mithril-aggregator/src/runtime/runtime.rs | 36 ++++++++++++++++++----- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 0a2b98e60e9..7abd7aa3617 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -98,8 +98,23 @@ impl AggregatorRunnerTrait for AggregatorRunner { } } + /// Is a multisignature ready? + /// returns the multisignature if the signer is ready to sign or None otherwise async fn is_multisig_created(&self) -> Result { - todo!() + trace!("running runner::is_multisig_created"); + let has_multisig = self + .config + .dependencies + .multi_signer + .as_ref() + .ok_or(RuntimeError::General(format!("no multisigner registered")))? + .read() + .await + .get_multi_signature() + .await? + .is_some(); + + Ok(has_multisig) } async fn compute_digest(&self, new_beacon: &Beacon) -> Result { diff --git a/mithril-aggregator/src/runtime/runtime.rs b/mithril-aggregator/src/runtime/runtime.rs index 5127257ef5e..fc69c2aac38 100644 --- a/mithril-aggregator/src/runtime/runtime.rs +++ b/mithril-aggregator/src/runtime/runtime.rs @@ -120,7 +120,7 @@ impl AggregatorRuntime { "new beacon found, immutable file number = {}", beacon.immutable_file_number ); - let new_state = self.from_signing_to_idle(state, beacon).await?; + let new_state = self.from_signing_to_idle_new_beacon(state, beacon).await?; self.state = AggregatorState::Idle(new_state); } else if self.runner.is_multisig_created().await? { todo!() @@ -133,8 +133,9 @@ impl AggregatorRuntime { } /// transition - /// from SIGNIN to IDLE - async fn from_signing_to_idle( + /// + /// from SIGNING to IDLE because NEW BEACON + async fn from_signing_to_idle_new_beacon( &mut self, state: SigningState, new_beacon: Beacon, @@ -148,7 +149,7 @@ impl AggregatorRuntime { }) } /// transition - /// from IDLE state to SIGNING + /// from IDLE state to SIGNING because NEW BEACON async fn from_idle_to_signing( &mut self, new_beacon: Beacon, @@ -402,8 +403,8 @@ mod tests { runner, ) .await; - let _ = runtime.cycle().await.unwrap(); + assert_eq!("idle".to_string(), runtime.get_state()); } @@ -445,8 +446,8 @@ mod tests { runner, ) .await; - let _ = runtime.cycle().await.unwrap(); + assert_eq!("signing".to_string(), runtime.get_state()); } @@ -474,8 +475,8 @@ mod tests { certificate_pending: fake_data::certificate_pending(), }; let mut runtime = init_runtime(Some(AggregatorState::Signing(state)), runner).await; - let _ = runtime.cycle().await.unwrap(); + assert_eq!("idle".to_string(), runtime.get_state()); } @@ -495,8 +496,29 @@ mod tests { certificate_pending: fake_data::certificate_pending(), }; let mut runtime = init_runtime(Some(AggregatorState::Signing(state)), runner).await; + let _ = runtime.cycle().await.unwrap(); + assert_eq!("signing".to_string(), runtime.get_state()); + } + + #[tokio::test] + async fn signing_multisig_ready_to_idle() { + let mut runner = MockAggregatorRunner::new(); + runner + .expect_is_new_beacon() + .times(1) + .returning(|_| Ok(None)); + runner + .expect_is_multisig_created() + .times(1) + .returning(|| Ok(true)); + let state = SigningState { + current_beacon: fake_data::beacon(), + certificate_pending: fake_data::certificate_pending(), + }; + let mut runtime = init_runtime(Some(AggregatorState::Signing(state)), runner).await; let _ = runtime.cycle().await.unwrap(); + assert_eq!("signing".to_string(), runtime.get_state()); } } From 50acf68b5bb1a02104be625ea28960010a58ead5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Mon, 13 Jun 2022 22:05:29 +0200 Subject: [PATCH 10/18] add save snapshot in store --- mithril-aggregator/src/runtime/runner.rs | 147 +++++++++++- mithril-aggregator/src/runtime/runtime.rs | 259 +++++----------------- 2 files changed, 192 insertions(+), 214 deletions(-) diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 7abd7aa3617..771b85798b4 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -1,9 +1,10 @@ use std::path::PathBuf; -use crate::DependencyManager; +use crate::{DependencyManager, SnapshotError, Snapshotter}; use async_trait::async_trait; +use chrono::Utc; use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester}; -use mithril_common::entities::{Beacon, CertificatePending}; +use mithril_common::entities::{Beacon, Certificate, CertificatePending, Snapshot}; #[allow(unused_imports)] use slog_scope::{debug, error, info, trace, warn}; @@ -65,8 +66,24 @@ pub trait AggregatorRunnerTrait: Sync + Send { &self, pending_certificate: CertificatePending, ) -> Result<(), RuntimeError>; - async fn drop_pending_certificate(&self, beacon: &Beacon) -> Result<(), RuntimeError>; + async fn drop_pending_certificate( + &self, + beacon: &Beacon, + ) -> Result; async fn is_multisig_created(&self) -> Result; + async fn create_snapshot_archive(&self) -> Result; + async fn upload_snapshot_archive(&self, path: &PathBuf) -> Result<(), RuntimeError>; + async fn create_and_save_certificate( + &self, + beacon: &Beacon, + certificate_pending: &CertificatePending, + ) -> Result; + async fn save_snapshot( + &self, + certificate: Certificate, + file_path: &PathBuf, + remote_locations: Vec, + ) -> Result<(), RuntimeError>; } pub struct AggregatorRunner { @@ -210,10 +227,13 @@ impl AggregatorRunnerTrait for AggregatorRunner { .map_err(|e| RuntimeError::MultiSigner(e)) } - async fn drop_pending_certificate(&self, beacon: &Beacon) -> Result<(), RuntimeError> { + async fn drop_pending_certificate( + &self, + beacon: &Beacon, + ) -> Result { trace!("drop pending certificate"); - let maybe_pending_certificate = self + let certificate_pending = self .config .dependencies .certificate_pending_store @@ -224,14 +244,117 @@ impl AggregatorRunnerTrait for AggregatorRunner { .write() .await .remove(beacon) - .await?; + .await? + .ok_or(RuntimeError::General( + "no certificate pending for the given beacon".to_string(), + ))?; - if maybe_pending_certificate.is_none() { - warn!( - "no pending certificate for the given beacon {:?} this is not normal", - beacon - ); - } + Ok(certificate_pending) + } + + async fn create_snapshot_archive(&self) -> Result { + trace!("create snapshot archive"); + + let snapshotter = Snapshotter::new( + self.config.db_directory.clone(), + self.config.snapshot_directory.clone(), + ); + let message = self + .config + .dependencies + .multi_signer + .as_ref() + .ok_or(RuntimeError::General(format!("no multisigner registered")))? + .read() + .await + .get_current_message() + .await + .ok_or(RuntimeError::General("no message found".to_string()))?; + let snapshot_name = format!( + "{}.{}.tar.gz", + self.config.network, + std::str::from_utf8(&message).map_err(|e| RuntimeError::General(e.to_string()))? + ); + // spawn a separate thread to prevent blocking + let snapshot_path = + tokio::task::spawn_blocking(move || -> Result { + snapshotter.snapshot(&snapshot_name) + }) + .await + .map_err(|e| RuntimeError::General(e.to_string()))??; + + debug!("snapshot created at '{}'", snapshot_path.to_string_lossy()); + + Ok(snapshot_path) + } + + async fn create_and_save_certificate( + &self, + beacon: &Beacon, + certificate_pending: &CertificatePending, + ) -> Result { + trace!("create and save certificate"); + let multisigner = self + .config + .dependencies + .multi_signer + .as_ref() + .ok_or(RuntimeError::General(format!("no multisigner registered")))? + .read() + .await; + let certificate = multisigner + .create_certificate(beacon.clone(), certificate_pending.previous_hash.clone()) + .await? + .ok_or(RuntimeError::General(format!("no certificate generated")))?; + let _ = self + .config + .dependencies + .certificate_store + .as_ref() + .ok_or(RuntimeError::General(format!( + "no certificate store registered" + )))? + .write() + .await + .save(certificate.clone()) + .await; + + Ok(certificate) + } + + async fn upload_snapshot_archive(&self, path: &PathBuf) -> Result<(), RuntimeError> { + trace!("upload snapshot archive"); + todo!() + } + + async fn save_snapshot( + &self, + certificate: Certificate, + file_path: &PathBuf, + remote_locations: Vec, + ) -> Result<(), RuntimeError> { + let snapshot = Snapshot::new( + certificate.digest, + certificate.hash, + std::fs::metadata(file_path) + .map_err(|e| RuntimeError::General(e.to_string()))? + .len(), + format!("{:?}", Utc::now()), + remote_locations, + ); + + let _ = self + .config + .dependencies + .snapshot_store + .as_ref() + .ok_or(RuntimeError::General(format!( + "no snapshot store registered" + )))? + .write() + .await + .add_snapshot(snapshot) + .await?; Ok(()) } diff --git a/mithril-aggregator/src/runtime/runtime.rs b/mithril-aggregator/src/runtime/runtime.rs index fc69c2aac38..bb2fbf0eb9b 100644 --- a/mithril-aggregator/src/runtime/runtime.rs +++ b/mithril-aggregator/src/runtime/runtime.rs @@ -1,7 +1,7 @@ -use mithril_common::entities::{Beacon, Certificate, CertificatePending}; - use super::{AggregatorRunnerTrait, RuntimeError}; -use slog_scope::{debug, error, info, trace, warn}; + +use mithril_common::entities::{Beacon, CertificatePending}; +use slog_scope::{debug, error, info, trace}; use std::fmt::Display; use std::sync::Arc; use tokio::time::{sleep, Duration}; @@ -123,7 +123,9 @@ impl AggregatorRuntime { let new_state = self.from_signing_to_idle_new_beacon(state, beacon).await?; self.state = AggregatorState::Idle(new_state); } else if self.runner.is_multisig_created().await? { - todo!() + trace!("new multisignature found"); + let new_state = self.from_signing_to_idle_multisignature(state).await?; + self.state = AggregatorState::Idle(new_state); } else { trace!("nothing to do in SIGNING state") } @@ -132,6 +134,32 @@ impl AggregatorRuntime { Ok(()) } + /// transition + /// + /// from SIGNING to IDLE because NEW MULTISIGNATURE + async fn from_signing_to_idle_multisignature( + &mut self, + state: SigningState, + ) -> Result { + let certificate_pending = self + .runner + .drop_pending_certificate(&state.current_beacon) + .await?; + let path = self.runner.create_snapshot_archive().await?; + let _ = self.runner.upload_snapshot_archive(&path).await?; + let certificate = self + .runner + .create_and_save_certificate(&state.current_beacon, &certificate_pending) + .await?; + let _ = self + .runner + .save_snapshot(certificate, &path, Vec::new()) + .await?; + + Ok(IdleState { + current_beacon: Some(state.current_beacon), + }) + } /// transition /// /// from SIGNING to IDLE because NEW BEACON @@ -176,204 +204,10 @@ impl AggregatorRuntime { Ok(state) } } -/// AggregatorRuntime factory -// TODO: Fix this by implementing an Aggregator Config that implements the From trait for a general Config -/* - pub fn new( - interval: u32, - network: String, - db_directory: PathBuf, - snapshot_directory: PathBuf, - beacon_store: BeaconStoreWrapper, - multi_signer: MultiSignerWrapper, - snapshot_store: SnapshotStoreWrapper, - snapshot_uploader: Box, - certificate_pending_store: CertificatePendingStoreWrapper, - certificate_store: CertificateStoreWrapper, - ) -> Self { - Self { - interval, - network, - db_directory, - snapshot_directory, - beacon_store, - multi_signer, - snapshot_store, - snapshot_uploader, - certificate_pending_store, - certificate_store, - } - } - /// Run snapshotter loop - pub async fn run(&self) { - info!("Starting runtime"); - - loop { - if let Err(e) = self.do_work().await { - error!("{:?}", e) - } - - info!("Sleeping for {}", self.interval); - sleep(Duration::from_millis(self.interval.into())).await; - } - } - - async fn do_work(&self) -> Result<(), RuntimeError> { - let snapshotter = - Snapshotter::new(self.db_directory.clone(), self.snapshot_directory.clone()); - let digester = ImmutableDigester::new(self.db_directory.clone(), slog_scope::logger()); - - info!("Computing digest"; "db_directory" => self.db_directory.display()); - let digest_result = tokio::task::spawn_blocking(move || digester.compute_digest()) - .await - .map_err(|e| RuntimeError::General(e.to_string()))?; - match digest_result { - Ok(digest_result) => { - let mut beacon = fake_data::beacon(); - beacon.immutable_file_number = digest_result.last_immutable_file_number; - let message = &digest_result.digest.clone().into_bytes(); - - match self.manage_trigger_snapshot(message, &beacon).await { - Ok(Some(certificate)) => { - info!( - "Snapshotting immutables up to `{}` in an archive", - &beacon.immutable_file_number - ); - - let snapshot_name = - format!("{}.{}.tar.gz", self.network, &digest_result.digest); - - let snapshot_path = tokio::task::spawn_blocking( - move || -> Result { - snapshotter.snapshot(&snapshot_name) - }, - ) - .await - .map_err(|e| RuntimeError::General(e.to_string()))??; - - info!("Uploading snapshot archive"); - let uploaded_snapshot_location = self - .snapshot_uploader - .upload_snapshot(&snapshot_path) - .await - .map_err(RuntimeError::SnapshotUploader)?; - - info!( - "Snapshot archive uploaded, location: `{}`", - &uploaded_snapshot_location - ); - - let new_snapshot = build_new_snapshot( - digest_result.digest, - certificate.hash.to_owned(), - &snapshot_path, - uploaded_snapshot_location, - )?; - - info!("Storing snapshot data"; "snapshot" => format!("{:?}", new_snapshot)); - let mut snapshot_store = self.snapshot_store.write().await; - snapshot_store.add_snapshot(new_snapshot).await?; - - info!("Storing certificate data"; "certificate" => format!("{:?}", certificate)); - let mut certificate_store = self.certificate_store.write().await; - certificate_store - .save(certificate) - .await - .map_err(|e| RuntimeError::CertificateStore(e.to_string()))?; - - Ok(()) - } - Ok(None) => Ok(()), - Err(err) => Err(err), - } - } - Err(err) => { - let mut beacon_store = self.beacon_store.write().await; - beacon_store.reset_current_beacon().await?; - Err(RuntimeError::Digester(err)) - } - } - } - - async fn manage_trigger_snapshot( - &self, - message: &Bytes, - beacon: &Beacon, - ) -> Result, RuntimeError> { - let mut multi_signer = self.multi_signer.write().await; - match multi_signer.get_multi_signature().await { - Ok(None) => { - { - let mut beacon_store = self.beacon_store.write().await; - beacon_store.set_current_beacon(beacon.clone()).await?; - } - multi_signer - .update_current_message(message.to_owned()) - .await?; - match multi_signer.create_multi_signature().await { - Ok(Some(_)) => { - let message = multi_signer - .get_current_message() - .await - .unwrap() - .encode_hex::(); - debug!( - "A multi signature has been created for message: {}", - message - ); - let previous_hash = "".to_string(); - Ok(multi_signer - .create_certificate(beacon.clone(), previous_hash) - .await?) - } - Ok(None) => { - warn!("Not ready to create a multi signature: quorum is not reached yet"); - Ok(None) - } - Err(e) => { - warn!("Error while creating a multi signature: {}", e); - Err(RuntimeError::MultiSigner(e)) - } - } - } - Ok(_) => { - let mut beacon_store = self.beacon_store.write().await; - beacon_store.reset_current_beacon().await?; - Ok(None) - } - Err(err) => { - let mut beacon_store = self.beacon_store.write().await; - beacon_store.reset_current_beacon().await?; - Err(RuntimeError::MultiSigner(err)) - } - } - } -} - -fn build_new_snapshot( - digest: String, - certificate_hash: String, - snapshot_filepath: &Path, - uploaded_snapshot_location: SnapshotLocation, -) -> Result { - let timestamp: DateTime = Utc::now(); - let created_at = format!("{:?}", timestamp); - - let mut tar_gz = File::open(&snapshot_filepath)?; - let size: u64 = tar_gz.seek(SeekFrom::End(0))?; - - Ok(Snapshot::new( - digest, - certificate_hash, - size, - created_at, - vec![uploaded_snapshot_location], - )) -} - */ - #[cfg(test)] mod tests { + use std::path::PathBuf; + use super::super::runner::MockAggregatorRunner; use super::*; use mithril_common::digesters::DigesterResult; @@ -461,7 +295,7 @@ mod tests { runner .expect_drop_pending_certificate() .times(1) - .returning(|_| Ok(())); + .returning(|_| Ok(fake_data::certificate_pending())); let state = SigningState { // this current beacon must be outdated so the state machine will @@ -508,10 +342,31 @@ mod tests { .expect_is_new_beacon() .times(1) .returning(|_| Ok(None)); + runner + .expect_drop_pending_certificate() + .times(1) + .returning(|_| Ok(fake_data::certificate_pending())); runner .expect_is_multisig_created() .times(1) .returning(|| Ok(true)); + runner + .expect_create_snapshot_archive() + .times(1) + .returning(|| Ok(PathBuf::new().join("/tmp/archive.zip"))); + runner + .expect_upload_snapshot_archive() + .times(1) + .returning(|_path| Ok(())); + runner + .expect_create_and_save_certificate() + .times(1) + .returning(|_, _| Ok(fake_data::certificate("whatever".to_string()))); + runner + .expect_save_snapshot() + .times(1) + .returning(|_, _, _| Ok(())); + let state = SigningState { current_beacon: fake_data::beacon(), certificate_pending: fake_data::certificate_pending(), @@ -519,6 +374,6 @@ mod tests { let mut runtime = init_runtime(Some(AggregatorState::Signing(state)), runner).await; let _ = runtime.cycle().await.unwrap(); - assert_eq!("signing".to_string(), runtime.get_state()); + assert_eq!("idle".to_string(), runtime.get_state()); } } From ec0da01e57b10760cdc7f2cea57a994b9f61b816 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Tue, 14 Jun 2022 12:12:39 +0200 Subject: [PATCH 11/18] create multisignature --- mithril-aggregator/src/runtime/runner.rs | 64 +++++++++++------------ mithril-aggregator/src/runtime/runtime.rs | 38 ++++++++------ 2 files changed, 51 insertions(+), 51 deletions(-) diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 771b85798b4..a9699904aa8 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -72,18 +72,18 @@ pub trait AggregatorRunnerTrait: Sync + Send { ) -> Result; async fn is_multisig_created(&self) -> Result; async fn create_snapshot_archive(&self) -> Result; - async fn upload_snapshot_archive(&self, path: &PathBuf) -> Result<(), RuntimeError>; + async fn upload_snapshot_archive(&self, path: &Path) -> Result, RuntimeError>; async fn create_and_save_certificate( &self, beacon: &Beacon, certificate_pending: &CertificatePending, ) -> Result; - async fn save_snapshot( + async fn create_and_save_snapshot( &self, certificate: Certificate, - file_path: &PathBuf, + file_path: &Path, remote_locations: Vec, - ) -> Result<(), RuntimeError>; + ) -> Result; } pub struct AggregatorRunner { @@ -124,10 +124,10 @@ impl AggregatorRunnerTrait for AggregatorRunner { .dependencies .multi_signer .as_ref() - .ok_or(RuntimeError::General(format!("no multisigner registered")))? - .read() + .ok_or_else(|| RuntimeError::General("no multisigner registered".to_string()))? + .write() .await - .get_multi_signature() + .create_multi_signature() .await? .is_some(); @@ -171,7 +171,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { .dependencies .multi_signer .as_ref() - .ok_or(RuntimeError::General(format!("no multisigner registered")))? + .ok_or_else(|| RuntimeError::General("no multisigner registered".to_string()))? .read() .await; @@ -182,7 +182,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { multi_signer .get_protocol_parameters() .await - .ok_or_else(|| RuntimeError::General(format!("no protocol parameters")))? + .ok_or_else(|| RuntimeError::General("no protocol parameters".to_string()))? .into(), "123".to_string(), multi_signer.get_signers().await?, @@ -201,7 +201,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { .dependencies .certificate_pending_store .as_ref() - .ok_or(RuntimeError::General(format!("no multisigner registered")))? + .ok_or_else(|| RuntimeError::General("no multisigner registered".to_string()))? .write() .await .save(pending_certificate) @@ -219,12 +219,12 @@ impl AggregatorRunnerTrait for AggregatorRunner { .dependencies .multi_signer .as_ref() - .ok_or(RuntimeError::General(format!("no multisigner registered")))? + .ok_or_else(|| RuntimeError::General("no multisigner registered".to_string()))? .write() .await .update_current_message(digest_result.digest.into_bytes()) .await - .map_err(|e| RuntimeError::MultiSigner(e)) + .map_err(RuntimeError::MultiSigner) } async fn drop_pending_certificate( @@ -238,16 +238,16 @@ impl AggregatorRunnerTrait for AggregatorRunner { .dependencies .certificate_pending_store .as_ref() - .ok_or(RuntimeError::General(format!( - "no certificate pending store registered" - )))? + .ok_or_else(|| { + RuntimeError::General("no certificate pending store registered".to_string()) + })? .write() .await .remove(beacon) .await? - .ok_or(RuntimeError::General( - "no certificate pending for the given beacon".to_string(), - ))?; + .ok_or_else(|| { + RuntimeError::General("no certificate pending for the given beacon".to_string()) + })?; Ok(certificate_pending) } @@ -264,12 +264,12 @@ impl AggregatorRunnerTrait for AggregatorRunner { .dependencies .multi_signer .as_ref() - .ok_or(RuntimeError::General(format!("no multisigner registered")))? + .ok_or_else(|| RuntimeError::General("no multisigner registered".to_string()))? .read() .await .get_current_message() .await - .ok_or(RuntimeError::General("no message found".to_string()))?; + .ok_or_else(|| RuntimeError::General("no message found".to_string()))?; let snapshot_name = format!( "{}.{}.tar.gz", self.config.network, @@ -299,21 +299,19 @@ impl AggregatorRunnerTrait for AggregatorRunner { .dependencies .multi_signer .as_ref() - .ok_or(RuntimeError::General(format!("no multisigner registered")))? + .ok_or_else(|| RuntimeError::General("no multisigner registered".to_string()))? .read() .await; let certificate = multisigner .create_certificate(beacon.clone(), certificate_pending.previous_hash.clone()) .await? - .ok_or(RuntimeError::General(format!("no certificate generated")))?; + .ok_or_else(|| RuntimeError::General("no certificate generated".to_string()))?; let _ = self .config .dependencies .certificate_store .as_ref() - .ok_or(RuntimeError::General(format!( - "no certificate store registered" - )))? + .ok_or_else(|| RuntimeError::General("no certificate store registered".to_string()))? .write() .await .save(certificate.clone()) @@ -322,17 +320,17 @@ impl AggregatorRunnerTrait for AggregatorRunner { Ok(certificate) } - async fn upload_snapshot_archive(&self, path: &PathBuf) -> Result<(), RuntimeError> { + async fn upload_snapshot_archive(&self, path: &Path) -> Result, RuntimeError> { trace!("upload snapshot archive"); todo!() } - async fn save_snapshot( + async fn create_and_save_snapshot( &self, certificate: Certificate, - file_path: &PathBuf, + file_path: &Path, remote_locations: Vec, - ) -> Result<(), RuntimeError> { + ) -> Result { let snapshot = Snapshot::new( certificate.digest, certificate.hash, @@ -348,14 +346,12 @@ impl AggregatorRunnerTrait for AggregatorRunner { .dependencies .snapshot_store .as_ref() - .ok_or(RuntimeError::General(format!( - "no snapshot store registered" - )))? + .ok_or_else(|| RuntimeError::General("no snapshot store registered".to_string()))? .write() .await - .add_snapshot(snapshot) + .add_snapshot(snapshot.clone()) .await?; - Ok(()) + Ok(snapshot) } } diff --git a/mithril-aggregator/src/runtime/runtime.rs b/mithril-aggregator/src/runtime/runtime.rs index bb2fbf0eb9b..4fba2bd207b 100644 --- a/mithril-aggregator/src/runtime/runtime.rs +++ b/mithril-aggregator/src/runtime/runtime.rs @@ -102,7 +102,7 @@ impl AggregatorRuntime { "new beacon found, immutable file number = {}", beacon.immutable_file_number ); - let new_state = self.from_idle_to_signing(beacon).await?; + let new_state = self.transition_from_idle_to_signing(beacon).await?; self.state = AggregatorState::Signing(new_state); } else { trace!("nothing to do in IDLE state") @@ -120,11 +120,15 @@ impl AggregatorRuntime { "new beacon found, immutable file number = {}", beacon.immutable_file_number ); - let new_state = self.from_signing_to_idle_new_beacon(state, beacon).await?; + let new_state = self + .transition_from_signing_to_idle_new_beacon(state, beacon) + .await?; self.state = AggregatorState::Idle(new_state); } else if self.runner.is_multisig_created().await? { trace!("new multisignature found"); - let new_state = self.from_signing_to_idle_multisignature(state).await?; + let new_state = self + .transition_from_signing_to_idle_multisignature(state) + .await?; self.state = AggregatorState::Idle(new_state); } else { trace!("nothing to do in SIGNING state") @@ -137,8 +141,8 @@ impl AggregatorRuntime { /// transition /// /// from SIGNING to IDLE because NEW MULTISIGNATURE - async fn from_signing_to_idle_multisignature( - &mut self, + async fn transition_from_signing_to_idle_multisignature( + &self, state: SigningState, ) -> Result { let certificate_pending = self @@ -153,7 +157,7 @@ impl AggregatorRuntime { .await?; let _ = self .runner - .save_snapshot(certificate, &path, Vec::new()) + .create_and_save_snapshot(certificate, &path, Vec::new()) .await?; Ok(IdleState { @@ -163,8 +167,8 @@ impl AggregatorRuntime { /// transition /// /// from SIGNING to IDLE because NEW BEACON - async fn from_signing_to_idle_new_beacon( - &mut self, + async fn transition_from_signing_to_idle_new_beacon( + &self, state: SigningState, new_beacon: Beacon, ) -> Result { @@ -178,8 +182,8 @@ impl AggregatorRuntime { } /// transition /// from IDLE state to SIGNING because NEW BEACON - async fn from_idle_to_signing( - &mut self, + async fn transition_from_idle_to_signing( + &self, new_beacon: Beacon, ) -> Result { debug!("launching transition from IDLE to SIGNING state"); @@ -342,14 +346,14 @@ mod tests { .expect_is_new_beacon() .times(1) .returning(|_| Ok(None)); - runner - .expect_drop_pending_certificate() - .times(1) - .returning(|_| Ok(fake_data::certificate_pending())); runner .expect_is_multisig_created() .times(1) .returning(|| Ok(true)); + runner + .expect_drop_pending_certificate() + .times(1) + .returning(|_| Ok(fake_data::certificate_pending())); runner .expect_create_snapshot_archive() .times(1) @@ -357,15 +361,15 @@ mod tests { runner .expect_upload_snapshot_archive() .times(1) - .returning(|_path| Ok(())); + .returning(|_path| Ok(vec!["locA".to_string(), "locB".to_string()])); runner .expect_create_and_save_certificate() .times(1) .returning(|_, _| Ok(fake_data::certificate("whatever".to_string()))); runner - .expect_save_snapshot() + .expect_create_and_save_snapshot() .times(1) - .returning(|_, _, _| Ok(())); + .returning(|_, _, _| Ok(fake_data::snapshots(1)[0].clone())); let state = SigningState { current_beacon: fake_data::beacon(), From 2bbcc5a8b1d65287c5109503f26e83d344b789e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Tue, 14 Jun 2022 14:02:16 +0200 Subject: [PATCH 12/18] use complete state machine --- mithril-aggregator/src/dependency.rs | 15 +++++++++++ mithril-aggregator/src/entities.rs | 15 +++++------ mithril-aggregator/src/main.rs | 7 ++--- mithril-aggregator/src/runtime/mod.rs | 4 +-- mithril-aggregator/src/runtime/runner.rs | 27 ++++++++++++++++--- .../runtime/{runtime.rs => state_machine.rs} | 0 6 files changed, 52 insertions(+), 16 deletions(-) rename mithril-aggregator/src/runtime/{runtime.rs => state_machine.rs} (100%) diff --git a/mithril-aggregator/src/dependency.rs b/mithril-aggregator/src/dependency.rs index 7247600f303..ac9ca35868d 100644 --- a/mithril-aggregator/src/dependency.rs +++ b/mithril-aggregator/src/dependency.rs @@ -7,6 +7,7 @@ use super::entities::*; use super::multi_signer::MultiSigner; use super::snapshot_stores::SnapshotStore; use crate::beacon_store::BeaconStore; +use crate::snapshot_uploaders::SnapshotUploader; use crate::{CertificatePendingStore, CertificateStore, VerificationKeyStore}; /// BeaconStoreWrapper wraps a BeaconStore @@ -30,10 +31,14 @@ pub type VerificationKeyStoreWrapper = Arc>; /// StakeStoreWrapper wraps a StakeStore pub type StakeStoreWrapper = Arc>; +/// StakeStoreWrapper wraps a StakeStore +pub type SnapshotUploaderWrapper = Arc>; + /// DependencyManager handles the dependencies pub struct DependencyManager { pub config: Config, pub snapshot_store: Option, + pub snapshot_uploader: Option, pub multi_signer: Option, pub beacon_store: Option, pub certificate_pending_store: Option, @@ -48,6 +53,7 @@ impl DependencyManager { Self { config, snapshot_store: None, + snapshot_uploader: None, multi_signer: None, beacon_store: None, certificate_pending_store: None, @@ -63,6 +69,15 @@ impl DependencyManager { self } + /// With SnapshotUploader middleware + pub fn with_snapshot_uploader( + &mut self, + snapshot_uploader: SnapshotUploaderWrapper, + ) -> &mut Self { + self.snapshot_uploader = Some(snapshot_uploader); + self + } + /// With MultiSigner middleware pub fn with_multi_signer(&mut self, multi_signer: MultiSignerWrapper) -> &mut Self { self.multi_signer = Some(multi_signer); diff --git a/mithril-aggregator/src/entities.rs b/mithril-aggregator/src/entities.rs index 56d9f9f83a3..e4c017bfca7 100644 --- a/mithril-aggregator/src/entities.rs +++ b/mithril-aggregator/src/entities.rs @@ -1,6 +1,5 @@ -use crate::dependency::SnapshotStoreWrapper; +use crate::dependency::{SnapshotStoreWrapper, SnapshotUploaderWrapper}; use crate::snapshot_stores::LocalSnapshotStore; -use crate::snapshot_uploaders::SnapshotUploader; use crate::tools::GcpFileUploader; use crate::{LocalSnapshotUploader, RemoteSnapshotStore, RemoteSnapshotUploader}; use serde::{Deserialize, Serialize}; @@ -70,14 +69,14 @@ impl Config { } } - pub fn build_snapshot_uploader(&self) -> Box { + pub fn build_snapshot_uploader(&self) -> SnapshotUploaderWrapper { match self.snapshot_uploader_type { - SnapshotUploaderType::Gcp => Box::new(RemoteSnapshotUploader::new(Box::new( - GcpFileUploader::default(), + SnapshotUploaderType::Gcp => Arc::new(RwLock::new(RemoteSnapshotUploader::new( + Box::new(GcpFileUploader::default()), + ))), + SnapshotUploaderType::Local => Arc::new(RwLock::new(LocalSnapshotUploader::new( + self.server_url.clone(), ))), - SnapshotUploaderType::Local => { - Box::new(LocalSnapshotUploader::new(self.server_url.clone())) - } } } } diff --git a/mithril-aggregator/src/main.rs b/mithril-aggregator/src/main.rs index 31217cd6912..ee03badcf25 100644 --- a/mithril-aggregator/src/main.rs +++ b/mithril-aggregator/src/main.rs @@ -4,9 +4,9 @@ use clap::Parser; use config::{Map, Source, Value, ValueKind}; use mithril_aggregator::{ - AggregatorConfig, AggregatorRuntime, BeaconStore, CertificatePendingStore, CertificateStore, - Config, DependencyManager, MemoryBeaconStore, MultiSigner, MultiSignerImpl, Server, - VerificationKeyStore, + AggregatorConfig, AggregatorRunner, AggregatorRuntime, BeaconStore, CertificatePendingStore, + CertificateStore, Config, DependencyManager, MemoryBeaconStore, MultiSigner, MultiSignerImpl, + Server, VerificationKeyStore, }; use mithril_common::crypto_helper::ProtocolStakeDistribution; use mithril_common::fake_data; @@ -156,6 +156,7 @@ async fn main() -> Result<(), Box> { let mut dependency_manager = DependencyManager::new(config.clone()); dependency_manager .with_snapshot_store(snapshot_store.clone()) + .with_snapshot_uploader(snapshot_uploader.clone()) .with_multi_signer(multi_signer.clone()) .with_beacon_store(beacon_store.clone()) .with_certificate_pending_store(certificate_pending_store.clone()) diff --git a/mithril-aggregator/src/runtime/mod.rs b/mithril-aggregator/src/runtime/mod.rs index 11de682740c..5bcad8c4ed5 100644 --- a/mithril-aggregator/src/runtime/mod.rs +++ b/mithril-aggregator/src/runtime/mod.rs @@ -1,7 +1,7 @@ mod error; mod runner; -mod runtime; +mod state_machine; pub use error::RuntimeError; pub use runner::{AggregatorConfig, AggregatorRunner, AggregatorRunnerTrait}; -pub use runtime::*; +pub use state_machine::*; diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index a9699904aa8..728288d710e 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; +use crate::snapshot_uploaders::SnapshotLocation; use crate::{DependencyManager, SnapshotError, Snapshotter}; use async_trait::async_trait; use chrono::Utc; @@ -72,7 +73,10 @@ pub trait AggregatorRunnerTrait: Sync + Send { ) -> Result; async fn is_multisig_created(&self) -> Result; async fn create_snapshot_archive(&self) -> Result; - async fn upload_snapshot_archive(&self, path: &Path) -> Result, RuntimeError>; + async fn upload_snapshot_archive( + &self, + path: &Path, + ) -> Result, RuntimeError>; async fn create_and_save_certificate( &self, beacon: &Beacon, @@ -320,9 +324,26 @@ impl AggregatorRunnerTrait for AggregatorRunner { Ok(certificate) } - async fn upload_snapshot_archive(&self, path: &Path) -> Result, RuntimeError> { + async fn upload_snapshot_archive( + &self, + path: &Path, + ) -> Result, RuntimeError> { trace!("upload snapshot archive"); - todo!() + let location = self + .config + .dependencies + .snapshot_uploader + .as_ref() + .ok_or_else(|| { + RuntimeError::SnapshotUploader("no snapshot uploader registered".to_string()) + })? + .read() + .await + .upload_snapshot(path) + .await + .map_err(RuntimeError::SnapshotUploader)?; + + Ok(vec![location]) } async fn create_and_save_snapshot( diff --git a/mithril-aggregator/src/runtime/runtime.rs b/mithril-aggregator/src/runtime/state_machine.rs similarity index 100% rename from mithril-aggregator/src/runtime/runtime.rs rename to mithril-aggregator/src/runtime/state_machine.rs From 5128d9064cb84c0613e080af9b83c906719ca3a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Tue, 14 Jun 2022 17:03:38 +0200 Subject: [PATCH 13/18] clean code and add logs --- mithril-aggregator/src/runtime/error.rs | 5 +- mithril-aggregator/src/runtime/runner.rs | 48 +++++++++++++------ .../src/runtime/state_machine.rs | 6 +++ 3 files changed, 44 insertions(+), 15 deletions(-) diff --git a/mithril-aggregator/src/runtime/error.rs b/mithril-aggregator/src/runtime/error.rs index 3fd07529ebb..12273eb6d51 100644 --- a/mithril-aggregator/src/runtime/error.rs +++ b/mithril-aggregator/src/runtime/error.rs @@ -2,7 +2,7 @@ use crate::snapshot_stores::SnapshotStoreError; use crate::store::StoreError; use crate::{BeaconStoreError, ProtocolError, SnapshotError}; -use mithril_common::digesters::DigesterError; +use mithril_common::digesters::{DigesterError, ImmutableFileListingError}; use std::io; use thiserror::Error; @@ -32,6 +32,9 @@ pub enum RuntimeError { #[error("snapshot build error")] SnapshotBuild(#[from] io::Error), + #[error("immutable file scanning error")] + ImmutableFileError(#[from] ImmutableFileListingError), + #[error("general error")] General(String), } diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 728288d710e..c470cfa1d14 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -4,7 +4,7 @@ use crate::snapshot_uploaders::SnapshotLocation; use crate::{DependencyManager, SnapshotError, Snapshotter}; use async_trait::async_trait; use chrono::Utc; -use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester}; +use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester, ImmutableFile}; use mithril_common::entities::{Beacon, Certificate, CertificatePending, Snapshot}; #[allow(unused_imports)] @@ -110,11 +110,26 @@ impl AggregatorRunnerTrait for AggregatorRunner { maybe_beacon: Option, ) -> Result, RuntimeError> { info!("checking if there is a new beacon"); - warn!("using fake data for the new beacon"); - let current_beacon = mithril_common::fake_data::beacon(); + debug!( + "checking immutables in directory {}", + self.config.db_directory.to_string_lossy() + ); + let db_path: &Path = self.config.db_directory.as_path(); + let immutable_file_number = ImmutableFile::list_completed_in_dir(db_path) + .map_err(RuntimeError::ImmutableFileError)? + .into_iter() + .last() + .ok_or_else(|| RuntimeError::General("no last immutable file".to_string()))? + .number; + let current_beacon = Beacon { + network: self.config.network.clone(), + epoch: 0, + immutable_file_number, + }; match maybe_beacon { Some(beacon) if current_beacon > beacon => Ok(Some(current_beacon)), + None => Ok(Some(current_beacon)), _ => Ok(None), } } @@ -122,7 +137,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { /// Is a multisignature ready? /// returns the multisignature if the signer is ready to sign or None otherwise async fn is_multisig_created(&self) -> Result { - trace!("running runner::is_multisig_created"); + info!("check if we can create a multisignature"); let has_multisig = self .config .dependencies @@ -135,25 +150,30 @@ impl AggregatorRunnerTrait for AggregatorRunner { .await? .is_some(); + if has_multisig { + debug!("new MULTISIG created"); + } else { + info!("no multisig created"); + } Ok(has_multisig) } async fn compute_digest(&self, new_beacon: &Beacon) -> Result { - trace!("running runner::compute_digester"); + info!("running runner::compute_digester"); let digester = ImmutableDigester::new(self.config.db_directory.clone(), slog_scope::logger()); - info!("Computing digest"; "db_directory" => self.config.db_directory.display()); + debug!("computing digest"; "db_directory" => self.config.db_directory.display()); // digest is done in a separate thread because it is blocking the whole task debug!("launching digester thread"); let digest_result = tokio::task::spawn_blocking(move || digester.compute_digest()) .await .map_err(|e| RuntimeError::General(e.to_string()))??; - debug!( "last immutable file number: {}", digest_result.last_immutable_file_number ); + if digest_result.last_immutable_file_number != new_beacon.immutable_file_number { error!("digest beacon is different than the given beacon"); Err(RuntimeError::General( @@ -169,7 +189,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { &self, beacon: Beacon, ) -> Result { - trace!("running runner::create_pending_certificate"); + info!("running runner::create_pending_certificate"); let multi_signer = self .config .dependencies @@ -199,7 +219,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { &self, pending_certificate: CertificatePending, ) -> Result<(), RuntimeError> { - trace!("saving pending certificate"); + info!("saving pending certificate"); self.config .dependencies @@ -217,7 +237,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { &self, digest_result: DigesterResult, ) -> Result<(), RuntimeError> { - trace!("update message in multisigner"); + info!("update message in multisigner"); self.config .dependencies @@ -235,7 +255,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { &self, beacon: &Beacon, ) -> Result { - trace!("drop pending certificate"); + info!("drop pending certificate"); let certificate_pending = self .config @@ -257,7 +277,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { } async fn create_snapshot_archive(&self) -> Result { - trace!("create snapshot archive"); + info!("create snapshot archive"); let snapshotter = Snapshotter::new( self.config.db_directory.clone(), @@ -297,7 +317,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { beacon: &Beacon, certificate_pending: &CertificatePending, ) -> Result { - trace!("create and save certificate"); + info!("create and save certificate"); let multisigner = self .config .dependencies @@ -328,7 +348,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { &self, path: &Path, ) -> Result, RuntimeError> { - trace!("upload snapshot archive"); + info!("upload snapshot archive"); let location = self .config .dependencies diff --git a/mithril-aggregator/src/runtime/state_machine.rs b/mithril-aggregator/src/runtime/state_machine.rs index 4fba2bd207b..cf7e9577584 100644 --- a/mithril-aggregator/src/runtime/state_machine.rs +++ b/mithril-aggregator/src/runtime/state_machine.rs @@ -73,6 +73,9 @@ impl AggregatorRuntime { }) } + /// run + /// + /// launches an infinite loop ticking the state machine pub async fn run(&mut self) { info!("Starting runtime"); debug!("current state: {}", self.state); @@ -87,6 +90,9 @@ impl AggregatorRuntime { } } + /// cycle + /// + /// one tick of the state machine pub async fn cycle(&mut self) -> Result<(), RuntimeError> { info!("new cycle"); match self.state.clone() { From 4c18c3a5663bd2c56d89b921707125478bcc6897 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Wed, 15 Jun 2022 09:50:55 +0200 Subject: [PATCH 14/18] fix coding style --- mithril-aggregator/src/runtime/runner.rs | 19 +++++++++++++++---- .../src/runtime/state_machine.rs | 4 ++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index c470cfa1d14..e21af3835c9 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -7,7 +7,6 @@ use chrono::Utc; use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester, ImmutableFile}; use mithril_common::entities::{Beacon, Certificate, CertificatePending, Snapshot}; -#[allow(unused_imports)] use slog_scope::{debug, error, info, trace, warn}; use std::path::Path; use std::sync::Arc; @@ -54,34 +53,44 @@ impl AggregatorConfig { pub trait AggregatorRunnerTrait: Sync + Send { /// Return the current beacon if it is newer than the given one. async fn is_new_beacon(&self, beacon: Option) -> Result, RuntimeError>; + async fn compute_digest(&self, new_beacon: &Beacon) -> Result; + async fn update_message_in_multisigner( &self, digest_result: DigesterResult, ) -> Result<(), RuntimeError>; + async fn create_new_pending_certificate_from_multisigner( &self, beacon: Beacon, ) -> Result; + async fn save_pending_certificate( &self, pending_certificate: CertificatePending, ) -> Result<(), RuntimeError>; + async fn drop_pending_certificate( &self, beacon: &Beacon, ) -> Result; + async fn is_multisig_created(&self) -> Result; + async fn create_snapshot_archive(&self) -> Result; + async fn upload_snapshot_archive( &self, path: &Path, ) -> Result, RuntimeError>; + async fn create_and_save_certificate( &self, beacon: &Beacon, certificate_pending: &CertificatePending, ) -> Result; + async fn create_and_save_snapshot( &self, certificate: Certificate, @@ -119,7 +128,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { .map_err(RuntimeError::ImmutableFileError)? .into_iter() .last() - .ok_or_else(|| RuntimeError::General("no last immutable file".to_string()))? + .ok_or_else(|| RuntimeError::General("no immutable file was returned".to_string()))? .number; let current_beacon = Beacon { network: self.config.network.clone(), @@ -225,7 +234,9 @@ impl AggregatorRunnerTrait for AggregatorRunner { .dependencies .certificate_pending_store .as_ref() - .ok_or_else(|| RuntimeError::General("no multisigner registered".to_string()))? + .ok_or_else(|| { + RuntimeError::General("no certificate pending store registered".to_string()) + })? .write() .await .save(pending_certificate) @@ -339,7 +350,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { .write() .await .save(certificate.clone()) - .await; + .await?; Ok(certificate) } diff --git a/mithril-aggregator/src/runtime/state_machine.rs b/mithril-aggregator/src/runtime/state_machine.rs index cf7e9577584..47abed66ef1 100644 --- a/mithril-aggregator/src/runtime/state_machine.rs +++ b/mithril-aggregator/src/runtime/state_machine.rs @@ -170,6 +170,7 @@ impl AggregatorRuntime { current_beacon: Some(state.current_beacon), }) } + /// transition /// /// from SIGNING to IDLE because NEW BEACON @@ -186,7 +187,9 @@ impl AggregatorRuntime { current_beacon: Some(new_beacon), }) } + /// transition + /// /// from IDLE state to SIGNING because NEW BEACON async fn transition_from_idle_to_signing( &self, @@ -214,6 +217,7 @@ impl AggregatorRuntime { Ok(state) } } + #[cfg(test)] mod tests { use std::path::PathBuf; From cb101b331ec056943f07c32add2a784d9cc9d4d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Wed, 15 Jun 2022 12:19:49 +0200 Subject: [PATCH 15/18] use certificate pending store in http server --- mithril-aggregator/src/http_server.rs | 151 +++++------------- mithril-aggregator/src/runtime/runner.rs | 15 +- .../src/runtime/state_machine.rs | 19 +-- .../src/store/pending_certificate_store.rs | 97 +++++------ 4 files changed, 87 insertions(+), 195 deletions(-) diff --git a/mithril-aggregator/src/http_server.rs b/mithril-aggregator/src/http_server.rs index 6245679a96d..f99b48d6540 100644 --- a/mithril-aggregator/src/http_server.rs +++ b/mithril-aggregator/src/http_server.rs @@ -1,6 +1,5 @@ use mithril_common::crypto_helper::{key_decode_hex, ProtocolLotteryIndex, ProtocolPartyId}; use mithril_common::entities; -use mithril_common::fake_data; use serde_json::Value::Null; use slog_scope::{debug, info}; use std::convert::Infallible; @@ -10,7 +9,7 @@ use warp::Future; use warp::{http::Method, http::StatusCode, Filter}; use super::dependency::{ - BeaconStoreWrapper, CertificateStoreWrapper, DependencyManager, MultiSignerWrapper, + CertificatePendingStoreWrapper, CertificateStoreWrapper, DependencyManager, MultiSignerWrapper, SnapshotStoreWrapper, }; use super::multi_signer; @@ -76,8 +75,7 @@ mod router { ) -> impl Filter + Clone { warp::path!("certificate-pending") .and(warp::get()) - .and(with_beacon_store(dependency_manager.clone())) - .and(with_multi_signer(dependency_manager)) + .and(with_certificate_pending_store(dependency_manager.clone())) .and_then(handlers::certificate_pending) } @@ -155,13 +153,6 @@ mod router { .and_then(handlers::register_signatures) } - /// With beacon store middleware - fn with_beacon_store( - dependency_manager: Arc, - ) -> impl Filter + Clone { - warp::any().map(move || dependency_manager.beacon_store.as_ref().unwrap().clone()) - } - /// With snapshot store middleware fn with_snapshot_store( dependency_manager: Arc, @@ -182,6 +173,19 @@ mod router { }) } + /// With certificate pending store + fn with_certificate_pending_store( + dependency_manager: Arc, + ) -> impl Filter + Clone { + warp::any().map(move || { + dependency_manager + .certificate_pending_store + .as_ref() + .unwrap() + .clone() + }) + } + /// With multi signer middleware fn with_multi_signer( dependency_manager: Arc, @@ -198,77 +202,25 @@ mod router { } mod handlers { + use crate::dependency::CertificatePendingStoreWrapper; + use super::*; use std::str::FromStr; use warp::http::Uri; /// Certificate Pending pub async fn certificate_pending( - beacon_store: BeaconStoreWrapper, - multi_signer: MultiSignerWrapper, + certificate_pending_store: CertificatePendingStoreWrapper, ) -> Result { debug!("certificate_pending"); - let beacon_store = beacon_store.read().await; - match beacon_store.get_current_beacon().await { - Ok(Some(beacon)) => { - let multi_signer = multi_signer.read().await; - match multi_signer.get_multi_signature().await { - Ok(None) => { - let mut certificate_pending = fake_data::certificate_pending(); - certificate_pending.beacon = beacon.clone(); - - let protocol_parameters = multi_signer.get_protocol_parameters().await; - if protocol_parameters.is_none() { - return Ok(warp::reply::with_status( - warp::reply::json(&entities::Error::new( - "MITHRIL-E0004".to_string(), - "no protocol parameters available".to_string(), - )), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - let protocol_parameters = protocol_parameters.unwrap().into(); - - let previous_hash = certificate_pending.previous_hash; + let certificate_pending_store = certificate_pending_store.read().await; - let signers = multi_signer.get_signers().await; - if let Err(err) = signers { - return Ok(warp::reply::with_status( - warp::reply::json(&entities::Error::new( - "MITHRIL-E0007".to_string(), - err.to_string(), - )), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - let signers = signers.unwrap(); - - let certificate_pending = entities::CertificatePending::new( - beacon, - protocol_parameters, - previous_hash, - signers, - ); - - Ok(warp::reply::with_status( - warp::reply::json(&certificate_pending), - StatusCode::OK, - )) - } - Ok(_) => Ok(warp::reply::with_status( - warp::reply::json(&Null), - StatusCode::NO_CONTENT, - )), - Err(err) => Ok(warp::reply::with_status( - warp::reply::json(&entities::Error::new( - "MITHRIL-E0008".to_string(), - err.to_string(), - )), - StatusCode::INTERNAL_SERVER_ERROR, - )), - } - } + match certificate_pending_store.get().await { + Ok(Some(certificate_pending)) => Ok(warp::reply::with_status( + warp::reply::json(&certificate_pending), + StatusCode::OK, + )), Ok(None) => Ok(warp::reply::with_status( warp::reply::json(&Null), StatusCode::NO_CONTENT, @@ -526,7 +478,9 @@ mod tests { use tokio::sync::RwLock; use warp::test::request; - use super::super::beacon_store::{BeaconStoreError, MockBeaconStore}; + use crate::CertificatePendingStore; + + use super::super::beacon_store::MockBeaconStore; use super::super::entities::*; use super::super::multi_signer::MockMultiSigner; use super::super::multi_signer::ProtocolError; @@ -556,29 +510,14 @@ mod tests { #[tokio::test] async fn test_certificate_pending_get_ok() { - let fake_protocol_parameters = fake_data::protocol_parameters(); - let fake_signers = fake_data::signers(5); let method = Method::GET.as_str(); let path = "/certificate-pending"; - let mut beacon_store = MockBeaconStore::new(); - beacon_store - .expect_get_current_beacon() - .return_once(|| Ok(Some(fake_data::beacon()))); - let mut mock_multi_signer = MockMultiSigner::new(); - mock_multi_signer - .expect_get_protocol_parameters() - .return_once(|| Some(fake_protocol_parameters.into())); - mock_multi_signer - .expect_get_signers() - .return_once(|| Ok(fake_signers)); - mock_multi_signer - .expect_get_multi_signature() - .return_once(|| Ok(None)); + let certificate_pending_store = + CertificatePendingStore::new(Box::new(DumbStoreAdapter::new())); let mut dependency_manager = setup_dependency_manager(); dependency_manager - .with_beacon_store(Arc::new(RwLock::new(beacon_store))) - .with_multi_signer(Arc::new(RwLock::new(mock_multi_signer))); + .with_certificate_pending_store(Arc::new(RwLock::new(certificate_pending_store))); let response = request() .method(method) @@ -597,19 +536,11 @@ mod tests { #[tokio::test] async fn test_certificate_pending_get_ok_204() { - let fake_protocol_parameters = fake_data::protocol_parameters(); - let mut beacon_store = MockBeaconStore::new(); - beacon_store - .expect_get_current_beacon() - .return_once(|| Ok(None)); - let mut mock_multi_signer = MockMultiSigner::new(); - mock_multi_signer - .expect_get_protocol_parameters() - .return_once(|| Some(fake_protocol_parameters.into())); + let certificate_pending_store = + CertificatePendingStore::new(Box::new(DumbStoreAdapter::new())); let mut dependency_manager = setup_dependency_manager(); dependency_manager - .with_multi_signer(Arc::new(RwLock::new(mock_multi_signer))) - .with_beacon_store(Arc::new(RwLock::new(beacon_store))); + .with_certificate_pending_store(Arc::new(RwLock::new(certificate_pending_store))); let method = Method::GET.as_str(); let path = "/certificate-pending"; @@ -630,22 +561,14 @@ mod tests { } #[tokio::test] - async fn test_certificate_pending_get_ko_current_beacon_500() { - let fake_protocol_parameters = fake_data::protocol_parameters(); + async fn test_certificate_pending_get_ko_500() { let method = Method::GET.as_str(); let path = "/certificate-pending"; - let mut beacon_store = MockBeaconStore::new(); - beacon_store - .expect_get_current_beacon() - .return_once(|| Err(BeaconStoreError::GenericError())); - let mut mock_multi_signer = MockMultiSigner::new(); - mock_multi_signer - .expect_get_protocol_parameters() - .return_once(|| Some(fake_protocol_parameters.into())); + let certificate_pending_store = + CertificatePendingStore::new(Box::new(DumbStoreAdapter::new())); let mut dependency_manager = setup_dependency_manager(); dependency_manager - .with_multi_signer(Arc::new(RwLock::new(mock_multi_signer))) - .with_beacon_store(Arc::new(RwLock::new(beacon_store))); + .with_certificate_pending_store(Arc::new(RwLock::new(certificate_pending_store))); let response = request() .method(method) diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index e21af3835c9..2fb248a8eb6 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -28,6 +28,7 @@ pub struct AggregatorConfig { /// Directory to store snapshot pub snapshot_directory: PathBuf, + /// Services dependencies pub dependencies: Arc, } @@ -71,10 +72,7 @@ pub trait AggregatorRunnerTrait: Sync + Send { pending_certificate: CertificatePending, ) -> Result<(), RuntimeError>; - async fn drop_pending_certificate( - &self, - beacon: &Beacon, - ) -> Result; + async fn drop_pending_certificate(&self) -> Result; async fn is_multisig_created(&self) -> Result; @@ -144,7 +142,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { } /// Is a multisignature ready? - /// returns the multisignature if the signer is ready to sign or None otherwise + /// Can we create a multisignature. async fn is_multisig_created(&self) -> Result { info!("check if we can create a multisignature"); let has_multisig = self @@ -262,10 +260,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { .map_err(RuntimeError::MultiSigner) } - async fn drop_pending_certificate( - &self, - beacon: &Beacon, - ) -> Result { + async fn drop_pending_certificate(&self) -> Result { info!("drop pending certificate"); let certificate_pending = self @@ -278,7 +273,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { })? .write() .await - .remove(beacon) + .remove() .await? .ok_or_else(|| { RuntimeError::General("no certificate pending for the given beacon".to_string()) diff --git a/mithril-aggregator/src/runtime/state_machine.rs b/mithril-aggregator/src/runtime/state_machine.rs index 47abed66ef1..77f02d110c0 100644 --- a/mithril-aggregator/src/runtime/state_machine.rs +++ b/mithril-aggregator/src/runtime/state_machine.rs @@ -94,6 +94,7 @@ impl AggregatorRuntime { /// /// one tick of the state machine pub async fn cycle(&mut self) -> Result<(), RuntimeError> { + info!("================================================================================"); info!("new cycle"); match self.state.clone() { AggregatorState::Idle(state) => { @@ -127,7 +128,7 @@ impl AggregatorRuntime { beacon.immutable_file_number ); let new_state = self - .transition_from_signing_to_idle_new_beacon(state, beacon) + .transition_from_signing_to_idle_new_beacon(state) .await?; self.state = AggregatorState::Idle(new_state); } else if self.runner.is_multisig_created().await? { @@ -151,10 +152,7 @@ impl AggregatorRuntime { &self, state: SigningState, ) -> Result { - let certificate_pending = self - .runner - .drop_pending_certificate(&state.current_beacon) - .await?; + let certificate_pending = self.runner.drop_pending_certificate().await?; let path = self.runner.create_snapshot_archive().await?; let _ = self.runner.upload_snapshot_archive(&path).await?; let certificate = self @@ -177,14 +175,11 @@ impl AggregatorRuntime { async fn transition_from_signing_to_idle_new_beacon( &self, state: SigningState, - new_beacon: Beacon, ) -> Result { - self.runner - .drop_pending_certificate(&state.current_beacon) - .await?; + self.runner.drop_pending_certificate().await?; Ok(IdleState { - current_beacon: Some(new_beacon), + current_beacon: Some(state.current_beacon), }) } @@ -309,7 +304,7 @@ mod tests { runner .expect_drop_pending_certificate() .times(1) - .returning(|_| Ok(fake_data::certificate_pending())); + .returning(|| Ok(fake_data::certificate_pending())); let state = SigningState { // this current beacon must be outdated so the state machine will @@ -363,7 +358,7 @@ mod tests { runner .expect_drop_pending_certificate() .times(1) - .returning(|_| Ok(fake_data::certificate_pending())); + .returning(|| Ok(fake_data::certificate_pending())); runner .expect_create_snapshot_archive() .times(1) diff --git a/mithril-aggregator/src/store/pending_certificate_store.rs b/mithril-aggregator/src/store/pending_certificate_store.rs index 4487d490407..9193f6320f6 100644 --- a/mithril-aggregator/src/store/pending_certificate_store.rs +++ b/mithril-aggregator/src/store/pending_certificate_store.rs @@ -1,9 +1,11 @@ use super::StoreError; -use mithril_common::entities::{Beacon, CertificatePending}; +use mithril_common::entities::CertificatePending; use mithril_common::store::adapter::StoreAdapter; -type Adapter = Box>; +type Adapter = Box>; + +const KEY: &'static str = "certificate_pending"; pub struct CertificatePendingStore { adapter: Adapter, @@ -14,33 +16,20 @@ impl CertificatePendingStore { Self { adapter } } - pub async fn get_from_beacon( - &self, - beacon: &Beacon, - ) -> Result, StoreError> { - Ok(self.adapter.get_record(beacon).await?) + pub async fn get(&self) -> Result, StoreError> { + Ok(self.adapter.get_record(&KEY.to_string()).await?) } pub async fn save(&mut self, certificate: CertificatePending) -> Result<(), StoreError> { Ok(self .adapter - .store_record(&certificate.beacon, &certificate) + .store_record(&KEY.to_string(), &certificate) .await?) } - pub async fn get_list(&self, last_n: usize) -> Result, StoreError> { - let vars = self.adapter.get_last_n_records(last_n).await?; - let result = vars.into_iter().map(|(_, y)| y).collect(); - - Ok(result) - } - - pub async fn remove( - &mut self, - beacon: &Beacon, - ) -> Result, StoreError> { + pub async fn remove(&mut self) -> Result, StoreError> { self.adapter - .remove(beacon) + .remove(&KEY.to_string()) .await .map_err(StoreError::AdapterError) } @@ -50,22 +39,23 @@ impl CertificatePendingStore { mod test { use super::*; + use mithril_common::entities::Beacon; use mithril_common::fake_data; use mithril_common::store::adapter::DumbStoreAdapter; - async fn get_certificate_pending_store(size: u64) -> CertificatePendingStore { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); + async fn get_certificate_pending_store(is_populated: bool) -> CertificatePendingStore { + let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - for ix in 0..size { - let beacon = Beacon::new("testnet".to_string(), ix / 3, ix); + if is_populated { + let beacon = Beacon::new("testnet".to_string(), 0, 0); let certificate_pending = CertificatePending::new( beacon.clone(), fake_data::protocol_parameters(), - ix.to_string(), + "previous_hash".to_string(), fake_data::signers(5), ); adapter - .store_record(&beacon, &certificate_pending) + .store_record(&KEY.to_string(), &certificate_pending) .await .unwrap(); } @@ -75,71 +65,60 @@ mod test { } #[tokio::test] - async fn list_is_empty() { - let store = get_certificate_pending_store(0).await; - - assert_eq!(0, store.get_list(100).await.unwrap().len()); - } - - #[tokio::test] - async fn list_has_some_members() { - let store = get_certificate_pending_store(1).await; + async fn get_certificate_pending_with_existing_certificate() { + let store = get_certificate_pending_store(true).await; + let result = store.get().await.unwrap(); - assert_eq!(1, store.get_list(100).await.unwrap().len()); - } - - #[tokio::test] - async fn get_certificate_pending_with_good_beacon() { - let beacon = Beacon::new("testnet".to_string(), 0, 0); - let store = get_certificate_pending_store(1).await; - let result = store.get_from_beacon(&beacon).await.unwrap(); assert!(result.is_some()); } #[tokio::test] - async fn get_certificate_pending_with_wrong_beacon() { - let beacon = Beacon::new("testnet".to_string(), 0, 1); - let store = get_certificate_pending_store(1).await; - let result = store.get_from_beacon(&beacon).await.unwrap(); + async fn get_certificate_pending_with_no_existing_certificate() { + let store = get_certificate_pending_store(false).await; + let result = store.get().await.unwrap(); + assert!(result.is_none()); } #[tokio::test] async fn save_certificate_pending_once() { - let mut store = get_certificate_pending_store(1).await; + let mut store = get_certificate_pending_store(false).await; let beacon = Beacon::new("testnet".to_string(), 0, 1); let certificate_pending = CertificatePending::new( beacon, fake_data::protocol_parameters(), - "0".to_string(), + "previous_hash".to_string(), fake_data::signers(1), ); assert!(store.save(certificate_pending).await.is_ok()); + assert!(store.get().await.unwrap().is_some()); } #[tokio::test] async fn update_certificate_pending() { - let mut store = get_certificate_pending_store(1).await; - let beacon = Beacon::new("testnet".to_string(), 0, 0); - let mut certificate_pending = store.get_from_beacon(&beacon).await.unwrap().unwrap(); + let mut store = get_certificate_pending_store(true).await; + let mut certificate_pending = store.get().await.unwrap().unwrap(); - assert_eq!("0".to_string(), certificate_pending.previous_hash); - certificate_pending.previous_hash = "one".to_string(); + assert_eq!( + "previous_hash".to_string(), + certificate_pending.previous_hash + ); + certificate_pending.previous_hash = "something".to_string(); assert!(store.save(certificate_pending).await.is_ok()); - let certificate_pending = store.get_from_beacon(&beacon).await.unwrap().unwrap(); + let certificate_pending = store.get().await.unwrap().unwrap(); - assert_eq!("one".to_string(), certificate_pending.previous_hash); + assert_eq!("something".to_string(), certificate_pending.previous_hash); } #[tokio::test] async fn remove_certificate_pending() { - let mut store = get_certificate_pending_store(1).await; + let mut store = get_certificate_pending_store(true).await; let beacon = Beacon::new("testnet".to_string(), 0, 0); - let certificate_pending = store.remove(&beacon).await.unwrap().unwrap(); + let certificate_pending = store.remove().await.unwrap().unwrap(); assert_eq!(beacon, certificate_pending.beacon); - assert!(store.get_from_beacon(&beacon).await.unwrap().is_none()); + assert!(store.get().await.unwrap().is_none()); } } From 15ec37321e191837cfa5c44c2567bb395b55f569 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Wed, 15 Jun 2022 14:46:07 +0200 Subject: [PATCH 16/18] fix http_server tests --- mithril-aggregator/src/http_server.rs | 78 +------------------ .../src/store/pending_certificate_store.rs | 2 +- 2 files changed, 2 insertions(+), 78 deletions(-) diff --git a/mithril-aggregator/src/http_server.rs b/mithril-aggregator/src/http_server.rs index f99b48d6540..6395fdcf8ef 100644 --- a/mithril-aggregator/src/http_server.rs +++ b/mithril-aggregator/src/http_server.rs @@ -75,7 +75,7 @@ mod router { ) -> impl Filter + Clone { warp::path!("certificate-pending") .and(warp::get()) - .and(with_certificate_pending_store(dependency_manager.clone())) + .and(with_certificate_pending_store(dependency_manager)) .and_then(handlers::certificate_pending) } @@ -480,7 +480,6 @@ mod tests { use crate::CertificatePendingStore; - use super::super::beacon_store::MockBeaconStore; use super::super::entities::*; use super::super::multi_signer::MockMultiSigner; use super::super::multi_signer::ProtocolError; @@ -585,81 +584,6 @@ mod tests { .expect("OpenAPI error"); } - #[tokio::test] - async fn test_certificate_pending_get_ko_signers_500() { - let fake_protocol_parameters = fake_data::protocol_parameters(); - let mut beacon_store = MockBeaconStore::new(); - beacon_store - .expect_get_current_beacon() - .return_once(|| Ok(Some(fake_data::beacon()))); - let mut mock_multi_signer = MockMultiSigner::new(); - mock_multi_signer - .expect_get_protocol_parameters() - .return_once(|| Some(fake_protocol_parameters.into())); - mock_multi_signer - .expect_get_signers() - .return_once(|| Err(ProtocolError::Codec("an error occurred".to_string()))); - mock_multi_signer - .expect_get_multi_signature() - .return_once(|| Ok(None)); - let mut dependency_manager = setup_dependency_manager(); - dependency_manager - .with_multi_signer(Arc::new(RwLock::new(mock_multi_signer))) - .with_beacon_store(Arc::new(RwLock::new(beacon_store))); - - let method = Method::GET.as_str(); - let path = "/certificate-pending"; - - let response = request() - .method(method) - .path(&format!("/{}{}", SERVER_BASE_PATH, path)) - .reply(&router::routes(Arc::new(dependency_manager))) - .await; - - APISpec::from_file(API_SPEC_FILE) - .method(method) - .path(path) - .validate_request(&Null) - .unwrap() - .validate_response(&response) - .expect("OpenAPI error"); - } - - #[tokio::test] - async fn test_certificate_pending_get_ko_protocol_parameters_500() { - let mut beacon_store = MockBeaconStore::new(); - beacon_store - .expect_get_current_beacon() - .return_once(|| Ok(Some(fake_data::beacon()))); - let mut mock_multi_signer = MockMultiSigner::new(); - mock_multi_signer - .expect_get_protocol_parameters() - .return_once(|| None); - mock_multi_signer - .expect_get_multi_signature() - .return_once(|| Ok(None)); - let mut dependency_manager = setup_dependency_manager(); - dependency_manager - .with_multi_signer(Arc::new(RwLock::new(mock_multi_signer))) - .with_beacon_store(Arc::new(RwLock::new(beacon_store))); - - let method = Method::GET.as_str(); - let path = "/certificate-pending"; - - let response = request() - .method(method) - .path(&format!("/{}{}", SERVER_BASE_PATH, path)) - .reply(&router::routes(Arc::new(dependency_manager))) - .await; - APISpec::from_file(API_SPEC_FILE) - .method(method) - .path(path) - .validate_request(&Null) - .unwrap() - .validate_response(&response) - .expect("OpenAPI error"); - } - #[tokio::test] async fn test_certificate_certificate_hash_get_ok() { let mut certificate_store = CertificateStore::new(Box::new(DumbStoreAdapter::< diff --git a/mithril-aggregator/src/store/pending_certificate_store.rs b/mithril-aggregator/src/store/pending_certificate_store.rs index 9193f6320f6..e1b8da56778 100644 --- a/mithril-aggregator/src/store/pending_certificate_store.rs +++ b/mithril-aggregator/src/store/pending_certificate_store.rs @@ -5,7 +5,7 @@ use mithril_common::store::adapter::StoreAdapter; type Adapter = Box>; -const KEY: &'static str = "certificate_pending"; +const KEY: &str = "certificate_pending"; pub struct CertificatePendingStore { adapter: Adapter, From d68f3121788835ac5b54c59c48fc95165575712e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Wed, 15 Jun 2022 17:17:16 +0200 Subject: [PATCH 17/18] create immutable file to test snapshot creation --- .../mithril-end-to-end/src/end_to_end_spec.rs | 4 +++- .../mithril-end-to-end/src/mithril/client.rs | 2 +- .../src/mithril/infrastructure.rs | 20 +++++++++++++++++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs b/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs index ad28dd7e97f..3e15e500d8d 100644 --- a/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs +++ b/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs @@ -19,6 +19,8 @@ impl Spec { let aggregator_endpoint = self.infrastructure.aggregator().endpoint(); wait_for_pending_certificate(&aggregator_endpoint).await?; + let _ = self.infrastructure.add_immutable(); + let digest = assert_node_producing_snapshot(&aggregator_endpoint).await?; let certificate_hash = assert_signer_is_signing_snapshot(&aggregator_endpoint, &digest).await?; @@ -82,7 +84,7 @@ async fn wait_for_pending_certificate(aggregator_endpoint: &str) -> Result<(), S async fn assert_node_producing_snapshot(aggregator_endpoint: &str) -> Result { let url = format!("{}/snapshots", aggregator_endpoint); - match attempt!(10, Duration::from_millis(1000), { + match attempt!(20, Duration::from_millis(1500), { match reqwest::get(url.clone()).await { Ok(response) => match response.status() { StatusCode::OK => match response.json::>().await.as_deref() { diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/client.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/client.rs index 65bbee1c645..322cb4c2790 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/client.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/client.rs @@ -48,7 +48,7 @@ impl Client { self.command.dump_logs_to_stdout().await?; Err(match status.code() { - Some(c) => format!("mithril-signer exited with code: {}", c), + Some(c) => format!("mithril-client exited with code: {}", c), None => "mithril-client was terminated with a signal".to_string(), }) } diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs index 8ffcf182549..d6b8030249f 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs @@ -5,6 +5,7 @@ use std::path::{Path, PathBuf}; pub struct MithrilInfrastructure { work_dir: PathBuf, bin_dir: PathBuf, + db_dir: PathBuf, aggregator: Aggregator, signer: Signer, } @@ -25,6 +26,7 @@ impl MithrilInfrastructure { Ok(Self { work_dir: work_dir.to_path_buf(), bin_dir: bin_dir.to_path_buf(), + db_dir: db_dir.to_path_buf(), aggregator, signer, }) @@ -49,4 +51,22 @@ impl MithrilInfrastructure { pub fn build_client(&self) -> Result { Client::new(self.aggregator.endpoint(), &self.work_dir, &self.bin_dir) } + + pub fn add_immutable(&self) -> Result<(), Box> { + let db_path = self.db_dir.join("immutable"); + let glob_expr = format!("{}/*.chunk", db_path.to_string_lossy()); + + let mut filelist = glob::glob(&glob_expr)? + .map(|f| { + str::parse::(f.unwrap().file_stem().unwrap().to_str().unwrap()).unwrap() + }) + .collect::>(); + filelist.sort(); + let new_number = filelist.pop().unwrap() + 1; + + std::fs::File::create(db_path.join(format!("{:05}.chunk", new_number)))?; + std::fs::File::create(db_path.join(format!("{:05}.primary", new_number)))?; + std::fs::File::create(db_path.join(format!("{:05}.secondary", new_number)))?; + Ok(()) + } } From 6e8d24ab572f2b3ec855b0dca5ee6fa85ecfceed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT?= Date: Thu, 16 Jun 2022 09:49:27 +0200 Subject: [PATCH 18/18] fix snapshot digest and tests --- mithril-aggregator/src/runtime/runner.rs | 4 +++- mithril-aggregator/src/runtime/state_machine.rs | 4 ++-- .../mithril-end-to-end/src/end_to_end_spec.rs | 2 +- .../mithril-end-to-end/src/mithril/aggregator.rs | 11 +++++++---- .../mithril-end-to-end/src/mithril/signer.rs | 2 +- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 2fb248a8eb6..5ce28dd4de5 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -4,6 +4,7 @@ use crate::snapshot_uploaders::SnapshotLocation; use crate::{DependencyManager, SnapshotError, Snapshotter}; use async_trait::async_trait; use chrono::Utc; +use hex::FromHex; use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester, ImmutableFile}; use mithril_common::entities::{Beacon, Certificate, CertificatePending, Snapshot}; @@ -378,8 +379,9 @@ impl AggregatorRunnerTrait for AggregatorRunner { file_path: &Path, remote_locations: Vec, ) -> Result { + let digest_hex = Vec::from_hex(certificate.digest).unwrap(); let snapshot = Snapshot::new( - certificate.digest, + String::from_utf8(digest_hex).map_err(|e| RuntimeError::General(e.to_string()))?, certificate.hash, std::fs::metadata(file_path) .map_err(|e| RuntimeError::General(e.to_string()))? diff --git a/mithril-aggregator/src/runtime/state_machine.rs b/mithril-aggregator/src/runtime/state_machine.rs index 77f02d110c0..782703daedb 100644 --- a/mithril-aggregator/src/runtime/state_machine.rs +++ b/mithril-aggregator/src/runtime/state_machine.rs @@ -154,14 +154,14 @@ impl AggregatorRuntime { ) -> Result { let certificate_pending = self.runner.drop_pending_certificate().await?; let path = self.runner.create_snapshot_archive().await?; - let _ = self.runner.upload_snapshot_archive(&path).await?; + let locations = self.runner.upload_snapshot_archive(&path).await?; let certificate = self .runner .create_and_save_certificate(&state.current_beacon, &certificate_pending) .await?; let _ = self .runner - .create_and_save_snapshot(certificate, &path, Vec::new()) + .create_and_save_snapshot(certificate, &path, locations) .await?; Ok(IdleState { diff --git a/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs b/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs index 3e15e500d8d..79956d87d99 100644 --- a/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs +++ b/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs @@ -56,7 +56,7 @@ impl Spec { async fn wait_for_pending_certificate(aggregator_endpoint: &str) -> Result<(), String> { let url = format!("{}/certificate-pending", aggregator_endpoint); - match attempt!(10, Duration::from_millis(100), { + match attempt!(10, Duration::from_millis(1000), { match reqwest::get(url.clone()).await { Ok(response) => match response.status() { StatusCode::OK => { diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs index d010c0d1c0d..8f14ff283fd 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs @@ -25,11 +25,14 @@ impl Aggregator { ("SNAPSHOT_UPLOADER_TYPE", "local"), ( "PENDING_CERTIFICATE_STORE_DIRECTORY", - "./store/pending-certs", + "./store/aggregator/pending-certs", ), - ("CERTIFICATE_STORE_DIRECTORY", "./store/certs"), - ("VERIFICATION_KEY_STORE_DIRECTORY", "./store/certs"), - ("STAKE_STORE_DIRECTORY", "./store/stakes"), + ("CERTIFICATE_STORE_DIRECTORY", "./store/aggregator/certs"), + ( + "VERIFICATION_KEY_STORE_DIRECTORY", + "./store/aggregator/certs", + ), + ("STAKE_STORE_DIRECTORY", "./store/aggregator/stakes"), ]); let args = vec![ "--db-directory", diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs index 0234e1805a4..7f687010c02 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs @@ -22,7 +22,7 @@ impl Signer { ("RUN_INTERVAL", "2000"), ("AGGREGATOR_ENDPOINT", &aggregator_endpoint), ("DB_DIRECTORY", db_directory.to_str().unwrap()), - ("STAKE_STORE_DIRECTORY", "./store/stakes") + ("STAKE_STORE_DIRECTORY", "./store/signer/stakes"), ]); let args = vec!["-vvv"];