repair: plumb shred resolver service, and send events from blockstore#428
repair: plumb shred resolver service, and send events from blockstore#428AshwinSekar wants to merge 5 commits intoanza-xyz:masterfrom
Conversation
dffe763 to
b0eb8f0
Compare
|
Adding @cpubot just for awareness. He was going to look into some of the shred/blockstore --> replay flow performance |
| @@ -0,0 +1,51 @@ | |||
| //! The shred resolver service listens to events emitted by | |||
There was a problem hiding this comment.
maybe duplicate_shred_resolver_service
There was a problem hiding this comment.
the idea was that should be a replacement to repair_service.rs so it sends out normal repair as well as duplicate scenarios.
wanted to avoid calling it ag_repair_service 😅
There was a problem hiding this comment.
we can just drag and drop it into repair_service when the time comes then
| let Ok(_events) = event_receiver.recv() else { | ||
| break; | ||
| }; | ||
| }); |
There was a problem hiding this comment.
do we want to be consistent with the error handling in block_creation_loop and event_handler:
alpenglow/core/src/block_creation_loop.rs
Lines 325 to 343 in b0eb8f0
There was a problem hiding this comment.
yeah definitely, i'll add proper error handling when i fill in the impl in the next PR
| lower_merkle_root: Hash, | ||
| higher_fec_set_index: u32, | ||
| higher_chained_merkle_root: Hash, |
There was a problem hiding this comment.
what does lower vs higher mean in this context?
There was a problem hiding this comment.
CMR conflict occurs when 2 adjacent FEC sets don't chain together correctly. the lower (by index) FEC sets' merkle root is not equal to the higher (by index) FEC sets' chained merkle root.
There was a problem hiding this comment.
actually #437 will give us anza-xyz/agave#7728 which enforces fixed 32:32 FEC sets.
Once that lands I can remove higher_fec_set_index as it will just be +32 (and some other places where I specify both index and fec_set_index). Will wait to merge this until then.
There was a problem hiding this comment.
ah dope was thinking it would be clearer to call it previous/current merkle root instead of lower/higher
There was a problem hiding this comment.
done I refactored now 5c7c2f5 we only need to track the previous fec set
|
|
||
| if let Some(shred_event_sender) = shred_event_sender { | ||
| for event in shred_events { | ||
| // TODO: handle error |
There was a problem hiding this comment.
probably need to resolve this on startup right?
There was a problem hiding this comment.
i.e. do we need to resend these events if we shut down, and then start back up again before these events are handled
There was a problem hiding this comment.
it should be fine, essentially repair will have to perform a blockstore scan initially (when receiving a certificate to initiate catchup, or a request to repair an alternate version from replay/safe to notar). The idea behind these events are just so we don't have to rescan to see if we've finished.
So if we restart then we don't need to resend these events.
| .push(ShredEvent::CompletedFECSet { | ||
| location, | ||
| slot, | ||
| fec_set_index, | ||
| is_last_in_slot, | ||
| }); |
There was a problem hiding this comment.
This can only be sent once because each shred in an FEC set only calls into this function check_insert_data_shred() once? i.e. we filter out existing shreds before this function is called
There was a problem hiding this comment.
exactly, we have a check up above (see PossibleDuplicateShred::Exists) that makes sure we don't already have the shred. So regardless of whether we naturally complete the FEC set or complete it as a result of FEC recovery, we'll only send this event out once.
| shred_events.push(ShredEvent::ChainedMerkleRootConflict { | ||
| location, | ||
| slot, | ||
| lower_fec_set_index: erasure_set.fec_set_index(), | ||
| lower_merkle_root: merkle_root.unwrap_or_default(), | ||
| higher_fec_set_index: next_erasure_set.fec_set_index(), | ||
| higher_chained_merkle_root: chained_merkle_root.unwrap_or_default(), | ||
| }); | ||
|
|
There was a problem hiding this comment.
the repair controller will be in charge of dumping these bad shreds?
For future PR's, might be good to organize them per event handler. Have a feeling it's going to be complicated 🫡
There was a problem hiding this comment.
Yeah exactly the MerkleRootConflict / ChainedMerkleRootConflict won't happen during normal operation, only if we're receiving duplicate blocks or getting a malicious repair response.
I wanted to avoid having to rescan blockstore after x ms and dump/re request repair in this case. Since it's a low prob event figured it's better to have it event driven - the repair controller gets notified when this happens and takes action.
b0eb8f0 to
72e1ce5
Compare
| pub(crate) fn is_data_set_complete(fec_set_index: u32, index: &Index) -> bool { | ||
| let data_indices = | ||
| u64::from(fec_set_index)..u64::from(fec_set_index) + (DATA_SHREDS_PER_FEC_BLOCK as u64); | ||
| index.data().range(data_indices).count() == DATA_SHREDS_PER_FEC_BLOCK |
There was a problem hiding this comment.
nit: could take the count_ones fastpath here
alpenglow/ledger/src/bit_vec.rs
Line 344 in 0c0c6ff
72e1ce5 to
7bc4555
Compare
|
We're going a different direction with repair, closing for now |
Problem
We need a controller to receive requests to repair blocks from the pool/replay and send requests to the network.
It needs to be blockstore aware so that it can send out additional repairs and reconcile invalid responses.
Additionally we don't want to poll blockstore every time to see if our requests have completed. I would like to set it up so that we have the minimal blockstore interactions to find out which shreds to request, and then can continue on notifications from blockstore.
Summary of Changes
Plumb together a new service for this (open to naming suggestions), impl in the next PR. Send events from blockstore for use as feedback in this service.