Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use {
solana_client::rpc_response::SlotUpdate,
solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
solana_ledger::{
blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache, shred::Shred,
blockstore::Blockstore,
leader_schedule_cache::LeaderScheduleCache,
shred::{Shred, ShredType},
},
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
Expand Down Expand Up @@ -137,14 +139,14 @@ impl RetransmitStats {
}
}

// Map of shred (slot, index, is_data) => list of hash values seen for that key.
type ShredFilter = LruCache<(Slot, u32, bool), Vec<u64>>;
// Map of shred (slot, index, type) => list of hash values seen for that key.
type ShredFilter = LruCache<(Slot, u32, ShredType), Vec<u64>>;

type ShredFilterAndHasher = (ShredFilter, PacketHasher);

// Returns true if shred is already received and should skip retransmit.
fn should_skip_retransmit(shred: &Shred, shreds_received: &Mutex<ShredFilterAndHasher>) -> bool {
let key = (shred.slot(), shred.index(), shred.is_data());
let key = (shred.slot(), shred.index(), shred.shred_type());
let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut();
match cache.get_mut(&key) {
Expand Down
13 changes: 6 additions & 7 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use {
solana_ledger::{
blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT},
leader_schedule_cache::LeaderScheduleCache,
shred::{Nonce, Shred},
shred::{Nonce, Shred, ShredType},
},
solana_measure::measure::Measure,
solana_metrics::{inc_new_counter_debug, inc_new_counter_error},
Expand Down Expand Up @@ -161,12 +161,11 @@ impl ReceiveWindowStats {
}

fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
if shred.is_data() {
match shred.shred_type() {
// Only data shreds have parent information
blockstore::verify_shred_slots(shred.slot(), shred.parent(), root)
} else {
ShredType::Data => blockstore::verify_shred_slots(shred.slot(), shred.parent(), root),
// Filter out outdated coding shreds
shred.slot() >= root
ShredType::Code => shred.slot() >= root,
}
}

Expand Down Expand Up @@ -218,8 +217,8 @@ fn run_check_duplicate(
if let Some(existing_shred_payload) = blockstore.is_shred_duplicate(
shred_slot,
shred.index(),
&shred.payload,
shred.is_data(),
shred.payload.clone(),
shred.shred_type(),
) {
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
blockstore.store_duplicate_slot(
Expand Down
2 changes: 1 addition & 1 deletion gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "D2ebKKmm6EQ8JJjYc3xUpzpBTJguqgEzShhj9fiUcP6F")]
#[frozen_abi(digest = "7cgH6JHdpxMSuPs6LEZzV5ShLXQMcZftb95s5PZKR5qB")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down
17 changes: 5 additions & 12 deletions gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn check_shreds(
// TODO: Should also allow two coding shreds with different indices but
// same fec-set-index and mismatching erasure-config.
Err(Error::ShredIndexMismatch)
} else if shred1.common_header.shred_type != shred2.common_header.shred_type {
} else if shred1.shred_type() != shred2.shred_type() {
Err(Error::ShredTypeMismatch)
} else if shred1.payload == shred2.payload {
Err(Error::InvalidDuplicateShreds)
Expand Down Expand Up @@ -119,11 +119,7 @@ pub fn from_duplicate_slot_proof(
let shred1 = Shred::new_from_serialized_shred(proof.shred1.clone())?;
let shred2 = Shred::new_from_serialized_shred(proof.shred2.clone())?;
check_shreds(leader_schedule, &shred1, &shred2)?;
let (slot, shred_index, shred_type) = (
shred1.slot(),
shred1.index(),
shred1.common_header.shred_type,
);
let (slot, shred_index, shred_type) = (shred1.slot(), shred1.index(), shred1.shred_type());
let data = bincode::serialize(proof)?;
let chunk_size = if DUPLICATE_SHRED_HEADER_SIZE < max_size {
max_size - DUPLICATE_SHRED_HEADER_SIZE
Expand Down Expand Up @@ -161,8 +157,7 @@ pub(crate) fn from_shred(
}
let other_shred = Shred::new_from_serialized_shred(other_payload.clone())?;
check_shreds(leader_schedule, &shred, &other_shred)?;
let (slot, shred_index, shred_type) =
(shred.slot(), shred.index(), shred.common_header.shred_type);
let (slot, shred_index, shred_type) = (shred.slot(), shred.index(), shred.shred_type());
let proof = DuplicateSlotProof {
shred1: shred.payload,
shred2: other_payload,
Expand Down Expand Up @@ -262,9 +257,7 @@ pub fn into_shreds(
Err(Error::SlotMismatch)
} else if shred1.index() != shred_index || shred2.index() != shred_index {
Err(Error::ShredIndexMismatch)
} else if shred1.common_header.shred_type != shred_type
|| shred2.common_header.shred_type != shred_type
{
} else if shred1.shred_type() != shred_type || shred2.shred_type() != shred_type {
Err(Error::ShredTypeMismatch)
} else if shred1.payload == shred2.payload {
Err(Error::InvalidDuplicateShreds)
Expand Down Expand Up @@ -306,7 +299,7 @@ pub(crate) mod tests {
wallclock: u64::MAX,
slot: Slot::MAX,
shred_index: u32::MAX,
shred_type: ShredType(u8::MAX),
shred_type: ShredType::Data,
num_chunks: u8::MAX,
chunk_index: u8::MAX,
chunk: Vec::default(),
Expand Down
2 changes: 2 additions & 0 deletions ledger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ itertools = "0.9.0"
lazy_static = "1.4.0"
libc = "0.2.81"
log = { version = "0.4.11" }
num-derive = "0.3"
num-traits = "0.2"
num_cpus = "1.13.0"
prost = "0.8.0"
rand = "0.7.0"
Expand Down
145 changes: 68 additions & 77 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use {
erasure::ErasureConfig,
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{Result as ShredResult, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK},
shred::{
Result as ShredResult, Shred, ShredType, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK,
SHRED_PAYLOAD_SIZE,
},
},
bincode::deserialize,
log::*,
Expand Down Expand Up @@ -827,51 +830,54 @@ impl Blockstore {
let mut newly_completed_data_sets: Vec<CompletedDataSetInfo> = vec![];
let mut inserted_indices = Vec::new();
for (i, (shred, is_repaired)) in shreds.into_iter().zip(is_repaired).enumerate() {
if shred.is_data() {
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
ShredSource::Turbine
};
match self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
leader_schedule,
shred_source,
) {
Err(InsertDataShredError::Exists) => metrics.num_data_shreds_exists += 1,
Err(InsertDataShredError::InvalidShred) => metrics.num_data_shreds_invalid += 1,
Err(InsertDataShredError::BlockstoreError(_)) => {
metrics.num_data_shreds_blockstore_error += 1;
}
Ok(completed_data_sets) => {
newly_completed_data_sets.extend(completed_data_sets);
inserted_indices.push(i);
metrics.num_inserted += 1;
}
};
} else if shred.is_code() {
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
handle_duplicate,
is_trusted,
is_repaired,
metrics,
);
} else {
panic!("There should be no other case");
}
match shred.shred_type() {
ShredType::Data => {
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
ShredSource::Turbine
};
match self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
leader_schedule,
shred_source,
) {
Err(InsertDataShredError::Exists) => metrics.num_data_shreds_exists += 1,
Err(InsertDataShredError::InvalidShred) => {
metrics.num_data_shreds_invalid += 1
}
Err(InsertDataShredError::BlockstoreError(_)) => {
metrics.num_data_shreds_blockstore_error += 1;
}
Ok(completed_data_sets) => {
newly_completed_data_sets.extend(completed_data_sets);
inserted_indices.push(i);
metrics.num_inserted += 1;
}
};
}
ShredType::Code => {
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
handle_duplicate,
is_trusted,
is_repaired,
metrics,
);
}
};
}
start.stop();

Expand Down Expand Up @@ -1345,7 +1351,6 @@ impl Blockstore {
leader_schedule: Option<&LeaderScheduleCache>,
shred_source: ShredSource,
) -> bool {
use crate::shred::SHRED_PAYLOAD_SIZE;
let shred_index = u64::from(shred.index());
let slot = shred.slot();
let last_in_slot = if shred.last_in_slot() {
Expand Down Expand Up @@ -1574,7 +1579,6 @@ impl Blockstore {
}

pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result<Option<Vec<u8>>> {
use crate::shred::SHRED_PAYLOAD_SIZE;
self.data_shred_cf.get_bytes((slot, index)).map(|data| {
data.map(|mut d| {
// Only data_header.size bytes stored in the blockstore so
Expand Down Expand Up @@ -3032,31 +3036,18 @@ impl Blockstore {
&self,
slot: u64,
index: u32,
new_shred_raw: &[u8],
is_data: bool,
mut payload: Vec<u8>,
shred_type: ShredType,
) -> Option<Vec<u8>> {
let res = if is_data {
self.get_data_shred(slot, index as u64)
.expect("fetch from DuplicateSlots column family failed")
} else {
self.get_coding_shred(slot, index as u64)
.expect("fetch from DuplicateSlots column family failed")
};

let mut payload = new_shred_raw.to_vec();
payload.resize(
std::cmp::max(new_shred_raw.len(), crate::shred::SHRED_PAYLOAD_SIZE),
0,
);
let existing_shred = match shred_type {
ShredType::Data => self.get_data_shred(slot, index as u64),
ShredType::Code => self.get_coding_shred(slot, index as u64),
}
.expect("fetch from DuplicateSlots column family failed")?;
let size = payload.len().max(SHRED_PAYLOAD_SIZE);
payload.resize(size, 0u8);
let new_shred = Shred::new_from_serialized_shred(payload).unwrap();
res.map(|existing_shred| {
if existing_shred != new_shred.payload {
Some(existing_shred)
} else {
None
}
})
.unwrap_or(None)
(existing_shred != new_shred.payload).then(|| existing_shred)
}

pub fn has_duplicate_shreds_in_slot(&self, slot: Slot) -> bool {
Expand Down Expand Up @@ -8228,17 +8219,17 @@ pub mod tests {
blockstore.is_shred_duplicate(
slot,
0,
&duplicate_shred.payload,
duplicate_shred.is_data()
duplicate_shred.payload.clone(),
duplicate_shred.shred_type()
),
Some(shred.payload.to_vec())
);
assert!(blockstore
.is_shred_duplicate(
slot,
0,
&non_duplicate_shred.payload,
duplicate_shred.is_data()
non_duplicate_shred.payload,
duplicate_shred.shred_type()
)
.is_none());

Expand Down Expand Up @@ -8726,8 +8717,8 @@ pub mod tests {
.is_shred_duplicate(
slot,
even_smaller_last_shred_duplicate.index(),
&even_smaller_last_shred_duplicate.payload,
true
even_smaller_last_shred_duplicate.payload.clone(),
ShredType::Data,
)
.is_some());
blockstore
Expand Down
Loading