diff --git a/CHANGELOG.md b/CHANGELOG.md index b9396942682..6a3735cc08c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,8 @@ As a minor extension, we have adopted a slightly different versioning convention - **UNSTABLE** : - Support for DMQ signature publisher in the signer and signature consumer in the aggregator. + - Implement automatic certificates chain synchronization between leader/follower aggregators. + - Crates versions: | Crate | Version | diff --git a/Cargo.lock b/Cargo.lock index af75e05e994..f28604c31c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3910,10 +3910,12 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.7.72" +version = "0.7.73" dependencies = [ "anyhow", "async-trait", + "axum", + "axum-test", "chrono", "clap", "config", @@ -4160,7 +4162,7 @@ dependencies = [ [[package]] name = "mithril-common" -version = "0.6.7" +version = "0.6.8" dependencies = [ "anyhow", "async-trait", @@ -4238,7 +4240,7 @@ dependencies = [ [[package]] name = "mithril-end-to-end" -version = "0.4.95" +version = "0.4.96" dependencies = [ "anyhow", "async-recursion", diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index 5446f43aef0..713c5f5023d 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.7.72" +version = "0.7.73" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } @@ -76,6 +76,8 @@ zstd = { version = "0.13.3", features = ["zstdmt"] } tikv-jemallocator = { version = "0.6.0", optional = true } [dev-dependencies] +axum = { version = "0.8.4", features = ["json"] } +axum-test = "17.3.0" criterion = { version = "0.6.0", features = ["html_reports", "async_tokio"] } http = "1.3.1" httpmock = "0.7.0" diff --git a/mithril-aggregator/src/database/query/certificate/conditions.rs b/mithril-aggregator/src/database/query/certificate/conditions.rs new file mode 100644 index 00000000000..c6687ffc7f5 --- /dev/null +++ b/mithril-aggregator/src/database/query/certificate/conditions.rs @@ -0,0 +1,65 @@ +//! Shared `WhereCondition` across certificates queries + +use sqlite::Value; +use std::iter::repeat_n; + +use mithril_persistence::sqlite::WhereCondition; + +use crate::database::record::CertificateRecord; + +pub(super) fn insert_many(certificates_records: Vec) -> WhereCondition { + let columns = "(\ + certificate_id, \ + parent_certificate_id, \ + message, \ + signature, \ + aggregate_verification_key, \ + epoch, \ + network, \ + signed_entity_type_id, \ + signed_entity_beacon, \ + protocol_version, \ + protocol_parameters, \ + protocol_message, \ + signers, \ + initiated_at, \ + sealed_at)"; + let values_columns: Vec<&str> = repeat_n( + "(?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*)", + certificates_records.len(), + ) + .collect(); + + let values: Vec = certificates_records + .into_iter() + .flat_map(|certificate_record| { + vec![ + Value::String(certificate_record.certificate_id), + match certificate_record.parent_certificate_id { + Some(parent_certificate_id) => Value::String(parent_certificate_id), + None => Value::Null, + }, + Value::String(certificate_record.message), + Value::String(certificate_record.signature), + Value::String(certificate_record.aggregate_verification_key), + Value::Integer(certificate_record.epoch.try_into().unwrap()), + Value::String(certificate_record.network), + Value::Integer(certificate_record.signed_entity_type.index() as i64), + Value::String(certificate_record.signed_entity_type.get_json_beacon().unwrap()), + Value::String(certificate_record.protocol_version), + Value::String( + serde_json::to_string(&certificate_record.protocol_parameters).unwrap(), + ), + Value::String(serde_json::to_string(&certificate_record.protocol_message).unwrap()), + Value::String(serde_json::to_string(&certificate_record.signers).unwrap()), + Value::String(certificate_record.initiated_at.to_rfc3339()), + Value::String(certificate_record.sealed_at.to_rfc3339()), + ] + }) + .collect(); + + WhereCondition::new( + format!("{columns} values {}", values_columns.join(", ")).as_str(), + values, + ) +} diff --git a/mithril-aggregator/src/database/query/certificate/insert_certificate.rs b/mithril-aggregator/src/database/query/certificate/insert_certificate.rs index 82ea9b34364..381e0321e23 100644 --- a/mithril-aggregator/src/database/query/certificate/insert_certificate.rs +++ b/mithril-aggregator/src/database/query/certificate/insert_certificate.rs @@ -1,11 +1,9 @@ -use std::iter::repeat_n; - -use sqlite::Value; - use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; use crate::database::record::CertificateRecord; +use super::conditions; + /// Query to insert [CertificateRecord] in the sqlite database pub struct InsertCertificateRecordQuery { condition: WhereCondition, @@ -17,64 +15,9 @@ impl InsertCertificateRecordQuery { } pub fn many(certificates_records: Vec) -> Self { - let columns = "(\ - certificate_id, \ - parent_certificate_id, \ - message, \ - signature, \ - aggregate_verification_key, \ - epoch, \ - network, \ - signed_entity_type_id, \ - signed_entity_beacon, \ - protocol_version, \ - protocol_parameters, \ - protocol_message, \ - signers, \ - initiated_at, \ - sealed_at)"; - let values_columns: Vec<&str> = repeat_n( - "(?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*)", - certificates_records.len(), - ) - .collect(); - - let values: Vec = certificates_records - .into_iter() - .flat_map(|certificate_record| { - vec![ - Value::String(certificate_record.certificate_id), - match certificate_record.parent_certificate_id { - Some(parent_certificate_id) => Value::String(parent_certificate_id), - None => Value::Null, - }, - Value::String(certificate_record.message), - Value::String(certificate_record.signature), - Value::String(certificate_record.aggregate_verification_key), - Value::Integer(certificate_record.epoch.try_into().unwrap()), - Value::String(certificate_record.network), - Value::Integer(certificate_record.signed_entity_type.index() as i64), - Value::String(certificate_record.signed_entity_type.get_json_beacon().unwrap()), - Value::String(certificate_record.protocol_version), - Value::String( - serde_json::to_string(&certificate_record.protocol_parameters).unwrap(), - ), - Value::String( - serde_json::to_string(&certificate_record.protocol_message).unwrap(), - ), - Value::String(serde_json::to_string(&certificate_record.signers).unwrap()), - Value::String(certificate_record.initiated_at.to_rfc3339()), - Value::String(certificate_record.sealed_at.to_rfc3339()), - ] - }) - .collect(); - - let condition = WhereCondition::new( - format!("{columns} values {}", values_columns.join(", ")).as_str(), - values, - ); - - Self { condition } + Self { + condition: conditions::insert_many(certificates_records), + } } } diff --git a/mithril-aggregator/src/database/query/certificate/insert_or_replace_certificate.rs b/mithril-aggregator/src/database/query/certificate/insert_or_replace_certificate.rs new file mode 100644 index 00000000000..fa34bb91588 --- /dev/null +++ b/mithril-aggregator/src/database/query/certificate/insert_or_replace_certificate.rs @@ -0,0 +1,169 @@ +use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; + +use crate::database::record::CertificateRecord; + +use super::conditions; + +/// Query to insert or replace [CertificateRecord] in the sqlite database +pub struct InsertOrReplaceCertificateRecordQuery { + condition: WhereCondition, +} + +impl InsertOrReplaceCertificateRecordQuery { + pub fn many(certificates_records: Vec) -> Self { + Self { + condition: conditions::insert_many(certificates_records), + } + } +} + +impl Query for InsertOrReplaceCertificateRecordQuery { + type Entity = CertificateRecord; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + // it is important to alias the fields with the same name as the table + // since the table cannot be aliased in a RETURNING statement in SQLite. + let projection = Self::Entity::get_projection() + .expand(SourceAlias::new(&[("{:certificate:}", "certificate")])); + + format!("insert or replace into certificate {condition} returning {projection}") + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use mithril_common::crypto_helper::tests_setup::setup_certificate_chain; + use mithril_common::entities::Epoch; + use mithril_common::test_utils::fake_data; + use mithril_persistence::sqlite::ConnectionExtensions; + + use crate::database::query::{GetCertificateRecordQuery, InsertCertificateRecordQuery}; + use crate::database::test_helper::main_db_connection; + + use super::*; + + #[test] + fn test_insert_many_certificates_records_in_empty_db() { + let certificates = setup_certificate_chain(5, 2); + let certificates_records: Vec = certificates.into(); + + let connection = main_db_connection().unwrap(); + + let certificates_records_saved: Vec = connection + .fetch_collect(InsertOrReplaceCertificateRecordQuery::many( + certificates_records.clone(), + )) + .expect("saving many records should not fail"); + + assert_eq!(certificates_records, certificates_records_saved); + + // Check insertion order + let all_records: Vec = + connection.fetch_collect(GetCertificateRecordQuery::all()).unwrap(); + assert_eq!( + certificates_records.into_iter().rev().collect::>(), + all_records + ); + } + + #[test] + fn test_replace_one_certificate_record() { + let certificate_record = CertificateRecord { + epoch: Epoch(12), + ..fake_data::certificate("hash").into() + }; + + let connection = main_db_connection().unwrap(); + let certificate_record_saved = connection + .fetch_first(InsertCertificateRecordQuery::one( + certificate_record.clone(), + )) + .unwrap(); + assert_eq!(Some(Epoch(12)), certificate_record_saved.map(|r| r.epoch)); + + let modified_certificate_record = CertificateRecord { + epoch: Epoch(23), + ..certificate_record + }; + let certificate_record_saved = connection + .fetch_first(InsertOrReplaceCertificateRecordQuery::many(vec![ + modified_certificate_record.clone(), + ])) + .unwrap(); + assert_eq!(Some(Epoch(23)), certificate_record_saved.map(|r| r.epoch)); + + let all_records_cursor = connection.fetch(GetCertificateRecordQuery::all()).unwrap(); + assert_eq!(1, all_records_cursor.count()); + } + + #[test] + fn test_insert_and_replace_many_certificate_record() { + let tested_records: HashMap<_, CertificateRecord> = HashMap::from([ + ( + "cert1-genesis", + fake_data::genesis_certificate("genesis").into(), + ), + ("cert2", fake_data::certificate("cert2").into()), + ( + "cert2-modified", + CertificateRecord { + epoch: Epoch(14), + ..fake_data::certificate("cert2").into() + }, + ), + ("cert3", fake_data::certificate("cert3").into()), + ("cert4", fake_data::certificate("cert4").into()), + ( + "cert4-modified", + CertificateRecord { + epoch: Epoch(32), + ..fake_data::certificate("cert4").into() + }, + ), + ("cert5", fake_data::certificate("cert5").into()), + ]); + let connection = main_db_connection().unwrap(); + + let cursor = connection + .fetch(InsertCertificateRecordQuery::many(vec![ + tested_records["cert1-genesis"].clone(), + tested_records["cert2"].clone(), + tested_records["cert3"].clone(), + tested_records["cert4"].clone(), + tested_records["cert5"].clone(), + ])) + .unwrap(); + assert_eq!(5, cursor.count()); + + let cursor = connection + .fetch(InsertOrReplaceCertificateRecordQuery::many(vec![ + tested_records["cert1-genesis"].clone(), + tested_records["cert2-modified"].clone(), + tested_records["cert3"].clone(), + tested_records["cert4-modified"].clone(), + ])) + .unwrap(); + assert_eq!(4, cursor.count()); + + let all_records: Vec = + connection.fetch_collect(GetCertificateRecordQuery::all()).unwrap(); + assert_eq!(5, all_records.len()); + assert_eq!( + all_records, + vec![ + tested_records["cert4-modified"].clone(), + tested_records["cert3"].clone(), + tested_records["cert2-modified"].clone(), + tested_records["cert1-genesis"].clone(), + // Since the cert5 was not in the Insert/replace query, it now has a lower rowid and shows first + tested_records["cert5"].clone(), + ] + ); + } +} diff --git a/mithril-aggregator/src/database/query/certificate/mod.rs b/mithril-aggregator/src/database/query/certificate/mod.rs index 05f5ed5e073..9d1cc2c8c98 100644 --- a/mithril-aggregator/src/database/query/certificate/mod.rs +++ b/mithril-aggregator/src/database/query/certificate/mod.rs @@ -1,9 +1,12 @@ +mod conditions; mod delete_certificate; mod get_certificate; mod get_master_certificate; mod insert_certificate; +mod insert_or_replace_certificate; pub use delete_certificate::*; pub use get_certificate::*; pub use get_master_certificate::*; pub use insert_certificate::*; +pub use insert_or_replace_certificate::*; diff --git a/mithril-aggregator/src/database/query/open_message/conditions.rs b/mithril-aggregator/src/database/query/open_message/conditions.rs new file mode 100644 index 00000000000..393c8a1a36f --- /dev/null +++ b/mithril-aggregator/src/database/query/open_message/conditions.rs @@ -0,0 +1,32 @@ +//! Shared `WhereCondition` across open message queries + +use sqlite::Value; + +use mithril_common::StdResult; +use mithril_persistence::sqlite::WhereCondition; + +use crate::database::record::OpenMessageRecord; + +pub(crate) fn insert_one(record: OpenMessageRecord) -> StdResult { + let expression = "(\ + open_message_id, epoch_setting_id, beacon, signed_entity_type_id, protocol_message, \ + expires_at, created_at, is_certified, is_expired\ + ) values (?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*)"; + let beacon_str = record.signed_entity_type.get_json_beacon()?; + let parameters = vec![ + Value::String(record.open_message_id.to_string()), + Value::Integer(record.epoch.try_into()?), + Value::String(beacon_str), + Value::Integer(record.signed_entity_type.index() as i64), + Value::String(serde_json::to_string(&record.protocol_message)?), + record + .expires_at + .map(|t| Value::String(t.to_rfc3339())) + .unwrap_or(Value::Null), + Value::String(record.created_at.to_rfc3339()), + Value::Integer(record.is_certified as i64), + Value::Integer(record.is_expired as i64), + ]; + + Ok(WhereCondition::new(expression, parameters)) +} diff --git a/mithril-aggregator/src/database/query/open_message/get_open_message.rs b/mithril-aggregator/src/database/query/open_message/get_open_message.rs index 85cbe1a4f5a..3a16b52fd00 100644 --- a/mithril-aggregator/src/database/query/open_message/get_open_message.rs +++ b/mithril-aggregator/src/database/query/open_message/get_open_message.rs @@ -15,6 +15,13 @@ pub struct GetOpenMessageQuery { } impl GetOpenMessageQuery { + #[cfg(test)] + pub fn all() -> Self { + Self { + condition: WhereCondition::default(), + } + } + pub fn by_epoch_and_signed_entity_type( epoch: Epoch, signed_entity_type: &SignedEntityType, diff --git a/mithril-aggregator/src/database/query/open_message/insert_open_message.rs b/mithril-aggregator/src/database/query/open_message/insert_open_message.rs index a3663ce3c65..df60e6a29ec 100644 --- a/mithril-aggregator/src/database/query/open_message/insert_open_message.rs +++ b/mithril-aggregator/src/database/query/open_message/insert_open_message.rs @@ -1,11 +1,10 @@ use chrono::Utc; -use sqlite::Value; -use uuid::Uuid; use mithril_common::StdResult; use mithril_common::entities::{Epoch, ProtocolMessage, SignedEntityType}; use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; +use crate::database::query::open_message::conditions; use crate::database::record::OpenMessageRecord; /// Query to insert [OpenMessageRecord] in the sqlite database @@ -19,23 +18,20 @@ impl InsertOpenMessageQuery { signed_entity_type: &SignedEntityType, protocol_message: &ProtocolMessage, ) -> StdResult { - let expression = "(open_message_id, epoch_setting_id, beacon, signed_entity_type_id, protocol_message, expires_at, created_at) values (?*, ?*, ?*, ?*, ?*, ?*, ?*)"; - let beacon_str = signed_entity_type.get_json_beacon()?; - let parameters = vec![ - Value::String(Uuid::new_v4().to_string()), - Value::Integer(epoch.try_into()?), - Value::String(beacon_str), - Value::Integer(signed_entity_type.index() as i64), - Value::String(serde_json::to_string(protocol_message)?), - signed_entity_type - .get_open_message_timeout() - .map(|t| Value::String((Utc::now() + t).to_rfc3339())) - .unwrap_or(Value::Null), - Value::String(Utc::now().to_rfc3339()), - ]; + let now = Utc::now(); + let record = OpenMessageRecord { + open_message_id: OpenMessageRecord::new_id(), + epoch, + signed_entity_type: signed_entity_type.clone(), + protocol_message: protocol_message.clone(), + is_certified: false, + is_expired: false, + created_at: now, + expires_at: signed_entity_type.get_open_message_timeout().map(|t| now + t), + }; Ok(Self { - condition: WhereCondition::new(expression, parameters), + condition: conditions::insert_one(record)?, }) } } @@ -54,3 +50,69 @@ impl Query for InsertOpenMessageQuery { format!("insert into open_message {condition} returning {projection}") } } + +#[cfg(test)] +mod tests { + use mithril_common::entities::ProtocolMessagePartKey; + use mithril_persistence::sqlite::ConnectionExtensions; + + use crate::database::query::GetOpenMessageQuery; + use crate::database::test_helper::main_db_connection; + + use super::*; + + #[test] + fn test_insert_one() { + let connection = main_db_connection().unwrap(); + let epoch = Epoch(5); + let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(10)); + let mut protocol_message = ProtocolMessage::new(); + protocol_message.set_message_part( + ProtocolMessagePartKey::CardanoStakeDistributionEpoch, + "value".to_string(), + ); + + connection + .fetch_first( + InsertOpenMessageQuery::one(epoch, &signed_entity_type, &protocol_message).unwrap(), + ) + .unwrap(); + let records: Vec = + connection.fetch_collect(GetOpenMessageQuery::all()).unwrap(); + + assert_eq!(1, records.len()); + assert_eq!( + OpenMessageRecord { + open_message_id: records[0].open_message_id, + epoch, + signed_entity_type, + protocol_message, + is_certified: false, + is_expired: false, + created_at: records[0].created_at, + expires_at: records[0].expires_at, + }, + records[0] + ); + } + + #[should_panic] + #[test] + fn test_insert_two_entries_with_same_signed_entity_violate_unique_constraint() { + let connection = main_db_connection().unwrap(); + let epoch = Epoch(5); + let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(10)); + + connection + .fetch_first( + InsertOpenMessageQuery::one(epoch, &signed_entity_type, &ProtocolMessage::new()) + .unwrap(), + ) + .unwrap(); + + let _ = connection.fetch_first( + InsertOpenMessageQuery::one(epoch + 10, &signed_entity_type, &ProtocolMessage::new()) + .unwrap(), + ); + } +} diff --git a/mithril-aggregator/src/database/query/open_message/insert_or_replace_open_message.rs b/mithril-aggregator/src/database/query/open_message/insert_or_replace_open_message.rs new file mode 100644 index 00000000000..358fed71d42 --- /dev/null +++ b/mithril-aggregator/src/database/query/open_message/insert_or_replace_open_message.rs @@ -0,0 +1,92 @@ +use mithril_common::StdResult; +use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; + +use crate::database::query::open_message::conditions; +use crate::database::record::OpenMessageRecord; + +/// Query to insert [OpenMessageRecord] in the sqlite database +pub struct InsertOrReplaceOpenMessageQuery { + condition: WhereCondition, +} + +impl InsertOrReplaceOpenMessageQuery { + pub fn one(record: OpenMessageRecord) -> StdResult { + Ok(Self { + condition: conditions::insert_one(record)?, + }) + } +} + +impl Query for InsertOrReplaceOpenMessageQuery { + type Entity = OpenMessageRecord; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + let aliases = SourceAlias::new(&[("{:open_message:}", "open_message")]); + let projection = Self::Entity::get_projection().expand(aliases); + + format!("insert or replace into open_message {condition} returning {projection}") + } +} + +#[cfg(test)] +mod tests { + use mithril_persistence::sqlite::ConnectionExtensions; + + use crate::database::query::GetOpenMessageQuery; + use crate::database::test_helper::main_db_connection; + + use super::*; + + #[test] + fn test_insert_one() { + let connection = main_db_connection().unwrap(); + let record = OpenMessageRecord::dummy(); + + connection + .fetch_first(InsertOrReplaceOpenMessageQuery::one(record.clone()).unwrap()) + .unwrap(); + let records: Vec = connection + .fetch_collect( + GetOpenMessageQuery::by_epoch_and_signed_entity_type( + record.epoch, + &record.signed_entity_type, + ) + .unwrap(), + ) + .unwrap(); + + assert_eq!(1, records.len()); + assert_eq!(record, records[0]); + } + + #[test] + fn test_insert_record_for_existing_signed_entity_type_replaces_it() { + let connection = main_db_connection().unwrap(); + let record = OpenMessageRecord { + is_expired: false, + ..OpenMessageRecord::dummy() + }; + + connection + .fetch_first(InsertOrReplaceOpenMessageQuery::one(record.clone()).unwrap()) + .unwrap(); + + let replaced_record = connection + .fetch_first( + InsertOrReplaceOpenMessageQuery::one(OpenMessageRecord { + is_expired: true, + ..record.clone() + }) + .unwrap(), + ) + .unwrap(); + let count = connection.fetch(GetOpenMessageQuery::all()).unwrap().count(); + + assert_eq!(1, count); + assert_eq!(Some(true), replaced_record.map(|r| r.is_expired)); + } +} diff --git a/mithril-aggregator/src/database/query/open_message/mod.rs b/mithril-aggregator/src/database/query/open_message/mod.rs index f964c018855..9580befce44 100644 --- a/mithril-aggregator/src/database/query/open_message/mod.rs +++ b/mithril-aggregator/src/database/query/open_message/mod.rs @@ -1,11 +1,14 @@ +mod conditions; mod delete_open_message; mod get_open_message; mod get_open_message_with_single_signatures; mod insert_open_message; +mod insert_or_replace_open_message; mod update_open_message; pub use delete_open_message::*; pub use get_open_message::*; pub use get_open_message_with_single_signatures::*; pub use insert_open_message::*; +pub use insert_or_replace_open_message::*; pub use update_open_message::*; diff --git a/mithril-aggregator/src/database/record/open_message.rs b/mithril-aggregator/src/database/record/open_message.rs index 993ead563e5..40b0159dc75 100644 --- a/mithril-aggregator/src/database/record/open_message.rs +++ b/mithril-aggregator/src/database/record/open_message.rs @@ -39,6 +39,11 @@ pub struct OpenMessageRecord { } impl OpenMessageRecord { + /// Creates a new random id that can be used for a new record + pub fn new_id() -> Uuid { + Uuid::new_v4() + } + #[cfg(test)] /// Create a dumb OpenMessage instance mainly for test purposes pub fn dummy() -> Self { diff --git a/mithril-aggregator/src/database/repository/certificate_repository.rs b/mithril-aggregator/src/database/repository/certificate_repository.rs index 18ecd0a27b1..ebb985ee5a3 100644 --- a/mithril-aggregator/src/database/repository/certificate_repository.rs +++ b/mithril-aggregator/src/database/repository/certificate_repository.rs @@ -11,9 +11,10 @@ use mithril_persistence::sqlite::ConnectionExtensions; use crate::database::query::{ DeleteCertificateQuery, GetCertificateRecordQuery, InsertCertificateRecordQuery, - MasterCertificateQuery, + InsertOrReplaceCertificateRecordQuery, MasterCertificateQuery, }; use crate::database::record::CertificateRecord; +use crate::services::SynchronizedCertificateStorer; /// Database frontend API for Certificate queries. pub struct CertificateRepository { @@ -105,6 +106,24 @@ impl CertificateRepository { Ok(new_certificates.map(|cert| cert.into()).collect()) } + /// Create, or replace if they already exist, many certificates at once in the database. + pub async fn create_or_replace_many_certificates( + &self, + certificates: Vec, + ) -> StdResult> { + if certificates.is_empty() { + return Ok(vec![]); + } + + let records: Vec = + certificates.into_iter().map(|cert| cert.into()).collect(); + let new_certificates = self + .connection + .fetch(InsertOrReplaceCertificateRecordQuery::many(records))?; + + Ok(new_certificates.map(|cert| cert.into()).collect()) + } + /// Delete all the given certificates from the database pub async fn delete_certificates(&self, certificates: &[&Certificate]) -> StdResult<()> { let ids = certificates.iter().map(|c| c.hash.as_str()).collect::>(); @@ -131,6 +150,18 @@ impl CertificateRetriever for CertificateRepository { } } +#[async_trait] +impl SynchronizedCertificateStorer for CertificateRepository { + async fn insert_or_replace_many(&self, certificates: Vec) -> StdResult<()> { + self.create_or_replace_many_certificates(certificates).await?; + Ok(()) + } + + async fn get_latest_genesis(&self) -> StdResult> { + self.get_latest_genesis_certificate().await + } +} + #[cfg(test)] mod tests { use mithril_common::crypto_helper::tests_setup::setup_certificate_chain; diff --git a/mithril-aggregator/src/database/repository/open_message_repository.rs b/mithril-aggregator/src/database/repository/open_message_repository.rs index 66edbdcba85..57f36635ee8 100644 --- a/mithril-aggregator/src/database/repository/open_message_repository.rs +++ b/mithril-aggregator/src/database/repository/open_message_repository.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; - +use async_trait::async_trait; use chrono::Utc; +use std::sync::Arc; use mithril_common::StdResult; use mithril_common::entities::{Epoch, ProtocolMessage, SignedEntityType}; @@ -8,9 +8,11 @@ use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection}; use crate::database::query::{ DeleteOpenMessageQuery, GetOpenMessageQuery, GetOpenMessageWithSingleSignaturesQuery, - InsertOpenMessageQuery, UpdateOpenMessageQuery, + InsertOpenMessageQuery, InsertOrReplaceOpenMessageQuery, UpdateOpenMessageQuery, }; use crate::database::record::{OpenMessageRecord, OpenMessageWithSingleSignaturesRecord}; +use crate::entities::OpenMessage; +use crate::services::OpenMessageStorer; /// ## Open message repository /// @@ -79,6 +81,21 @@ impl OpenMessageRepository { message.ok_or_else(|| panic!("Inserting an open_message should not return nothing.")) } + /// Create, or replace if one with the same [SignedEntityType] they already exist, a + /// [OpenMessageRecord] in the database. + pub async fn create_or_replace_open_message( + &self, + record: OpenMessageRecord, + ) -> StdResult { + let message = self + .connection + .fetch_first(InsertOrReplaceOpenMessageQuery::one(record)?)?; + + message.ok_or_else(|| { + panic!("Inserting or replacing an open_message should not return nothing.") + }) + } + /// Updates an [OpenMessageRecord] in the database. pub async fn update_open_message( &self, @@ -102,6 +119,24 @@ impl OpenMessageRepository { } } +#[async_trait] +impl OpenMessageStorer for OpenMessageRepository { + async fn insert_or_replace_open_message(&self, open_message: OpenMessage) -> StdResult<()> { + let record = OpenMessageRecord { + open_message_id: OpenMessageRecord::new_id(), + epoch: open_message.epoch, + signed_entity_type: open_message.signed_entity_type, + protocol_message: open_message.protocol_message, + is_certified: open_message.is_certified, + is_expired: open_message.is_expired, + created_at: open_message.created_at, + expires_at: open_message.expires_at, + }; + self.create_or_replace_open_message(record).await?; + Ok(()) + } +} + #[cfg(test)] mod tests { use mithril_common::entities::{BlockNumber, CardanoDbBeacon}; @@ -261,6 +296,28 @@ mod tests { assert_eq!(open_message.epoch, message.epoch); } + #[tokio::test] + async fn repository_create_or_replace_open_message() { + let connection = get_connection().await; + let repository = OpenMessageRepository::new(connection.clone()); + let mut inserted_record = repository + .create_or_replace_open_message(OpenMessageRecord { + epoch: Epoch(5), + signed_entity_type: SignedEntityType::MithrilStakeDistribution(Epoch(6)), + ..OpenMessageRecord::dummy() + }) + .await + .unwrap(); + assert_eq!(Epoch(5), inserted_record.epoch); + + inserted_record.epoch = Epoch(32); + let replaced_record = repository + .create_or_replace_open_message(inserted_record) + .await + .unwrap(); + assert_eq!(Epoch(32), replaced_record.epoch); + } + #[tokio::test] async fn repository_update_open_message() { let connection = get_connection().await; diff --git a/mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs b/mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs index e4874a3131d..497f3262ecb 100644 --- a/mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs +++ b/mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs @@ -4,6 +4,8 @@ //! - group these enablers into more logical categories //! - redefine the actual categories so those miscellaneous enablers fit into them +use anyhow::{Context, anyhow}; +use reqwest::Url; use std::sync::Arc; use std::time::Duration; @@ -19,8 +21,8 @@ use crate::get_dependency; #[cfg(feature = "future_dmq")] use crate::services::SignatureConsumerDmq; use crate::services::{ - AggregatorClient, AggregatorHTTPClient, MessageService, MithrilMessageService, - SequentialSignatureProcessor, SignatureConsumer, SignatureConsumerNoop, SignatureProcessor, + AggregatorHTTPClient, MessageService, MithrilMessageService, SequentialSignatureProcessor, + SignatureConsumer, SignatureConsumerNoop, SignatureProcessor, }; impl DependenciesBuilder { async fn build_signed_entity_type_lock(&mut self) -> Result> { @@ -56,12 +58,18 @@ impl DependenciesBuilder { get_dependency!(self.message_service) } - /// Builds an [AggregatorClient] - pub async fn build_leader_aggregator_client(&mut self) -> Result> { - let leader_aggregator_endpoint = - self.configuration.leader_aggregator_endpoint().unwrap_or_default(); + /// Builds an [AggregatorHTTPClient] + pub async fn build_leader_aggregator_client(&mut self) -> Result> { + let leader_aggregator_endpoint = self.configuration.leader_aggregator_endpoint().ok_or( + anyhow!("Leader Aggregator endpoint is mandatory for follower Aggregator"), + )?; + let aggregator_client = AggregatorHTTPClient::new( - leader_aggregator_endpoint, + Url::parse(&leader_aggregator_endpoint).with_context(|| { + format!( + "Failed to parse leader aggregator endpoint: '{leader_aggregator_endpoint}'" + ) + })?, None, self.get_api_version_provider().await?, Some(Duration::from_secs(30)), @@ -71,8 +79,8 @@ impl DependenciesBuilder { Ok(Arc::new(aggregator_client)) } - /// Returns a leader [AggregatorClient] - pub async fn get_leader_aggregator_client(&mut self) -> Result> { + /// Returns a leader [AggregatorHTTPClient] + pub async fn get_leader_aggregator_client(&mut self) -> Result> { get_dependency!(self.leader_aggregator_client) } diff --git a/mithril-aggregator/src/dependency_injection/builder/mod.rs b/mithril-aggregator/src/dependency_injection/builder/mod.rs index 9befa7d5472..d14398c7d1d 100644 --- a/mithril-aggregator/src/dependency_injection/builder/mod.rs +++ b/mithril-aggregator/src/dependency_injection/builder/mod.rs @@ -56,9 +56,9 @@ use crate::{ file_uploaders::FileUploader, http_server::routes::router::{self, RouterConfig, RouterState}, services::{ - AggregatorClient, CertifierService, MessageService, MithrilSignerRegistrationFollower, - ProverService, SignedEntityService, SignerSynchronizer, Snapshotter, - StakeDistributionService, UpkeepService, + AggregatorHTTPClient, CertificateChainSynchronizer, CertifierService, MessageService, + MithrilSignerRegistrationFollower, ProverService, SignedEntityService, SignerSynchronizer, + Snapshotter, StakeDistributionService, UpkeepService, }, tools::file_archiver::FileArchiver, }; @@ -185,6 +185,9 @@ pub struct DependenciesBuilder { /// Genesis signature verifier service. pub genesis_verifier: Option>, + /// Certificate chain synchronizer service + pub certificate_chain_synchronizer: Option>, + /// Mithril signer registration leader service pub mithril_signer_registration_leader: Option>, @@ -273,7 +276,7 @@ pub struct DependenciesBuilder { pub metrics_service: Option>, /// Leader aggregator client - pub leader_aggregator_client: Option>, + pub leader_aggregator_client: Option>, /// Protocol parameters retriever pub protocol_parameters_retriever: Option>, @@ -312,6 +315,7 @@ impl DependenciesBuilder { snapshotter: None, certificate_verifier: None, genesis_verifier: None, + certificate_chain_synchronizer: None, mithril_signer_registration_leader: None, mithril_signer_registration_follower: None, signer_registerer: None, @@ -374,6 +378,7 @@ impl DependenciesBuilder { verification_key_store: self.get_verification_key_store().await?, epoch_settings_storer: self.get_epoch_settings_store().await?, chain_observer: self.get_chain_observer().await?, + certificate_chain_synchronizer: self.get_certificate_chain_synchronizer().await?, signer_registerer: self.get_signer_registerer().await?, signer_synchronizer: self.get_signer_synchronizer().await?, signer_registration_round_opener: self.get_signer_registration_round_opener().await?, diff --git a/mithril-aggregator/src/dependency_injection/builder/protocol/certificates.rs b/mithril-aggregator/src/dependency_injection/builder/protocol/certificates.rs index 1bbb754cb91..aa8ec7851a4 100644 --- a/mithril-aggregator/src/dependency_injection/builder/protocol/certificates.rs +++ b/mithril-aggregator/src/dependency_injection/builder/protocol/certificates.rs @@ -10,8 +10,9 @@ use crate::database::repository::{BufferedSingleSignatureRepository, SingleSigna use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result}; use crate::get_dependency; use crate::services::{ - BufferedCertifierService, CertifierService, MithrilCertifierService, - MithrilSignerRegistrationFollower, SignerSynchronizer, + BufferedCertifierService, CertificateChainSynchronizer, CertifierService, + MithrilCertificateChainSynchronizer, MithrilCertificateChainSynchronizerNoop, + MithrilCertifierService, MithrilSignerRegistrationFollower, SignerSynchronizer, }; use crate::{ ExecutionEnvironment, MithrilSignerRegistrationLeader, MithrilSignerRegistrationVerifier, @@ -84,6 +85,39 @@ impl DependenciesBuilder { get_dependency!(self.multi_signer) } + async fn build_certificate_chain_synchronizer( + &mut self, + ) -> Result> { + let synchronizer: Arc = + if self.configuration.is_follower_aggregator() { + let leader_aggregator_client = self.get_leader_aggregator_client().await?; + let verifier = Arc::new(MithrilCertificateVerifier::new( + self.root_logger(), + leader_aggregator_client.clone(), + )); + + Arc::new(MithrilCertificateChainSynchronizer::new( + leader_aggregator_client, + self.get_certificate_repository().await?, + verifier, + self.get_genesis_verifier().await?, + self.get_open_message_repository().await?, + self.root_logger(), + )) + } else { + Arc::new(MithrilCertificateChainSynchronizerNoop) + }; + + Ok(synchronizer) + } + + /// [CertificateChainSynchronizer] service + pub async fn get_certificate_chain_synchronizer( + &mut self, + ) -> Result> { + get_dependency!(self.certificate_chain_synchronizer) + } + async fn build_certificate_verifier(&mut self) -> Result> { let verifier = Arc::new(MithrilCertificateVerifier::new( self.root_logger(), diff --git a/mithril-aggregator/src/dependency_injection/containers/serve.rs b/mithril-aggregator/src/dependency_injection/containers/serve.rs index ecf68d4dc78..000902c84ca 100644 --- a/mithril-aggregator/src/dependency_injection/containers/serve.rs +++ b/mithril-aggregator/src/dependency_injection/containers/serve.rs @@ -27,8 +27,9 @@ use crate::{ entities::AggregatorEpochSettings, event_store::{EventMessage, TransmitterService}, services::{ - CertifierService, EpochService, MessageService, ProverService, SignedEntityService, - SignerRecorder, SignerSynchronizer, StakeDistributionService, UpkeepService, + CertificateChainSynchronizer, CertifierService, EpochService, MessageService, + ProverService, SignedEntityService, SignerRecorder, SignerSynchronizer, + StakeDistributionService, UpkeepService, }, }; @@ -57,6 +58,9 @@ pub struct ServeCommandDependenciesContainer { /// Chain observer service. pub(crate) chain_observer: Arc, + /// Certificate chain synchronizer service + pub(crate) certificate_chain_synchronizer: Arc, + /// Signer registerer service pub signer_registerer: Arc, diff --git a/mithril-aggregator/src/entities/open_message.rs b/mithril-aggregator/src/entities/open_message.rs index e3b8f98e1cb..813ba0648b3 100644 --- a/mithril-aggregator/src/entities/open_message.rs +++ b/mithril-aggregator/src/entities/open_message.rs @@ -120,7 +120,7 @@ mod test { fn test_from_record() { let created_at = Utc::now(); let record = OpenMessageRecord { - open_message_id: Uuid::new_v4(), + open_message_id: OpenMessageRecord::new_id(), epoch: Epoch(1), signed_entity_type: SignedEntityType::dummy(), protocol_message: ProtocolMessage::default(), diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 933e68ac1bf..affffc98395 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -72,6 +72,12 @@ pub trait AggregatorRunnerTrait: Sync + Send { /// Synchronize the follower aggregator signer registration. async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>; + /// Synchronize the follower aggregator certificate chain + async fn synchronize_follower_aggregator_certificate_chain( + &self, + force_sync: bool, + ) -> StdResult<()>; + /// Ask the EpochService to update the epoch settings. async fn update_epoch_settings(&self) -> StdResult<()>; @@ -506,6 +512,20 @@ impl AggregatorRunnerTrait for AggregatorRunner { .get_runtime_cycle_total_since_startup() .increment(); } + + async fn synchronize_follower_aggregator_certificate_chain( + &self, + force_sync: bool, + ) -> StdResult<()> { + debug!( + self.logger, + ">> synchronize_follower_aggregator_certificate_chain(force_sync:{force_sync})" + ); + self.dependencies + .certificate_chain_synchronizer + .synchronize_certificate_chain(force_sync) + .await + } } #[cfg(test)] diff --git a/mithril-aggregator/src/runtime/state_machine.rs b/mithril-aggregator/src/runtime/state_machine.rs index 6add3afab29..3b5453011d4 100644 --- a/mithril-aggregator/src/runtime/state_machine.rs +++ b/mithril-aggregator/src/runtime/state_machine.rs @@ -277,13 +277,21 @@ impl AggregatorRuntime { self.runner.precompute_epoch_data().await?; } - self.runner + let chain_validity_result = self + .runner .is_certificate_chain_valid(&new_time_point) .await .map_err(|e| RuntimeError::KeepState { message: "certificate chain is invalid".to_string(), nested_error: e.into(), - })?; + }); + if self.config.is_follower { + let force_sync = chain_validity_result.is_err(); + self.runner + .synchronize_follower_aggregator_certificate_chain(force_sync) + .await?; + } + chain_validity_result?; Ok(()) } @@ -834,6 +842,8 @@ mod tests { } mod follower { + use mockall::predicate::eq; + use super::*; #[tokio::test] @@ -911,6 +921,11 @@ mod tests { .expect_is_certificate_chain_valid() .once() .returning(|_| Ok(())); + runner + .expect_synchronize_follower_aggregator_certificate_chain() + .once() + .with(eq(false)) // Certificate chain valid so force_sync must be false + .returning(|_| Ok(())); runner .expect_update_era_checker() .with(predicate::eq(new_time_point_clone.clone().epoch)) diff --git a/mithril-aggregator/src/services/aggregator_client.rs b/mithril-aggregator/src/services/aggregator_client.rs index 6e3b657226c..a81f42928cf 100644 --- a/mithril-aggregator/src/services/aggregator_client.rs +++ b/mithril-aggregator/src/services/aggregator_client.rs @@ -1,23 +1,27 @@ -use anyhow::anyhow; +use anyhow::{Context, anyhow}; use async_trait::async_trait; -use mithril_common::messages::TryFromMessageAdapter; use reqwest::header::{self, HeaderValue}; -use reqwest::{self, Client, Proxy, RequestBuilder, Response, StatusCode}; +use reqwest::{self, Client, Proxy, RequestBuilder, Response, StatusCode, Url}; + use semver::Version; use slog::{Logger, debug, error, warn}; use std::{io, sync::Arc, time::Duration}; use thiserror::Error; use mithril_common::{ - MITHRIL_AGGREGATOR_VERSION_HEADER, MITHRIL_API_VERSION_HEADER, StdError, + MITHRIL_AGGREGATOR_VERSION_HEADER, MITHRIL_API_VERSION_HEADER, StdError, StdResult, api_version::APIVersionProvider, - entities::{ClientError, ServerError}, + certificate_chain::{CertificateRetriever, CertificateRetrieverError}, + entities::{Certificate, ClientError, ServerError}, logging::LoggerExtensions, - messages::EpochSettingsMessage, + messages::{ + CertificateListMessage, CertificateMessage, EpochSettingsMessage, TryFromMessageAdapter, + }, }; use crate::entities::LeaderAggregatorEpochSettings; use crate::message_adapters::FromEpochSettingsAdapter; +use crate::services::{LeaderAggregatorClient, RemoteCertificateRetriever}; const JSON_CONTENT_TYPE: HeaderValue = HeaderValue::from_static("application/json"); @@ -117,19 +121,9 @@ impl AggregatorClientError { } } -/// Trait for mocking and testing a `AggregatorClient` -#[cfg_attr(test, mockall::automock)] -#[async_trait] -pub trait AggregatorClient: Sync + Send { - /// Retrieves epoch settings from the aggregator - async fn retrieve_epoch_settings( - &self, - ) -> Result, AggregatorClientError>; -} - /// AggregatorHTTPClient is a http client for an aggregator pub struct AggregatorHTTPClient { - aggregator_endpoint: String, + aggregator_endpoint: Url, relay_endpoint: Option, api_version_provider: Arc, timeout_duration: Option, @@ -139,7 +133,7 @@ pub struct AggregatorHTTPClient { impl AggregatorHTTPClient { /// AggregatorHTTPClient factory pub fn new( - aggregator_endpoint: String, + aggregator_endpoint: Url, relay_endpoint: Option, api_version_provider: Arc, timeout_duration: Option, @@ -147,6 +141,18 @@ impl AggregatorHTTPClient { ) -> Self { let logger = logger.new_with_component_name::(); debug!(logger, "New AggregatorHTTPClient created"); + + // Trailing slash is significant because url::join + // (https://docs.rs/url/latest/url/struct.Url.html#method.join) will remove + // the 'path' part of the url if it doesn't end with a trailing slash. + let aggregator_endpoint = if aggregator_endpoint.as_str().ends_with('/') { + aggregator_endpoint + } else { + let mut url = aggregator_endpoint.clone(); + url.set_path(&format!("{}/", aggregator_endpoint.path())); + url + }; + Self { aggregator_endpoint, relay_endpoint, @@ -156,6 +162,18 @@ impl AggregatorHTTPClient { } } + fn join_aggregator_endpoint(&self, endpoint: &str) -> Result { + self.aggregator_endpoint + .join(endpoint) + .with_context(|| { + format!( + "Invalid url when joining given endpoint, '{endpoint}', to aggregator url '{}'", + self.aggregator_endpoint + ) + }) + .map_err(AggregatorClientError::HTTPClientCreation) + } + fn prepare_http_client(&self) -> Result { let client = match &self.relay_endpoint { Some(relay_endpoint) => Client::builder() @@ -219,15 +237,15 @@ impl AggregatorHTTPClient { } } -#[async_trait] -impl AggregatorClient for AggregatorHTTPClient { - async fn retrieve_epoch_settings( +// Route specifics methods +impl AggregatorHTTPClient { + async fn epoch_settings( &self, ) -> Result, AggregatorClientError> { debug!(self.logger, "Retrieve epoch settings"); - let url = format!("{}/epoch-settings", self.aggregator_endpoint); + let url = self.join_aggregator_endpoint("epoch-settings")?; let response = self - .prepare_request_builder(self.prepare_http_client()?.get(url.clone())) + .prepare_request_builder(self.prepare_http_client()?.get(url)) .send() .await; @@ -249,6 +267,116 @@ impl AggregatorClient for AggregatorHTTPClient { Err(err) => Err(AggregatorClientError::RemoteServerUnreachable(anyhow!(err))), } } + + async fn latest_certificates_list( + &self, + ) -> Result { + debug!(self.logger, "Retrieve latest certificates list"); + let url = self.join_aggregator_endpoint("certificates")?; + let response = self + .prepare_request_builder(self.prepare_http_client()?.get(url)) + .send() + .await; + + match response { + Ok(response) => match response.status() { + StatusCode::OK => { + self.warn_if_api_version_mismatch(&response); + match response.json::().await { + Ok(message) => Ok(message), + Err(err) => Err(AggregatorClientError::JsonParseFailed(anyhow!(err))), + } + } + _ => Err(AggregatorClientError::from_response(response).await), + }, + Err(err) => Err(AggregatorClientError::RemoteServerUnreachable(anyhow!(err))), + } + } + + async fn certificate_details( + &self, + certificate_hash: &str, + ) -> Result, AggregatorClientError> { + debug!(self.logger, "Retrieve certificate details"; "certificate_hash" => %certificate_hash); + let url = self.join_aggregator_endpoint(&format!("certificate/{certificate_hash}"))?; + let response = self + .prepare_request_builder(self.prepare_http_client()?.get(url)) + .send() + .await; + + match response { + Ok(response) => match response.status() { + StatusCode::OK => { + self.warn_if_api_version_mismatch(&response); + match response.json::().await { + Ok(message) => Ok(Some(message)), + Err(err) => Err(AggregatorClientError::JsonParseFailed(anyhow!(err))), + } + } + StatusCode::NOT_FOUND => Ok(None), + _ => Err(AggregatorClientError::from_response(response).await), + }, + Err(err) => Err(AggregatorClientError::RemoteServerUnreachable(anyhow!(err))), + } + } + + async fn latest_genesis_certificate( + &self, + ) -> Result, AggregatorClientError> { + self.certificate_details("genesis").await + } +} + +#[async_trait] +impl LeaderAggregatorClient for AggregatorHTTPClient { + async fn retrieve_epoch_settings(&self) -> StdResult> { + let epoch_settings = self.epoch_settings().await?; + Ok(epoch_settings) + } +} + +#[async_trait] +impl CertificateRetriever for AggregatorHTTPClient { + async fn get_certificate_details( + &self, + certificate_hash: &str, + ) -> Result { + let message = self + .certificate_details(certificate_hash) + .await + .with_context(|| { + format!("Failed to retrieve certificate with hash: '{certificate_hash}'") + }) + .map_err(CertificateRetrieverError)? + .ok_or(CertificateRetrieverError(anyhow!( + "Certificate does not exist: '{certificate_hash}'" + )))?; + + message.try_into().map_err(CertificateRetrieverError) + } +} + +#[async_trait] +impl RemoteCertificateRetriever for AggregatorHTTPClient { + async fn get_latest_certificate_details(&self) -> StdResult> { + let latest_certificates_list = self.latest_certificates_list().await?; + + match latest_certificates_list.first() { + None => Ok(None), + Some(latest_certificate_list_item) => { + let latest_certificate_message = + self.certificate_details(&latest_certificate_list_item.hash).await?; + latest_certificate_message.map(TryInto::try_into).transpose() + } + } + } + + async fn get_genesis_certificate_details(&self) -> StdResult> { + match self.latest_genesis_certificate().await? { + Some(message) => Ok(Some(message.try_into()?)), + None => Ok(None), + } + } } #[cfg(test)] @@ -273,10 +401,10 @@ pub(crate) mod dumb { } #[async_trait] - impl AggregatorClient for DumbAggregatorClient { + impl LeaderAggregatorClient for DumbAggregatorClient { async fn retrieve_epoch_settings( &self, - ) -> Result, AggregatorClientError> { + ) -> StdResult> { let epoch_settings = self.epoch_settings.read().await.clone(); Ok(epoch_settings) @@ -288,20 +416,22 @@ pub(crate) mod dumb { mod tests { use http::response::Builder as HttpResponseBuilder; use httpmock::prelude::*; + use reqwest::IntoUrl; use serde_json::json; use mithril_common::api_version::DummyApiVersionDiscriminantSource; + use mithril_common::messages::CertificateListItemMessage; use crate::test_tools::TestLogger; use super::*; - fn setup_client>(server_url: U) -> AggregatorHTTPClient { + fn setup_client(server_url: U) -> AggregatorHTTPClient { let discriminant_source = DummyApiVersionDiscriminantSource::default(); let api_version_provider = APIVersionProvider::new(Arc::new(discriminant_source)); AggregatorHTTPClient::new( - server_url.into(), + server_url.into_url().unwrap(), None, Arc::new(api_version_provider), None, @@ -370,7 +500,7 @@ mod tests { then.status(500).body("an error occurred"); }); - match client.retrieve_epoch_settings().await.unwrap_err() { + match client.epoch_settings().await.unwrap_err() { AggregatorClientError::RemoteServerTechnical(_) => (), e => panic!("Expected Aggregator::RemoteServerTechnical error, got '{e:?}'."), }; @@ -386,7 +516,119 @@ mod tests { }); let error = client - .retrieve_epoch_settings() + .epoch_settings() + .await + .expect_err("retrieve_epoch_settings should fail"); + + assert!( + matches!(error, AggregatorClientError::RemoteServerUnreachable(_)), + "unexpected error type: {error:?}" + ); + } + + #[tokio::test] + async fn test_latest_certificates_list_ok_200() { + let (server, client) = setup_server_and_client(); + let expected_list = vec![ + CertificateListItemMessage::dummy(), + CertificateListItemMessage::dummy(), + ]; + let _server_mock = server.mock(|when, then| { + when.path("/certificates"); + then.status(200).body(json!(expected_list).to_string()); + }); + + let fetched_list = client.latest_certificates_list().await.unwrap(); + + assert_eq!(expected_list, fetched_list); + } + + #[tokio::test] + async fn test_latest_certificates_list_ko_500() { + let (server, client) = setup_server_and_client(); + let _server_mock = server.mock(|when, then| { + when.path("/certificates"); + then.status(500).body("an error occurred"); + }); + + match client.latest_certificates_list().await.unwrap_err() { + AggregatorClientError::RemoteServerTechnical(_) => (), + e => panic!("Expected Aggregator::RemoteServerTechnical error, got '{e:?}'."), + }; + } + + #[tokio::test] + async fn test_latest_certificates_list_timeout() { + let (server, mut client) = setup_server_and_client(); + client.timeout_duration = Some(Duration::from_millis(10)); + let _server_mock = server.mock(|when, then| { + when.path("/certificates"); + then.delay(Duration::from_millis(100)); + }); + + let error = client + .latest_certificates_list() + .await + .expect_err("retrieve_epoch_settings should fail"); + + assert!( + matches!(error, AggregatorClientError::RemoteServerUnreachable(_)), + "unexpected error type: {error:?}" + ); + } + + #[tokio::test] + async fn test_certificates_details_ok_200() { + let (server, client) = setup_server_and_client(); + let expected_message = CertificateMessage::dummy(); + let _server_mock = server.mock(|when, then| { + when.path(format!("/certificate/{}", expected_message.hash)); + then.status(200).body(json!(expected_message).to_string()); + }); + + let fetched_message = client.certificate_details(&expected_message.hash).await.unwrap(); + + assert_eq!(Some(expected_message), fetched_message); + } + + #[tokio::test] + async fn test_certificates_details_ok_404() { + let (server, client) = setup_server_and_client(); + let _server_mock = server.mock(|when, then| { + when.path("/certificate/not-found"); + then.status(404); + }); + + let fetched_message = client.latest_genesis_certificate().await.unwrap(); + + assert_eq!(None, fetched_message); + } + + #[tokio::test] + async fn test_certificates_details_ko_500() { + let (server, client) = setup_server_and_client(); + let _server_mock = server.mock(|when, then| { + when.path("/certificate/whatever"); + then.status(500).body("an error occurred"); + }); + + match client.certificate_details("whatever").await.unwrap_err() { + AggregatorClientError::RemoteServerTechnical(_) => (), + e => panic!("Expected Aggregator::RemoteServerTechnical error, got '{e:?}'."), + }; + } + + #[tokio::test] + async fn test_certificates_details_timeout() { + let (server, mut client) = setup_server_and_client(); + client.timeout_duration = Some(Duration::from_millis(10)); + let _server_mock = server.mock(|when, then| { + when.path("/certificate/whatever"); + then.delay(Duration::from_millis(100)); + }); + + let error = client + .certificate_details("whatever") .await .expect_err("retrieve_epoch_settings should fail"); @@ -396,6 +638,66 @@ mod tests { ); } + #[tokio::test] + async fn test_latest_genesis_ok_200() { + let (server, client) = setup_server_and_client(); + let genesis_message = CertificateMessage::dummy(); + let _server_mock = server.mock(|when, then| { + when.path("/certificate/genesis"); + then.status(200).body(json!(genesis_message).to_string()); + }); + + let fetched = client.latest_genesis_certificate().await.unwrap(); + + assert_eq!(Some(genesis_message), fetched); + } + + #[tokio::test] + async fn test_latest_genesis_ok_404() { + let (server, client) = setup_server_and_client(); + let _server_mock = server.mock(|when, then| { + when.path("/certificate/genesis"); + then.status(404); + }); + + let fetched = client.latest_genesis_certificate().await.unwrap(); + + assert_eq!(None, fetched); + } + + #[tokio::test] + async fn test_latest_genesis_ko_500() { + let (server, client) = setup_server_and_client(); + let _server_mock = server.mock(|when, then| { + when.path("/certificate/genesis"); + then.status(500).body("an error occurred"); + }); + + let error = client.latest_genesis_certificate().await.unwrap_err(); + + assert!( + matches!(error, AggregatorClientError::RemoteServerTechnical(_)), + "Expected Aggregator::RemoteServerTechnical error, got {error:?}" + ); + } + + #[tokio::test] + async fn test_latest_genesis_timeout() { + let (server, mut client) = setup_server_and_client(); + client.timeout_duration = Some(Duration::from_millis(10)); + let _server_mock = server.mock(|when, then| { + when.path("/certificate/genesis"); + then.delay(Duration::from_millis(100)); + }); + + let error = client.latest_genesis_certificate().await.unwrap_err(); + + assert!( + matches!(error, AggregatorClientError::RemoteServerUnreachable(_)), + "unexpected error type: {error:?}" + ); + } + #[tokio::test] async fn test_4xx_errors_are_handled_as_remote_server_logical() { let response = build_text_response(StatusCode::BAD_REQUEST, "error text"); @@ -567,7 +869,7 @@ mod tests { let aggregator_version = "1.0.0"; let (logger, log_inspector) = TestLogger::memory(); let version_provider = version_provider_with_open_api_version(aggregator_version); - let mut client = setup_client("whatever"); + let mut client = setup_client("http://whatever"); client.api_version_provider = Arc::new(version_provider); client.logger = logger; let response = build_fake_response_with_header( @@ -594,7 +896,7 @@ mod tests { let version = "1.0.0"; let (logger, log_inspector) = TestLogger::memory(); let version_provider = version_provider_with_open_api_version(version); - let mut client = setup_client("whatever"); + let mut client = setup_client("http://whatever"); client.api_version_provider = Arc::new(version_provider); client.logger = logger; let response = build_fake_response_with_header(MITHRIL_API_VERSION_HEADER, version); @@ -610,7 +912,7 @@ mod tests { let aggregator_version = "2.0.0"; let (logger, log_inspector) = TestLogger::memory(); let version_provider = version_provider_with_open_api_version(aggregator_version); - let mut client = setup_client("whatever"); + let mut client = setup_client("http://whatever"); client.api_version_provider = Arc::new(version_provider); client.logger = logger; let response = build_fake_response_with_header( @@ -631,7 +933,7 @@ mod tests { #[test] fn test_does_not_log_or_fail_when_header_is_missing() { let (logger, log_inspector) = TestLogger::memory(); - let mut client = setup_client("whatever"); + let mut client = setup_client("http://whatever"); client.logger = logger; let response = build_fake_response_with_header("NotMithrilAPIVersionHeader", "whatever"); @@ -644,7 +946,7 @@ mod tests { #[test] fn test_does_not_log_or_fail_when_header_is_not_a_version() { let (logger, log_inspector) = TestLogger::memory(); - let mut client = setup_client("whatever"); + let mut client = setup_client("http://whatever"); client.logger = logger; let response = build_fake_response_with_header(MITHRIL_API_VERSION_HEADER, "not_a_version"); @@ -658,7 +960,7 @@ mod tests { fn test_logs_error_when_aggregator_version_cannot_be_computed() { let (logger, log_inspector) = TestLogger::memory(); let version_provider = version_provider_without_open_api_version(); - let mut client = setup_client("whatever"); + let mut client = setup_client("http://whatever"); client.api_version_provider = Arc::new(version_provider); client.logger = logger; let response = build_fake_response_with_header(MITHRIL_API_VERSION_HEADER, "1.0.0"); @@ -699,4 +1001,53 @@ mod tests { ); } } + + mod remote_certificate_retriever { + use mithril_common::test_utils::fake_data; + + use super::*; + + #[tokio::test] + async fn test_get_latest_certificate_details() { + let (server, client) = setup_server_and_client(); + let expected_certificate = fake_data::certificate("expected"); + let latest_message: CertificateMessage = + expected_certificate.clone().try_into().unwrap(); + let latest_certificates = vec![ + CertificateListItemMessage { + hash: expected_certificate.hash.clone(), + ..CertificateListItemMessage::dummy() + }, + CertificateListItemMessage::dummy(), + CertificateListItemMessage::dummy(), + ]; + let _server_mock = server.mock(|when, then| { + when.path("/certificates"); + then.status(200).body(json!(latest_certificates).to_string()); + }); + let _server_mock = server.mock(|when, then| { + when.path(format!("/certificate/{}", latest_message.hash)); + then.status(200).body(json!(latest_message).to_string()); + }); + + let fetched_certificate = client.get_latest_certificate_details().await.unwrap(); + + assert_eq!(Some(expected_certificate), fetched_certificate); + } + + #[tokio::test] + async fn test_get_latest_genesis_certificate() { + let (server, client) = setup_server_and_client(); + let genesis_message = CertificateMessage::dummy(); + let expected_genesis: Certificate = genesis_message.clone().try_into().unwrap(); + let _server_mock = server.mock(|when, then| { + when.path("/certificate/genesis"); + then.status(200).body(json!(genesis_message).to_string()); + }); + + let fetched = client.get_genesis_certificate_details().await.unwrap(); + + assert_eq!(Some(expected_genesis), fetched); + } + } } diff --git a/mithril-aggregator/src/services/certificate_chain_synchronizer/interface.rs b/mithril-aggregator/src/services/certificate_chain_synchronizer/interface.rs new file mode 100644 index 00000000000..3f0ca2a50cf --- /dev/null +++ b/mithril-aggregator/src/services/certificate_chain_synchronizer/interface.rs @@ -0,0 +1,47 @@ +use async_trait::async_trait; + +use mithril_common::StdResult; +use mithril_common::entities::Certificate; + +use crate::entities::OpenMessage; + +/// Define how to synchronize the certificate chain with a remote source +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait CertificateChainSynchronizer: Send + Sync { + /// Synchronize the certificate chain with a remote source + /// + /// If `force` is true, the chain will always be synchronized, else it will only synchronize + /// if the remote source has started a new chain with a new Genesis. + async fn synchronize_certificate_chain(&self, force: bool) -> StdResult<()>; +} + +/// Define how to retrieve remote certificate details +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait RemoteCertificateRetriever: Sync + Send { + /// Get latest certificate + async fn get_latest_certificate_details(&self) -> StdResult>; + + /// Get genesis certificate + async fn get_genesis_certificate_details(&self) -> StdResult>; +} + +/// Define how to store the synchronized certificate and retrieve details about the actual local chain +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait SynchronizedCertificateStorer: Send + Sync { + /// Insert a list of Certificates in the database, if some already exists, they will be deleted before inserting + async fn insert_or_replace_many(&self, certificates: Vec) -> StdResult<()>; + + /// Get the latest genesis Certificate + async fn get_latest_genesis(&self) -> StdResult>; +} + +/// Define how to store the open message created at the end of the synchronization process +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait OpenMessageStorer: Send + Sync { + /// Store an open_message in the database + async fn insert_or_replace_open_message(&self, open_message: OpenMessage) -> StdResult<()>; +} diff --git a/mithril-aggregator/src/services/certificate_chain_synchronizer/mod.rs b/mithril-aggregator/src/services/certificate_chain_synchronizer/mod.rs new file mode 100644 index 00000000000..83562263901 --- /dev/null +++ b/mithril-aggregator/src/services/certificate_chain_synchronizer/mod.rs @@ -0,0 +1,7 @@ +mod interface; +mod noop; +mod synchronizer_service; + +pub use interface::*; +pub use noop::*; +pub use synchronizer_service::*; diff --git a/mithril-aggregator/src/services/certificate_chain_synchronizer/noop.rs b/mithril-aggregator/src/services/certificate_chain_synchronizer/noop.rs new file mode 100644 index 00000000000..2d02ba51b19 --- /dev/null +++ b/mithril-aggregator/src/services/certificate_chain_synchronizer/noop.rs @@ -0,0 +1,13 @@ +use mithril_common::StdResult; + +use crate::services::CertificateChainSynchronizer; + +/// A noop [CertificateChainSynchronizer] for leader aggregators +pub struct MithrilCertificateChainSynchronizerNoop; + +#[async_trait::async_trait] +impl CertificateChainSynchronizer for MithrilCertificateChainSynchronizerNoop { + async fn synchronize_certificate_chain(&self, _force: bool) -> StdResult<()> { + Ok(()) + } +} diff --git a/mithril-aggregator/src/services/certificate_chain_synchronizer/synchronizer_service.rs b/mithril-aggregator/src/services/certificate_chain_synchronizer/synchronizer_service.rs new file mode 100644 index 00000000000..7342e9c3f6b --- /dev/null +++ b/mithril-aggregator/src/services/certificate_chain_synchronizer/synchronizer_service.rs @@ -0,0 +1,709 @@ +//! # Certificate chain synchronizer +//! +//! Behavior: +//! 1. Check force: +//! - If false, fetch the latest local genesis certificate in database +//! - If it's found, fetch the remote Genesis certificate +//! - If it's different from the local genesis, continue synchronization +//! - If it's the same, abort with an `Ok` +//! - If it's not found, continue synchronization +//! - If true, skip the remote Genesis certificate check and synchronize +//! 2. Fetch then validate the latest remote certificate +//! - if valid, store it in an in-memory FIFO list +//! - if invalid, abort with an `Err` +//! 3. Repeat step 2. with each parent of the certificate until the genesis certificate is reached +//! 4. Store the fetched certificates in the database, from genesis to latest, for each certificate: +//! - if it exists in the database, it is replaced +//! - if it doesn't exist, it is inserted +//! 5. Create a certified open message in the database, based on the latest certificate (the first +//! of the last epoch synchronized) +//! 6. End +//! +use anyhow::{Context, anyhow}; +use async_trait::async_trait; +use chrono::Utc; +use slog::{Logger, debug, info}; +use std::collections::VecDeque; +use std::sync::Arc; + +use mithril_common::StdResult; +use mithril_common::certificate_chain::CertificateVerifier; +use mithril_common::crypto_helper::ProtocolGenesisVerifier; +use mithril_common::entities::{Certificate, SignedEntityType}; +use mithril_common::logging::LoggerExtensions; + +use crate::entities::OpenMessage; + +use super::{ + CertificateChainSynchronizer, OpenMessageStorer, RemoteCertificateRetriever, + SynchronizedCertificateStorer, +}; + +/// Service that synchronizes the certificate chain with a remote aggregator +pub struct MithrilCertificateChainSynchronizer { + remote_certificate_retriever: Arc, + certificate_storer: Arc, + certificate_verifier: Arc, + genesis_verifier: Arc, + open_message_storer: Arc, + logger: Logger, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +enum SyncStatus { + Forced, + NoLocalGenesis, + RemoteGenesisMatchesLocalGenesis, + RemoteGenesisDoesntMatchLocalGenesis, +} + +impl SyncStatus { + fn should_sync(&self) -> bool { + match self { + SyncStatus::Forced => true, + SyncStatus::NoLocalGenesis => true, + SyncStatus::RemoteGenesisMatchesLocalGenesis => false, + SyncStatus::RemoteGenesisDoesntMatchLocalGenesis => true, + } + } +} + +impl MithrilCertificateChainSynchronizer { + /// Create a new `MithrilCertificateChainSynchronizer` instance + pub fn new( + remote_certificate_retriever: Arc, + certificate_storer: Arc, + certificate_verifier: Arc, + genesis_verifier: Arc, + open_message_storer: Arc, + logger: Logger, + ) -> Self { + Self { + remote_certificate_retriever, + certificate_storer, + certificate_verifier, + genesis_verifier, + open_message_storer, + logger: logger.new_with_component_name::(), + } + } + + async fn check_sync_state(&self, force: bool) -> StdResult { + if force { + return Ok(SyncStatus::Forced); + } + + match self.certificate_storer.get_latest_genesis().await? { + Some(local_genesis) => { + match self + .remote_certificate_retriever + .get_genesis_certificate_details() + .await? + { + Some(remote_genesis) if (local_genesis == remote_genesis) => { + Ok(SyncStatus::RemoteGenesisMatchesLocalGenesis) + } + Some(_) => Ok(SyncStatus::RemoteGenesisDoesntMatchLocalGenesis), + // The remote aggregator doesn't have a chain yet, we can't sync + None => Err(anyhow!("Remote aggregator doesn't have a chain yet")), + } + } + None => Ok(SyncStatus::NoLocalGenesis), + } + } + + async fn retrieve_and_validate_remote_certificate_chain( + &self, + starting_point: Certificate, + ) -> StdResult> { + // IMPORTANT: Order matters, returned certificates must be ordered from genesis to latest + // (fetched database data is returned from last inserted to oldest) + let mut validated_certificates = VecDeque::new(); + let mut certificate = starting_point; + + loop { + let parent_certificate = self + .certificate_verifier + .verify_certificate(&certificate, &self.genesis_verifier.to_verification_key()) + .await + .with_context( + || format!("Failed to verify certificate: `{}`", certificate.hash,), + )?; + + match parent_certificate { + None => { + validated_certificates.push_front(certificate); + break; + } + Some(parent) => { + // At the start of the retrieval the first certificate may not be the first of + // its epoch, filter them out since we only need one certificate per epoch + if !validated_certificates.is_empty() || parent.epoch != certificate.epoch { + validated_certificates.push_front(certificate); + } + + certificate = parent; + } + } + } + + Ok(validated_certificates.into()) + } + + async fn store_certificate_chain(&self, certificate_chain: Vec) -> StdResult<()> { + self.certificate_storer + .insert_or_replace_many(certificate_chain) + .await?; + Ok(()) + } +} + +#[async_trait] +impl CertificateChainSynchronizer for MithrilCertificateChainSynchronizer { + async fn synchronize_certificate_chain(&self, force: bool) -> StdResult<()> { + debug!(self.logger, ">> synchronize_certificate_chain"; "force" => force); + + let sync_state = self.check_sync_state(force).await.with_context(|| { + format!("Failed to check if certificate chain should be sync (force: `{force}`)") + })?; + if sync_state.should_sync() { + info!(self.logger, "Start synchronizing certificate chain"; "sync_state" => ?sync_state); + } else { + info!(self.logger, "No need to synchronize certificate chain"; "sync_state" => ?sync_state); + return Ok(()); + } + + let starting_point = self + .remote_certificate_retriever + .get_latest_certificate_details() + .await? + .ok_or( + anyhow!("Remote aggregator doesn't have a chain yet") + .context("Failed to retrieve latest remote certificate details"), + )?; + let remote_certificate_chain = self + .retrieve_and_validate_remote_certificate_chain(starting_point) + .await + .with_context(|| "Failed to retrieve and validate remote certificate chain")?; + let open_message = prepare_open_message_to_store( + remote_certificate_chain + .last() + .ok_or(anyhow!("Retrieved certificate chain is empty"))?, + ); + self.store_certificate_chain(remote_certificate_chain) + .await + .with_context(|| "Failed to store remote retrieved certificate chain")?; + self.open_message_storer + .insert_or_replace_open_message(open_message) + .await + .with_context(|| "Failed to store open message when synchronizing certificate chain")?; + + info!( + self.logger, + "Certificate chain synchronized with remote source" + ); + Ok(()) + } +} + +fn prepare_open_message_to_store(latest_certificate: &Certificate) -> OpenMessage { + OpenMessage { + epoch: latest_certificate.epoch, + signed_entity_type: SignedEntityType::MithrilStakeDistribution(latest_certificate.epoch), + protocol_message: latest_certificate.protocol_message.clone(), + is_certified: true, + is_expired: false, + single_signatures: Vec::new(), + created_at: Utc::now(), + expires_at: None, + } +} + +#[cfg(test)] +mod tests { + use anyhow::anyhow; + use std::sync::RwLock; + + use mithril_common::certificate_chain::{ + FakeCertificaterRetriever, MithrilCertificateVerifier, + }; + use mithril_common::test_utils::{ + CertificateChainBuilder, CertificateChainFixture, fake_data, fake_keys, + mock_extensions::MockBuilder, + }; + + use crate::services::{ + MockOpenMessageStorer, MockRemoteCertificateRetriever, MockSynchronizedCertificateStorer, + }; + use crate::test_tools::TestLogger; + use crate::tools::mocks::MockCertificateVerifier; + + use super::*; + + impl MithrilCertificateChainSynchronizer { + fn default_for_test() -> Self { + let genesis_verification_key = + fake_keys::genesis_verification_key()[0].try_into().unwrap(); + Self::new( + Arc::new(MockRemoteCertificateRetriever::new()), + Arc::new(MockSynchronizedCertificateStorer::new()), + Arc::new(MockCertificateVerifier::new()), + Arc::new(ProtocolGenesisVerifier::from_verification_key( + genesis_verification_key, + )), + Arc::new(MockOpenMessageStorer::new()), + TestLogger::stdout(), + ) + } + } + + macro_rules! mocked_synchronizer { + (with_remote_genesis: $remote_genesis_result:expr) => { + MithrilCertificateChainSynchronizer { + remote_certificate_retriever: + MockBuilder::::configure(|retriever| { + retriever + .expect_get_genesis_certificate_details() + .return_once(move || $remote_genesis_result); + }), + ..MithrilCertificateChainSynchronizer::default_for_test() + } + }; + (with_local_genesis: $local_genesis_result:expr) => { + MithrilCertificateChainSynchronizer { + certificate_storer: MockBuilder::::configure( + |storer| { + storer + .expect_get_latest_genesis() + .return_once(move || $local_genesis_result); + }, + ), + ..MithrilCertificateChainSynchronizer::default_for_test() + } + }; + (with_remote_genesis: $remote_genesis_result:expr, with_local_genesis: $local_genesis_result:expr) => { + MithrilCertificateChainSynchronizer { + remote_certificate_retriever: + MockBuilder::::configure(|retriever| { + retriever + .expect_get_genesis_certificate_details() + .return_once(move || $remote_genesis_result); + }), + certificate_storer: MockBuilder::::configure( + |storer| { + storer + .expect_get_latest_genesis() + .return_once(move || $local_genesis_result); + }, + ), + ..MithrilCertificateChainSynchronizer::default_for_test() + } + }; + (with_verify_certificate_result: $verify_certificate_result:expr) => { + MithrilCertificateChainSynchronizer { + certificate_verifier: MockBuilder::::configure( + |verifier| { + verifier + .expect_verify_certificate() + .return_once(move |_, _| $verify_certificate_result); + }, + ), + ..MithrilCertificateChainSynchronizer::default_for_test() + } + }; + } + + fn fake_verifier(remote_certificate_chain: &[Certificate]) -> Arc { + let verifier = MithrilCertificateVerifier::new( + TestLogger::stdout(), + Arc::new(FakeCertificaterRetriever::from_certificates( + remote_certificate_chain, + )), + ); + Arc::new(verifier) + } + + #[derive(Default)] + struct DumbCertificateStorer { + certificates: RwLock>, + genesis_certificate: Option, + } + + impl DumbCertificateStorer { + fn new(genesis: Certificate, already_stored: Vec) -> Self { + Self { + certificates: RwLock::new(already_stored), + genesis_certificate: Some(genesis), + } + } + + fn stored_certificates(&self) -> Vec { + self.certificates.read().unwrap().clone() + } + } + + #[async_trait] + impl SynchronizedCertificateStorer for DumbCertificateStorer { + async fn insert_or_replace_many( + &self, + certificates_chain: Vec, + ) -> StdResult<()> { + let mut certificates = self.certificates.write().unwrap(); + *certificates = certificates_chain; + Ok(()) + } + + async fn get_latest_genesis(&self) -> StdResult> { + Ok(self.genesis_certificate.clone()) + } + } + + mod check_sync_state { + use super::*; + + #[test] + fn sync_state_should_sync() { + assert!(SyncStatus::Forced.should_sync()); + assert!(!SyncStatus::RemoteGenesisMatchesLocalGenesis.should_sync()); + assert!(SyncStatus::RemoteGenesisDoesntMatchLocalGenesis.should_sync()); + assert!(SyncStatus::NoLocalGenesis.should_sync()); + } + + #[tokio::test] + async fn state_when_force_true() { + let synchronizer = MithrilCertificateChainSynchronizer::default_for_test(); + + let sync_state = synchronizer.check_sync_state(true).await.unwrap(); + assert_eq!(SyncStatus::Forced, sync_state); + } + + #[tokio::test] + async fn state_when_force_false_and_no_local_genesis_certificate_found() { + let synchronizer = mocked_synchronizer!(with_local_genesis: Ok(None)); + + let sync_state = synchronizer.check_sync_state(false).await.unwrap(); + assert_eq!(SyncStatus::NoLocalGenesis, sync_state); + } + + #[tokio::test] + async fn state_when_force_false_and_remote_genesis_dont_matches_local_genesis() { + let synchronizer = mocked_synchronizer!( + with_remote_genesis: Ok(Some(fake_data::genesis_certificate("remote_genesis"))), + with_local_genesis: Ok(Some(fake_data::genesis_certificate("local_genesis"))) + ); + + let sync_state = synchronizer.check_sync_state(false).await.unwrap(); + assert_eq!(SyncStatus::RemoteGenesisDoesntMatchLocalGenesis, sync_state); + } + + #[tokio::test] + async fn state_when_force_false_and_remote_genesis_matches_local_genesis() { + let remote_genesis = fake_data::genesis_certificate("genesis"); + let local_genesis = remote_genesis.clone(); + let synchronizer = mocked_synchronizer!( + with_remote_genesis: Ok(Some(remote_genesis)), + with_local_genesis: Ok(Some(local_genesis)) + ); + + let sync_state = synchronizer.check_sync_state(false).await.unwrap(); + assert_eq!(SyncStatus::RemoteGenesisMatchesLocalGenesis, sync_state); + } + + #[tokio::test] + async fn if_force_true_it_should_not_fetch_remote_genesis_certificate() { + let synchronizer = mocked_synchronizer!(with_remote_genesis: Err(anyhow!( + "should not fetch genesis" + ))); + + synchronizer.check_sync_state(true).await.unwrap(); + } + + #[tokio::test] + async fn should_abort_with_error_if_force_false_and_fails_to_retrieve_local_genesis() { + let synchronizer = mocked_synchronizer!(with_local_genesis: Err(anyhow!("failure"))); + synchronizer + .check_sync_state(false) + .await + .expect_err("Expected an error but was:"); + } + + #[tokio::test] + async fn should_abort_with_error_if_force_false_and_fails_to_retrieve_remote_genesis() { + let synchronizer = mocked_synchronizer!( + with_remote_genesis: Err(anyhow!("failure")), + with_local_genesis: Ok(Some(fake_data::genesis_certificate("local_genesis"))) + ); + synchronizer + .check_sync_state(false) + .await + .expect_err("Expected an error but was:"); + } + + #[tokio::test] + async fn should_abort_with_error_if_force_false_and_remote_genesis_is_none() { + let synchronizer = mocked_synchronizer!( + with_remote_genesis: Ok(None), + with_local_genesis: Ok(Some(fake_data::genesis_certificate("local_genesis"))) + ); + let error = synchronizer + .check_sync_state(false) + .await + .expect_err("Expected an error but was:"); + + assert!( + error + .to_string() + .contains("Remote aggregator doesn't have a chain yet"), + "Unexpected error:\n{error:?}" + ); + } + } + + mod retrieve_validate_remote_certificate_chain { + use mockall::predicate::{always, eq}; + + use mithril_common::entities::Epoch; + + use super::*; + + #[tokio::test] + async fn succeed_if_the_remote_chain_only_contains_a_genesis_certificate() { + let chain = CertificateChainBuilder::new().with_total_certificates(1).build(); + let synchronizer = MithrilCertificateChainSynchronizer { + certificate_verifier: fake_verifier(&chain), + genesis_verifier: Arc::new(chain.genesis_verifier.clone()), + ..MithrilCertificateChainSynchronizer::default_for_test() + }; + + let starting_point = chain[0].clone(); + let remote_certificate_chain = synchronizer + .retrieve_and_validate_remote_certificate_chain(starting_point) + .await + .unwrap(); + + assert_eq!(remote_certificate_chain, chain.certificates_chained); + } + + #[tokio::test] + async fn abort_with_error_if_a_certificate_is_invalid() { + let synchronizer = mocked_synchronizer!(with_verify_certificate_result: Err(anyhow!("invalid certificate"))); + + let starting_point = fake_data::certificate("certificate"); + synchronizer + .retrieve_and_validate_remote_certificate_chain(starting_point) + .await + .expect_err("Expected an error but was:"); + } + + #[tokio::test] + async fn succeed_with_a_valid_certificate_chain_and_only_get_first_certificate_of_each_epoch_plus_genesis() + { + // Note: the `CertificateChainBuilder` use one epoch for the genesis only, so in order + // for the last epoch to have two certificates when `certificates_per_epoch` is an *even* + // number, we need to set `total_certificates` to an *odd* number + let chain = CertificateChainBuilder::new() + .with_total_certificates(9) + .with_certificates_per_epoch(2) + .build(); + let synchronizer = MithrilCertificateChainSynchronizer { + certificate_verifier: fake_verifier(&chain), + genesis_verifier: Arc::new(chain.genesis_verifier.clone()), + ..MithrilCertificateChainSynchronizer::default_for_test() + }; + + let starting_point = chain[0].clone(); + let remote_certificate_chain = synchronizer + .retrieve_and_validate_remote_certificate_chain(starting_point.clone()) + .await + .unwrap(); + + let mut expected = chain.certificate_path_to_genesis(&starting_point.hash); + // Remote certificate chain is returned ordered from genesis to latest + expected.reverse(); + // Remove the latest certificate has it's not the first of its epoch + expected.pop(); + assert_eq!(remote_certificate_chain, expected); + } + + #[tokio::test] + async fn return_chain_ordered_from_genesis_to_latest() { + let base_certificate = fake_data::certificate("whatever"); + let chain = vec![ + Certificate { + epoch: Epoch(2), + ..fake_data::genesis_certificate("genesis") + }, + Certificate { + epoch: Epoch(3), + hash: "hash1".to_string(), + previous_hash: "genesis".to_string(), + ..base_certificate.clone() + }, + Certificate { + epoch: Epoch(4), + hash: "hash2".to_string(), + previous_hash: "hash1".to_string(), + ..base_certificate + }, + ]; + let synchronizer = MithrilCertificateChainSynchronizer { + certificate_verifier: MockBuilder::::configure(|mock| { + let cert_1 = chain[1].clone(); + mock.expect_verify_certificate() + .with(eq(chain[2].clone()), always()) + .return_once(move |_, _| Ok(Some(cert_1))); + let genesis = chain[0].clone(); + mock.expect_verify_certificate() + .with(eq(chain[1].clone()), always()) + .return_once(move |_, _| Ok(Some(genesis))); + mock.expect_verify_certificate() + .with(eq(chain[0].clone()), always()) + .return_once(move |_, _| Ok(None)); + }), + ..MithrilCertificateChainSynchronizer::default_for_test() + }; + + let starting_point = chain[2].clone(); + let remote_certificate_chain = synchronizer + .retrieve_and_validate_remote_certificate_chain(starting_point.clone()) + .await + .unwrap(); + + assert_eq!( + remote_certificate_chain + .into_iter() + .map(|c| c.hash) + .collect::>(), + vec!["genesis".to_string(), "hash1".to_string(), "hash2".to_string()] + ); + } + } + + mod store_remote_certificate_chain { + use super::*; + + #[tokio::test] + async fn do_store_given_certificates() { + let certificates_chain = vec![ + fake_data::genesis_certificate("genesis"), + fake_data::certificate("certificate1"), + fake_data::certificate("certificate2"), + ]; + let storer = Arc::new(DumbCertificateStorer::default()); + let synchronizer = MithrilCertificateChainSynchronizer { + certificate_storer: storer.clone(), + ..MithrilCertificateChainSynchronizer::default_for_test() + }; + + assert_eq!(Vec::::new(), storer.stored_certificates()); + + synchronizer + .store_certificate_chain(certificates_chain.clone()) + .await + .unwrap(); + + assert_eq!(certificates_chain, storer.stored_certificates()); + } + + #[tokio::test] + async fn fail_on_storer_error() { + let synchronizer = MithrilCertificateChainSynchronizer { + certificate_storer: MockBuilder::::configure( + |mock| { + mock.expect_insert_or_replace_many() + .return_once(move |_| Err(anyhow!("failure"))); + }, + ), + ..MithrilCertificateChainSynchronizer::default_for_test() + }; + + synchronizer + .store_certificate_chain(vec![fake_data::certificate("certificate")]) + .await + .unwrap_err(); + } + } + + mod synchronize_certificate_chain { + use mockall::predicate::function; + + use super::*; + + fn build_synchronizer( + remote_chain: &CertificateChainFixture, + storer: Arc, + ) -> MithrilCertificateChainSynchronizer { + MithrilCertificateChainSynchronizer { + certificate_storer: storer.clone(), + remote_certificate_retriever: + MockBuilder::::configure(|mock| { + let genesis = remote_chain.genesis_certificate().clone(); + mock.expect_get_genesis_certificate_details() + .return_once(move || Ok(Some(genesis))); + let latest = remote_chain.latest_certificate().clone(); + mock.expect_get_latest_certificate_details() + .return_once(move || Ok(Some(latest))); + }), + certificate_verifier: fake_verifier(remote_chain), + open_message_storer: MockBuilder::::configure(|mock| { + // Ensure that `store_open_message` is called + let expected_msd_epoch = remote_chain.latest_certificate().epoch; + mock.expect_insert_or_replace_open_message() + .with(function(move |open_message: &OpenMessage| { + open_message.signed_entity_type + == SignedEntityType::MithrilStakeDistribution(expected_msd_epoch) + })) + .times(1..) + .returning(|_| Ok(())); + }), + ..MithrilCertificateChainSynchronizer::default_for_test() + } + } + + #[tokio::test] + async fn store_all() { + let remote_chain = CertificateChainBuilder::default() + .with_certificates_per_epoch(3) + .with_total_certificates(8) + .build(); + let storer = Arc::new(DumbCertificateStorer::default()); + let synchronizer = build_synchronizer(&remote_chain, storer.clone()); + + // Will sync even if force is false + synchronizer.synchronize_certificate_chain(false).await.unwrap(); + + let mut expected = + remote_chain.certificate_path_to_genesis(&remote_chain.latest_certificate().hash); + expected.reverse(); + assert_eq!(expected, storer.stored_certificates()); + } + + #[tokio::test] + async fn store_partial() { + let remote_chain = CertificateChainBuilder::default() + .with_certificates_per_epoch(1) + .with_total_certificates(8) + .build(); + let existing_certificates = + remote_chain.certificate_path_to_genesis(&remote_chain[5].hash); + let storer = Arc::new(DumbCertificateStorer::new( + remote_chain.genesis_certificate().clone(), + existing_certificates.clone(), + )); + let synchronizer = build_synchronizer(&remote_chain, storer.clone()); + + // Force false - won't sync + synchronizer.synchronize_certificate_chain(false).await.unwrap(); + + assert_eq!(&existing_certificates, &storer.stored_certificates()); + + // Force true - will sync + synchronizer.synchronize_certificate_chain(true).await.unwrap(); + + let mut expected = + remote_chain.certificate_path_to_genesis(&remote_chain.latest_certificate().hash); + expected.reverse(); + assert_eq!(expected, storer.stored_certificates()); + } + } +} diff --git a/mithril-aggregator/src/services/mod.rs b/mithril-aggregator/src/services/mod.rs index d4649849fde..d8443da5f65 100644 --- a/mithril-aggregator/src/services/mod.rs +++ b/mithril-aggregator/src/services/mod.rs @@ -11,6 +11,7 @@ mod aggregator_client; mod cardano_transactions_importer; +mod certificate_chain_synchronizer; mod certifier; mod epoch_service; mod message; @@ -27,6 +28,7 @@ mod usage_reporter; pub use aggregator_client::*; pub use cardano_transactions_importer::*; +pub use certificate_chain_synchronizer::*; pub use certifier::*; pub use epoch_service::*; pub use message::*; diff --git a/mithril-aggregator/src/services/signer_registration/api.rs b/mithril-aggregator/src/services/signer_registration/api.rs index f5b88959080..a2697455b81 100644 --- a/mithril-aggregator/src/services/signer_registration/api.rs +++ b/mithril-aggregator/src/services/signer_registration/api.rs @@ -5,6 +5,8 @@ use mithril_common::{ entities::{Epoch, Signer, SignerWithStake, StakeDistribution}, }; +use crate::entities::LeaderAggregatorEpochSettings; + use super::SignerRegistrationError; /// Represents the information needed to handle a signer registration round @@ -87,3 +89,11 @@ pub trait SignerRegistrationVerifier: Send + Sync { stake_distribution: &StakeDistribution, ) -> StdResult; } + +/// Define how data are retrieved from a leader aggregator +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait LeaderAggregatorClient: Sync + Send { + /// Retrieves epoch settings from the aggregator + async fn retrieve_epoch_settings(&self) -> StdResult>; +} diff --git a/mithril-aggregator/src/services/signer_registration/follower.rs b/mithril-aggregator/src/services/signer_registration/follower.rs index 6daaa821b14..78f4a0b6ce5 100644 --- a/mithril-aggregator/src/services/signer_registration/follower.rs +++ b/mithril-aggregator/src/services/signer_registration/follower.rs @@ -11,12 +11,11 @@ use mithril_persistence::store::StakeStorer; use crate::{ SignerRegistrationVerifier, VerificationKeyStorer, dependency_injection::EpochServiceWrapper, - services::AggregatorClient, }; use super::{ - SignerRecorder, SignerRegisterer, SignerRegistrationError, SignerRegistrationRound, - SignerRegistrationRoundOpener, SignerSynchronizer, + LeaderAggregatorClient, SignerRecorder, SignerRegisterer, SignerRegistrationError, + SignerRegistrationRound, SignerRegistrationRoundOpener, SignerSynchronizer, }; /// A [MithrilSignerRegistrationFollower] supports signer registrations in a follower aggregator @@ -34,7 +33,7 @@ pub struct MithrilSignerRegistrationFollower { signer_registration_verifier: Arc, /// Leader aggregator client - leader_aggregator_client: Arc, + leader_aggregator_client: Arc, /// Stake store stake_store: Arc, @@ -47,7 +46,7 @@ impl MithrilSignerRegistrationFollower { verification_key_store: Arc, signer_recorder: Arc, signer_registration_verifier: Arc, - leader_aggregator_client: Arc, + leader_aggregator_client: Arc, stake_store: Arc, ) -> Self { Self { @@ -178,36 +177,28 @@ impl SignerRegistrationRoundOpener for MithrilSignerRegistrationFollower { #[cfg(test)] mod tests { - use std::sync::Arc; - - use anyhow::anyhow; - use mithril_persistence::store::StakeStorer; - - use mithril_common::{ - entities::{Epoch, Signer, SignerWithStake}, - messages::{EpochSettingsMessage, SignerMessagePart, TryFromMessageAdapter}, - test_utils::MithrilFixtureBuilder, + use mithril_common::messages::{ + EpochSettingsMessage, SignerMessagePart, TryFromMessageAdapter, }; + use mithril_common::test_utils::MithrilFixtureBuilder; use crate::{ - MithrilSignerRegistrationFollower, SignerRecorder, SignerRegisterer, - SignerRegistrationRoundOpener, SignerRegistrationVerifier, VerificationKeyStorer, database::{repository::SignerRegistrationStore, test_helper::main_db_connection}, message_adapters::FromEpochSettingsAdapter, services::{ - AggregatorClient, AggregatorClientError, FakeEpochService, MockAggregatorClient, - MockSignerRecorder, MockSignerRegistrationVerifier, SignerSynchronizer, + FakeEpochService, MockLeaderAggregatorClient, MockSignerRecorder, + MockSignerRegistrationVerifier, }, tools::mocks::MockStakeStore, }; + use super::*; + use test_utils::*; mod test_utils { use tokio::sync::RwLock; - use crate::{dependency_injection::EpochServiceWrapper, services::FakeEpochService}; - use super::*; /// MithrilSignerRegistrationFollowerBuilder is a test builder for [MithrilSignerRegistrationFollower] @@ -215,7 +206,7 @@ mod tests { epoch_service: EpochServiceWrapper, signer_recorder: Arc, signer_registration_verifier: Arc, - leader_aggregator_client: Arc, + leader_aggregator_client: Arc, stake_store: Arc, verification_key_store: Arc, } @@ -226,7 +217,7 @@ mod tests { epoch_service: Arc::new(RwLock::new(FakeEpochService::without_data())), signer_recorder: Arc::new(MockSignerRecorder::new()), signer_registration_verifier: Arc::new(MockSignerRegistrationVerifier::new()), - leader_aggregator_client: Arc::new(MockAggregatorClient::new()), + leader_aggregator_client: Arc::new(MockLeaderAggregatorClient::new()), stake_store: Arc::new(MockStakeStore::new()), verification_key_store: Arc::new(SignerRegistrationStore::new( Arc::new(main_db_connection().unwrap()), @@ -263,7 +254,7 @@ mod tests { pub fn with_leader_aggregator_client( self, - leader_aggregator_client: Arc, + leader_aggregator_client: Arc, ) -> Self { Self { leader_aggregator_client, @@ -359,7 +350,7 @@ mod tests { Arc::new(signer_registration_verifier) }) .with_leader_aggregator_client({ - let mut aggregator_client = MockAggregatorClient::new(); + let mut aggregator_client = MockLeaderAggregatorClient::new(); aggregator_client .expect_retrieve_epoch_settings() .returning(move || Ok(Some(epoch_settings_message.clone()))) @@ -421,7 +412,7 @@ mod tests { Arc::new(signer_registration_verifier) }) .with_leader_aggregator_client({ - let mut aggregator_client = MockAggregatorClient::new(); + let mut aggregator_client = MockLeaderAggregatorClient::new(); aggregator_client .expect_retrieve_epoch_settings() .returning(move || Ok(Some(epoch_settings_message.clone()))) @@ -488,7 +479,7 @@ mod tests { Arc::new(signer_registration_verifier) }) .with_leader_aggregator_client({ - let mut aggregator_client = MockAggregatorClient::new(); + let mut aggregator_client = MockLeaderAggregatorClient::new(); aggregator_client .expect_retrieve_epoch_settings() .returning(move || Ok(Some(epoch_settings_message.clone()))) @@ -517,14 +508,10 @@ mod tests { async fn synchronize_all_signers_fails_if_fetching_epoch_settings_fails() { let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default() .with_leader_aggregator_client({ - let mut aggregator_client = MockAggregatorClient::new(); + let mut aggregator_client = MockLeaderAggregatorClient::new(); aggregator_client .expect_retrieve_epoch_settings() - .returning(move || { - Err(AggregatorClientError::RemoteServerTechnical(anyhow!( - "an error" - ))) - }) + .returning(move || Err(anyhow!("an error"))) .times(1); Arc::new(aggregator_client) @@ -553,7 +540,7 @@ mod tests { .unwrap(); let signer_registration_follower = MithrilSignerRegistrationFollowerBuilder::default() .with_leader_aggregator_client({ - let mut aggregator_client = MockAggregatorClient::new(); + let mut aggregator_client = MockLeaderAggregatorClient::new(); aggregator_client .expect_retrieve_epoch_settings() .returning(move || Ok(Some(epoch_settings_message.clone()))) diff --git a/mithril-aggregator/src/services/signer_registration/mod.rs b/mithril-aggregator/src/services/signer_registration/mod.rs index ac7aa7aa442..8dfa1ac55c7 100644 --- a/mithril-aggregator/src/services/signer_registration/mod.rs +++ b/mithril-aggregator/src/services/signer_registration/mod.rs @@ -5,8 +5,8 @@ mod leader; mod verifier; pub use api::{ - SignerRecorder, SignerRegisterer, SignerRegistrationRound, SignerRegistrationRoundOpener, - SignerRegistrationVerifier, SignerSynchronizer, + LeaderAggregatorClient, SignerRecorder, SignerRegisterer, SignerRegistrationRound, + SignerRegistrationRoundOpener, SignerRegistrationVerifier, SignerSynchronizer, }; pub use error::SignerRegistrationError; pub use follower::MithrilSignerRegistrationFollower; @@ -14,4 +14,7 @@ pub use leader::MithrilSignerRegistrationLeader; pub use verifier::MithrilSignerRegistrationVerifier; #[cfg(test)] -pub use api::{MockSignerRecorder, MockSignerRegisterer, MockSignerRegistrationVerifier}; +pub use api::{ + MockLeaderAggregatorClient, MockSignerRecorder, MockSignerRegisterer, + MockSignerRegistrationVerifier, +}; diff --git a/mithril-aggregator/src/tools/mocks.rs b/mithril-aggregator/src/tools/mocks.rs index f3b315de936..ceac43a6d2c 100644 --- a/mithril-aggregator/src/tools/mocks.rs +++ b/mithril-aggregator/src/tools/mocks.rs @@ -2,13 +2,45 @@ use async_trait::async_trait; use mithril_cardano_node_chain::chain_observer::{ChainObserver, ChainObserverError}; use mithril_cardano_node_chain::entities::{ChainAddress, TxDatum}; -use mithril_common::crypto_helper::{KesPeriod, OpCert}; -use mithril_common::entities::{ChainPoint, Epoch, StakeDistribution}; +use mithril_common::certificate_chain::CertificateVerifier; +use mithril_common::crypto_helper::{KesPeriod, OpCert, ProtocolGenesisVerificationKey}; +use mithril_common::entities::{Certificate, ChainPoint, Epoch, StakeDistribution}; use mithril_persistence::store::StakeStorer; use mithril_common::StdResult; use mockall::mock; +mock! { + pub CertificateVerifier {} + + #[async_trait] + impl CertificateVerifier for CertificateVerifier { + async fn verify_genesis_certificate( + &self, + genesis_certificate: &Certificate, + genesis_verification_key: &ProtocolGenesisVerificationKey, + ) -> StdResult<()>; + + async fn verify_standard_certificate( + &self, + certificate: &Certificate, + previous_certificate: &Certificate, + ) -> StdResult<()>; + + async fn verify_certificate( + &self, + certificate: &Certificate, + genesis_verification_key: &ProtocolGenesisVerificationKey, + ) -> StdResult>; + + async fn verify_certificate_chain( + &self, + certificate: Certificate, + genesis_verification_key: &ProtocolGenesisVerificationKey, + ) -> StdResult<()>; + } +} + mock! { pub ChainObserver {} diff --git a/mithril-aggregator/tests/create_certificate_follower.rs b/mithril-aggregator/tests/create_certificate_follower.rs index 4148519e10a..3ded9df4bed 100644 --- a/mithril-aggregator/tests/create_certificate_follower.rs +++ b/mithril-aggregator/tests/create_certificate_follower.rs @@ -5,9 +5,8 @@ use std::{collections::HashMap, ops::Range}; use mithril_aggregator::ServeCommandConfiguration; use mithril_common::{ entities::{ - BlockNumber, CardanoTransactionsSigningConfig, ChainPoint, Epoch, ProtocolParameters, - SignedEntityType, SignedEntityTypeDiscriminants, SlotNumber, StakeDistributionParty, - TimePoint, + BlockNumber, ChainPoint, Epoch, ProtocolParameters, SignedEntityType, + SignedEntityTypeDiscriminants, SlotNumber, StakeDistributionParty, TimePoint, }, temp_dir, test_utils::{ @@ -86,7 +85,7 @@ async fn create_certificate_follower() { phi_f: 0.95, }; let fixtures = - EpochFixturesMapBuilder::build_fixtures_sequence(1..10, protocol_parameters.clone()); + EpochFixturesMapBuilder::build_fixtures_sequence(1..6, protocol_parameters.clone()); let epoch_fixtures_map = EpochFixturesMapBuilder::build_epoch_fixtures_map(&fixtures); let start_time_point = TimePoint { epoch: Epoch(1), @@ -100,16 +99,15 @@ async fn create_certificate_follower() { let leader_configuration = ServeCommandConfiguration { protocol_parameters: protocol_parameters.clone(), data_stores_directory: get_test_dir("create_certificate_leader"), - cardano_transactions_signing_config: CardanoTransactionsSigningConfig { - security_parameter: BlockNumber(0), - step: BlockNumber(30), - }, - signed_entity_types: Some(SignedEntityTypeDiscriminants::CardanoDatabase.to_string()), + signed_entity_types: Some( + SignedEntityTypeDiscriminants::CardanoStakeDistribution.to_string(), + ), ..ServeCommandConfiguration::new_sample(temp_dir!()) }; let mut leader_tester = RuntimeTester::build(start_time_point.clone(), leader_configuration.clone()).await; - let leader_aggregator_http_server = leader_tester.expose_epoch_settings().await.unwrap(); + let leader_aggregator_http_server = + leader_tester.spawn_leader_aggregator_http_server().await.unwrap(); let follower_configuration = ServeCommandConfiguration { data_stores_directory: get_test_dir("create_certificate_follower"), @@ -117,7 +115,7 @@ async fn create_certificate_follower() { "aggregator-integration", "create_certificate_follower", ), - leader_aggregator_endpoint: Some(leader_aggregator_http_server.url()), + leader_aggregator_endpoint: Some(leader_aggregator_http_server.url().to_string()), ..leader_configuration }; let mut follower_tester = RuntimeTester::build(start_time_point, follower_configuration).await; @@ -126,6 +124,7 @@ async fn create_certificate_follower() { "Epoch 1: - the leader aggregator registers the first signers - the leader aggregator can't transition from 'Idle' to 'Ready' + - the follower can't synchronize its chain with the leader aggregator - the follower aggregator can't transition from 'Idle' to 'Ready' " ); @@ -210,7 +209,10 @@ async fn create_certificate_follower() { "Epoch 3: - the leader aggregator produces a new certificate - the follower aggregator synchronizes signers from the leader aggregator - - the follower aggregator can't transition from 'Idle' to 'Ready' + - the follower synchronizes its chain with the leader aggregator + - the follower aggregator can transition from 'Idle' to 'Ready' + - the follower aggregator transition from 'Ready' to 'Signing(CardanoStakeDistribution)', skipping + MithrilStakeDistribution " ); let epoch_fixture = &epoch_fixtures_map[&Epoch(3)]; @@ -232,10 +234,6 @@ async fn create_certificate_follower() { cycle!(leader_tester, "ready"); cycle!(leader_tester, "signing"); - comment!("Follower: change the epoch after leader"); - follower_tester.increase_epoch().await.unwrap(); - cycle_err!(follower_tester, "idle"); - comment!("Leader: register signers"); leader_tester .register_signers(&epoch_fixture.registering.signers_fixture()) @@ -255,98 +253,65 @@ async fn create_certificate_follower() { comment!("Leader: state machine should issue a certificate for the MithrilStakeDistribution"); cycle!(leader_tester, "signing"); - assert_last_certificate_eq!( - leader_tester, - ExpectedCertificate::new( - Epoch(3), - StakeDistributionParty::from_signers( - epoch_fixture.current_signing.unwrap().signers_with_stake() - ) - .as_slice(), - epoch_fixture.current_signing.unwrap().compute_and_encode_avk(), - SignedEntityType::MithrilStakeDistribution(Epoch(3)), - ExpectedCertificate::genesis_identifier(Epoch(2)), + let expected_certificate_on_both_aggregator = ExpectedCertificate::new( + Epoch(3), + StakeDistributionParty::from_signers( + epoch_fixture.current_signing.unwrap().signers_with_stake(), ) + .as_slice(), + epoch_fixture.current_signing.unwrap().compute_and_encode_avk(), + SignedEntityType::MithrilStakeDistribution(Epoch(3)), + ExpectedCertificate::genesis_identifier(Epoch(2)), ); + assert_last_certificate_eq!(leader_tester, expected_certificate_on_both_aggregator); cycle_err!(leader_tester, "signing"); comment!( - "Epoch 4: - - the leader aggregator produces a new certificate - - the follower aggregator synchronizes signers from the leader aggregator - - the follower aggregator bootstraps its genesis certificate - - the follower aggregator can't transition from 'Idle' to 'Ready'" + "Follower: change the epoch after leader created a certificate and synchronize certificate chain" ); - let epoch_fixture = &epoch_fixtures_map[&Epoch(4)]; - - comment!("Leader: update stake distribution source"); - leader_tester - .update_stake_distribution(epoch_fixture.registering.stake_distribution()) - .await - .unwrap(); - - comment!("Follower: update stake distribution source"); - follower_tester - .update_stake_distribution(epoch_fixture.registering.stake_distribution()) - .await - .unwrap(); - - comment!("Leader: change the epoch"); - leader_tester.increase_epoch().await.unwrap(); - cycle!(leader_tester, "idle"); - cycle!(leader_tester, "ready"); - - comment!("Follower: change the epoch after leader"); follower_tester.increase_epoch().await.unwrap(); cycle_err!(follower_tester, "idle"); - - comment!("Follower: bootstrap the genesis certificate"); - follower_tester - .register_genesis_certificate(epoch_fixture.next_signing.unwrap()) - .await - .unwrap(); - - comment!("Leader: register signers"); - leader_tester - .register_signers(&epoch_fixture.registering.signers_fixture()) + assert_last_certificate_eq!( + follower_tester, synchronized_from_leader => expected_certificate_on_both_aggregator + ); + let current_msd_open_message = follower_tester + .observer + .get_current_open_message(SignedEntityTypeDiscriminants::MithrilStakeDistribution) .await .unwrap(); - cycle!(leader_tester, "signing"); + assert!( + current_msd_open_message + .as_ref() + .is_some_and(|m| m.epoch == 3 && m.is_certified), + "Expected a certified OpenMessage for MithrilStakeDistribution, got:\n{current_msd_open_message:#?}" + ); - comment!("Leader: signers send their single signature"); - leader_tester - .send_single_signatures( - SignedEntityTypeDiscriminants::MithrilStakeDistribution, - &epoch_fixture.current_signing.unwrap().signers_fixture(), - ) + comment!( + "Follower: transition to 'Signing(CardanoStakeDistribution)' directly since a\ + OpenMessage for MithrilStakeDistribution was stored by the synchronizer" + ); + cycle!(follower_tester, "ready"); + cycle!(follower_tester, "signing"); + let current_csd_open_message = follower_tester + .observer + .get_current_open_message(SignedEntityTypeDiscriminants::CardanoStakeDistribution) .await .unwrap(); - - comment!("Leader: state machine should issue a certificate for the MithrilStakeDistribution"); - cycle!(leader_tester, "ready"); - - assert_last_certificate_eq!( - leader_tester, - ExpectedCertificate::new( - Epoch(4), - StakeDistributionParty::from_signers( - epoch_fixture.current_signing.unwrap().signers_with_stake() - ) - .as_slice(), - epoch_fixture.current_signing.unwrap().compute_and_encode_avk(), - SignedEntityType::MithrilStakeDistribution(Epoch(4)), - ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(3))), - ) + assert!( + current_csd_open_message + .as_ref() + .is_some_and(|m| m.epoch == 3 && !m.is_certified), + "Expected a non-certified OpenMessage for CardanoStakeDistribution, got:\n{current_csd_open_message:#?}" ); - cycle!(leader_tester, "signing"); comment!( - "Epoch 5: + "Epoch 4: - the leader aggregator produces a new certificate + - the follower aggregator synchronizes signers from the leader aggregator - the follower aggregator produces a new certificate - - the follower aggregator new certificate uses the same avk as the leader aggregator's new certificate - "); - let epoch_fixture = &epoch_fixtures_map[&Epoch(5)]; + - the follower aggregator new certificate uses the same avk as the leader aggregator's new certificate" + ); + let epoch_fixture = &epoch_fixtures_map[&Epoch(4)]; comment!("Leader: update stake distribution source"); leader_tester @@ -360,16 +325,14 @@ async fn create_certificate_follower() { .await .unwrap(); - comment!("Follower: change the epoch before leader"); - follower_tester.increase_epoch().await.unwrap(); - cycle!(follower_tester, "idle"); - comment!("Leader: change the epoch"); leader_tester.increase_epoch().await.unwrap(); cycle!(leader_tester, "idle"); cycle!(leader_tester, "ready"); comment!("Follower: change the epoch after leader"); + follower_tester.increase_epoch().await.unwrap(); + cycle!(follower_tester, "idle"); cycle!(follower_tester, "ready"); comment!("Leader: register signers"); @@ -388,41 +351,43 @@ async fn create_certificate_follower() { .await .unwrap(); - comment!("Follower: signers send their single signature"); - cycle!(follower_tester, "signing"); - follower_tester - .send_single_signatures( - SignedEntityTypeDiscriminants::MithrilStakeDistribution, - &epoch_fixture.current_signing.unwrap().signers_fixture(), - ) - .await - .unwrap(); - comment!("Leader: state machine should issue a certificate for the MithrilStakeDistribution"); cycle!(leader_tester, "ready"); + let leader_expected_certificate = ExpectedCertificate::new( - Epoch(5), + Epoch(4), StakeDistributionParty::from_signers( epoch_fixture.current_signing.unwrap().signers_with_stake(), ) .as_slice(), epoch_fixture.current_signing.unwrap().compute_and_encode_avk(), - SignedEntityType::MithrilStakeDistribution(Epoch(5)), - ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(4))), + SignedEntityType::MithrilStakeDistribution(Epoch(4)), + ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(3))), ); assert_last_certificate_eq!(leader_tester, leader_expected_certificate); + cycle!(leader_tester, "signing"); + + comment!("Follower: signers send their single signature"); + cycle!(follower_tester, "signing"); + follower_tester + .send_single_signatures( + SignedEntityTypeDiscriminants::MithrilStakeDistribution, + &epoch_fixture.current_signing.unwrap().signers_fixture(), + ) + .await + .unwrap(); comment!("Follower: state machine should issue a certificate for the MithrilStakeDistribution"); cycle!(follower_tester, "ready"); let follower_expected_certificate = ExpectedCertificate::new( - Epoch(5), + Epoch(4), StakeDistributionParty::from_signers( epoch_fixture.current_signing.unwrap().signers_with_stake(), ) .as_slice(), epoch_fixture.current_signing.unwrap().compute_and_encode_avk(), - SignedEntityType::MithrilStakeDistribution(Epoch(5)), - ExpectedCertificate::genesis_identifier(Epoch(4)), + SignedEntityType::MithrilStakeDistribution(Epoch(4)), + ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(3))), ); assert_last_certificate_eq!(follower_tester, follower_expected_certificate); let expected_avk = epoch_fixture @@ -434,12 +399,12 @@ async fn create_certificate_follower() { assert_eq!(expected_avk, follower_expected_certificate.avk()); comment!( - "Epoch 6: + "Epoch 5: - the leader aggregator produces a new certificate - the follower aggregator produces a new certificate - the follower aggregator new certificate uses the same avk as the leader aggregator's new certificate "); - let epoch_fixture = &epoch_fixtures_map[&Epoch(6)]; + let epoch_fixture = &epoch_fixtures_map[&Epoch(5)]; comment!("Leader: update stake distribution source"); leader_tester @@ -492,28 +457,28 @@ async fn create_certificate_follower() { comment!("Leader: state machine should issue a certificate for the MithrilStakeDistribution"); cycle!(leader_tester, "ready"); let leader_expected_certificate = ExpectedCertificate::new( - Epoch(6), + Epoch(5), StakeDistributionParty::from_signers( epoch_fixture.current_signing.unwrap().signers_with_stake(), ) .as_slice(), epoch_fixture.current_signing.unwrap().compute_and_encode_avk(), - SignedEntityType::MithrilStakeDistribution(Epoch(6)), - ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(5))), + SignedEntityType::MithrilStakeDistribution(Epoch(5)), + ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(4))), ); assert_last_certificate_eq!(leader_tester, leader_expected_certificate); comment!("Follower: state machine should issue a certificate for the MithrilStakeDistribution"); cycle!(follower_tester, "ready"); let follower_expected_certificate = ExpectedCertificate::new( - Epoch(6), + Epoch(5), StakeDistributionParty::from_signers( epoch_fixture.current_signing.unwrap().signers_with_stake(), ) .as_slice(), epoch_fixture.current_signing.unwrap().compute_and_encode_avk(), - SignedEntityType::MithrilStakeDistribution(Epoch(6)), - ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(5))), + SignedEntityType::MithrilStakeDistribution(Epoch(5)), + ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(4))), ); assert_last_certificate_eq!(follower_tester, follower_expected_certificate); let expected_avk = epoch_fixture diff --git a/mithril-aggregator/tests/test_extensions/aggregator_observer.rs b/mithril-aggregator/tests/test_extensions/aggregator_observer.rs index a2b2a2980df..407f9687ee7 100644 --- a/mithril-aggregator/tests/test_extensions/aggregator_observer.rs +++ b/mithril-aggregator/tests/test_extensions/aggregator_observer.rs @@ -1,10 +1,10 @@ use anyhow::{Context, anyhow}; -use std::{collections::BTreeSet, sync::Arc}; +use std::sync::Arc; use mithril_aggregator::{ dependency_injection::{DependenciesBuilder, EpochServiceWrapper}, entities::OpenMessage, - services::{CertifierService, MessageService, SignedEntityService}, + services::{CertifierService, SignedEntityService}, }; use mithril_common::{ StdResult, @@ -12,7 +12,6 @@ use mithril_common::{ CardanoTransactionsSnapshot, Certificate, Epoch, SignedEntityType, SignedEntityTypeDiscriminants, TimePoint, }, - messages::EpochSettingsMessage, signable_builder::SignedEntity, }; use mithril_ticker::TickerService; @@ -23,7 +22,6 @@ pub struct AggregatorObserver { signed_entity_service: Arc, ticker_service: Arc, epoch_service: EpochServiceWrapper, - message_service: Arc, } impl AggregatorObserver { @@ -34,7 +32,6 @@ impl AggregatorObserver { signed_entity_service: deps_builder.get_signed_entity_service().await.unwrap(), ticker_service: deps_builder.get_ticker_service().await.unwrap(), epoch_service: deps_builder.get_epoch_service().await.unwrap(), - message_service: deps_builder.get_message_service().await.unwrap(), } } @@ -84,17 +81,6 @@ impl AggregatorObserver { .time_point_to_signed_entity(discriminant, &time_point) } - /// Get the current [EpochSettingsMessage] of the aggregator - pub async fn get_epoch_settings( - &self, - allowed_discriminants: BTreeSet, - ) -> StdResult { - self.message_service - .get_epoch_settings_message(allowed_discriminants) - .await - .with_context(|| "Querying the current epoch settings should not fail") - } - /// Get the last certificate produced by the aggregator pub async fn get_last_certificate(&self) -> StdResult { let certificate = self diff --git a/mithril-aggregator/tests/test_extensions/leader_aggregator_http_server.rs b/mithril-aggregator/tests/test_extensions/leader_aggregator_http_server.rs new file mode 100644 index 00000000000..72a387aa284 --- /dev/null +++ b/mithril-aggregator/tests/test_extensions/leader_aggregator_http_server.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; + +use axum::{ + Json, Router, + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::get, +}; +use axum_test::TestServer; +use reqwest::Url; + +use mithril_aggregator::services::MessageService; +use mithril_common::entities::SignedEntityTypeDiscriminants; +use mithril_common::logging::LoggerExtensions; +use mithril_common::{StdError, StdResult}; + +use crate::test_extensions::RuntimeTester; + +pub struct LeaderAggregatorHttpServer { + server: TestServer, + url: Url, +} + +impl LeaderAggregatorHttpServer { + pub fn spawn(runtime_tester: &RuntimeTester) -> StdResult { + let state = LeaderAggregatorRoutesState { + message_service: runtime_tester.dependencies.message_service.clone(), + logger: slog_scope::logger().new_with_component_name::(), + }; + let router = Router::new() + .route("/epoch-settings", get(epoch_settings)) + .route("/certificates", get(certificates_list)) + .route("/certificate/genesis", get(certificate_last_genesis)) + .route("/certificate/{hash}", get(certificate_by_hash)) + .with_state(state); + + let server = TestServer::builder().http_transport().build(router)?; + let url = server.server_address().unwrap(); + + Ok(Self { server, url }) + } + + pub fn url(&self) -> &Url { + &self.url + } +} + +#[derive(Clone)] +struct LeaderAggregatorRoutesState { + message_service: Arc, + logger: slog::Logger, +} + +fn internal_server_error(err: StdError) -> impl IntoResponse { + (StatusCode::INTERNAL_SERVER_ERROR, Json(err.to_string())) +} + +async fn epoch_settings(state: State) -> Response { + slog::debug!(state.logger, "/epoch-settings"); + let allowed_discriminants = SignedEntityTypeDiscriminants::all(); + let epoch_settings_message = state + .message_service + .get_epoch_settings_message(allowed_discriminants) + .await; + + match epoch_settings_message { + Ok(message) => (StatusCode::OK, Json(message)).into_response(), + Err(err) => internal_server_error(err).into_response(), + } +} + +async fn certificates_list(state: State) -> Response { + slog::debug!(state.logger, "/certificates"); + match state.message_service.get_certificate_list_message(5).await { + Ok(message) => (StatusCode::OK, Json(message)).into_response(), + Err(err) => internal_server_error(err).into_response(), + } +} + +async fn certificate_last_genesis(state: State) -> Response { + slog::debug!(state.logger, "/certificate/genesis"); + match state.message_service.get_latest_genesis_certificate_message().await { + Ok(Some(message)) => (StatusCode::OK, Json(message)).into_response(), + Ok(None) => StatusCode::NOT_FOUND.into_response(), + Err(err) => internal_server_error(err).into_response(), + } +} + +async fn certificate_by_hash( + Path(hash): Path, + state: State, +) -> Response { + slog::debug!(state.logger, "/certificate/{hash}"); + match state.message_service.get_certificate_message(&hash).await { + Ok(Some(message)) => (StatusCode::OK, Json(message)).into_response(), + Ok(None) => StatusCode::NOT_FOUND.into_response(), + Err(err) => internal_server_error(err).into_response(), + } +} diff --git a/mithril-aggregator/tests/test_extensions/mod.rs b/mithril-aggregator/tests/test_extensions/mod.rs index 4ac0f630b04..7d90a61aa2f 100644 --- a/mithril-aggregator/tests/test_extensions/mod.rs +++ b/mithril-aggregator/tests/test_extensions/mod.rs @@ -8,6 +8,7 @@ pub mod runtime_tester; pub mod utilities; pub mod aggregator_observer; mod expected_certificate; +mod leader_aggregator_http_server; mod metrics_tester; pub use aggregator_observer::AggregatorObserver; diff --git a/mithril-aggregator/tests/test_extensions/runtime_tester.rs b/mithril-aggregator/tests/test_extensions/runtime_tester.rs index 49447ac5c2c..5cae914f313 100644 --- a/mithril-aggregator/tests/test_extensions/runtime_tester.rs +++ b/mithril-aggregator/tests/test_extensions/runtime_tester.rs @@ -1,13 +1,9 @@ use anyhow::{Context, anyhow}; use chrono::Utc; -use serde_json::json; use slog::Drain; use slog_scope::debug; -use std::convert::Infallible; use std::sync::Arc; use std::time::Duration; -use warp::Filter; -use warp::http::StatusCode; use mithril_aggregator::{ AggregatorRuntime, ConfigurationSource, DumbUploader, MetricsService, @@ -38,9 +34,9 @@ use mithril_common::{ }, }; use mithril_era::{EraMarker, EraReader, adapters::EraReaderDummyAdapter}; -use mithril_test_http_server::{TestHttpServer, test_http_server}; -use crate::test_extensions::utilities::tx_hash; +use crate::test_extensions::leader_aggregator_http_server::LeaderAggregatorHttpServer; +use crate::test_extensions::utilities::{async_wait, tx_hash}; use crate::test_extensions::{AggregatorObserver, ExpectedCertificate, MetricsVerifier}; #[macro_export] @@ -48,14 +44,15 @@ macro_rules! cycle { ( $tester:expr, $expected_state:expr ) => {{ use $crate::test_extensions::ExpectedMetrics; + let runtime_tester: &mut RuntimeTester = &mut $tester; let (runtime_cycle_success, runtime_cycle_total) = - $tester.get_runtime_cycle_success_and_total_since_startup_metrics(); + runtime_tester.get_runtime_cycle_success_and_total_since_startup_metrics(); - RuntimeTester::cycle(&mut $tester).await.unwrap(); - assert_eq!($expected_state, $tester.runtime.get_state()); + runtime_tester.cycle().await.unwrap(); + assert_eq!($expected_state, runtime_tester.runtime.get_state()); assert_metrics_eq!( - $tester.metrics_verifier, + runtime_tester.metrics_verifier, ExpectedMetrics::new() .runtime_cycle_success(runtime_cycle_success + 1) .runtime_cycle_total(runtime_cycle_total + 1) @@ -68,16 +65,19 @@ macro_rules! cycle_err { ( $tester:expr, $expected_state:expr ) => {{ use $crate::test_extensions::ExpectedMetrics; + let runtime_tester: &mut RuntimeTester = &mut $tester; let (runtime_cycle_success, runtime_cycle_total) = - $tester.get_runtime_cycle_success_and_total_since_startup_metrics(); + runtime_tester.get_runtime_cycle_success_and_total_since_startup_metrics(); - RuntimeTester::cycle(&mut $tester) + let err = runtime_tester + .cycle() .await .expect_err("cycle tick should have returned an error"); - assert_eq!($expected_state, $tester.runtime.get_state()); + slog_scope::info!("cycle_err result: {err:?}"); + assert_eq!($expected_state, runtime_tester.runtime.get_state()); assert_metrics_eq!( - $tester.metrics_verifier, + runtime_tester.metrics_verifier, ExpectedMetrics::new() .runtime_cycle_success(runtime_cycle_success) .runtime_cycle_total(runtime_cycle_total + 1) @@ -88,11 +88,27 @@ macro_rules! cycle_err { #[macro_export] macro_rules! assert_last_certificate_eq { ( $tester:expr, $expected_certificate:expr ) => {{ + let runtime_tester: &mut RuntimeTester = &mut $tester; if let Some(signed_type) = $expected_certificate.get_signed_type() { - $tester.wait_until_signed_entity(&signed_type).await.unwrap(); + runtime_tester.wait_until_signed_entity(&signed_type).await.unwrap(); } - let last_certificate = RuntimeTester::get_last_expected_certificate(&mut $tester) + let is_synchronized_from_leader = false; + let last_certificate = runtime_tester + .get_last_expected_certificate(is_synchronized_from_leader) + .await + .unwrap(); + assert_eq!($expected_certificate, last_certificate); + }}; + ( $tester:expr, synchronized_from_leader => $expected_certificate:expr ) => {{ + let runtime_tester: &mut RuntimeTester = &mut $tester; + if let Some(signed_type) = $expected_certificate.get_signed_type() { + runtime_tester.wait_until_certificate(&signed_type).await.unwrap(); + } + + let is_synchronized_from_leader = true; + let last_certificate = runtime_tester + .get_last_expected_certificate(is_synchronized_from_leader) .await .unwrap(); assert_eq!($expected_certificate, last_certificate); @@ -240,37 +256,10 @@ impl RuntimeTester { Ok(()) } - pub async fn expose_epoch_settings(&mut self) -> StdResult { - fn with_observer( - runtime_tester: &RuntimeTester, - ) -> impl Filter,), Error = Infallible> + Clone + use<> - { - let observer = runtime_tester.observer.clone(); - warp::any().map(move || observer.clone()) - } - - async fn epoch_settings_handler( - observer: Arc, - ) -> Result { - let allowed_discriminants = SignedEntityTypeDiscriminants::all(); - let epoch_settings_message = observer.get_epoch_settings(allowed_discriminants).await; - match epoch_settings_message { - Ok(message) => Ok(Box::new(warp::reply::with_status( - warp::reply::json(&message), - StatusCode::OK, - ))), - Err(err) => Ok(Box::new(warp::reply::with_status( - warp::reply::json(&json!(err.to_string())), - StatusCode::INTERNAL_SERVER_ERROR, - ))), - } - } - - let routes = warp::path("epoch-settings") - .and(with_observer(self)) - .and_then(epoch_settings_handler); - - Ok(test_http_server(routes)) + pub async fn spawn_leader_aggregator_http_server( + &self, + ) -> StdResult { + LeaderAggregatorHttpServer::spawn(self) } /// Increase the immutable file number of the simulated db, returns the new number. @@ -569,12 +558,11 @@ impl RuntimeTester { Ok(()) } - /// Get the last produced certificate with its signed entity if it's not a genesis certificate - pub async fn get_last_certificate_with_signed_entity( + /// Get the last produced signed entity + async fn get_last_signed_entity( &mut self, - ) -> StdResult<(Certificate, Option)> { - let certificate = self.observer.get_last_certificate().await?; - + certificate: &Certificate, + ) -> StdResult> { let signed_entity = match &certificate.signature { CertificateSignature::GenesisSignature(..) => None, CertificateSignature::MultiSignature(..) => { @@ -591,37 +579,40 @@ impl RuntimeTester { } }; - Ok((certificate, signed_entity)) + Ok(signed_entity) } /// Get the last produced certificate and transform it to a [ExpectedCertificate] - pub async fn get_last_expected_certificate(&mut self) -> StdResult { - let (certificate, signed_entity_record) = - self.get_last_certificate_with_signed_entity().await?; + pub async fn get_last_expected_certificate( + &mut self, + is_synchronized_from_leader: bool, + ) -> StdResult { + let certificate = self.observer.get_last_certificate().await?; - let expected_certificate = match signed_entity_record { - None if certificate.is_genesis() => ExpectedCertificate::new_genesis( + let expected_certificate = if certificate.is_genesis() { + ExpectedCertificate::new_genesis( certificate.epoch, certificate.aggregate_verification_key.try_into().unwrap(), - ), - None => { - panic!( + ) + } else { + let signed_entity_type = certificate.signed_entity_type(); + let previous_cert_identifier = self + .get_expected_certificate_identifier(&certificate.previous_hash) + .await?; + + if !is_synchronized_from_leader && !certificate.is_genesis() { + self.get_last_signed_entity(&certificate).await?.ok_or(anyhow!( "A certificate should always have a SignedEntity if it's not a genesis certificate" - ); - } - Some(record) => { - let previous_cert_identifier = self - .get_expected_certificate_identifier(&certificate.previous_hash) - .await?; - - ExpectedCertificate::new( - certificate.epoch, - certificate.metadata.signers.as_slice(), - certificate.aggregate_verification_key.try_into().unwrap(), - record.signed_entity_type, - previous_cert_identifier, - ) + ))?; } + + ExpectedCertificate::new( + certificate.epoch, + certificate.metadata.signers.as_slice(), + certificate.aggregate_verification_key.try_into().unwrap(), + signed_entity_type, + previous_cert_identifier, + ) }; Ok(expected_certificate) @@ -632,30 +623,22 @@ impl RuntimeTester { &mut self, certificate_hash: &str, ) -> StdResult { - let cert_identifier = match self + let certificate = self .dependencies - .signed_entity_storer - .get_signed_entity_by_certificate_id(certificate_hash) + .certificate_repository + .get_certificate::(certificate_hash) .await - .with_context(|| "Querying signed entity should not fail")? - { - Some(record) => ExpectedCertificate::identifier(&record.signed_entity_type), - None => { - // Certificate is a genesis certificate - let genesis_certificate = self - .dependencies - .certifier_service - .get_certificate_by_hash(certificate_hash) - .await - .with_context(|| "Querying genesis certificate should not fail")? - .ok_or(anyhow!( - "A genesis certificate should exist with hash {}", - certificate_hash - ))?; - ExpectedCertificate::genesis_identifier(genesis_certificate.epoch) - } - }; + .with_context(|| format!("Failed to query certificate with hash {certificate_hash}"))? + .ok_or(anyhow!( + "A certificate should exist with hash {}", + certificate_hash + ))?; + let cert_identifier = if certificate.is_genesis() { + ExpectedCertificate::genesis_identifier(certificate.epoch) + } else { + ExpectedCertificate::identifier(&certificate.signed_entity_type()) + }; Ok(cert_identifier) } @@ -665,22 +648,25 @@ impl RuntimeTester { &self, signed_entity_type_expected: &SignedEntityType, ) -> StdResult<()> { - let mut max_iteration = 100; - while !self - .observer - .is_last_signed_entity(signed_entity_type_expected) - .await? - { - max_iteration -= 1; - if max_iteration <= 0 { - return Err(anyhow!( - "Signed entity not found: {signed_entity_type_expected}" - )); - } - tokio::time::sleep(Duration::from_millis(1)).await; - } + async_wait!( + max_iter:100, sleep_ms:1, + condition: !self.observer.is_last_signed_entity(signed_entity_type_expected).await?, + error_msg: "Signed entity not found: {signed_entity_type_expected}" + ) + } - Ok(()) + /// Wait until the last stored certificate of the given signed entity type + /// corresponds to the expected signed entity type + pub async fn wait_until_certificate( + &self, + certificate_signed_entity_type: &SignedEntityType, + ) -> StdResult<()> { + async_wait!( + max_iter:100, sleep_ms:1, + condition: self.observer.get_last_certificate().await?.signed_entity_type() + != *certificate_signed_entity_type, + error_msg: "Certificate not found for signed entity: {certificate_signed_entity_type}" + ) } /// Returns the runtime cycle success and total metrics since startup diff --git a/mithril-aggregator/tests/test_extensions/utilities.rs b/mithril-aggregator/tests/test_extensions/utilities.rs index 3ac8c00b95d..c6921470b6a 100644 --- a/mithril-aggregator/tests/test_extensions/utilities.rs +++ b/mithril-aggregator/tests/test_extensions/utilities.rs @@ -29,3 +29,21 @@ macro_rules! comment { test_extensions::utilities::comment(format!($($comment)*)); }}; } + +#[macro_export] +macro_rules! async_wait { + ( max_iter:$max_iter:expr, sleep_ms:$sleep_ms:expr, condition:$condition:expr, error_msg:$($error_msg:tt)* + ) => {{ + let mut max_iteration: usize = $max_iter; + while $condition { + max_iteration -= 1; + if max_iteration == 0 { + return Err(anyhow::anyhow!($($error_msg)*)); + } + tokio::time::sleep(Duration::from_millis($sleep_ms)).await; + } + + Ok(()) + }}; +} +pub use async_wait; diff --git a/mithril-common/Cargo.toml b/mithril-common/Cargo.toml index fcc1e647c44..0f5a23f20cb 100644 --- a/mithril-common/Cargo.toml +++ b/mithril-common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-common" -version = "0.6.7" +version = "0.6.8" description = "Common types, interfaces, and utilities for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-common/src/test_utils/certificate_chain_builder.rs b/mithril-common/src/test_utils/certificate_chain_builder.rs index 6bf1beea035..9e121a84538 100644 --- a/mithril-common/src/test_utils/certificate_chain_builder.rs +++ b/mithril-common/src/test_utils/certificate_chain_builder.rs @@ -132,6 +132,11 @@ impl CertificateChainFixture { &self.certificates_chained[self.certificates_chained.len() - 1] } + /// Return the latest certificate of this chain + pub fn latest_certificate(&self) -> &Certificate { + &self.certificates_chained[0] + } + /// Return a copy of the chain but with reversed order (from genesis to last) pub fn reversed_chain(&self) -> Vec { self.certificates_chained.iter().rev().cloned().collect() @@ -1065,6 +1070,22 @@ mod test { assert!(chain_with_multiple_certificates.genesis_certificate().is_genesis()); } + #[test] + fn get_latest_certificate() { + let chain_with_only_a_genesis = CertificateChainBuilder::new() + .with_total_certificates(1) + .build(); + assert!(chain_with_only_a_genesis.latest_certificate().is_genesis()); + + let chain_with_multiple_certificates = CertificateChainBuilder::new() + .with_total_certificates(10) + .build(); + assert_eq!( + chain_with_multiple_certificates.latest_certificate(), + chain_with_multiple_certificates.first().unwrap() + ); + } + #[test] fn path_to_genesis_from_a_chain_with_one_certificate_per_epoch() { let chain = CertificateChainBuilder::new() diff --git a/mithril-test-lab/mithril-end-to-end/Cargo.toml b/mithril-test-lab/mithril-end-to-end/Cargo.toml index 19198097cde..54802e4d462 100644 --- a/mithril-test-lab/mithril-end-to-end/Cargo.toml +++ b/mithril-test-lab/mithril-end-to-end/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-end-to-end" -version = "0.4.95" +version = "0.4.96" authors = { workspace = true } edition = { workspace = true } documentation = { workspace = true } diff --git a/mithril-test-lab/mithril-end-to-end/src/assertions/exec.rs b/mithril-test-lab/mithril-end-to-end/src/assertions/exec.rs index e0a2c4eced4..3b78bee7ab2 100644 --- a/mithril-test-lab/mithril-end-to-end/src/assertions/exec.rs +++ b/mithril-test-lab/mithril-end-to-end/src/assertions/exec.rs @@ -7,17 +7,6 @@ use slog_scope::info; pub async fn bootstrap_genesis_certificate(aggregator: &Aggregator) -> StdResult<()> { info!("Bootstrap genesis certificate"; "aggregator" => &aggregator.name()); - - // A follower aggregator needs to wait few cycles of the state machine to be able to bootstrap - // This should be removed when the aggregator is able to synchronize its certificate chain from another aggregator - if !aggregator.is_first() { - const CYCLES_WAIT_FOLLOWER: u64 = 3; - tokio::time::sleep(std::time::Duration::from_millis( - CYCLES_WAIT_FOLLOWER * aggregator.mithril_run_interval() as u64, - )) - .await; - } - info!("> stopping aggregator"; "aggregator" => &aggregator.name()); aggregator.stop().await?; info!("> bootstrapping genesis using signers registered two epochs ago..."; "aggregator" => &aggregator.name()); diff --git a/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs b/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs index bbc65aab9c6..1a3aa4078a0 100644 --- a/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs +++ b/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use slog_scope::info; use tokio::task::JoinSet; use mithril_common::{ @@ -54,6 +55,15 @@ impl Spec { // As we get closer to the tip of the chain when signing, we'll be able to relax this constraint. assertions::transfer_funds(spec.infrastructure.devnet()).await?; + info!("Bootstrapping leader aggregator"); + spec.bootstrap_leader_aggregator(&spec.infrastructure).await?; + + info!("Starting followers"); + for follower_aggregator in spec.infrastructure.follower_aggregators() { + follower_aggregator.serve().await?; + } + + info!("Running scenarios"); for index in 0..spec.infrastructure.aggregators().len() { let spec_clone = spec.clone(); join_set.spawn(async move { @@ -72,44 +82,54 @@ impl Spec { Ok(()) } - pub async fn run_scenario( + pub async fn bootstrap_leader_aggregator( &self, - aggregator: &Aggregator, infrastructure: &MithrilInfrastructure, ) -> StdResult<()> { - assertions::wait_for_enough_immutable(aggregator).await?; - let chain_observer = aggregator.chain_observer(); + let leader_aggregator = infrastructure.leader_aggregator(); + + assertions::wait_for_enough_immutable(leader_aggregator).await?; + let chain_observer = leader_aggregator.chain_observer(); let start_epoch = chain_observer.get_current_epoch().await?.unwrap_or_default(); // Wait 4 epochs after start epoch for the aggregator to be able to bootstrap a genesis certificate let mut target_epoch = start_epoch + 4; assertions::wait_for_aggregator_at_target_epoch( - aggregator, + leader_aggregator, target_epoch, "minimal epoch for the aggregator to be able to bootstrap genesis certificate" .to_string(), ) .await?; - assertions::bootstrap_genesis_certificate(aggregator).await?; - assertions::wait_for_epoch_settings(aggregator).await?; + assertions::bootstrap_genesis_certificate(leader_aggregator).await?; + assertions::wait_for_epoch_settings(leader_aggregator).await?; // Wait 2 epochs before changing stake distribution, so that we use at least one original stake distribution target_epoch += 2; assertions::wait_for_aggregator_at_target_epoch( - aggregator, + leader_aggregator, target_epoch, "epoch after which the stake distribution will change".to_string(), ) .await?; - if aggregator.is_first() { - // Delegate some stakes to pools - let delegation_round = 1; - assertions::delegate_stakes_to_pools(infrastructure.devnet(), delegation_round).await?; - } + // Delegate some stakes to pools + let delegation_round = 1; + assertions::delegate_stakes_to_pools(infrastructure.devnet(), delegation_round).await?; + + Ok(()) + } + + pub async fn run_scenario( + &self, + aggregator: &Aggregator, + infrastructure: &MithrilInfrastructure, + ) -> StdResult<()> { + let chain_observer = aggregator.chain_observer(); + let start_epoch = chain_observer.get_current_epoch().await?.unwrap_or_default(); // Wait 2 epochs before changing protocol parameters - target_epoch += 2; + let mut target_epoch = start_epoch + 2; assertions::wait_for_aggregator_at_target_epoch( aggregator, target_epoch, diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs index f8d5df5ec86..5db25573fe2 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs @@ -103,17 +103,22 @@ impl MithrilInfrastructure { let relay_signer_registration_mode = &config.relay_signer_registration_mode; let relay_signature_registration_mode = &config.relay_signature_registration_mode; - let aggregators = - Self::start_aggregators(config, aggregator_cardano_nodes, chain_observer_type).await?; - let aggregator_endpoints = aggregators + let (leader_aggregator, follower_aggregators) = + Self::prepare_aggregators(config, aggregator_cardano_nodes, chain_observer_type) + .await?; + + Self::register_startup_era(&leader_aggregator, config).await?; + leader_aggregator.serve().await?; + + let follower_aggregator_endpoints = follower_aggregators .iter() .map(|aggregator| aggregator.endpoint()) .collect::>(); - let leader_aggregator_endpoint = aggregator_endpoints[0].to_owned(); let (relay_aggregators, relay_signers, relay_passives) = Self::start_relays( config, - &aggregator_endpoints, + leader_aggregator.endpoint(), + &follower_aggregator_endpoints, &signer_party_ids, relay_signer_registration_mode.to_owned(), relay_signature_registration_mode.to_owned(), @@ -121,7 +126,7 @@ impl MithrilInfrastructure { let signers = Self::start_signers( config, - leader_aggregator_endpoint, + leader_aggregator.endpoint(), signer_cardano_nodes, &relay_signers, ) @@ -132,11 +137,14 @@ impl MithrilInfrastructure { CardanoNetwork::DevNet(DEVNET_MAGIC_ID), )); + let mut all_aggregators = vec![leader_aggregator]; + all_aggregators.extend(follower_aggregators); + Ok(Self { bin_dir: config.bin_dir.to_path_buf(), artifacts_dir: config.artifacts_dir.to_path_buf(), devnet: config.devnet.clone(), - aggregators, + aggregators: all_aggregators, signers, relay_aggregators, relay_signers, @@ -176,8 +184,13 @@ impl MithrilInfrastructure { + 1; if self.era_reader_adapter == "cardano-chain" { let devnet = self.devnet.clone(); - assertions::register_era_marker(self.aggregator(0), &devnet, next_era, next_era_epoch) - .await?; + assertions::register_era_marker( + self.leader_aggregator(), + &devnet, + next_era, + next_era_epoch, + ) + .await?; } let mut current_era = self.current_era.write().await; *current_era = next_era.to_owned(); @@ -185,69 +198,80 @@ impl MithrilInfrastructure { Ok(()) } - async fn start_aggregators( + async fn prepare_aggregators( config: &MithrilInfrastructureConfig, - pool_nodes: &[FullNode], + full_nodes: &[FullNode], chain_observer_type: &str, - ) -> StdResult> { - let mut aggregators = vec![]; - let mut leader_aggregator_endpoint: Option = None; - for (index, full_node) in pool_nodes.iter().enumerate() { - let aggregator_name = Aggregator::name_suffix(index); - let aggregator_artifacts_dir = config - .artifacts_dir - .join(format!("mithril-aggregator-{aggregator_name}")); - let aggregator_store_dir = - config.store_dir.join(format!("aggregator-{aggregator_name}")); - let aggregator = Aggregator::new(&AggregatorConfig { - index, - name: &aggregator_name, - server_port: config.server_port + index as u64, + ) -> StdResult<(Aggregator, Vec)> { + let [leader_node, follower_nodes @ ..] = full_nodes else { + panic!("Can't prepare Aggregators: No full nodes found"); + }; + let leader_aggregator = + Self::prepare_aggregator(0, leader_node, config, chain_observer_type, None).await?; + + let mut follower_aggregators = vec![]; + for (index, full_node) in follower_nodes.iter().enumerate() { + let aggregator = Self::prepare_aggregator( + index + 1, full_node, - cardano_cli_path: &config.devnet.cardano_cli_path(), - work_dir: &config.work_dir, - store_dir: &aggregator_store_dir, - artifacts_dir: &aggregator_artifacts_dir, - bin_dir: &config.bin_dir, - cardano_node_version: &config.cardano_node_version, - mithril_run_interval: config.mithril_run_interval, - mithril_era: &config.mithril_era, - mithril_era_reader_adapter: &config.mithril_era_reader_adapter, - mithril_era_marker_address: &config.devnet.mithril_era_marker_address()?, - signed_entity_types: &config.signed_entity_types, + config, chain_observer_type, - leader_aggregator_endpoint: &leader_aggregator_endpoint.clone(), - })?; - - aggregator - .set_protocol_parameters(&ProtocolParameters { - k: 70, - m: 105, - phi_f: 0.95, - }) - .await; - - if leader_aggregator_endpoint.is_none() - && config.has_leader_follower_signer_registration() - { - leader_aggregator_endpoint = Some(aggregator.endpoint()); - } - - aggregators.push(aggregator); + Some(leader_aggregator.endpoint()), + ) + .await?; + follower_aggregators.push(aggregator); } - Self::register_startup_era(&aggregators[0], config).await?; - - for aggregator in &aggregators { - aggregator.serve().await?; - } + Ok((leader_aggregator, follower_aggregators)) + } - Ok(aggregators) + async fn prepare_aggregator( + index: usize, + full_node: &FullNode, + config: &MithrilInfrastructureConfig, + chain_observer_type: &str, + leader_aggregator_endpoint: Option, + ) -> StdResult { + let aggregator_name = Aggregator::name_suffix(index); + let aggregator_artifacts_dir = config + .artifacts_dir + .join(format!("mithril-aggregator-{aggregator_name}")); + let aggregator_store_dir = config.store_dir.join(format!("aggregator-{aggregator_name}")); + let aggregator = Aggregator::new(&AggregatorConfig { + index, + name: &aggregator_name, + server_port: config.server_port + index as u64, + full_node, + cardano_cli_path: &config.devnet.cardano_cli_path(), + work_dir: &config.work_dir, + store_dir: &aggregator_store_dir, + artifacts_dir: &aggregator_artifacts_dir, + bin_dir: &config.bin_dir, + cardano_node_version: &config.cardano_node_version, + mithril_run_interval: config.mithril_run_interval, + mithril_era: &config.mithril_era, + mithril_era_reader_adapter: &config.mithril_era_reader_adapter, + mithril_era_marker_address: &config.devnet.mithril_era_marker_address()?, + signed_entity_types: &config.signed_entity_types, + chain_observer_type, + leader_aggregator_endpoint: &leader_aggregator_endpoint, + })?; + + aggregator + .set_protocol_parameters(&ProtocolParameters { + k: 70, + m: 105, + phi_f: 0.95, + }) + .await; + + Ok(aggregator) } fn start_relays( config: &MithrilInfrastructureConfig, - aggregator_endpoints: &[String], + leader_aggregator_endpoint: String, + follower_aggregator_endpoints: &[String], signers_party_ids: &[PartyId], relay_signer_registration_mode: String, relay_signature_registration_mode: String, @@ -255,11 +279,15 @@ impl MithrilInfrastructure { if !config.use_relays { return Ok((vec![], vec![], vec![])); } + let aggregator_endpoints = [ + vec![leader_aggregator_endpoint.clone()], + follower_aggregator_endpoints.to_vec(), + ] + .concat(); let mut relay_aggregators: Vec = vec![]; let mut relay_signers: Vec = vec![]; let mut relay_passives: Vec = vec![]; - let leader_aggregator_endpoint = &aggregator_endpoints[0]; info!("Starting the Mithril infrastructure in P2P mode (experimental)"); @@ -287,7 +315,7 @@ impl MithrilInfrastructure { dial_to: bootstrap_peer_addr.clone(), relay_signer_registration_mode: relay_signer_registration_mode.clone(), relay_signature_registration_mode: relay_signature_registration_mode.clone(), - aggregator_endpoint: leader_aggregator_endpoint, + aggregator_endpoint: &leader_aggregator_endpoint, party_id: party_id.clone(), work_dir: &config.work_dir, bin_dir: &config.bin_dir, @@ -377,7 +405,7 @@ impl MithrilInfrastructure { signer.stop().await?; } - for aggregator in &self.aggregators { + for aggregator in self.aggregators() { aggregator.stop().await?; } @@ -396,6 +424,18 @@ impl MithrilInfrastructure { &self.aggregators[index] } + pub fn leader_aggregator(&self) -> &Aggregator { + &self.aggregators[0] + } + + pub fn follower_aggregators(&self) -> &[Aggregator] { + &self.aggregators[1..] + } + + pub fn follower_aggregator(&self, index: usize) -> &Aggregator { + &self.aggregators[index + 1] + } + pub fn signers(&self) -> &[Signer] { &self.signers } diff --git a/mithril-test-lab/mithril-end-to-end/src/run_only.rs b/mithril-test-lab/mithril-end-to-end/src/run_only.rs index 3774a7f8739..abcedfa59c2 100644 --- a/mithril-test-lab/mithril-end-to-end/src/run_only.rs +++ b/mithril-test-lab/mithril-end-to-end/src/run_only.rs @@ -1,10 +1,10 @@ use std::sync::Arc; -use tokio::task::JoinSet; +use slog_scope::info; use mithril_common::StdResult; -use crate::{Aggregator, MithrilInfrastructure, assertions}; +use crate::{MithrilInfrastructure, assertions}; pub struct RunOnly { pub infrastructure: Arc, @@ -17,51 +17,41 @@ impl RunOnly { pub async fn run(self) -> StdResult<()> { let run_only = Arc::new(self); - let mut join_set = JoinSet::new(); + info!("Bootstrapping leader aggregator"); + run_only.bootstrap_leader_aggregator(&run_only.infrastructure).await?; - for index in 0..run_only.infrastructure.aggregators().len() { - let run_only_clone = run_only.clone(); - join_set.spawn(async move { - let infrastructure = &run_only_clone.infrastructure; - - run_only_clone - .bootstrap_aggregator(infrastructure.aggregator(index), infrastructure) - .await - }); - } - - while let Some(res) = join_set.join_next().await { - res??; + info!("Starting followers"); + for follower_aggregator in run_only.infrastructure.follower_aggregators() { + follower_aggregator.serve().await?; } Ok(()) } - pub async fn bootstrap_aggregator( + pub async fn bootstrap_leader_aggregator( &self, - aggregator: &Aggregator, infrastructure: &MithrilInfrastructure, ) -> StdResult<()> { - assertions::wait_for_enough_immutable(aggregator).await?; - let chain_observer = aggregator.chain_observer(); + let leader_aggregator = infrastructure.leader_aggregator(); + + assertions::wait_for_enough_immutable(leader_aggregator).await?; + let chain_observer = leader_aggregator.chain_observer(); let start_epoch = chain_observer.get_current_epoch().await?.unwrap_or_default(); // Wait 3 epochs after start epoch for the aggregator to be able to bootstrap a genesis certificate let target_epoch = start_epoch + 3; assertions::wait_for_aggregator_at_target_epoch( - aggregator, + leader_aggregator, target_epoch, "minimal epoch for the aggregator to be able to bootstrap genesis certificate" .to_string(), ) .await?; - assertions::bootstrap_genesis_certificate(aggregator).await?; - assertions::wait_for_epoch_settings(aggregator).await?; + assertions::bootstrap_genesis_certificate(leader_aggregator).await?; + assertions::wait_for_epoch_settings(leader_aggregator).await?; - if aggregator.is_first() { - // Transfer some funds on the devnet to have some Cardano transactions to sign - assertions::transfer_funds(infrastructure.devnet()).await?; - } + // Transfer some funds on the devnet to have some Cardano transactions to sign + assertions::transfer_funds(infrastructure.devnet()).await?; Ok(()) }