Skip to content

Commit c99acb2

Browse files
authored
allows inserting shreds into blockstore using a reference (anza-xyz#4993)
Shreds generated during leader slots are concurrently inserted into blockstore while also broadcasted to turbine. Currently shreds are shared between the two paths using an Arc<Vec<Shred>> and sent to both BroadcastRun::record and BroadcastRun::transmit: https://github.com/anza-xyz/agave/blob/ddec7bdbc/turbine/src/broadcast_stage/standard_broadcast_run.rs#L224-L226 https://github.com/anza-xyz/agave/blob/ddec7bdbc/turbine/src/broadcast_stage.rs#L321 https://github.com/anza-xyz/agave/blob/ddec7bdbc/turbine/src/broadcast_stage.rs#L344 However, blockstore API for inserting shreds currently require an owned shred anyways: https://github.com/anza-xyz/agave/blob/ddec7bdbc/ledger/src/blockstore.rs#L1231 and we cannot get an owned shred out of Arc<Vec<Shred>> if there are other references to the Arc. As a result StandardBroadcastRun::insert is forced to do Arc::unwrap_or_clone anyways without any guarantee that the other reference (turbine broadcast path) is already dropped: https://github.com/anza-xyz/agave/blob/b59c1aee7/turbine/src/broadcast_stage/standard_broadcast_run.rs#L370 This defeats the point of using Arc<Vec<Shred>> to begin with. The commit updates blockstore insert-shreds API to instead work with Cow<'a, Shred>. This allows to insert shreds into the blockstore with either a reference or an owned value, and so Arc::unwrap_or_clone in StandardBroadcastRun::insert can be avoided.
1 parent a705c76 commit c99acb2

File tree

5 files changed

+89
-62
lines changed

5 files changed

+89
-62
lines changed

core/src/validator.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -2460,7 +2460,8 @@ fn cleanup_blockstore_incorrect_shred_versions(
24602460
let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
24612461
for (slot, _meta) in slot_meta_iterator {
24622462
let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2463-
let _ = backup_blockstore.insert_shreds(shreds, None, true);
2463+
let shreds = shreds.into_iter().map(Cow::Owned);
2464+
let _ = backup_blockstore.insert_cow_shreds(shreds, None, true);
24642465
num_slots_copied += 1;
24652466

24662467
if print_timer.elapsed() > PRINT_INTERVAL {

core/src/window_service.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use {
2727
solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT},
2828
solana_turbine::cluster_nodes,
2929
std::{
30+
borrow::Cow,
3031
net::UdpSocket,
3132
sync::{
3233
atomic::{AtomicBool, AtomicUsize, Ordering},
@@ -211,7 +212,7 @@ where
211212
debug_assert_matches!(shred, shred::Payload::Shared(_));
212213
}
213214
let shred = Shred::new_from_serialized_shred(shred).ok()?;
214-
Some((shred, repair))
215+
Some((Cow::Owned(shred), repair))
215216
};
216217
let now = Instant::now();
217218
let shreds: Vec<_> = thread_pool.install(|| {
@@ -608,9 +609,9 @@ mod test {
608609
};
609610
assert_eq!(duplicate_shred.slot(), slot);
610611
// Simulate storing both duplicate shreds in the same batch
611-
let shreds = [original_shred.clone(), duplicate_shred.clone()]
612+
let shreds = [&original_shred, &duplicate_shred]
612613
.into_iter()
613-
.map(|shred| (shred, /*is_repaired:*/ false));
614+
.map(|shred| (Cow::Borrowed(shred), /*is_repaired:*/ false));
614615
blockstore
615616
.insert_shreds_handle_duplicate(
616617
shreds,

ledger-tool/src/blockstore.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use {
3131
hash::Hash,
3232
},
3333
std::{
34+
borrow::Cow,
3435
collections::{BTreeMap, BTreeSet, HashMap},
3536
fs::File,
3637
io::{stdout, BufRead, BufReader, Write},
@@ -676,7 +677,8 @@ fn do_blockstore_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) -
676677
break;
677678
}
678679
let shreds = source.get_data_shreds_for_slot(slot, 0)?;
679-
if target.insert_shreds(shreds, None, true).is_err() {
680+
let shreds = shreds.into_iter().map(Cow::Owned);
681+
if target.insert_cow_shreds(shreds, None, true).is_err() {
680682
warn!("error inserting shreds for slot {}", slot);
681683
}
682684
}

0 commit comments

Comments
 (0)