Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/repair/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ pub mod request_response;
pub mod result;
pub mod serve_repair;
pub mod serve_repair_service;
pub mod shred_resolver_service;
pub(crate) mod standard_repair_handler;
51 changes: 51 additions & 0 deletions core/src/repair/shred_resolver_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//! The shred resolver service listens to events emitted by
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe duplicate_shred_resolver_service

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea was that should be a replacement to repair_service.rs so it sends out normal repair as well as duplicate scenarios.

wanted to avoid calling it ag_repair_service 😅

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can just drag and drop it into repair_service when the time comes then

//!
//! Blockstore:
//! - FEC set has been ingested or successfully completed
//! - A conflicting shred has been ingested
//!
//! Votor:
//! - A block has reached "canonical" status, where canonical
//! means that it is the unique block of interest in the slot.
//! This is a result of the block or a descendant receiving
//! a Notarize, FastFinalizeation, or SlowFinalization certificate
//! - An alternate version of a block has been requested in order to
//! progress replay or check safe to notar conditions. We must fetch
//! these shreds, but it is not yet clear if this is the canonical block.
//!
//! Using these events it plans how to fetch the correct shreds and resolves
//! any incorrect shreds ingested. It interfaces with repair to send out
//! the appropriate requests.

use {
super::repair_service::OutstandingShredRepairs,
solana_ledger::{blockstore::Blockstore, shred_event::ShredEventReceiver},
std::{
sync::{Arc, RwLock},
thread::{self, JoinHandle},
},
};

pub struct ShredResolverService {
t_listen: JoinHandle<()>,
}

impl ShredResolverService {
pub fn new(
_blockstore: Arc<Blockstore>,
event_receiver: ShredEventReceiver,
_outstanding_repairs: Arc<RwLock<OutstandingShredRepairs>>,
) -> Self {
let t_listen = thread::spawn(move || loop {
let Ok(_events) = event_receiver.recv() else {
break;
};
});
Comment on lines +40 to +43
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to be consistent with the error handling in block_creation_loop and event_handler:

match record_receiver.recv_timeout(Duration::from_millis(400)) {
Ok(record) => {
if record
.sender
.send(poh_recorder.write().unwrap().record(
record.slot,
record.mixins,
record.transaction_batches,
))
.is_err()
{
panic!("Error returning mixin hashes");
}
}
Err(RecvTimeoutError::Disconnected) => {
info!("Record receiver disconnected");
return;
}
Err(RecvTimeoutError::Timeout) => (),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah definitely, i'll add proper error handling when i fill in the impl in the next PR


Self { t_listen }
}

pub fn join(self) -> thread::Result<()> {
self.t_listen.join()
}
}
23 changes: 20 additions & 3 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@ use {
repair_service::{
OutstandingShredRepairs, RepairInfo, RepairService, RepairServiceChannels,
},
shred_resolver_service::ShredResolverService,
},
result::{Error, Result},
},
agave_feature_set as feature_set,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError, Sender},
rayon::{prelude::*, ThreadPool},
solana_clock::{Slot, DEFAULT_MS_PER_SLOT},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
blockstore_meta::BlockLocation,
leader_schedule_cache::LeaderScheduleCache,
shred::{self, ReedSolomonCache, Shred},
shred::{self, ReedSolomonCache, Shred, MAX_DATA_SHREDS_PER_SLOT},
shred_event::ShredEventSender,
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
Expand Down Expand Up @@ -199,6 +201,7 @@ fn run_insert<F>(
ws_metrics: &mut WindowServiceMetrics,
completed_data_sets_sender: Option<&CompletedDataSetsSender>,
retransmit_sender: &EvictingSender<Vec<shred::Payload>>,
shred_event_sender: &ShredEventSender,
reed_solomon_cache: &ReedSolomonCache,
accept_repairs_only: bool,
) -> Result<()>
Expand Down Expand Up @@ -238,6 +241,7 @@ where
false, // is_trusted
retransmit_sender,
&handle_duplicate,
Some(shred_event_sender),
reed_solomon_cache,
metrics,
)?;
Expand Down Expand Up @@ -280,6 +284,7 @@ pub(crate) struct WindowService {
t_check_duplicate: JoinHandle<()>,
repair_service: RepairService,
certificate_service: CertificateService,
shred_resolver_service: ShredResolverService,
}

impl WindowService {
Expand Down Expand Up @@ -322,6 +327,13 @@ impl WindowService {
let certificate_service =
CertificateService::new(exit.clone(), blockstore.clone(), certificate_receiver);

let (shred_event_sender, shred_event_receiver) = bounded(MAX_DATA_SHREDS_PER_SLOT);
let shred_resolver_service = ShredResolverService::new(
blockstore.clone(),
shred_event_receiver,
outstanding_repair_requests.clone(),
);

let (duplicate_sender, duplicate_receiver) = unbounded();

let t_check_duplicate = Self::start_check_duplicate_thread(
Expand All @@ -341,6 +353,7 @@ impl WindowService {
duplicate_sender,
completed_data_sets_sender,
retransmit_sender,
shred_event_sender,
accept_repairs_only,
);

Expand All @@ -349,6 +362,7 @@ impl WindowService {
t_check_duplicate,
repair_service,
certificate_service,
shred_resolver_service,
}
}

Expand Down Expand Up @@ -391,6 +405,7 @@ impl WindowService {
check_duplicate_sender: Sender<PossibleDuplicateShred>,
completed_data_sets_sender: Option<CompletedDataSetsSender>,
retransmit_sender: EvictingSender<Vec<shred::Payload>>,
shred_event_sender: ShredEventSender,
accept_repairs_only: bool,
) -> JoinHandle<()> {
let handle_error = || {
Expand Down Expand Up @@ -426,6 +441,7 @@ impl WindowService {
&mut ws_metrics,
completed_data_sets_sender.as_ref(),
&retransmit_sender,
&shred_event_sender,
&reed_solomon_cache,
accept_repairs_only,
) {
Expand Down Expand Up @@ -467,7 +483,8 @@ impl WindowService {
self.t_insert.join()?;
self.t_check_duplicate.join()?;
self.repair_service.join()?;
self.certificate_service.join()
self.certificate_service.join()?;
self.shred_resolver_service.join()
}
}

Expand Down
Loading