diff --git a/mithril-aggregator/src/beacon_provider.rs b/mithril-aggregator/src/beacon_provider.rs new file mode 100644 index 00000000000..d4304e48e06 --- /dev/null +++ b/mithril-aggregator/src/beacon_provider.rs @@ -0,0 +1,186 @@ +use async_trait::async_trait; +use std::{error::Error, path::PathBuf, sync::Arc}; +use tokio::sync::RwLock; + +use mithril_common::{chain_observer::ChainObserver, digesters::ImmutableFile, entities::Beacon}; + +use crate::runtime::RuntimeError; + +#[async_trait] +pub trait ImmutableFileObserver +where + Self: Sync + Send, +{ + async fn get_last_immutable_number(&self) -> Result>; +} + +pub struct ImmutableFileSystemObserver { + db_path: PathBuf, +} + +impl ImmutableFileSystemObserver { + pub fn new(db_path: &PathBuf) -> Self { + let db_path = db_path.to_owned(); + + Self { db_path } + } +} + +#[async_trait] +impl ImmutableFileObserver for ImmutableFileSystemObserver { + async fn get_last_immutable_number(&self) -> Result> { + let immutable_file_number = ImmutableFile::list_completed_in_dir(&self.db_path) + .map_err(RuntimeError::ImmutableFile)? + .into_iter() + .last() + .ok_or_else(|| { + RuntimeError::General("no immutable file was returned".to_string().into()) + })? + .number; + + Ok(immutable_file_number) + } +} + +#[async_trait] +pub trait BeaconProvider +where + Self: Sync + Send, +{ + async fn get_current_beacon(&self) -> Result>; +} + +pub struct BeaconProviderImpl { + chain_observer: Arc>, + immutable_observer: Arc>, + network: String, +} + +impl BeaconProviderImpl { + pub fn new( + chain_observer: Arc>, + immutable_observer: Arc>, + network: &str, + ) -> Self { + let network = network.to_string(); + + Self { + chain_observer, + immutable_observer, + network, + } + } +} + +#[async_trait] +impl BeaconProvider for BeaconProviderImpl { + async fn get_current_beacon(&self) -> Result> { + let epoch = self + .chain_observer + .read() + .await + .get_current_epoch() + .await? + .ok_or_else(|| RuntimeError::General("could not get Epoch".to_string().into()))?; + let immutable_file_number = self + .immutable_observer + .read() + .await + .get_last_immutable_number() + .await?; + + let beacon = Beacon { + network: self.network.clone(), + epoch, + immutable_file_number, + }; + + Ok(beacon) + } +} + +#[cfg(test)] +mod tests { + use std::io::ErrorKind; + + use mithril_common::chain_observer::{ChainObserver, ChainObserverError}; + use mithril_common::digesters::ImmutableFileListingError; + use mithril_common::entities::{Epoch, StakeDistribution}; + + use super::*; + + struct TestChainObserver {} + + #[async_trait] + impl ChainObserver for TestChainObserver { + async fn get_current_epoch(&self) -> Result, ChainObserverError> { + Ok(Some(42)) + } + + async fn get_current_stake_distribution( + &self, + ) -> Result, ChainObserverError> { + Err(ChainObserverError::General( + "this should not be called in the BeaconProvider" + .to_string() + .into(), + )) + } + } + + struct TestImmutableFileObserver { + shall_return: Option, + } + + impl TestImmutableFileObserver { + pub fn new() -> Self { + Self { + shall_return: Some(119827), + } + } + + pub fn shall_return(&mut self, what: Option) -> &mut Self { + self.shall_return = what; + self + } + } + + #[async_trait] + impl ImmutableFileObserver for TestImmutableFileObserver { + async fn get_last_immutable_number(&self) -> Result> { + match self.shall_return { + Some(n) => Ok(n), + None => Err(Box::new(ImmutableFileListingError::MetadataParsing( + std::io::Error::new(ErrorKind::Unsupported, "test error"), + ))), + } + } + } + + #[tokio::test] + async fn test_beacon_ok() { + let beacon_provider = BeaconProviderImpl::new( + Arc::new(RwLock::new(TestChainObserver {})), + Arc::new(RwLock::new(TestImmutableFileObserver::new())), + "whatever", + ); + let beacon = beacon_provider.get_current_beacon().await.unwrap(); + + assert_eq!(42, beacon.epoch); + assert_eq!(119_827, beacon.immutable_file_number); + } + + #[tokio::test] + async fn test_beacon_error() { + let mut immutable_observer = TestImmutableFileObserver::new(); + immutable_observer.shall_return(None); + let beacon_provider = BeaconProviderImpl::new( + Arc::new(RwLock::new(TestChainObserver {})), + Arc::new(RwLock::new(immutable_observer)), + "whatever", + ); + + let result = beacon_provider.get_current_beacon().await; + assert!(result.is_err()); + } +} diff --git a/mithril-aggregator/src/dependency.rs b/mithril-aggregator/src/dependency.rs index 5659cfee05a..dc5e4eb632e 100644 --- a/mithril-aggregator/src/dependency.rs +++ b/mithril-aggregator/src/dependency.rs @@ -7,10 +7,12 @@ use mithril_common::store::stake_store::StakeStore; use super::entities::*; use super::multi_signer::MultiSigner; use super::snapshot_stores::SnapshotStore; +use crate::beacon_provider::ImmutableFileObserver; use crate::beacon_store::BeaconStore; use crate::snapshot_uploaders::SnapshotUploader; use crate::{ - CertificatePendingStore, CertificateStore, SingleSignatureStore, VerificationKeyStore, + BeaconProvider, CertificatePendingStore, CertificateStore, SingleSignatureStore, + VerificationKeyStore, }; /// BeaconStoreWrapper wraps a BeaconStore @@ -43,6 +45,12 @@ pub type SingleSignatureStoreWrapper = Arc>; /// ChainObserverWrapper wraps a ChainObserver pub type ChainObserverWrapper = Arc>; +/// BeaconProviderWrapper wraps a BeaconProvider +pub type BeaconProviderWrapper = Arc>; + +/// BeaconProviderWrapper wraps a BeaconProvider +pub type ImmutableFileObserverWrapper = Arc>; + /// DependencyManager handles the dependencies pub struct DependencyManager { pub config: Config, @@ -56,6 +64,8 @@ pub struct DependencyManager { pub stake_store: Option, pub single_signature_store: Option, pub chain_observer: Option, + pub beacon_provider: Option, + pub immutable_file_observer: Option, } impl DependencyManager { @@ -73,9 +83,17 @@ impl DependencyManager { stake_store: None, single_signature_store: None, chain_observer: None, + beacon_provider: None, + immutable_file_observer: None, } } + /// With BeaconProvider middleware + pub fn with_beacon_provider(&mut self, beacon_provider: BeaconProviderWrapper) -> &mut Self { + self.beacon_provider = Some(beacon_provider); + self + } + /// With SnapshotStore middleware pub fn with_snapshot_store(&mut self, snapshot_store: SnapshotStoreWrapper) -> &mut Self { self.snapshot_store = Some(snapshot_store); @@ -151,6 +169,14 @@ impl DependencyManager { self } + pub fn with_immutable_file_observer( + &mut self, + immutable_file_observer: ImmutableFileObserverWrapper, + ) -> &mut Self { + self.immutable_file_observer = Some(immutable_file_observer); + self + } + #[cfg(test)] pub fn fake() -> DependencyManager { let config = Config { diff --git a/mithril-aggregator/src/lib.rs b/mithril-aggregator/src/lib.rs index ae2be1b2131..ce3d5ca8443 100644 --- a/mithril-aggregator/src/lib.rs +++ b/mithril-aggregator/src/lib.rs @@ -1,3 +1,4 @@ +mod beacon_provider; mod beacon_store; mod dependency; mod entities; @@ -13,6 +14,9 @@ mod tools; pub use crate::entities::Config; pub use crate::multi_signer::{MultiSigner, MultiSignerImpl, ProtocolError}; pub use crate::snapshot_stores::{RemoteSnapshotStore, SnapshotStore}; +pub use beacon_provider::{ + BeaconProvider, BeaconProviderImpl, ImmutableFileObserver, ImmutableFileSystemObserver, +}; pub use beacon_store::{BeaconStore, BeaconStoreError, MemoryBeaconStore}; pub use dependency::DependencyManager; pub use http_server::Server; diff --git a/mithril-aggregator/src/main.rs b/mithril-aggregator/src/main.rs index 0360118ddd0..6624f34ef19 100644 --- a/mithril-aggregator/src/main.rs +++ b/mithril-aggregator/src/main.rs @@ -4,9 +4,10 @@ use clap::Parser; use config::{Map, Source, Value, ValueKind}; use mithril_aggregator::{ - AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore, - CertificateStore, Config, DependencyManager, MemoryBeaconStore, MultiSigner, MultiSignerImpl, - Server, SingleSignatureStore, VerificationKeyStore, + AggregatorConfig, AggregatorRunner, AggregatorRuntime, BeaconProviderImpl, + CertificatePendingStore, CertificateStore, Config, DependencyManager, + ImmutableFileSystemObserver, MemoryBeaconStore, MultiSigner, MultiSignerImpl, Server, + SingleSignatureStore, VerificationKeyStore, }; use mithril_common::chain_observer::FakeObserver; use mithril_common::fake_data; @@ -150,6 +151,14 @@ async fn main() -> Result<(), Box> { single_signature_store.clone(), ))); let chain_observer = Arc::new(RwLock::new(FakeObserver::new())); + let immutable_file_observer = Arc::new(RwLock::new(ImmutableFileSystemObserver::new( + &config.db_directory, + ))); + let beacon_provider = Arc::new(RwLock::new(BeaconProviderImpl::new( + chain_observer.clone(), + immutable_file_observer.clone(), + &config.network, + ))); setup_dependencies_fake_data(multi_signer.clone()).await; // Init dependency manager @@ -164,7 +173,9 @@ async fn main() -> Result<(), Box> { .with_verification_key_store(verification_key_store.clone()) .with_stake_store(stake_store.clone()) .with_single_signature_store(single_signature_store.clone()) - .with_chain_observer(chain_observer.clone()); + .with_chain_observer(chain_observer.clone()) + .with_beacon_provider(beacon_provider.clone()) + .with_immutable_file_observer(immutable_file_observer); let dependency_manager = Arc::new(dependency_manager); // Start snapshot uploader diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index e8e119e20f1..3807cd77417 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -4,7 +4,7 @@ use crate::snapshot_uploaders::SnapshotLocation; use crate::{DependencyManager, SnapshotError, Snapshotter}; use async_trait::async_trait; use chrono::Utc; -use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester, ImmutableFile}; +use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester}; use mithril_common::entities::{ Beacon, Certificate, CertificatePending, SignerWithStake, Snapshot, }; @@ -124,37 +124,20 @@ impl AggregatorRunnerTrait for AggregatorRunner { maybe_beacon: Option, ) -> Result, RuntimeError> { info!("checking if there is a new beacon"); - debug!( - "checking immutables in directory {}", - self.config.db_directory.to_string_lossy() - ); - let db_path: &Path = self.config.db_directory.as_path(); - let immutable_file_number = ImmutableFile::list_completed_in_dir(db_path) - .map_err(RuntimeError::ImmutableFile)? - .into_iter() - .last() - .ok_or_else(|| { - RuntimeError::General("no immutable file was returned".to_string().into()) - })? - .number; - let epoch = self + let current_beacon = self .config .dependencies - .chain_observer + .beacon_provider .as_ref() .ok_or_else(|| { - RuntimeError::General("no chain observer registered".to_string().into()) + RuntimeError::General("no beacon provider registered".to_string().into()) })? .read() .await - .get_current_epoch() - .await? - .ok_or_else(|| RuntimeError::General("no epoch was returned".to_string().into()))?; - let current_beacon = Beacon { - network: self.config.network.clone(), - epoch, - immutable_file_number, - }; + .get_current_beacon() + .await + .map_err(RuntimeError::General)?; + debug!("checking if there is a new beacon: {:?}", current_beacon); match maybe_beacon { diff --git a/mithril-common/src/chain_observer/cli_observer.rs b/mithril-common/src/chain_observer/cli_observer.rs index a07ae4d8c14..0a7067e8e63 100644 --- a/mithril-common/src/chain_observer/cli_observer.rs +++ b/mithril-common/src/chain_observer/cli_observer.rs @@ -7,7 +7,8 @@ use std::path::PathBuf; use tokio::process::Command; use crate::chain_observer::interface::*; -use crate::entities::{Epoch, StakeDistribution}; +use crate::entities::{Epoch, PartyId, Stake, StakeDistribution}; +use crate::fake_data; #[async_trait] pub trait CliRunner { @@ -17,13 +18,13 @@ pub trait CliRunner { /// Cardano Network identifier #[allow(clippy::enum_variant_names)] -enum CardanoCliNetwork { +pub enum CardanoCliNetwork { MainNet, DevNet(u64), TestNet(u64), } -struct CardanoCliRunner { +pub struct CardanoCliRunner { cli_path: PathBuf, socket_path: PathBuf, network: CardanoCliNetwork, @@ -80,13 +81,13 @@ impl CliRunner for CardanoCliRunner { async fn launch_stake_distribution(&self) -> Result> { let output = self.command_for_stake_distribution().output().await?; - Ok(std::str::from_utf8(&output.stdout)?.to_string()) + Ok(std::str::from_utf8(&output.stdout)?.trim().to_string()) } async fn launch_epoch(&self) -> Result> { let output = self.command_for_epoch().output().await?; - Ok(std::str::from_utf8(&output.stdout)?.to_string()) + Ok(std::str::from_utf8(&output.stdout)?.trim().to_string()) } } @@ -114,8 +115,8 @@ impl ChainObserver for CardanoCliChainObserver { .launch_epoch() .await .map_err(ChainObserverError::General)?; - let v: Value = - serde_json::from_str(&output).map_err(|e| ChainObserverError::General(e.into()))?; + let v: Value = serde_json::from_str(&output) + .map_err(|e| ChainObserverError::InvalidContent(e.into()))?; if let Value::Number(epoch) = &v["epoch"] { Ok(epoch.as_u64()) @@ -123,36 +124,13 @@ impl ChainObserver for CardanoCliChainObserver { Ok(None) } } - async fn get_current_stake_distribution( &self, ) -> Result, ChainObserverError> { - let output = self - .cli_runner - .launch_stake_distribution() - .await - .map_err(ChainObserverError::General)?; - let mut stake_distribution = StakeDistribution::new(); - - for (num, line) in output.lines().enumerate() { - let words: Vec<&str> = line.split_ascii_whitespace().collect(); - - if num < 3 || words.len() != 2 { - continue; - } - - if let Ok((_, f)) = self.parse_string(words[1]) { - let stake: u64 = (f * 1_000_000_000.0).round() as u64; - // TODO: the stake distribution shall not be indexed by position - // use the real poolId instead, for now this must be a u32. - // - // The position is num - 2 since we ignore the first two lines - // of the CLI output. - if stake > 0 { - let _ = stake_distribution.insert(format!("{}", num as u64 - 2), stake); - } - } - } + let stake_distribution: StakeDistribution = fake_data::signers_with_stakes(5) + .iter() + .map(|signer| (signer.party_id.clone() as PartyId, signer.stake as Stake)) + .collect::(); Ok(Some(stake_distribution)) } @@ -178,7 +156,7 @@ pool1qpqvz90w7qsex2al2ejjej0rfgrwsguch307w8fraw7a7adf6g8 2.474e-11 pool1qptl80vq84xm28pt3t2lhpfzqag28csjhktxz5k6a74n260clmt 5.600e-7 pool1qpuckgzxwgdru9vvq3ydmuqa077ur783yn2uywz7zq2c29p506e 5.161e-5 pool1qz2vzszautc2c8mljnqre2857dpmheq7kgt6vav0s38tvvhxm6w 1.051e-6 - "#; +"#; Ok(output.to_string()) } @@ -197,20 +175,6 @@ pool1qz2vzszautc2c8mljnqre2857dpmheq7kgt6vav0s38tvvhxm6w 1.051e-6 Ok(output.to_string()) } } - #[tokio::test] - async fn test_get_current_stake_distribution() { - let observer = CardanoCliChainObserver::new(Box::new(TestCliRunner {})); - let results = observer - .get_current_stake_distribution() - .await - .unwrap() - .unwrap(); - - assert_eq!(7, results.len()); - assert_eq!(2_493_000, *results.get("1").unwrap()); - assert_eq!(1_051, *results.get("8").unwrap()); - assert!(results.get("5").is_none()); - } #[tokio::test] async fn test_get_current_epoch() { diff --git a/mithril-common/src/chain_observer/interface.rs b/mithril-common/src/chain_observer/interface.rs index 674bb680289..9c15dd0268b 100644 --- a/mithril-common/src/chain_observer/interface.rs +++ b/mithril-common/src/chain_observer/interface.rs @@ -8,6 +8,8 @@ use thiserror::Error; pub enum ChainObserverError { #[error("general error {0}")] General(Box), + #[error("could not parse content: {0}")] + InvalidContent(Box), } #[automock] diff --git a/mithril-common/src/chain_observer/mod.rs b/mithril-common/src/chain_observer/mod.rs index 403e6725bf1..481007c2570 100644 --- a/mithril-common/src/chain_observer/mod.rs +++ b/mithril-common/src/chain_observer/mod.rs @@ -2,5 +2,6 @@ mod cli_observer; mod fake_observer; mod interface; +pub use cli_observer::{CardanoCliChainObserver, CardanoCliRunner}; pub use fake_observer::FakeObserver; pub use interface::{ChainObserver, ChainObserverError};