Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 166 additions & 22 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,20 +396,27 @@ impl BankingStage {
data_budget: &DataBudget,
) -> std::io::Result<()> {
let packets = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
inc_new_counter_info!("banking_stage-forwarded_packets", packets.len());
const INTERVAL_MS: u64 = 100;
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5;
data_budget.update(INTERVAL_MS, |bytes| {
std::cmp::min(bytes + MAX_BYTES_PER_INTERVAL, MAX_BYTES_BUDGET)
std::cmp::min(
bytes.saturating_add(MAX_BYTES_PER_INTERVAL),
MAX_BYTES_BUDGET,
)
});

let mut forwarded_packet_count = 0;
for p in packets {
if data_budget.take(p.meta.size) {
if !p.meta.forwarded && data_budget.take(p.meta.size) {
forwarded_packet_count = forwarded_packet_count.saturating_add(1);
socket.send_to(&p.data[..p.meta.size], &tpu_forwards)?;
}
}

inc_new_counter_info!("banking_stage-forwarded_packets", forwarded_packet_count);

Ok(())
}

Expand Down Expand Up @@ -1696,7 +1703,7 @@ mod tests {
system_transaction,
transaction::TransactionError,
},
solana_streamer::socket::SocketAddrSpace,
solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace},
solana_transaction_status::TransactionWithStatusMeta,
solana_vote_program::vote_transaction,
std::{
Expand Down Expand Up @@ -2978,16 +2985,15 @@ mod tests {
fn test_forwarder_budget() {
solana_logger::setup();
// Create `PacketBatch` with 1 unprocessed packet
let single_packet_batch = PacketBatch::new(vec![Packet::default()]);
let mut unprocessed_packets: UnprocessedPacketBatches =
vec![(single_packet_batch, vec![0], false)]
.into_iter()
.collect();

let cluster_info = new_test_cluster_info(Node::new_localhost().info);
let packet = Packet::from_data(None, &[0]).unwrap();
let single_packet_batch = PacketBatch::new(vec![packet]);

let genesis_config_info = create_slow_genesis_config(10_000);
let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info;
let GenesisConfigInfo {
genesis_config,
validator_pubkey,
..
} = &genesis_config_info;

let bank = Arc::new(Bank::new_no_wallclock_throttle(genesis_config));
let ledger_path = get_tmp_ledger_path!();
Expand All @@ -3006,17 +3012,155 @@ mod tests {
let (exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank, &blockstore, Some(poh_config));

let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let data_budget = DataBudget::default();
BankingStage::handle_forwarding(
&ForwardOption::ForwardTransaction,
&cluster_info,
&mut unprocessed_packets,
&poh_recorder,
&socket,
false,
&data_budget,
let local_node = Node::new_localhost_with_pubkey(validator_pubkey);
let cluster_info = new_test_cluster_info(local_node.info);
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let recv_socket = &local_node.sockets.tpu_forwards[0];

let test_cases = vec![
("budget-restricted", DataBudget::restricted(), 0),
("budget-available", DataBudget::default(), 1),
];

for (name, data_budget, expected_num_forwarded) in test_cases {
let mut unprocessed_packet_batches: UnprocessedPacketBatches =
vec![(single_packet_batch.clone(), vec![0], false)]
.into_iter()
.collect();
BankingStage::handle_forwarding(
&ForwardOption::ForwardTransaction,
&cluster_info,
&mut unprocessed_packet_batches,
&poh_recorder,
&send_socket,
true,
&data_budget,
);

recv_socket
.set_nonblocking(expected_num_forwarded == 0)
.unwrap();

let mut packets = vec![Packet::default(); 2];
let (_, num_received) =
recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default();
assert_eq!(num_received, expected_num_forwarded, "{}", name);
}

exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
Blockstore::destroy(&ledger_path).unwrap();
}

#[test]
fn test_handle_forwarding() {
solana_logger::setup();

const FWD_PACKET: u8 = 1;
let forwarded_packet = {
let mut packet = Packet::from_data(None, &[FWD_PACKET]).unwrap();
packet.meta.forwarded = true;
packet
};

const NORMAL_PACKET: u8 = 2;
let normal_packet = Packet::from_data(None, &[NORMAL_PACKET]).unwrap();

let packet_batch = PacketBatch::new(vec![forwarded_packet, normal_packet]);
let mut unprocessed_packet_batches: UnprocessedPacketBatches =
vec![(packet_batch, vec![0, 1], false)]
.into_iter()
.collect();

let genesis_config_info = create_slow_genesis_config(10_000);
let GenesisConfigInfo {
genesis_config,
validator_pubkey,
..
} = &genesis_config_info;
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config));
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger"),
);
let poh_config = PohConfig {
// limit tick count to avoid clearing working_bank at
// PohRecord then PohRecorderError(MaxHeightReached) at BankingStage
target_tick_count: Some(bank.max_tick_height() - 1),
..PohConfig::default()
};

let (exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank, &blockstore, Some(poh_config));

let local_node = Node::new_localhost_with_pubkey(validator_pubkey);
let cluster_info = new_test_cluster_info(local_node.info);
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let recv_socket = &local_node.sockets.tpu_forwards[0];

let test_cases = vec![
("not-forward", ForwardOption::NotForward, true, vec![], 2),
(
"fwd-normal",
ForwardOption::ForwardTransaction,
true,
vec![NORMAL_PACKET],
2,
),
(
"fwd-no-op",
ForwardOption::ForwardTransaction,
true,
vec![],
2,
),
(
"fwd-no-hold",
ForwardOption::ForwardTransaction,
false,
vec![],
0,
),
];

for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases {
BankingStage::handle_forwarding(
&forward_option,
&cluster_info,
&mut unprocessed_packet_batches,
&poh_recorder,
&send_socket,
hold,
&DataBudget::default(),
);

recv_socket
.set_nonblocking(expected_ids.is_empty())
.unwrap();

let mut packets = vec![Packet::default(); 2];
let (_, num_received) =
recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default();
assert_eq!(num_received, expected_ids.len(), "{}", name);
for (i, expected_id) in expected_ids.iter().enumerate() {
assert_eq!(packets[i].meta.size, 1);
assert_eq!(packets[i].data[0], *expected_id, "{}", name);
}

let num_unprocessed_packets: usize = unprocessed_packet_batches
.iter()
.map(|(b, ..)| b.packets.len())
.sum();
assert_eq!(
num_unprocessed_packets, expected_num_unprocessed,
"{}",
name
);
}

exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
Expand Down
6 changes: 1 addition & 5 deletions core/src/commitment_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,7 @@ mod tests {

let validator_vote_keypairs = ValidatorVoteKeypairs::new_rand();
let validator_keypairs = vec![&validator_vote_keypairs];
let GenesisConfigInfo {
genesis_config,
mint_keypair: _,
voting_keypair: _,
} = create_genesis_config_with_vote_accounts(
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts(
1_000_000_000,
&validator_keypairs,
vec![100; 1],
Expand Down
16 changes: 11 additions & 5 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
solana_metrics::{inc_new_counter_debug, inc_new_counter_info},
solana_perf::{packet::PacketBatchRecycler, recycler::Recycler},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::clock::DEFAULT_TICKS_PER_SLOT,
solana_sdk::{clock::DEFAULT_TICKS_PER_SLOT, packet::Packet},
solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender},
std::{
net::UdpSocket,
Expand Down Expand Up @@ -83,10 +83,16 @@ impl FetchStage {
sendr: &PacketBatchSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> Result<()> {
let packet_batch = recvr.recv()?;
let mark_forwarded = |packet: &mut Packet| {
packet.meta.forwarded = true;
};

let mut packet_batch = recvr.recv()?;
let mut num_packets = packet_batch.packets.len();
packet_batch.packets.iter_mut().for_each(mark_forwarded);
let mut packet_batches = vec![packet_batch];
while let Ok(packet_batch) = recvr.try_recv() {
while let Ok(mut packet_batch) = recvr.try_recv() {
packet_batch.packets.iter_mut().for_each(mark_forwarded);
num_packets += packet_batch.packets.len();
packet_batches.push(packet_batch);
// Read at most 1K transactions in a loop
Expand Down Expand Up @@ -114,7 +120,7 @@ impl FetchStage {
}

fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>,
tpu_sockets: Vec<Arc<UdpSocket>>,
tpu_forwards_sockets: Vec<Arc<UdpSocket>>,
tpu_vote_sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>,
Expand All @@ -125,7 +131,7 @@ impl FetchStage {
) -> Self {
let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024);

let tpu_threads = sockets.into_iter().map(|socket| {
let tpu_threads = tpu_sockets.into_iter().map(|socket| {
streamer::receiver(
socket,
exit,
Expand Down
2 changes: 1 addition & 1 deletion core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl ShredFetchStage {
recycler.clone(),
bank_forks.clone(),
"shred_fetch_tvu_forwards",
|p| p.meta.forward = true,
|p| p.meta.forwarded = true,
);

let (repair_receiver, repair_handler) = Self::packet_modifier(
Expand Down
Loading