diff --git a/Cargo.lock b/Cargo.lock index e6ac4cc8cf2c57..ed979bdaf0b783 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5970,6 +5970,7 @@ dependencies = [ "solana-stake-program", "solana-streamer", "solana-tpu-client", + "solana-transaction-metrics-tracker", "solana-transaction-status", "solana-turbine", "solana-version", @@ -7213,9 +7214,11 @@ dependencies = [ "rcgen", "rustls", "solana-logger", + "solana-measure", "solana-metrics", "solana-perf", "solana-sdk", + "solana-transaction-metrics-tracker", "thiserror", "tokio", "x509-parser", @@ -7361,6 +7364,20 @@ dependencies = [ "solana-version", ] +[[package]] +name = "solana-transaction-metrics-tracker" +version = "1.17.28" +dependencies = [ + "Inflector", + "base64 0.21.4", + "bincode", + "lazy_static", + "log", + "rand 0.8.5", + "solana-perf", + "solana-sdk", +] + [[package]] name = "solana-transaction-status" version = "1.17.30" diff --git a/Cargo.toml b/Cargo.toml index c13ee3763605d7..ff9725b1bce30b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -364,6 +364,7 @@ solana-system-program = { path = "programs/system", version = "=1.17.30" } solana-test-validator = { path = "test-validator", version = "=1.17.30" } solana-thin-client = { path = "thin-client", version = "=1.17.30" } solana-tpu-client = { path = "tpu-client", version = "=1.17.30", default-features = false } +solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=1.17.30" } solana-transaction-status = { path = "transaction-status", version = "=1.17.30" } solana-turbine = { path = "turbine", version = "=1.17.30" } solana-udp-client = { path = "udp-client", version = "=1.17.30" } diff --git a/core/Cargo.toml b/core/Cargo.toml index fcab8ff8775912..c70697941ca012 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -66,6 +66,7 @@ solana-sdk = { workspace = true } solana-send-transaction-service = { workspace = true } solana-streamer = { workspace = true } solana-tpu-client = { workspace = true } +solana-transaction-metrics-tracker = { workspace = true } solana-transaction-status = { workspace = true } solana-turbine = { workspace = true } solana-version = { workspace = true } diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index c8624a96aad7d4..92f6a855f84efc 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -201,6 +201,32 @@ impl Consumer { .slot_metrics_tracker .increment_retryable_packets_count(retryable_transaction_indexes.len() as u64); + // Now we track the performance for the interested transactions which is not in the retryable_transaction_indexes + // We assume the retryable_transaction_indexes is already sorted. + let mut retryable_idx = 0; + for (index, packet) in packets_to_process.iter().enumerate() { + if packet.original_packet().meta().is_perf_track_packet() { + if let Some(start_time) = packet.start_time() { + if retryable_idx >= retryable_transaction_indexes.len() + || retryable_transaction_indexes[retryable_idx] != index + { + let duration = Instant::now().duration_since(*start_time); + + debug!( + "Banking stage processing took {duration:?} for transaction {:?}", + packet.transaction().get_signatures().first() + ); + payload + .slot_metrics_tracker + .increment_process_sampled_packets_us(duration.as_micros() as u64); + } else { + // This packet is retried, advance the retry index to the next, as the next packet's index will + // certainly be > than this. + retryable_idx += 1; + } + } + } + } Some(retryable_transaction_indexes) } diff --git a/core/src/banking_stage/immutable_deserialized_packet.rs b/core/src/banking_stage/immutable_deserialized_packet.rs index 0fd6a3f16e16a3..f25ed939d2b386 100644 --- a/core/src/banking_stage/immutable_deserialized_packet.rs +++ b/core/src/banking_stage/immutable_deserialized_packet.rs @@ -17,7 +17,7 @@ use { VersionedTransaction, }, }, - std::{cmp::Ordering, mem::size_of, sync::Arc}, + std::{cmp::Ordering, mem::size_of, sync::Arc, time::Instant}, thiserror::Error, }; @@ -45,10 +45,16 @@ pub struct ImmutableDeserializedPacket { message_hash: Hash, is_simple_vote: bool, priority_details: TransactionPriorityDetails, + banking_stage_start_time: Option, } impl ImmutableDeserializedPacket { pub fn new(packet: Packet) -> Result { + let banking_stage_start_time = packet + .meta() + .is_perf_track_packet() + .then_some(Instant::now()); + let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?; let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?; let message_bytes = packet_message(&packet)?; @@ -71,6 +77,7 @@ impl ImmutableDeserializedPacket { message_hash, is_simple_vote, priority_details, + banking_stage_start_time, }) } @@ -114,6 +121,10 @@ impl ImmutableDeserializedPacket { self.compute_unit_limit() >= static_builtin_cost_sum } + pub fn start_time(&self) -> &Option { + &self.banking_stage_start_time + } + // This function deserializes packets into transactions, computes the blake3 hash of transaction // messages, and verifies secp256k1 instructions. pub fn build_sanitized_transaction( diff --git a/core/src/banking_stage/leader_slot_metrics.rs b/core/src/banking_stage/leader_slot_metrics.rs index b36200d86e12d6..eb22dd9d2bdb62 100644 --- a/core/src/banking_stage/leader_slot_metrics.rs +++ b/core/src/banking_stage/leader_slot_metrics.rs @@ -839,6 +839,17 @@ impl LeaderSlotMetricsTracker { ); } } + + pub(crate) fn increment_process_sampled_packets_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + leader_slot_metrics + .timing_metrics + .process_packets_timings + .process_sampled_packets_us_hist + .increment(us) + .unwrap(); + } + } } #[cfg(test)] diff --git a/core/src/banking_stage/leader_slot_timing_metrics.rs b/core/src/banking_stage/leader_slot_timing_metrics.rs index 7727b6cf6c6563..34ce64b31c34f3 100644 --- a/core/src/banking_stage/leader_slot_timing_metrics.rs +++ b/core/src/banking_stage/leader_slot_timing_metrics.rs @@ -244,6 +244,9 @@ pub(crate) struct ProcessPacketsTimings { // Time spent running the cost model in processing transactions before executing // transactions pub cost_model_us: u64, + + // banking stage processing time histogram for sampled packets + pub process_sampled_packets_us_hist: histogram::Histogram, } impl ProcessPacketsTimings { @@ -264,6 +267,28 @@ impl ProcessPacketsTimings { i64 ), ("cost_model_us", self.cost_model_us, i64), + ( + "process_sampled_packets_us_90pct", + self.process_sampled_packets_us_hist + .percentile(90.0) + .unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_min", + self.process_sampled_packets_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_max", + self.process_sampled_packets_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_mean", + self.process_sampled_packets_us_hist.mean().unwrap_or(0), + i64 + ), ); } } diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 80ce0875323819..0ea5f2bd8b086c 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -876,6 +876,7 @@ impl ThreadLocalUnprocessedPackets { .iter() .map(|p| (*p).clone()) .collect_vec(); + let retryable_packets = if let Some(retryable_transaction_indexes) = processing_function(&packets_to_process, payload) { diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index e5e06a3bc701c9..f41d2b1d192f16 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -18,8 +18,9 @@ use { count_discarded_packets, count_packets_in_batches, count_valid_packets, shrink_batches, }, }, - solana_sdk::timing, + solana_sdk::{signature::Signature, timing}, solana_streamer::streamer::{self, StreamerError}, + solana_transaction_metrics_tracker::get_signature_from_packet, std::{ thread::{self, Builder, JoinHandle}, time::Instant, @@ -78,8 +79,9 @@ struct SigVerifierStats { verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch discard_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch dedup_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch - batches_hist: histogram::Histogram, // number of packet batches per verify call - packets_hist: histogram::Histogram, // number of packets per verify call + process_sampled_packets_us_hist: histogram::Histogram, // per-packet time do do overall verify for sampled packets + batches_hist: histogram::Histogram, // number of packet batches per verify call + packets_hist: histogram::Histogram, // number of packets per verify call num_deduper_saturations: usize, total_batches: usize, total_packets: usize, @@ -93,6 +95,7 @@ struct SigVerifierStats { total_discard_random_time_us: usize, total_verify_time_us: usize, total_shrink_time_us: usize, + perf_track_overhead_us: usize, } impl SigVerifierStats { @@ -181,6 +184,28 @@ impl SigVerifierStats { self.dedup_packets_pp_us_hist.mean().unwrap_or(0), i64 ), + ( + "process_sampled_packets_us_90pct", + self.process_sampled_packets_us_hist + .percentile(90.0) + .unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_min", + self.process_sampled_packets_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_max", + self.process_sampled_packets_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_mean", + self.process_sampled_packets_us_hist.mean().unwrap_or(0), + i64 + ), ( "batches_90pct", self.batches_hist.percentile(90.0).unwrap_or(0), @@ -214,6 +239,7 @@ impl SigVerifierStats { ), ("total_verify_time_us", self.total_verify_time_us, i64), ("total_shrink_time_us", self.total_shrink_time_us, i64), + ("perf_track_overhead_us", self.perf_track_overhead_us, i64), ); } } @@ -296,8 +322,26 @@ impl SigVerifyStage { verifier: &mut T, stats: &mut SigVerifierStats, ) -> Result<(), T::SendType> { + let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default(); + let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?; + let mut start_perf_track_measure = Measure::start("start_perf_track"); + // track sigverify start time for interested packets + for batch in &batches { + for packet in batch.iter() { + if packet.meta().is_perf_track_packet() { + let signature = get_signature_from_packet(packet); + if let Ok(signature) = signature { + packet_perf_measure.push((*signature, Instant::now())); + } + } + } + } + start_perf_track_measure.stop(); + + stats.perf_track_overhead_us = start_perf_track_measure.as_us() as usize; + let batches_len = batches.len(); debug!( "@{:?} verifier: verifying: {}", @@ -370,6 +414,22 @@ impl SigVerifyStage { (num_packets as f32 / verify_time.as_s()) ); + let mut perf_track_end_measure = Measure::start("perf_track_end"); + for (signature, start_time) in packet_perf_measure.iter() { + let duration = Instant::now().duration_since(*start_time); + debug!( + "Sigverify took {duration:?} for transaction {:?}", + Signature::from(*signature) + ); + stats + .process_sampled_packets_us_hist + .increment(duration.as_micros() as u64) + .unwrap(); + } + + perf_track_end_measure.stop(); + stats.perf_track_overhead_us += perf_track_end_measure.as_us() as usize; + stats .recv_batches_us_hist .increment(recv_duration.as_micros() as u64) diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 5a3a8994d77019..40f3c0d2f443e3 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -4901,6 +4901,7 @@ dependencies = [ "solana-send-transaction-service", "solana-streamer", "solana-tpu-client", + "solana-transaction-metrics-tracker", "solana-transaction-status", "solana-turbine", "solana-version", @@ -6241,9 +6242,11 @@ dependencies = [ "rand 0.8.5", "rcgen", "rustls", + "solana-measure", "solana-metrics", "solana-perf", "solana-sdk", + "solana-transaction-metrics-tracker", "thiserror", "tokio", "x509-parser", @@ -6326,6 +6329,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-transaction-metrics-tracker" +version = "1.17.28" +dependencies = [ + "Inflector", + "base64 0.21.4", + "bincode", + "lazy_static", + "log", + "rand 0.8.5", + "solana-perf", + "solana-sdk", +] + [[package]] name = "solana-transaction-status" version = "1.17.30" diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index faea9ab4753c67..8300b57218c696 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -33,6 +33,8 @@ bitflags! { /// the packet is built. /// This field can be removed when the above feature gate is adopted by mainnet-beta. const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000; + /// For tracking performance + const PERF_TRACK_PACKET = 0b0100_0000; } } @@ -228,6 +230,12 @@ impl Meta { self.flags.set(PacketFlags::TRACER_PACKET, is_tracer); } + #[inline] + pub fn set_track_performance(&mut self, is_performance_track: bool) { + self.flags + .set(PacketFlags::PERF_TRACK_PACKET, is_performance_track); + } + #[inline] pub fn set_simple_vote(&mut self, is_simple_vote: bool) { self.flags.set(PacketFlags::SIMPLE_VOTE_TX, is_simple_vote); @@ -261,6 +269,11 @@ impl Meta { self.flags.contains(PacketFlags::TRACER_PACKET) } + #[inline] + pub fn is_perf_track_packet(&self) -> bool { + self.flags.contains(PacketFlags::PERF_TRACK_PACKET) + } + #[inline] pub fn round_compute_unit_price(&self) -> bool { self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE) diff --git a/sdk/src/transaction/versioned/sanitized.rs b/sdk/src/transaction/versioned/sanitized.rs index 68d0581e23d4ba..14c49db847eebd 100644 --- a/sdk/src/transaction/versioned/sanitized.rs +++ b/sdk/src/transaction/versioned/sanitized.rs @@ -32,6 +32,10 @@ impl SanitizedVersionedTransaction { pub fn get_message(&self) -> &SanitizedVersionedMessage { &self.message } + + pub fn get_signatures(&self) -> &Vec { + &self.signatures + } } #[cfg(test)] diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 21ae96d11fd9a4..860f759ef20b52 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -28,9 +28,11 @@ quinn-proto = { workspace = true } rand = { workspace = true } rcgen = { workspace = true } rustls = { workspace = true, features = ["dangerous_configuration"] } +solana-measure = { workspace = true } solana-metrics = { workspace = true } solana-perf = { workspace = true } solana-sdk = { workspace = true } +solana-transaction-metrics-tracker = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } x509-parser = { workspace = true } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 0c278ea818e814..290fe154134d56 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,6 +1,8 @@ use { crate::{ - quic::{configure_server, QuicServerError, StreamStats, MAX_UNSTAKED_CONNECTIONS}, + quic::{ + configure_server, PeerStats, QuicServerError, StreamStats, MAX_UNSTAKED_CONNECTIONS, + }, streamer::StakedNodes, tls_certificates::get_pubkey_from_tls_certificate, }, @@ -14,6 +16,7 @@ use { quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, quinn_proto::VarIntBoundsExceeded, rand::{thread_rng, Rng}, + solana_measure::measure::Measure, solana_perf::packet::{PacketBatch, PACKETS_PER_BATCH}, solana_sdk::{ packet::{Meta, PACKET_DATA_SIZE}, @@ -24,9 +27,10 @@ use { QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO, }, - signature::Keypair, + signature::{Keypair, Signature}, timing, }, + solana_transaction_metrics_tracker::signature_if_should_track_packet, std::{ iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, @@ -96,6 +100,7 @@ struct PacketChunk { struct PacketAccumulator { pub meta: Meta, pub chunks: Vec, + pub start_time: Instant, } #[allow(clippy::too_many_arguments)] @@ -174,7 +179,7 @@ async fn run_server( let timeout_connection = timeout(WAIT_FOR_CONNECTION_TIMEOUT, incoming.accept()).await; if last_datapoint.elapsed().as_secs() >= 5 { - stats.report(name); + stats.report(name).await; last_datapoint = Instant::now(); } @@ -631,6 +636,7 @@ async fn packet_batch_sender( trace!("enter packet_batch_sender"); let mut batch_start_time = Instant::now(); loop { + let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default(); let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); let mut total_bytes: usize = 0; @@ -650,6 +656,9 @@ async fn packet_batch_sender( || (!packet_batch.is_empty() && elapsed >= coalesce) { let len = packet_batch.len(); + + track_streamer_fetch_packet_performance(&packet_perf_measure, &stats).await; + if let Err(e) = packet_sender.send(packet_batch) { stats .total_packet_batch_send_err @@ -695,6 +704,14 @@ async fn packet_batch_sender( total_bytes += packet_batch[i].meta().size; + if let Some(signature) = signature_if_should_track_packet(&packet_batch[i]) + .ok() + .flatten() + { + packet_perf_measure.push((*signature, packet_accumulator.start_time)); + // we set the PERF_TRACK_PACKET on + packet_batch[i].meta_mut().set_track_performance(true); + } stats .total_chunks_processed_by_batcher .fetch_add(num_chunks, Ordering::Relaxed); @@ -732,6 +749,32 @@ fn reset_throttling_params_if_needed(last_instant: &mut tokio::time::Instant) -> } } +async fn track_streamer_fetch_packet_performance( + packet_perf_measure: &[([u8; 64], Instant)], + stats: &Arc, +) { + if packet_perf_measure.is_empty() { + return; + } + let mut measure = Measure::start("track_perf"); + let mut process_sampled_packets_us_hist = stats.process_sampled_packets_us_hist.lock().await; + + for (signature, start_time) in packet_perf_measure.iter() { + let duration = Instant::now().duration_since(*start_time); + debug!( + "QUIC streamer fetch stage took {duration:?} for transaction {:?}", + Signature::from(*signature) + ); + process_sampled_packets_us_hist + .increment(duration.as_micros() as u64) + .unwrap(); + } + measure.stop(); + stats + .perf_track_overhead_us + .fetch_add(measure.as_us(), Ordering::Relaxed); +} + async fn handle_connection( connection: Connection, remote_addr: SocketAddr, @@ -755,17 +798,53 @@ async fn handle_connection( max_streams_for_connection_in_100ms(peer_type, params.stake, params.total_stake); let mut last_throttling_instant = tokio::time::Instant::now(); let mut streams_in_current_interval = 0; + let peer = ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey); + let connection_count = { connection_table.lock().await.get_connection_count(&peer) }; + let peer_stat = Arc::new(PeerStats::new(peer_type, params.stake, connection_count)); + while !stream_exit.load(Ordering::Relaxed) { if let Ok(stream) = tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await { match stream { Ok(mut stream) => { + stats.received_streams.fetch_add(1, Ordering::Relaxed); + + match peer_type { + ConnectionPeerType::Unstaked => { + stats + .received_unstaked_streams + .fetch_add(1, Ordering::Relaxed); + } + ConnectionPeerType::Staked => { + stats + .received_staked_streams + .fetch_add(1, Ordering::Relaxed); + } + } + peer_stat.received_streams.fetch_add(1, Ordering::Relaxed); + if reset_throttling_params_if_needed(&mut last_throttling_instant) { streams_in_current_interval = 0; } else if streams_in_current_interval >= max_streams_per_100ms { stats.throttled_streams.fetch_add(1, Ordering::Relaxed); + match peer_type { + ConnectionPeerType::Unstaked => { + stats + .throttled_unstaked_streams + .fetch_add(1, Ordering::Relaxed); + } + ConnectionPeerType::Staked => { + stats + .throttled_staked_streams + .fetch_add(1, Ordering::Relaxed); + } + } + peer_stat.throttled_streams.fetch_add(1, Ordering::Relaxed); + info!("Throttled stream from {remote_addr:?}, peer type: {peer_type:?}, stake: {}, total stake: {}", + params.stake, params.total_stake); let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING)); + update_peer_stats(¶ms.remote_pubkey, &stats, &peer_stat).await; continue; } streams_in_current_interval = streams_in_current_interval.saturating_add(1); @@ -773,6 +852,7 @@ async fn handle_connection( stats.total_new_streams.fetch_add(1, Ordering::Relaxed); let stream_exit = stream_exit.clone(); let stats = stats.clone(); + let peer_stat = peer_stat.clone(); let packet_sender = params.packet_sender.clone(); let last_update = last_update.clone(); tokio::spawn(async move { @@ -799,10 +879,13 @@ async fn handle_connection( &packet_sender, stats.clone(), peer_type, + peer_stat.clone(), ) .await { last_update.store(timing::timestamp(), Ordering::Relaxed); + update_peer_stats(¶ms.remote_pubkey, &stats, &peer_stat) + .await; break; } start = Instant::now(); @@ -811,6 +894,10 @@ async fn handle_connection( stats .total_stream_read_timeouts .fetch_add(1, Ordering::Relaxed); + peer_stat + .stream_read_timeouts + .fetch_add(1, Ordering::Relaxed); + update_peer_stats(¶ms.remote_pubkey, &stats, &peer_stat).await; break; } } @@ -842,6 +929,16 @@ async fn handle_connection( stats.total_connections.fetch_sub(1, Ordering::Relaxed); } +async fn update_peer_stats( + pubkey: &Option, + stats: &Arc, + peer_stat: &Arc, +) { + if let Some(pubkey) = pubkey { + stats.update_peer_stats(pubkey, peer_stat).await; + } +} + // Return true if the server should drop the stream async fn handle_chunk( chunk: Result, quinn::ReadError>, @@ -850,6 +947,7 @@ async fn handle_chunk( packet_sender: &AsyncSender, stats: Arc, peer_type: ConnectionPeerType, + peer_stat: Arc, ) -> bool { match chunk { Ok(maybe_chunk) => { @@ -879,6 +977,7 @@ async fn handle_chunk( *packet_accum = Some(PacketAccumulator { meta, chunks: Vec::new(), + start_time: Instant::now(), }); } @@ -932,6 +1031,24 @@ async fn handle_chunk( .total_chunks_sent_for_batching .fetch_add(chunks_sent, Ordering::Relaxed); + match peer_type { + ConnectionPeerType::Unstaked => { + stats + .total_unstaked_packets_sent_for_batching + .fetch_add(1, Ordering::Relaxed); + } + ConnectionPeerType::Staked => { + stats + .total_staked_packets_sent_for_batching + .fetch_add(1, Ordering::Relaxed); + } + } + peer_stat + .bytes_sent_for_batching + .fetch_add(bytes_sent, Ordering::Relaxed); + peer_stat + .packets_sent_for_batching + .fetch_add(1, Ordering::Relaxed); trace!("sent {} byte packet for batching", bytes_sent); } } else { @@ -947,6 +1064,7 @@ async fn handle_chunk( stats .total_stream_read_errors .fetch_add(1, Ordering::Relaxed); + peer_stat.stream_read_errors.fetch_add(1, Ordering::Relaxed); return true; } } @@ -996,8 +1114,9 @@ impl Drop for ConnectionEntry { } } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, Default)] pub enum ConnectionPeerType { + #[default] Unstaked, Staked, } @@ -1146,6 +1265,13 @@ impl ConnectionTable { 0 } } + + /// return the connection count of the peer in the cache + fn get_connection_count(&self, key: &ConnectionTableKey) -> usize { + self.table + .get(key) + .map_or_else(|| 0, |connections| connections.len()) + } } #[cfg(test)] @@ -1468,6 +1594,7 @@ pub mod test { offset, end_of_chunk: size, }], + start_time: Instant::now(), }; ptk_sender.send(packet_accum).await.unwrap(); } diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 8b2b4f9d852cac..ff8deb4019a027 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -1,6 +1,7 @@ use { crate::{ - nonblocking::quic::ALPN_TPU_PROTOCOL_ID, streamer::StakedNodes, + nonblocking::quic::{ConnectionPeerType, ALPN_TPU_PROTOCOL_ID}, + streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate, }, crossbeam_channel::Sender, @@ -10,19 +11,21 @@ use { solana_perf::packet::PacketBatch, solana_sdk::{ packet::PACKET_DATA_SIZE, + pubkey::Pubkey, quic::{QUIC_MAX_TIMEOUT, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS}, signature::Keypair, }, std::{ + collections::HashMap, net::{IpAddr, UdpSocket}, sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, Arc, RwLock, }, thread, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }, - tokio::runtime::Runtime, + tokio::{runtime::Runtime, sync::Mutex}, }; pub const MAX_STAKED_CONNECTIONS: usize = 2000; @@ -157,10 +160,94 @@ pub struct StreamStats { pub(crate) connection_removed: AtomicUsize, pub(crate) connection_remove_failed: AtomicUsize, pub(crate) throttled_streams: AtomicUsize, + pub(crate) process_sampled_packets_us_hist: Mutex, + pub(crate) perf_track_overhead_us: AtomicU64, + pub(crate) total_staked_packets_sent_for_batching: AtomicUsize, + pub(crate) total_unstaked_packets_sent_for_batching: AtomicUsize, + pub(crate) throttled_staked_streams: AtomicUsize, + pub(crate) throttled_unstaked_streams: AtomicUsize, + pub(crate) received_streams: AtomicUsize, + pub(crate) received_unstaked_streams: AtomicUsize, + pub(crate) received_staked_streams: AtomicUsize, + pub(crate) peer_stats: PeerStatsRecorder, +} + +/// Stats per peer +pub struct PeerStats { + pub(crate) peer_type: ConnectionPeerType, + pub(crate) stakes: AtomicUsize, + pub(crate) received_streams: AtomicUsize, + pub(crate) throttled_streams: AtomicUsize, + pub(crate) packets_sent_for_batching: AtomicUsize, + pub(crate) bytes_sent_for_batching: AtomicUsize, + pub(crate) stream_read_errors: AtomicUsize, + pub(crate) stream_read_timeouts: AtomicUsize, + pub(crate) connection_count: AtomicUsize, + pub(crate) start_time: Instant, + pub(crate) end_time: Instant, +} + +impl Default for PeerStats { + fn default() -> Self { + Self { + peer_type: ConnectionPeerType::default(), + stakes: AtomicUsize::default(), + received_streams: AtomicUsize::default(), + throttled_streams: AtomicUsize::default(), + packets_sent_for_batching: AtomicUsize::default(), + bytes_sent_for_batching: AtomicUsize::default(), + stream_read_errors: AtomicUsize::default(), + stream_read_timeouts: AtomicUsize::default(), + connection_count: AtomicUsize::default(), + start_time: Instant::now(), + end_time: Instant::now(), + } + } +} + +impl Clone for PeerStats { + fn clone(&self) -> Self { + Self { + received_streams: AtomicUsize::new(self.received_streams.load(Ordering::Relaxed)), + throttled_streams: AtomicUsize::new(self.throttled_streams.load(Ordering::Relaxed)), + peer_type: self.peer_type, + connection_count: AtomicUsize::new(self.connection_count.load(Ordering::Relaxed)), + stakes: AtomicUsize::new(self.stakes.load(Ordering::Relaxed)), + stream_read_errors: AtomicUsize::new(self.stream_read_errors.load(Ordering::Relaxed)), + stream_read_timeouts: AtomicUsize::new( + self.stream_read_timeouts.load(Ordering::Relaxed), + ), + packets_sent_for_batching: AtomicUsize::new( + self.packets_sent_for_batching.load(Ordering::Relaxed), + ), + bytes_sent_for_batching: AtomicUsize::new( + self.bytes_sent_for_batching.load(Ordering::Relaxed), + ), + start_time: self.start_time, + end_time: self.end_time, + } + } +} + +impl PeerStats { + pub fn new(peer_type: ConnectionPeerType, stakes: u64, connection_count: usize) -> Self { + Self { + peer_type, + stakes: AtomicUsize::new(stakes as usize), + connection_count: AtomicUsize::new(connection_count), + ..Default::default() + } + } +} + +#[derive(Default)] +pub struct PeerStatsRecorder { + pub(crate) stats: Mutex>, } impl StreamStats { - pub fn report(&self, name: &'static str) { + pub async fn report(&self, name: &'static str) { + let mut process_sampled_packets_us_hist = self.process_sampled_packets_us_hist.lock().await; datapoint_info!( name, ( @@ -311,6 +398,18 @@ impl StreamStats { .swap(0, Ordering::Relaxed), i64 ), + ( + "staked_packets_sent_for_batching", + self.total_staked_packets_sent_for_batching + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "unstaked_packets_sent_for_batching", + self.total_unstaked_packets_sent_for_batching + .swap(0, Ordering::Relaxed), + i64 + ), ( "bytes_sent_for_batching", self.total_bytes_sent_for_batching @@ -392,7 +491,133 @@ impl StreamStats { self.throttled_streams.swap(0, Ordering::Relaxed), i64 ), + ( + "throttled_unstaked_streams", + self.throttled_unstaked_streams.swap(0, Ordering::Relaxed), + i64 + ), + ( + "throttled_staked_streams", + self.throttled_staked_streams.swap(0, Ordering::Relaxed), + i64 + ), + ( + "process_sampled_packets_us_90pct", + process_sampled_packets_us_hist + .percentile(90.0) + .unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_min", + process_sampled_packets_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_max", + process_sampled_packets_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_mean", + process_sampled_packets_us_hist.mean().unwrap_or(0), + i64 + ), + ( + "perf_track_overhead_us", + self.perf_track_overhead_us.swap(0, Ordering::Relaxed), + i64 + ), + ( + "received_streams", + self.received_streams.swap(0, Ordering::Relaxed), + i64 + ), + ( + "received_unstaked_streams", + self.received_unstaked_streams.swap(0, Ordering::Relaxed), + i64 + ), + ( + "received_staked_streams", + self.received_staked_streams.swap(0, Ordering::Relaxed), + i64 + ), ); + process_sampled_packets_us_hist.clear(); + self.report_peer_stats().await; + } + + async fn report_peer_stats(&self) { + let map = { + let mut stats = self.peer_stats.stats.lock().await; + let new_map = HashMap::::default(); + std::mem::replace(&mut *stats, new_map) + }; + // now we can report the stats without holding the lock + for (key, stats) in map { + info!( + "STREAMER_PEER_STATS: node={key}, type={:?}, stakes={}, received_streams={}, throttled_streams={}, packets_sent={}, bytes_sent={}, read_errors={}, read_timeouts={}, connection_count={} duration_us={}", + stats.peer_type, + stats.stakes.load(Ordering::Relaxed), + stats.received_streams.load(Ordering::Relaxed), + stats.throttled_streams.load(Ordering::Relaxed), + stats.packets_sent_for_batching.load(Ordering::Relaxed), + stats.bytes_sent_for_batching.load(Ordering::Relaxed), + stats.stream_read_errors.load(Ordering::Relaxed), + stats.stream_read_timeouts.load(Ordering::Relaxed), + stats.connection_count.load(Ordering::Relaxed), + stats.end_time.duration_since(stats.start_time).as_micros() + ); + } + } + + /// Update the peer stat for a peer given its key + pub async fn update_peer_stats(&self, pubkey: &Pubkey, peer_stat: &PeerStats) { + let mut stats = self.peer_stats.stats.lock().await; + stats + .entry(*pubkey) + .and_modify(|stat| { + stat.peer_type = peer_stat.peer_type; + stat.stakes + .store(peer_stat.stakes.load(Ordering::Relaxed), Ordering::Relaxed); + stat.received_streams.fetch_add( + peer_stat.received_streams.swap(0, Ordering::Relaxed), + Ordering::Relaxed, + ); + stat.throttled_streams.fetch_add( + peer_stat.throttled_streams.swap(0, Ordering::Relaxed), + Ordering::Relaxed, + ); + stat.packets_sent_for_batching.fetch_add( + peer_stat + .packets_sent_for_batching + .swap(0, Ordering::Relaxed), + Ordering::Relaxed, + ); + stat.bytes_sent_for_batching.fetch_add( + peer_stat.bytes_sent_for_batching.swap(0, Ordering::Relaxed), + Ordering::Relaxed, + ); + stat.stream_read_errors.fetch_add( + peer_stat.stream_read_errors.swap(0, Ordering::Relaxed), + Ordering::Relaxed, + ); + stat.stream_read_timeouts.fetch_add( + peer_stat.stream_read_timeouts.swap(0, Ordering::Relaxed), + Ordering::Relaxed, + ); + stat.connection_count.store( + peer_stat.connection_count.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + stat.end_time = Instant::now(); + }) + .or_insert({ + let mut stat = peer_stat.clone(); + stat.start_time = Instant::now(); + stat + }); } } diff --git a/transaction-metrics-tracker/Cargo.toml b/transaction-metrics-tracker/Cargo.toml new file mode 100644 index 00000000000000..9bd82702a3ebb4 --- /dev/null +++ b/transaction-metrics-tracker/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "solana-transaction-metrics-tracker" +description = "Solana transaction metrics tracker" +documentation = "https://docs.rs/solana-transaction-metrics-tracker" +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } +publish = false + +[dependencies] +Inflector = { workspace = true } +base64 = { workspace = true } +bincode = { workspace = true } +# Update this borsh dependency to the workspace version once +lazy_static = { workspace = true } +log = { workspace = true } +rand = { workspace = true } +solana-perf = { workspace = true } +solana-sdk = { workspace = true } + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/transaction-metrics-tracker/src/lib.rs b/transaction-metrics-tracker/src/lib.rs new file mode 100644 index 00000000000000..2baec195de9b84 --- /dev/null +++ b/transaction-metrics-tracker/src/lib.rs @@ -0,0 +1,157 @@ +use { + lazy_static::lazy_static, + log::*, + rand::Rng, + solana_perf::sigverify::PacketError, + solana_sdk::{packet::Packet, short_vec::decode_shortu16_len, signature::SIGNATURE_BYTES}, +}; + +// The mask is 12 bits long (1<<12 = 4096), it means the probability of matching +// the transaction is 1/4096 assuming the portion being matched is random. +lazy_static! { + static ref TXN_MASK: u16 = rand::thread_rng().gen_range(0..4096); +} + +/// Check if a transaction given its signature matches the randomly selected mask. +/// The signaure should be from the reference of Signature +pub fn should_track_transaction(signature: &[u8; SIGNATURE_BYTES]) -> bool { + // We do not use the highest signature byte as it is not really random + let match_portion: u16 = u16::from_le_bytes([signature[61], signature[62]]) >> 4; + trace!("Matching txn: {match_portion:016b} {:016b}", *TXN_MASK); + *TXN_MASK == match_portion +} + +/// Check if a transaction packet's signature matches the mask. +/// This does a rudimentry verification to make sure the packet at least +/// contains the signature data and it returns the reference to the signature. +pub fn signature_if_should_track_packet( + packet: &Packet, +) -> Result, PacketError> { + let signature = get_signature_from_packet(packet)?; + Ok(should_track_transaction(signature).then_some(signature)) +} + +/// Get the signature of the transaction packet +/// This does a rudimentry verification to make sure the packet at least +/// contains the signature data and it returns the reference to the signature. +pub fn get_signature_from_packet(packet: &Packet) -> Result<&[u8; SIGNATURE_BYTES], PacketError> { + let (sig_len_untrusted, sig_start) = packet + .data(..) + .and_then(|bytes| decode_shortu16_len(bytes).ok()) + .ok_or(PacketError::InvalidShortVec)?; + + if sig_len_untrusted < 1 { + return Err(PacketError::InvalidSignatureLen); + } + + let signature = packet + .data(sig_start..sig_start.saturating_add(SIGNATURE_BYTES)) + .ok_or(PacketError::InvalidSignatureLen)?; + let signature = signature + .try_into() + .map_err(|_| PacketError::InvalidSignatureLen)?; + Ok(signature) +} + +#[cfg(test)] +mod tests { + use { + super::*, + solana_sdk::{ + hash::Hash, + signature::{Keypair, Signature}, + system_transaction, + }, + }; + + #[test] + fn test_get_signature_from_packet() { + // Default invalid txn packet + let packet = Packet::default(); + let sig = get_signature_from_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidShortVec)); + + // Use a valid transaction, it should succeed + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let mut packet = Packet::from_data(None, tx).unwrap(); + + let sig = get_signature_from_packet(&packet); + assert!(sig.is_ok()); + + // Invalid signature length + packet.buffer_mut()[0] = 0x0; + let sig = get_signature_from_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidSignatureLen)); + } + + #[test] + fn test_should_track_transaction() { + let mut sig = [0x0; SIGNATURE_BYTES]; + let track = should_track_transaction(&sig); + assert!(!track); + + // Intentionally matching the randomly generated mask + // The lower four bits are ignored as only 12 highest bits from + // signature's 61 and 62 u8 are used for matching. + // We generate a random one + let mut rng = rand::thread_rng(); + let random_number: u8 = rng.gen_range(0..=15); + sig[61] = ((*TXN_MASK & 0xf_u16) << 4) as u8 | random_number; + sig[62] = (*TXN_MASK >> 4) as u8; + + let track = should_track_transaction(&sig); + assert!(track); + } + + #[test] + fn test_signature_if_should_track_packet() { + // Default invalid txn packet + let packet = Packet::default(); + let sig = signature_if_should_track_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidShortVec)); + + // Use a valid transaction which is not matched + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let packet = Packet::from_data(None, tx).unwrap(); + let sig = signature_if_should_track_packet(&packet); + assert_eq!(Ok(None), sig); + + // Now simulate a txn matching the signature mask + let mut tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let mut sig = [0x0; SIGNATURE_BYTES]; + sig[61] = ((*TXN_MASK & 0xf_u16) << 4) as u8; + sig[62] = (*TXN_MASK >> 4) as u8; + + let sig = Signature::from(sig); + tx.signatures[0] = sig; + let mut packet = Packet::from_data(None, tx).unwrap(); + let sig2 = signature_if_should_track_packet(&packet); + + match sig2 { + Ok(sig) => { + assert!(sig.is_some()); + } + Err(_) => panic!("Expected to get a matching signature!"), + } + + // Invalid signature length + packet.buffer_mut()[0] = 0x0; + let sig = signature_if_should_track_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidSignatureLen)); + } +}