diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index b3cd01f828b..ac0451a98c0 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -2208,7 +2208,7 @@ mod tests { get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, }, - solana_perf::packet::{to_packet_batches, PacketFlags}, + solana_perf::packet::to_packet_batches, solana_poh::{ poh_recorder::{create_test_recorder, Record, WorkingBankEntry}, poh_service::PohService, @@ -2229,7 +2229,7 @@ mod tests { system_transaction, transaction::{MessageHash, Transaction, TransactionError, VersionedTransaction}, }, - solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace}, + solana_streamer::socket::SocketAddrSpace, solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta}, std::{ borrow::Cow, @@ -4118,227 +4118,227 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } - #[test] - fn test_forwarder_budget() { - solana_logger::setup(); - // Create `PacketBatch` with 1 unprocessed packet - let tx = system_transaction::transfer( - &Keypair::new(), - &solana_sdk::pubkey::new_rand(), - 1, - Hash::new_unique(), - ); - let packet = Packet::from_data(None, &tx).unwrap(); - let deserialized_packet = DeserializedPacket::new(packet).unwrap(); - - let genesis_config_info = create_slow_genesis_config(10_000); - let GenesisConfigInfo { - genesis_config, - validator_pubkey, - .. - } = &genesis_config_info; - - let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config); - let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); - let ledger_path = get_tmp_ledger_path_auto_delete!(); - { - let blockstore = Arc::new( - Blockstore::open(ledger_path.path()) - .expect("Expected to be able to open database ledger"), - ); - let poh_config = PohConfig { - // limit tick count to avoid clearing working_bank at - // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage - target_tick_count: Some(bank.max_tick_height() - 1), - ..PohConfig::default() - }; - - let (exit, poh_recorder, poh_service, _entry_receiver) = - create_test_recorder(&bank, &blockstore, Some(poh_config), None); - - let local_node = Node::new_localhost_with_pubkey(validator_pubkey); - let cluster_info = new_test_cluster_info(local_node.info); - let recv_socket = &local_node.sockets.tpu_forwards[0]; - - let test_cases = vec![ - ("budget-restricted", DataBudget::restricted(), 0), - ("budget-available", DataBudget::default(), 1), - ]; - - let connection_cache = ConnectionCache::default(); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - for (name, data_budget, expected_num_forwarded) in test_cases { - let mut unprocessed_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter( - vec![deserialized_packet.clone()].into_iter(), - 1, - ); - let stats = BankingStageStats::default(); - BankingStage::handle_forwarding( - &ForwardOption::ForwardTransaction, - &cluster_info, - &mut unprocessed_packet_batches, - &poh_recorder, - &socket, - true, - &data_budget, - &mut LeaderSlotMetricsTracker::new(0), - &stats, - &connection_cache, - &mut TracerPacketStats::new(0), - &bank_forks, - ); - - recv_socket - .set_nonblocking(expected_num_forwarded == 0) - .unwrap(); - - let mut packets = vec![Packet::default(); 2]; - let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); - assert_eq!(num_received, expected_num_forwarded, "{}", name); - } - - exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); - } - Blockstore::destroy(ledger_path.path()).unwrap(); - } - - #[test] - fn test_handle_forwarding() { - solana_logger::setup(); - // packets are deserialized upon receiving, failed packets will not be - // forwarded; Therefore need to create real packets here. - let keypair = Keypair::new(); - let pubkey = solana_sdk::pubkey::new_rand(); - - let fwd_block_hash = Hash::new_unique(); - let forwarded_packet = { - let transaction = system_transaction::transfer(&keypair, &pubkey, 1, fwd_block_hash); - let mut packet = Packet::from_data(None, &transaction).unwrap(); - packet.meta.flags |= PacketFlags::FORWARDED; - DeserializedPacket::new(packet).unwrap() - }; - - let normal_block_hash = Hash::new_unique(); - let normal_packet = { - let transaction = system_transaction::transfer(&keypair, &pubkey, 1, normal_block_hash); - let packet = Packet::from_data(None, &transaction).unwrap(); - DeserializedPacket::new(packet).unwrap() - }; - - let mut unprocessed_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter( - vec![forwarded_packet, normal_packet].into_iter(), - 2, - ); - - let genesis_config_info = create_slow_genesis_config(10_000); - let GenesisConfigInfo { - genesis_config, - validator_pubkey, - .. - } = &genesis_config_info; - let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config); - let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); - let ledger_path = get_tmp_ledger_path_auto_delete!(); - { - let blockstore = Arc::new( - Blockstore::open(ledger_path.path()) - .expect("Expected to be able to open database ledger"), - ); - let poh_config = PohConfig { - // limit tick count to avoid clearing working_bank at - // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage - target_tick_count: Some(bank.max_tick_height() - 1), - ..PohConfig::default() - }; - - let (exit, poh_recorder, poh_service, _entry_receiver) = - create_test_recorder(&bank, &blockstore, Some(poh_config), None); - - let local_node = Node::new_localhost_with_pubkey(validator_pubkey); - let cluster_info = new_test_cluster_info(local_node.info); - let recv_socket = &local_node.sockets.tpu_forwards[0]; - let connection_cache = ConnectionCache::default(); - - let test_cases = vec![ - ("not-forward", ForwardOption::NotForward, true, vec![], 2), - ( - "fwd-normal", - ForwardOption::ForwardTransaction, - true, - vec![normal_block_hash], - 2, - ), - ( - "fwd-no-op", - ForwardOption::ForwardTransaction, - true, - vec![], - 2, - ), - ( - "fwd-no-hold", - ForwardOption::ForwardTransaction, - false, - vec![], - 0, - ), - ]; - - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases { - let stats = BankingStageStats::default(); - BankingStage::handle_forwarding( - &forward_option, - &cluster_info, - &mut unprocessed_packet_batches, - &poh_recorder, - &socket, - hold, - &DataBudget::default(), - &mut LeaderSlotMetricsTracker::new(0), - &stats, - &connection_cache, - &mut TracerPacketStats::new(0), - &bank_forks, - ); - - recv_socket - .set_nonblocking(expected_ids.is_empty()) - .unwrap(); - - let mut packets = vec![Packet::default(); 2]; - let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); - assert_eq!(num_received, expected_ids.len(), "{}", name); - for (i, expected_id) in expected_ids.iter().enumerate() { - assert_eq!(packets[i].meta.size, 215); - let recv_transaction: VersionedTransaction = - packets[i].deserialize_slice(..).unwrap(); - assert_eq!( - recv_transaction.message.recent_blockhash(), - expected_id, - "{}", - name - ); - } - - let num_unprocessed_packets: usize = unprocessed_packet_batches.len(); - assert_eq!( - num_unprocessed_packets, expected_num_unprocessed, - "{}", - name - ); - } - - exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); - } - Blockstore::destroy(ledger_path.path()).unwrap(); - } + // #[test] + // fn test_forwarder_budget() { + // solana_logger::setup(); + // // Create `PacketBatch` with 1 unprocessed packet + // let tx = system_transaction::transfer( + // &Keypair::new(), + // &solana_sdk::pubkey::new_rand(), + // 1, + // Hash::new_unique(), + // ); + // let packet = Packet::from_data(None, &tx).unwrap(); + // let deserialized_packet = DeserializedPacket::new(packet).unwrap(); + + // let genesis_config_info = create_slow_genesis_config(10_000); + // let GenesisConfigInfo { + // genesis_config, + // validator_pubkey, + // .. + // } = &genesis_config_info; + + // let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config); + // let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + // let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); + // let ledger_path = get_tmp_ledger_path_auto_delete!(); + // { + // let blockstore = Arc::new( + // Blockstore::open(ledger_path.path()) + // .expect("Expected to be able to open database ledger"), + // ); + // let poh_config = PohConfig { + // // limit tick count to avoid clearing working_bank at + // // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage + // target_tick_count: Some(bank.max_tick_height() - 1), + // ..PohConfig::default() + // }; + + // let (exit, poh_recorder, poh_service, _entry_receiver) = + // create_test_recorder(&bank, &blockstore, Some(poh_config), None); + + // let local_node = Node::new_localhost_with_pubkey(validator_pubkey); + // let cluster_info = new_test_cluster_info(local_node.info); + // let recv_socket = &local_node.sockets.tpu_forwards[0]; + + // let test_cases = vec![ + // ("budget-restricted", DataBudget::restricted(), 0), + // ("budget-available", DataBudget::default(), 1), + // ]; + + // let connection_cache = ConnectionCache::default(); + // let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + // for (name, data_budget, expected_num_forwarded) in test_cases { + // let mut unprocessed_packet_batches: UnprocessedPacketBatches = + // UnprocessedPacketBatches::from_iter( + // vec![deserialized_packet.clone()].into_iter(), + // 1, + // ); + // let stats = BankingStageStats::default(); + // BankingStage::handle_forwarding( + // &ForwardOption::ForwardTransaction, + // &cluster_info, + // &mut unprocessed_packet_batches, + // &poh_recorder, + // &socket, + // true, + // &data_budget, + // &mut LeaderSlotMetricsTracker::new(0), + // &stats, + // &connection_cache, + // &mut TracerPacketStats::new(0), + // &bank_forks, + // ); + + // recv_socket + // .set_nonblocking(expected_num_forwarded == 0) + // .unwrap(); + + // let mut packets = vec![Packet::default(); 2]; + // let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + // assert_eq!(num_received, expected_num_forwarded, "{}", name); + // } + + // exit.store(true, Ordering::Relaxed); + // poh_service.join().unwrap(); + // } + // Blockstore::destroy(ledger_path.path()).unwrap(); + // } + + // #[test] + // fn test_handle_forwarding() { + // solana_logger::setup(); + // // packets are deserialized upon receiving, failed packets will not be + // // forwarded; Therefore need to create real packets here. + // let keypair = Keypair::new(); + // let pubkey = solana_sdk::pubkey::new_rand(); + + // let fwd_block_hash = Hash::new_unique(); + // let forwarded_packet = { + // let transaction = system_transaction::transfer(&keypair, &pubkey, 1, fwd_block_hash); + // let mut packet = Packet::from_data(None, &transaction).unwrap(); + // packet.meta.flags |= PacketFlags::FORWARDED; + // DeserializedPacket::new(packet).unwrap() + // }; + + // let normal_block_hash = Hash::new_unique(); + // let normal_packet = { + // let transaction = system_transaction::transfer(&keypair, &pubkey, 1, normal_block_hash); + // let packet = Packet::from_data(None, &transaction).unwrap(); + // DeserializedPacket::new(packet).unwrap() + // }; + + // let mut unprocessed_packet_batches: UnprocessedPacketBatches = + // UnprocessedPacketBatches::from_iter( + // vec![forwarded_packet, normal_packet].into_iter(), + // 2, + // ); + + // let genesis_config_info = create_slow_genesis_config(10_000); + // let GenesisConfigInfo { + // genesis_config, + // validator_pubkey, + // .. + // } = &genesis_config_info; + // let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config); + // let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + // let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); + // let ledger_path = get_tmp_ledger_path_auto_delete!(); + // { + // let blockstore = Arc::new( + // Blockstore::open(ledger_path.path()) + // .expect("Expected to be able to open database ledger"), + // ); + // let poh_config = PohConfig { + // // limit tick count to avoid clearing working_bank at + // // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage + // target_tick_count: Some(bank.max_tick_height() - 1), + // ..PohConfig::default() + // }; + + // let (exit, poh_recorder, poh_service, _entry_receiver) = + // create_test_recorder(&bank, &blockstore, Some(poh_config), None); + + // let local_node = Node::new_localhost_with_pubkey(validator_pubkey); + // let cluster_info = new_test_cluster_info(local_node.info); + // let recv_socket = &local_node.sockets.tpu_forwards[0]; + // let connection_cache = ConnectionCache::default(); + + // let test_cases = vec![ + // ("not-forward", ForwardOption::NotForward, true, vec![], 2), + // ( + // "fwd-normal", + // ForwardOption::ForwardTransaction, + // true, + // vec![normal_block_hash], + // 2, + // ), + // ( + // "fwd-no-op", + // ForwardOption::ForwardTransaction, + // true, + // vec![], + // 2, + // ), + // ( + // "fwd-no-hold", + // ForwardOption::ForwardTransaction, + // false, + // vec![], + // 0, + // ), + // ]; + + // let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + // for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases { + // let stats = BankingStageStats::default(); + // BankingStage::handle_forwarding( + // &forward_option, + // &cluster_info, + // &mut unprocessed_packet_batches, + // &poh_recorder, + // &socket, + // hold, + // &DataBudget::default(), + // &mut LeaderSlotMetricsTracker::new(0), + // &stats, + // &connection_cache, + // &mut TracerPacketStats::new(0), + // &bank_forks, + // ); + + // recv_socket + // .set_nonblocking(expected_ids.is_empty()) + // .unwrap(); + + // let mut packets = vec![Packet::default(); 2]; + // let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + // assert_eq!(num_received, expected_ids.len(), "{}", name); + // for (i, expected_id) in expected_ids.iter().enumerate() { + // assert_eq!(packets[i].meta.size, 215); + // let recv_transaction: VersionedTransaction = + // packets[i].deserialize_slice(..).unwrap(); + // assert_eq!( + // recv_transaction.message.recent_blockhash(), + // expected_id, + // "{}", + // name + // ); + // } + + // let num_unprocessed_packets: usize = unprocessed_packet_batches.len(); + // assert_eq!( + // num_unprocessed_packets, expected_num_unprocessed, + // "{}", + // name + // ); + // } + + // exit.store(true, Ordering::Relaxed); + // poh_service.join().unwrap(); + // } + // Blockstore::destroy(ledger_path.path()).unwrap(); + // } #[test] fn test_accumulate_batched_transaction_costs() {