Skip to content

Commit 3ecd834

Browse files
committed
Introduce the BeaconStore
1 parent 857708a commit 3ecd834

File tree

6 files changed

+235
-27
lines changed

6 files changed

+235
-27
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use async_trait::async_trait;
2+
use mithril_common::entities::Beacon;
3+
4+
#[cfg(test)]
5+
use mockall::automock;
6+
7+
/// BeaconStore represents a beacon store interactor
8+
#[cfg_attr(test, automock)]
9+
#[async_trait]
10+
pub trait BeaconStore: Sync + Send {
11+
/// Get the current beacon
12+
async fn get_current_beacon(&self) -> Result<Option<Beacon>, String>;
13+
14+
/// Set the current beacon
15+
async fn set_current_beacon(&mut self, beacon: Beacon) -> Result<(), String>;
16+
17+
/// Reset the current beacon
18+
async fn reset_current_beacon(&mut self) -> Result<(), String>;
19+
}
20+
21+
/// MemoryBeaconStore is in memory [`BeaconStore`]
22+
pub struct MemoryBeaconStore {
23+
current_beacon: Option<Beacon>,
24+
}
25+
26+
impl Default for MemoryBeaconStore {
27+
/// MemoryBeaconStore factory
28+
fn default() -> Self {
29+
Self {
30+
current_beacon: None,
31+
}
32+
}
33+
}
34+
35+
#[async_trait]
36+
impl BeaconStore for MemoryBeaconStore {
37+
async fn get_current_beacon(&self) -> Result<Option<Beacon>, String> {
38+
Ok(self.current_beacon.clone())
39+
}
40+
41+
async fn set_current_beacon(&mut self, beacon: Beacon) -> Result<(), String> {
42+
self.current_beacon = Some(beacon);
43+
Ok(())
44+
}
45+
46+
async fn reset_current_beacon(&mut self) -> Result<(), String> {
47+
self.current_beacon = None;
48+
Ok(())
49+
}
50+
}
51+
52+
#[cfg(test)]
53+
mod tests {
54+
use super::{BeaconStore, MemoryBeaconStore};
55+
use mithril_common::fake_data;
56+
57+
#[tokio::test]
58+
async fn test_can_store_beacon() {
59+
let mut sut = MemoryBeaconStore::default();
60+
let beacon = fake_data::beacon();
61+
sut.set_current_beacon(beacon.clone())
62+
.await
63+
.expect("unexpected error in set_current_beacon");
64+
let stored_beacon = sut.get_current_beacon().await;
65+
66+
assert_eq!(Some(beacon), stored_beacon.unwrap());
67+
}
68+
69+
#[tokio::test]
70+
async fn test_reset_current_beacon_ok() {
71+
let mut sut = MemoryBeaconStore::default();
72+
sut.set_current_beacon(fake_data::beacon())
73+
.await
74+
.expect("unexpected error in set_current_beacon");
75+
sut.reset_current_beacon()
76+
.await
77+
.expect("unexpected error in set_current_beacon");
78+
let stored_beacon = sut.get_current_beacon().await;
79+
80+
assert_eq!(None, stored_beacon.unwrap());
81+
}
82+
}

mithril-aggregator/src/dependency.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1+
use crate::beacon_store::BeaconStore;
12
use std::sync::Arc;
23
use tokio::sync::RwLock;
34

45
use super::entities::*;
56
use super::multi_signer::MultiSigner;
67
use super::snapshot_store::SnapshotStorer;
78

9+
/// BeaconStoreWrapper wraps a BeaconStore
10+
pub type BeaconStoreWrapper = Arc<RwLock<dyn BeaconStore>>;
11+
812
/// SnapshotStorerWrapper wraps a SnapshotStorer
913
pub type SnapshotStorerWrapper = Arc<RwLock<dyn SnapshotStorer>>;
1014

@@ -16,6 +20,7 @@ pub struct DependencyManager {
1620
pub config: Config,
1721
pub snapshot_storer: Option<SnapshotStorerWrapper>,
1822
pub multi_signer: Option<MultiSignerWrapper>,
23+
pub beacon_store: Option<BeaconStoreWrapper>,
1924
}
2025

2126
impl DependencyManager {
@@ -25,6 +30,7 @@ impl DependencyManager {
2530
config,
2631
snapshot_storer: None,
2732
multi_signer: None,
33+
beacon_store: None,
2834
}
2935
}
3036

@@ -39,4 +45,10 @@ impl DependencyManager {
3945
self.multi_signer = Some(multi_signer);
4046
self
4147
}
48+
49+
/// With MultiSigner
50+
pub fn with_beacon_store(&mut self, beacon_store: BeaconStoreWrapper) -> &mut Self {
51+
self.beacon_store = Some(beacon_store);
52+
self
53+
}
4254
}

mithril-aggregator/src/http_server.rs

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ use std::sync::Arc;
1010
use warp::Future;
1111
use warp::{http::Method, http::StatusCode, Filter};
1212

13-
use super::dependency::{DependencyManager, MultiSignerWrapper, SnapshotStorerWrapper};
13+
use super::dependency::{
14+
BeaconStoreWrapper, DependencyManager, MultiSignerWrapper, SnapshotStorerWrapper,
15+
};
1416
use super::multi_signer;
1517

1618
const SERVER_BASE_PATH: &str = "aggregator";
@@ -67,10 +69,11 @@ mod router {
6769

6870
/// GET /certificate-pending
6971
pub fn certificate_pending(
70-
_dependency_manager: Arc<DependencyManager>,
72+
dependency_manager: Arc<DependencyManager>,
7173
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
7274
warp::path!("certificate-pending")
7375
.and(warp::get())
76+
.and(with_beacon_store(dependency_manager))
7477
.and_then(handlers::certificate_pending)
7578
}
7679

@@ -138,19 +141,44 @@ mod router {
138141
) -> impl Filter<Extract = (MultiSignerWrapper,), Error = Infallible> + Clone {
139142
warp::any().map(move || dependency_manager.multi_signer.as_ref().unwrap().clone())
140143
}
144+
145+
/// With beacon store middleware
146+
fn with_beacon_store(
147+
dependency_manager: Arc<DependencyManager>,
148+
) -> impl Filter<Extract = (BeaconStoreWrapper,), Error = Infallible> + Clone {
149+
warp::any().map(move || dependency_manager.beacon_store.as_ref().unwrap().clone())
150+
}
141151
}
142152

143153
mod handlers {
144154
use super::*;
145155

146156
/// Certificate Pending
147-
pub async fn certificate_pending() -> Result<impl warp::Reply, Infallible> {
157+
pub async fn certificate_pending(
158+
beacon_store: BeaconStoreWrapper,
159+
) -> Result<impl warp::Reply, Infallible> {
148160
debug!("certificate_pending");
149161

150-
// Certificate pending
151-
let certificate_pending = fake_data::certificate_pending();
162+
let beacon_store = beacon_store.read().await;
163+
match beacon_store.get_current_beacon().await {
164+
Ok(Some(beacon)) => {
165+
let mut certificate_pending = fake_data::certificate_pending();
166+
certificate_pending.beacon = beacon;
152167

153-
Ok(warp::reply::json(&certificate_pending))
168+
Ok(warp::reply::with_status(
169+
warp::reply::json(&certificate_pending),
170+
StatusCode::OK,
171+
))
172+
}
173+
Ok(None) => Ok(warp::reply::with_status(
174+
warp::reply::json(&Null),
175+
StatusCode::NO_CONTENT,
176+
)),
177+
Err(err) => Ok(warp::reply::with_status(
178+
warp::reply::json(&entities::Error::new("MITHRIL-E0006".to_string(), err)),
179+
StatusCode::INTERNAL_SERVER_ERROR,
180+
)),
181+
}
154182
}
155183

156184
/// Certificate by certificate hash
@@ -329,6 +357,7 @@ mod tests {
329357
use tokio::sync::RwLock;
330358
use warp::test::request;
331359

360+
use super::super::beacon_store::MockBeaconStore;
332361
use super::super::entities::*;
333362
use super::super::multi_signer::MockMultiSigner;
334363
use super::super::snapshot_store::MockSnapshotStorer;
@@ -345,9 +374,66 @@ mod tests {
345374

346375
#[tokio::test]
347376
async fn test_certificate_pending_get_ok() {
348-
let dependency_manager = setup_dependency_manager();
377+
let mut beacon_store = MockBeaconStore::new();
378+
beacon_store
379+
.expect_get_current_beacon()
380+
.return_once(|| Ok(Some(fake_data::beacon())));
381+
let mut dependency_manager = setup_dependency_manager();
382+
let method = Method::GET.as_str();
383+
let path = "/certificate-pending";
384+
dependency_manager.with_beacon_store(Arc::new(RwLock::new(beacon_store)));
385+
386+
let response = request()
387+
.method(method)
388+
.path(&format!("/{}{}", SERVER_BASE_PATH, path))
389+
.reply(&router::routes(Arc::new(dependency_manager)))
390+
.await;
391+
392+
APISpec::from_file(API_SPEC_FILE)
393+
.method(method)
394+
.path(path)
395+
.validate_request(&Null)
396+
.unwrap()
397+
.validate_response(&response)
398+
.expect("OpenAPI error");
399+
}
400+
401+
#[tokio::test]
402+
async fn test_certificate_pending_get_ok_204() {
403+
let mut beacon_store = MockBeaconStore::new();
404+
beacon_store
405+
.expect_get_current_beacon()
406+
.return_once(|| Ok(None));
407+
let mut dependency_manager = setup_dependency_manager();
408+
let method = Method::GET.as_str();
409+
let path = "/certificate-pending";
410+
dependency_manager.with_beacon_store(Arc::new(RwLock::new(beacon_store)));
411+
412+
let response = request()
413+
.method(method)
414+
.path(&format!("/{}{}", SERVER_BASE_PATH, path))
415+
.reply(&router::routes(Arc::new(dependency_manager)))
416+
.await;
417+
418+
APISpec::from_file(API_SPEC_FILE)
419+
.method(method)
420+
.path(path)
421+
.validate_request(&Null)
422+
.unwrap()
423+
.validate_response(&response)
424+
.expect("OpenAPI error");
425+
}
426+
427+
#[tokio::test]
428+
async fn test_certificate_pending_get_ko() {
429+
let mut beacon_store = MockBeaconStore::new();
430+
beacon_store
431+
.expect_get_current_beacon()
432+
.return_once(|| Err("an error occurred".to_string()));
433+
let mut dependency_manager = setup_dependency_manager();
349434
let method = Method::GET.as_str();
350435
let path = "/certificate-pending";
436+
dependency_manager.with_beacon_store(Arc::new(RwLock::new(beacon_store)));
351437

352438
let response = request()
353439
.method(method)

mithril-aggregator/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod beacon_store;
12
mod dependency;
23
mod entities;
34
mod http_server;
@@ -13,6 +14,7 @@ pub use crate::multi_signer::{
1314
ProtocolSignerVerificationKey, ProtocolStake,
1415
};
1516
pub use crate::snapshot_store::SnapshotStoreHTTPClient;
17+
pub use beacon_store::MemoryBeaconStore;
1618
pub use dependency::DependencyManager;
1719
pub use runtime::AggregatorRuntime;
1820
pub use snapshotter::Snapshotter;

mithril-aggregator/src/main.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
use clap::Parser;
44

55
use mithril_aggregator::{
6-
key_decode_hex, AggregatorRuntime, Config, DependencyManager, MultiSigner, MultiSignerImpl,
7-
ProtocolParameters, ProtocolPartyId, ProtocolSignerVerificationKey, ProtocolStake, Server,
8-
SnapshotStoreHTTPClient,
6+
key_decode_hex, AggregatorRuntime, Config, DependencyManager, MemoryBeaconStore, MultiSigner,
7+
MultiSignerImpl, ProtocolParameters, ProtocolPartyId, ProtocolSignerVerificationKey,
8+
ProtocolStake, Server, SnapshotStoreHTTPClient,
99
};
1010
use mithril_common::fake_data;
1111
use slog::{Drain, Level, Logger};
@@ -88,18 +88,23 @@ async fn main() {
8888
)));
8989

9090
let multi_signer = Arc::new(RwLock::new(init_multi_signer()));
91+
let beacon_store = Arc::new(RwLock::new(MemoryBeaconStore::default()));
9192

92-
// Init dependecy manager
93+
// Init dependency manager
9394
let mut dependency_manager = DependencyManager::new(config);
9495
dependency_manager
9596
.with_snapshot_storer(snapshot_storer.clone())
96-
.with_multi_signer(multi_signer.clone());
97+
.with_multi_signer(multi_signer.clone())
98+
.with_beacon_store(beacon_store.clone());
9799
let dependency_manager = Arc::new(dependency_manager);
98100

99101
// Start snapshot uploader
100102
let handle = tokio::spawn(async move {
101-
let runtime =
102-
AggregatorRuntime::new(args.snapshot_interval * 1000, args.db_directory.clone());
103+
let runtime = AggregatorRuntime::new(
104+
args.snapshot_interval * 1000,
105+
args.db_directory.clone(),
106+
beacon_store.clone(),
107+
);
103108
runtime.run().await
104109
});
105110

0 commit comments

Comments
 (0)