1- use slog_scope:: { error, info} ;
1+ use slog_scope:: { error, info, warn } ;
22use std:: collections:: HashMap ;
3+ use std:: sync:: Arc ;
34use std:: time:: Duration ;
45use thiserror:: Error ;
6+ use tokio:: sync:: RwLock ;
57use tokio:: time:: sleep;
68
7- use crate :: certificate_handler:: CertificateHandlerError ;
8- use crate :: single_signer:: SingleSignerError ;
99use mithril_common:: crypto_helper:: { key_encode_hex, Bytes } ;
1010use mithril_common:: digesters:: { Digester , DigesterError } ;
1111use mithril_common:: entities:: { self , Beacon , CertificatePending , SignerWithStake } ;
12- use mithril_common:: fake_data ;
12+ use mithril_common:: store :: stake_store :: { StakeStore , StakeStoreError , StakeStorer } ;
1313
1414use super :: certificate_handler:: CertificateHandler ;
1515use super :: single_signer:: SingleSigner ;
16+ use crate :: certificate_handler:: CertificateHandlerError ;
17+ use crate :: single_signer:: SingleSignerError ;
18+
19+ /// StakeStoreWrapper wraps a StakeStore
20+ pub type StakeStoreWrapper = Arc < RwLock < StakeStore > > ;
1621
1722pub struct Runtime {
1823 certificate_handler : Box < dyn CertificateHandler > ,
1924 single_signer : Box < dyn SingleSigner > ,
2025 digester : Box < dyn Digester > ,
2126 current_beacon : Option < Beacon > ,
27+ stake_store : StakeStoreWrapper ,
2228}
2329
2430#[ derive( Error , Debug ) ]
2531pub enum RuntimeError {
2632 #[ error( "single signatures computation failed: `{0}`" ) ]
2733 SingleSignaturesComputeFailed ( #[ from] SingleSignerError ) ,
34+
2835 #[ error( "could not retrieve pending certificate: `{0}`" ) ]
2936 RetrievePendingCertificateFailed ( #[ from] CertificateHandlerError ) ,
37+
3038 #[ error( "could not retrieve protocol initializer" ) ]
3139 RetrieveProtocolInitializerFailed ( ) ,
40+
3241 #[ error( "register signer failed: `{0}`" ) ]
3342 RegisterSignerFailed ( String ) ,
43+
3444 #[ error( "codec error:`{0}`" ) ]
3545 Codec ( String ) ,
46+
3647 #[ error( "digest computation failed: `{0}`" ) ]
3748 Digester ( #[ from] DigesterError ) ,
49+
50+ #[ error( "stake store error: '{0}'" ) ]
51+ StakeStore ( #[ from] StakeStoreError ) ,
52+
53+ #[ error( "no stakes available" ) ]
54+ UnavailableStakes ( ) ,
3855}
3956
4057impl Runtime {
4158 pub fn new (
4259 certificate_handler : Box < dyn CertificateHandler > ,
4360 single_signer : Box < dyn SingleSigner > ,
4461 digester : Box < dyn Digester > ,
62+ stake_store : StakeStoreWrapper ,
4563 ) -> Self {
4664 Self {
4765 certificate_handler,
4866 single_signer,
4967 digester,
5068 current_beacon : None ,
69+ stake_store,
5170 }
5271 }
5372
@@ -70,18 +89,18 @@ impl Runtime {
7089 {
7190 self . register_to_aggregator_if_needed ( ) . await ?;
7291
73- if self . should_register_signature ( & pending_certificate. beacon ) {
92+ if self . should_register_signatures ( & pending_certificate. beacon ) {
7493 let message = self . digester . compute_digest ( ) ?;
7594 info ! ( "Signing digest" ; "digester_result" => #?message) ;
76- self . register_signature ( message. digest . into_bytes ( ) , pending_certificate)
95+ self . register_signatures ( message. digest . into_bytes ( ) , pending_certificate)
7796 . await ?;
7897 }
7998 }
8099
81100 Ok ( ( ) )
82101 }
83102
84- fn should_register_signature ( & self , new_beacon : & Beacon ) -> bool {
103+ fn should_register_signatures ( & self , new_beacon : & Beacon ) -> bool {
85104 match & self . current_beacon {
86105 None => {
87106 info ! ( "Unknown beacon, signatures will be registered ..." ) ;
@@ -121,7 +140,7 @@ impl Runtime {
121140 Ok ( ( ) )
122141 }
123142
124- async fn register_signature (
143+ async fn register_signatures (
125144 & mut self ,
126145 message : Bytes ,
127146 pending_certificate : CertificatePending ,
@@ -132,10 +151,18 @@ impl Runtime {
132151 . map ( |signer| ( signer. party_id , signer. verification_key . as_str ( ) ) )
133152 . collect :: < HashMap < u64 , & str > > ( ) ;
134153
135- let stake_distribution = fake_data:: signers_with_stakes ( 5 ) ;
154+ #[ allow( unused_variables) ]
155+ let epoch = pending_certificate. beacon . epoch ;
156+ let epoch = 0 ; // TODO: to remove once the runtime feeds the stake distribution
157+ warn ! ( "Epoch computation is not final and needs to be fixed" ) ;
158+ let stake_store = self . stake_store . read ( ) . await ;
159+ let stake_distribution = stake_store
160+ . get_stakes ( epoch)
161+ . await ?
162+ . ok_or_else ( RuntimeError :: UnavailableStakes ) ?;
136163 let stake_distribution_extended = stake_distribution
137164 . into_iter ( )
138- . map ( |signer| {
165+ . map ( |( _ , signer) | {
139166 let verification_key = match verification_keys. get ( & signer. party_id ) {
140167 Some ( verification_key_found) => * verification_key_found,
141168 None => "" ,
@@ -169,8 +196,10 @@ mod tests {
169196 use super :: * ;
170197
171198 use mithril_common:: crypto_helper:: tests_setup:: * ;
199+ use mithril_common:: crypto_helper:: ProtocolStakeDistribution ;
172200 use mithril_common:: digesters:: { Digester , DigesterError , DigesterResult } ;
173201 use mithril_common:: fake_data;
202+ use mithril_common:: store:: adapter:: MemoryAdapter ;
174203 use mockall:: mock;
175204
176205 mock ! {
@@ -180,11 +209,31 @@ mod tests {
180209 }
181210 }
182211
212+ async fn setup_stake_store ( ) -> StakeStore {
213+ let total_signers = 5 ;
214+ let epoch = 0 ;
215+ let mut stake_store = StakeStore :: new ( Box :: new (
216+ MemoryAdapter :: < u64 , HashMap < u64 , entities:: SignerWithStake > > :: new ( None ) . unwrap ( ) ,
217+ ) ) ;
218+ let stakes: ProtocolStakeDistribution = fake_data:: signers_with_stakes ( total_signers)
219+ . into_iter ( )
220+ . map ( |signer| signer. into ( ) )
221+ . collect :: < _ > ( ) ;
222+ for stake in stakes {
223+ stake_store
224+ . save_stake ( epoch, stake. into ( ) )
225+ . await
226+ . expect ( "fake stake distribution update failed" ) ;
227+ }
228+ stake_store
229+ }
230+
183231 #[ tokio:: test]
184232 async fn signer_doesnt_sign_when_there_is_no_pending_certificate ( ) {
185233 let current_signer = & setup_signers ( 1 ) [ 0 ] ;
186234 let party_id = current_signer. clone ( ) . 0 ;
187235 let protocol_initializer = current_signer. 4 . clone ( ) ;
236+ let stake_store = setup_stake_store ( ) . await ;
188237 let mut mock_certificate_handler = MockCertificateHandler :: new ( ) ;
189238 let mut mock_single_signer = MockSingleSigner :: new ( ) ;
190239 let mock_digester = MockDigesterImpl :: new ( ) ;
@@ -211,12 +260,14 @@ mod tests {
211260 Box :: new ( mock_certificate_handler) ,
212261 Box :: new ( mock_single_signer) ,
213262 Box :: new ( mock_digester) ,
263+ Arc :: new ( RwLock :: new ( stake_store) ) ,
214264 ) ;
215265 assert ! ( signer. run( ) . await . is_ok( ) ) ;
216266 }
217267
218268 #[ tokio:: test]
219269 async fn signer_fails_when_pending_certificate_fails ( ) {
270+ let stake_store = setup_stake_store ( ) . await ;
220271 let mut mock_certificate_handler = MockCertificateHandler :: new ( ) ;
221272 let mut mock_single_signer = MockSingleSigner :: new ( ) ;
222273 let mock_digester = MockDigesterImpl :: new ( ) ;
@@ -235,6 +286,7 @@ mod tests {
235286 Box :: new ( mock_certificate_handler) ,
236287 Box :: new ( mock_single_signer) ,
237288 Box :: new ( mock_digester) ,
289+ Arc :: new ( RwLock :: new ( stake_store) ) ,
238290 ) ;
239291 assert_eq ! (
240292 RuntimeError :: RetrievePendingCertificateFailed (
@@ -250,6 +302,7 @@ mod tests {
250302 let current_signer = & setup_signers ( 1 ) [ 0 ] ;
251303 let party_id = current_signer. clone ( ) . 0 ;
252304 let protocol_initializer = current_signer. 4 . clone ( ) ;
305+ let stake_store = setup_stake_store ( ) . await ;
253306 let mut mock_certificate_handler = MockCertificateHandler :: new ( ) ;
254307 let mut mock_single_signer = MockSingleSigner :: new ( ) ;
255308 let mut mock_digester = MockDigesterImpl :: new ( ) ;
@@ -291,6 +344,7 @@ mod tests {
291344 Box :: new ( mock_certificate_handler) ,
292345 Box :: new ( mock_single_signer) ,
293346 Box :: new ( mock_digester) ,
347+ Arc :: new ( RwLock :: new ( stake_store) ) ,
294348 ) ;
295349 assert ! ( signer. run( ) . await . is_ok( ) ) ;
296350 assert ! ( signer. run( ) . await . is_ok( ) ) ;
@@ -301,6 +355,7 @@ mod tests {
301355 let current_signer = & setup_signers ( 1 ) [ 0 ] ;
302356 let party_id = current_signer. clone ( ) . 0 ;
303357 let protocol_initializer = current_signer. 4 . clone ( ) ;
358+ let stake_store = setup_stake_store ( ) . await ;
304359 let mut mock_certificate_handler = MockCertificateHandler :: new ( ) ;
305360 let mut mock_single_signer = MockSingleSigner :: new ( ) ;
306361 let mut mock_digester = MockDigesterImpl :: new ( ) ;
@@ -345,6 +400,7 @@ mod tests {
345400 Box :: new ( mock_certificate_handler) ,
346401 Box :: new ( mock_single_signer) ,
347402 Box :: new ( mock_digester) ,
403+ Arc :: new ( RwLock :: new ( stake_store) ) ,
348404 ) ;
349405 assert ! ( signer. run( ) . await . is_ok( ) ) ;
350406 assert ! ( signer. run( ) . await . is_ok( ) ) ;
@@ -355,6 +411,7 @@ mod tests {
355411 let current_signer = & setup_signers ( 1 ) [ 0 ] ;
356412 let party_id = current_signer. clone ( ) . 0 ;
357413 let protocol_initializer = current_signer. 4 . clone ( ) ;
414+ let stake_store = setup_stake_store ( ) . await ;
358415 let mut mock_certificate_handler = MockCertificateHandler :: new ( ) ;
359416 let mut mock_single_signer = MockSingleSigner :: new ( ) ;
360417 let mut mock_digester = MockDigesterImpl :: new ( ) ;
@@ -391,12 +448,14 @@ mod tests {
391448 Box :: new ( mock_certificate_handler) ,
392449 Box :: new ( mock_single_signer) ,
393450 Box :: new ( mock_digester) ,
451+ Arc :: new ( RwLock :: new ( stake_store) ) ,
394452 ) ;
395453 assert ! ( signer. run( ) . await . is_ok( ) ) ;
396454 }
397455
398456 #[ tokio:: test]
399457 async fn signer_fails_if_signature_computation_fails ( ) {
458+ let stake_store = setup_stake_store ( ) . await ;
400459 let mut mock_certificate_handler = MockCertificateHandler :: new ( ) ;
401460 let mut mock_single_signer = MockSingleSigner :: new ( ) ;
402461 let mut mock_digester = MockDigesterImpl :: new ( ) ;
@@ -421,6 +480,7 @@ mod tests {
421480 Box :: new ( mock_certificate_handler) ,
422481 Box :: new ( mock_single_signer) ,
423482 Box :: new ( mock_digester) ,
483+ Arc :: new ( RwLock :: new ( stake_store) ) ,
424484 ) ;
425485 assert_eq ! (
426486 RuntimeError :: SingleSignaturesComputeFailed (
@@ -437,6 +497,7 @@ mod tests {
437497 let party_id = current_signer. clone ( ) . 0 ;
438498 let protocol_initializer = current_signer. 4 . clone ( ) ;
439499 let pending_certificate = fake_data:: certificate_pending ( ) ;
500+ let stake_store = setup_stake_store ( ) . await ;
440501 let mut mock_certificate_handler = MockCertificateHandler :: new ( ) ;
441502 let mut mock_single_signer = MockSingleSigner :: new ( ) ;
442503 let mock_digester = MockDigesterImpl :: new ( ) ;
@@ -467,6 +528,7 @@ mod tests {
467528 Box :: new ( mock_certificate_handler) ,
468529 Box :: new ( mock_single_signer) ,
469530 Box :: new ( mock_digester) ,
531+ Arc :: new ( RwLock :: new ( stake_store) ) ,
470532 ) ;
471533 assert_eq ! (
472534 RuntimeError :: RegisterSignerFailed (
@@ -480,6 +542,7 @@ mod tests {
480542
481543 #[ tokio:: test]
482544 async fn signer_fails_if_digest_computation_fails ( ) {
545+ let stake_store = setup_stake_store ( ) . await ;
483546 let mut mock_certificate_handler = MockCertificateHandler :: new ( ) ;
484547 let mut mock_single_signer = MockSingleSigner :: new ( ) ;
485548 let mut mock_digester = MockDigesterImpl :: new ( ) ;
@@ -504,6 +567,7 @@ mod tests {
504567 Box :: new ( mock_certificate_handler) ,
505568 Box :: new ( mock_single_signer) ,
506569 Box :: new ( mock_digester) ,
570+ Arc :: new ( RwLock :: new ( stake_store) ) ,
507571 ) ;
508572 assert_eq ! (
509573 RuntimeError :: Digester ( DigesterError :: NotEnoughImmutable ( ) ) . to_string( ) ,
0 commit comments