Skip to content

Commit 9f5f1a7

Browse files
committed
add beacon provider
1 parent c4e3bcc commit 9f5f1a7

File tree

6 files changed

+158
-31
lines changed

6 files changed

+158
-31
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use async_trait::async_trait;
2+
use mithril_common::{chain_observer::ChainObserver, digesters::ImmutableFile, entities::Beacon};
3+
use std::{error::Error, path::PathBuf, sync::Arc};
4+
use tokio::sync::RwLock;
5+
6+
use crate::runtime::RuntimeError;
7+
8+
#[async_trait]
9+
pub trait BeaconProvider
10+
where
11+
Self: Sync + Send,
12+
{
13+
async fn get_current_beacon(&self) -> Result<Beacon, Box<dyn Error + Sync + Send>>;
14+
}
15+
pub struct BeaconProviderImpl {
16+
observer: Arc<RwLock<dyn ChainObserver>>,
17+
db_path: PathBuf,
18+
network: String,
19+
}
20+
21+
impl BeaconProviderImpl {
22+
pub fn new(observer: Arc<RwLock<dyn ChainObserver>>, db_path: PathBuf, network: &str) -> Self {
23+
let network = network.to_string();
24+
25+
Self {
26+
observer,
27+
db_path,
28+
network,
29+
}
30+
}
31+
}
32+
33+
#[async_trait]
34+
impl BeaconProvider for BeaconProviderImpl {
35+
async fn get_current_beacon(&self) -> Result<Beacon, Box<dyn Error + Sync + Send>> {
36+
let epoch = self
37+
.observer
38+
.read()
39+
.await
40+
.get_current_epoch()
41+
.await?
42+
.ok_or_else(|| RuntimeError::General("could not get Epoch".to_string().into()))?;
43+
let immutable_file_number = ImmutableFile::list_completed_in_dir(&self.db_path)
44+
.map_err(RuntimeError::ImmutableFile)?
45+
.into_iter()
46+
.last()
47+
.ok_or_else(|| {
48+
RuntimeError::General("no immutable file was returned".to_string().into())
49+
})?
50+
.number;
51+
let beacon = Beacon {
52+
network: self.network.clone(),
53+
epoch,
54+
immutable_file_number,
55+
};
56+
57+
Ok(beacon)
58+
}
59+
}
60+
61+
#[cfg(test)]
62+
mod tests {
63+
use super::*;
64+
65+
use mithril_common::chain_observer::{ChainObserver, ChainObserverError};
66+
use mithril_common::entities::{Epoch, StakeDistribution};
67+
68+
struct TestChainObserver {}
69+
70+
#[async_trait]
71+
impl ChainObserver for TestChainObserver {
72+
async fn get_current_epoch(&self) -> Result<Option<Epoch>, ChainObserverError> {
73+
Ok(Some(42))
74+
}
75+
76+
async fn get_current_stake_distribution(
77+
&self,
78+
) -> Result<Option<StakeDistribution>, ChainObserverError> {
79+
let stake_distribution: StakeDistribution = [
80+
(
81+
"pool1qqyjr9pcrv97gwrueunug829fs5znw6p2wxft3fvqkgu5f4qlrg".to_string(),
82+
2_493_000 as u64,
83+
),
84+
(
85+
"pool1qqfnw2fwajdnam7xsqhhrje5cgd8jcltzfrx655rd23eqlxjfef".to_string(),
86+
21_640,
87+
),
88+
(
89+
"pool1qqnjh80kudcjphrxftj74x22q3a4uvw8wknlxptgs7gdqtstqad".to_string(),
90+
80,
91+
),
92+
(
93+
"pool1qquwwu6680fr72y4779r2kpc7mxtch8rp2uhuqcc7v9p6q4f7ph".to_string(),
94+
70,
95+
),
96+
(
97+
"pool1qptl80vq84xm28pt3t2lhpfzqag28csjhktxz5k6a74n260clmt".to_string(),
98+
56,
99+
),
100+
(
101+
"pool1qpuckgzxwgdru9vvq3ydmuqa077ur783yn2uywz7zq2c29p506e".to_string(),
102+
51_610,
103+
),
104+
(
105+
"pool1qz2vzszautc2c8mljnqre2857dpmheq7kgt6vav0s38tvvhxm6w".to_string(),
106+
1_051,
107+
),
108+
]
109+
.into_iter()
110+
.collect();
111+
112+
Ok(Some(stake_distribution))
113+
}
114+
}
115+
116+
#[tokio::test]
117+
async fn test() {
118+
let beacon_provider = BeaconProviderImpl::new(
119+
Arc::new(RwLock::new(TestChainObserver {})),
120+
PathBuf::new().join("/tmp/db"),
121+
"whatever",
122+
);
123+
let res = beacon_provider.get_current_beacon().await;
124+
assert!(res.is_err());
125+
}
126+
}

mithril-aggregator/src/dependency.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use super::multi_signer::MultiSigner;
99
use super::snapshot_stores::SnapshotStore;
1010
use crate::beacon_store::BeaconStore;
1111
use crate::snapshot_uploaders::SnapshotUploader;
12-
use crate::{CertificatePendingStore, CertificateStore, VerificationKeyStore};
12+
use crate::{BeaconProvider, CertificatePendingStore, CertificateStore, VerificationKeyStore};
1313

1414
/// BeaconStoreWrapper wraps a BeaconStore
1515
pub type BeaconStoreWrapper = Arc<RwLock<dyn BeaconStore>>;
@@ -38,6 +38,9 @@ pub type StakeStoreWrapper = Arc<RwLock<StakeStore>>;
3838
/// ChainObserverWrapper wraps a ChainObserver
3939
pub type ChainObserverWrapper = Arc<RwLock<dyn ChainObserver>>;
4040

41+
/// BeaconProviderWrapper wraps a BeaconProvider
42+
pub type BeaconProviderWrapper = Arc<RwLock<dyn BeaconProvider>>;
43+
4144
/// DependencyManager handles the dependencies
4245
pub struct DependencyManager {
4346
pub config: Config,
@@ -50,6 +53,7 @@ pub struct DependencyManager {
5053
pub verification_key_store: Option<VerificationKeyStoreWrapper>,
5154
pub stake_store: Option<StakeStoreWrapper>,
5255
pub chain_observer: Option<ChainObserverWrapper>,
56+
pub beacon_provider: Option<BeaconProviderWrapper>,
5357
}
5458

5559
impl DependencyManager {
@@ -66,9 +70,16 @@ impl DependencyManager {
6670
verification_key_store: None,
6771
stake_store: None,
6872
chain_observer: None,
73+
beacon_provider: None,
6974
}
7075
}
7176

77+
/// With BeaconProvider middleware
78+
pub fn with_beacon_provider(&mut self, beacon_provider: BeaconProviderWrapper) -> &mut Self {
79+
self.beacon_provider = Some(beacon_provider);
80+
self
81+
}
82+
7283
/// With SnapshotStore middleware
7384
pub fn with_snapshot_store(&mut self, snapshot_store: SnapshotStoreWrapper) -> &mut Self {
7485
self.snapshot_store = Some(snapshot_store);

mithril-aggregator/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod beacon_provider;
12
mod beacon_store;
23
mod dependency;
34
mod entities;
@@ -13,6 +14,7 @@ mod tools;
1314
pub use crate::entities::Config;
1415
pub use crate::multi_signer::{MultiSigner, MultiSignerImpl, ProtocolError};
1516
pub use crate::snapshot_stores::{RemoteSnapshotStore, SnapshotStore};
17+
pub use beacon_provider::{BeaconProvider, BeaconProviderImpl};
1618
pub use beacon_store::{BeaconStore, BeaconStoreError, MemoryBeaconStore};
1719
pub use dependency::DependencyManager;
1820
pub use http_server::Server;

mithril-aggregator/src/main.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use clap::Parser;
44

55
use config::{Map, Source, Value, ValueKind};
66
use mithril_aggregator::{
7-
AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore,
8-
CertificateStore, Config, DependencyManager, MemoryBeaconStore, MultiSigner, MultiSignerImpl,
9-
Server, VerificationKeyStore,
7+
AggregatorConfig, AggregatorRunner, AggregatorRuntime, BeaconProviderImpl,
8+
CertificatePendingStore, CertificateStore, Config, DependencyManager, MemoryBeaconStore,
9+
MultiSigner, MultiSignerImpl, Server, VerificationKeyStore,
1010
};
1111
use mithril_common::chain_observer::FakeObserver;
1212
use mithril_common::fake_data;
@@ -164,7 +164,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
164164
.with_certificate_store(certificate_store.clone())
165165
.with_verification_key_store(verification_key_store.clone())
166166
.with_stake_store(stake_store.clone())
167-
.with_chain_observer(chain_observer.clone());
167+
.with_chain_observer(chain_observer.clone())
168+
.with_beacon_provider(Arc::new(RwLock::new(BeaconProviderImpl::new(
169+
chain_observer.clone(),
170+
config.db_directory.to_path_buf(),
171+
&config.network,
172+
))));
168173
let dependency_manager = Arc::new(dependency_manager);
169174

170175
// Start snapshot uploader

mithril-aggregator/src/runtime/runner.rs

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::snapshot_uploaders::SnapshotLocation;
44
use crate::{DependencyManager, SnapshotError, Snapshotter};
55
use async_trait::async_trait;
66
use chrono::Utc;
7-
use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester, ImmutableFile};
7+
use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester};
88
use mithril_common::entities::{
99
Beacon, Certificate, CertificatePending, SignerWithStake, Snapshot,
1010
};
@@ -124,37 +124,20 @@ impl AggregatorRunnerTrait for AggregatorRunner {
124124
maybe_beacon: Option<Beacon>,
125125
) -> Result<Option<Beacon>, RuntimeError> {
126126
info!("checking if there is a new beacon");
127-
debug!(
128-
"checking immutables in directory {}",
129-
self.config.db_directory.to_string_lossy()
130-
);
131-
let db_path: &Path = self.config.db_directory.as_path();
132-
let immutable_file_number = ImmutableFile::list_completed_in_dir(db_path)
133-
.map_err(RuntimeError::ImmutableFile)?
134-
.into_iter()
135-
.last()
136-
.ok_or_else(|| {
137-
RuntimeError::General("no immutable file was returned".to_string().into())
138-
})?
139-
.number;
140-
let epoch = self
127+
let current_beacon = self
141128
.config
142129
.dependencies
143-
.chain_observer
130+
.beacon_provider
144131
.as_ref()
145132
.ok_or_else(|| {
146-
RuntimeError::General("no chain observer registered".to_string().into())
133+
RuntimeError::General("no beacon provider registered".to_string().into())
147134
})?
148135
.read()
149136
.await
150-
.get_current_epoch()
151-
.await?
152-
.ok_or_else(|| RuntimeError::General("no epoch was returned".to_string().into()))?;
153-
let current_beacon = Beacon {
154-
network: self.config.network.clone(),
155-
epoch,
156-
immutable_file_number,
157-
};
137+
.get_current_beacon()
138+
.await
139+
.map_err(RuntimeError::General)?;
140+
158141
debug!("checking if there is a new beacon: {:?}", current_beacon);
159142

160143
match maybe_beacon {

mithril-common/src/chain_observer/cli_observer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ impl ChainObserver for CardanoCliChainObserver {
130130
let stake_distribution: HashMap<String, u64> = [
131131
(
132132
"pool1qqyjr9pcrv97gwrueunug829fs5znw6p2wxft3fvqkgu5f4qlrg".to_string(),
133-
2_493_000 as u64,
133+
2_493_000_u64,
134134
),
135135
(
136136
"pool1qqfnw2fwajdnam7xsqhhrje5cgd8jcltzfrx655rd23eqlxjfef".to_string(),

0 commit comments

Comments
 (0)