Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use {
BytesPacket, BytesPacketBatch, PacketBatch, PacketBatchRecycler, PacketFlags, PacketRef,
},
solana_pubkey::Pubkey,
solana_runtime::bank_forks::BankForks,
solana_runtime::bank_forks::{BankForks, SharableBanks},
solana_streamer::{
evicting_sender::EvictingSender,
streamer::{self, ChannelSend, PacketBatchReceiver, StreamerReceiveStats},
Expand Down Expand Up @@ -64,7 +64,7 @@ impl ShredFetchStage {
recvr: PacketBatchReceiver,
recvr_stats: Option<Arc<StreamerReceiveStats>>,
sendr: EvictingSender<PacketBatch>,
bank_forks: &RwLock<BankForks>,
sharable_banks: &SharableBanks,
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was here anyway 🪓

shred_version: u16,
name: &'static str,
flags: PacketFlags,
Expand All @@ -82,32 +82,28 @@ impl ShredFetchStage {
let (
mut last_root,
mut slots_per_epoch,
mut _feature_set,
mut _epoch_schedule,
mut feature_set,
mut epoch_schedule,
mut last_slot,
) = {
let bank_forks_r = bank_forks.read().unwrap();
let root_bank = bank_forks_r.root_bank();
let root_bank = sharable_banks.root();
(
root_bank.slot(),
root_bank.get_slots_in_epoch(root_bank.epoch()),
root_bank.feature_set.clone(),
root_bank.epoch_schedule().clone(),
bank_forks_r.highest_slot(),
sharable_banks.working().slot(),
)
};
let mut stats = ShredFetchStats::default();

for mut packet_batch in recvr {
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
last_updated = Instant::now();
let root_bank = {
let bank_forks_r = bank_forks.read().unwrap();
last_slot = bank_forks_r.highest_slot();
bank_forks_r.root_bank()
};
_feature_set = root_bank.feature_set.clone();
_epoch_schedule = root_bank.epoch_schedule().clone();
last_slot = sharable_banks.working().slot();
let root_bank = sharable_banks.root();
feature_set = root_bank.feature_set.clone();
epoch_schedule = root_bank.epoch_schedule().clone();
last_root = root_bank.slot();
slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch());
keypair = repair_context.as_ref().copied().map(RepairContext::keypair);
Expand Down Expand Up @@ -149,6 +145,14 @@ impl ShredFetchStage {
// Filter out shreds that are way too far in the future to avoid the
// overhead of having to hold onto them.
let max_slot = last_slot + MAX_SHRED_DISTANCE_MINIMUM.max(2 * slots_per_epoch);
let enforce_fixed_fec_set = |shred_slot| {
check_feature_activation(
&agave_feature_set::enforce_fixed_fec_set::id(),
shred_slot,
&feature_set,
&epoch_schedule,
)
};
let turbine_disabled = turbine_disabled.load(Ordering::Relaxed);
for mut packet in packet_batch.iter_mut().filter(|p| !p.meta().discard()) {
if turbine_disabled
Expand All @@ -157,6 +161,7 @@ impl ShredFetchStage {
last_root,
max_slot,
shred_version,
enforce_fixed_fec_set,
Comment thread
bw-solana marked this conversation as resolved.
&mut stats,
)
{
Expand Down Expand Up @@ -197,6 +202,7 @@ impl ShredFetchStage {
repair_context: Option<RepairContext>,
turbine_disabled: Arc<AtomicBool>,
) -> (Vec<JoinHandle<()>>, JoinHandle<()>) {
let sharable_banks = bank_forks.read().unwrap().sharable_banks();
let (packet_sender, packet_receiver) =
EvictingSender::new_bounded(SHRED_FETCH_CHANNEL_SIZE);
let receiver_stats = Arc::new(StreamerReceiveStats::new(receiver_name));
Expand Down Expand Up @@ -225,7 +231,7 @@ impl ShredFetchStage {
packet_receiver,
Some(receiver_stats),
sender,
&bank_forks,
&sharable_banks,
shred_version,
name,
flags,
Expand Down Expand Up @@ -315,11 +321,12 @@ impl ShredFetchStage {
Builder::new()
.name("solTvuFetchRpr".to_string())
.spawn(move || {
let sharable_banks = bank_forks.read().unwrap().sharable_banks();
Self::modify_packets(
packet_receiver,
None,
sender,
&bank_forks,
&sharable_banks,
shred_version,
"shred_fetch_repair_quic",
PacketFlags::REPAIR,
Expand Down Expand Up @@ -348,11 +355,12 @@ impl ShredFetchStage {
Builder::new()
.name("solTvuFetchQuic".to_string())
.spawn(move || {
let sharable_banks = bank_forks.read().unwrap().sharable_banks();
Self::modify_packets(
packet_receiver,
None,
sender,
&bank_forks,
&sharable_banks,
shred_version,
"shred_fetch_quic",
PacketFlags::empty(),
Expand Down Expand Up @@ -436,7 +444,6 @@ pub(crate) fn receive_quic_datagrams(

// Returns true if the feature is effective for the shred slot.
#[must_use]
#[allow(dead_code)]
fn check_feature_activation(
feature: &Pubkey,
shred_slot: Slot,
Expand Down
5 changes: 5 additions & 0 deletions feature-set/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,10 @@ pub mod raise_cpi_nesting_limit_to_8 {
solana_pubkey::declare_id!("6TkHkRmP7JZy1fdM6fg5uXn76wChQBWGokHBJzrLB3mj");
}

pub mod enforce_fixed_fec_set {
solana_pubkey::declare_id!("fixfecLZYMfkGzwq6NJA11Yw6KYztzXiK9QcL3K78in");
}

pub static FEATURE_NAMES: LazyLock<AHashMap<Pubkey, &'static str>> = LazyLock::new(|| {
[
(secp256k1_program_enabled::id(), "secp256k1 program"),
Expand Down Expand Up @@ -1363,6 +1367,7 @@ pub static FEATURE_NAMES: LazyLock<AHashMap<Pubkey, &'static str>> = LazyLock::n
(raise_block_limits_to_100m::id(), "SIMD-0286: Raise block limit to 100M"),
(raise_account_cu_limit::id(), "SIMD-0306: Raise account CU limit to 40% max"),
(raise_cpi_nesting_limit_to_8::id(), "SIMD-0296: Raise CPI nesting limit from 4 to 8"),
(enforce_fixed_fec_set::id(), "SIMD-0317: Enforce 32 data + 32 coding shreds"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()
Expand Down
12 changes: 9 additions & 3 deletions ledger/src/blockstore_meta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{
bit_vec::BitVec,
shred::{self, Shred, ShredType, MAX_DATA_SHREDS_PER_SLOT},
shred::{self, Shred, ShredType, DATA_SHREDS_PER_FEC_BLOCK, MAX_DATA_SHREDS_PER_SLOT},
},
bitflags::bitflags,
serde::{Deserialize, Deserializer, Serialize, Serializer},
Expand Down Expand Up @@ -341,8 +341,14 @@ mod serde_compat_cast {

#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub(crate) struct ErasureConfig {
num_data: usize,
num_coding: usize,
pub(crate) num_data: usize,
pub(crate) num_coding: usize,
}

impl ErasureConfig {
pub(crate) fn is_fixed(&self) -> bool {
self.num_data == DATA_SHREDS_PER_FEC_BLOCK && self.num_coding == DATA_SHREDS_PER_FEC_BLOCK
}
}

#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
Loading
Loading