diff --git a/tpu-client-next/src/connection_worker.rs b/tpu-client-next/src/connection_worker.rs index ec4063f376b3fb..c84707e2e7a93d 100644 --- a/tpu-client-next/src/connection_worker.rs +++ b/tpu-client-next/src/connection_worker.rs @@ -10,7 +10,7 @@ use { transaction_batch::TransactionBatch, QuicError, }, - quinn::{ConnectError, Connection, Endpoint}, + quinn::{ConnectError, Connection, ConnectionError, Endpoint}, solana_clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS}, solana_measure::measure::Measure, solana_time_utils::timestamp, @@ -70,6 +70,9 @@ impl Drop for ConnectionState { /// [`ConnectionWorker`] holds connection to the validator with address `peer`. /// +/// The worker proactively monitors connection health while processing +/// transactions, detecting connection closures immediately rather than waiting +/// for send failures. /// If connection has been closed, [`ConnectionWorker`] tries to reconnect /// `max_reconnect_attempts` times. If connection is in `Active` state, it sends /// transactions received from `transactions_receiver`. Additionally, it @@ -126,7 +129,8 @@ impl ConnectionWorker { /// /// This method manages the connection to the peer and handles state /// transitions. It runs indefinitely until the connection is closed or an - /// unrecoverable error occurs. + /// unrecoverable error occurs. The worker monitors both incoming transactions + /// and connection health simultaneously when in the Active state. pub async fn run(&mut self) { let cancel = self.cancel.clone(); @@ -140,16 +144,29 @@ impl ConnectionWorker { self.create_connection(0).await; } ConnectionState::Active(connection) => { - let Some(transactions) = self.transactions_receiver.recv().await else { - debug!( - "Transactions sender has been dropped for peer: {}", - self.peer - ); - self.connection = ConnectionState::Closing; - continue; - }; - self.send_transactions(connection.clone(), transactions) - .await; + tokio::select! { + // Process incoming transactions + transactions = self.transactions_receiver.recv() => { + match transactions { + Some(batch) => { + self.send_transactions(connection.clone(), batch).await; + } + None => { + debug!( + "Transactions sender has been dropped for peer: {}", + self.peer + ); + self.connection = ConnectionState::Closing; + } + } + } + + // Monitor connection health proactively + close_reason = connection.closed() => { + self.handle_connection_closed(close_reason).await; + continue; + } + } } ConnectionState::Retry(num_reconnects) => { if *num_reconnects > self.max_reconnect_attempts { @@ -170,6 +187,67 @@ impl ConnectionWorker { } } + /// Handles connection closure events detected by the connection monitor. + /// + /// This method logs the close reason with appropriate severity based on + /// the type of closure, records statistics, and determines whether to + /// attempt reconnection based on the error type. + async fn handle_connection_closed(&mut self, close_reason: ConnectionError) { + match &close_reason { + ConnectionError::ConnectionClosed(close) => { + debug!( + "Connection to {} closed by peer: code={} reason={:?}", + self.peer, + close.error_code, + String::from_utf8_lossy(&close.reason) + ); + } + ConnectionError::ApplicationClosed(close) => { + debug!( + "Connection to {} closed by application: code={} reason={:?}", + self.peer, + close.error_code, + String::from_utf8_lossy(&close.reason) + ); + } + ConnectionError::LocallyClosed => { + debug!("Connection to {} closed locally", self.peer); + } + ConnectionError::TimedOut => { + warn!("Connection to {} timed out", self.peer); + } + ConnectionError::Reset => { + warn!("Connection to {} reset", self.peer); + } + ConnectionError::TransportError(e) => { + warn!( + "Connection to {} encountered transport error: {}", + self.peer, e + ); + } + ConnectionError::VersionMismatch => { + error!("Connection to {} failed: version mismatch", self.peer); + } + ConnectionError::CidsExhausted => { + warn!( + "Connection to {} closed: connection IDs exhausted", + self.peer + ); + } + } + + record_error(close_reason.clone().into(), &self.send_txs_stats); + + // Determine next state based on close reason + // Fatal errors transition to Closing, recoverable errors transition to Retry + self.connection = match close_reason { + ConnectionError::VersionMismatch | ConnectionError::LocallyClosed => { + ConnectionState::Closing + } + _ => ConnectionState::Retry(0), + }; + } + /// Sends a batch of transactions using the provided `connection`. /// /// Each transaction in the batch is sent over the QUIC streams one at the @@ -178,7 +256,9 @@ impl ConnectionWorker { /// outdated and flag `skip_check_transaction_age` is unset, it will be /// dropped without being sent. /// - /// In case of error, it doesn't retry to send the same transactions again. + /// The method checks connection health before sending each transaction to + /// avoid operations on a closed connection. In case of error, it doesn't + /// retry to send the same transactions again but transitions to retry state. async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) { let now = timestamp(); if !self.skip_check_transaction_age @@ -187,8 +267,16 @@ impl ConnectionWorker { debug!("Drop outdated transaction batch for peer: {}", self.peer); return; } + let mut measure_send = Measure::start("send transaction batch"); for data in transactions.into_iter() { + // Check connection health before each send + if connection.close_reason().is_some() { + debug!("Connection closed during transaction batch sending"); + self.connection = ConnectionState::Retry(0); + break; + } + let result = send_data_over_stream(&connection, &data).await; if let Err(error) = result { @@ -198,6 +286,8 @@ impl ConnectionWorker { ); record_error(error, &self.send_txs_stats); self.connection = ConnectionState::Retry(0); + // Exit early since connection is likely broken + break; } else { self.send_txs_stats .successfully_sent diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index 2cc808ab88ef85..0a832ea47f151d 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -319,13 +319,13 @@ async fn test_connection_denied_until_allowed() { // Wait for the exchange to finish. tx_sender_shutdown.await; let stats = join_scheduler(scheduler_handle).await; - // in case of pruning, server closes the connection with code 1 and error - // message b"dropped". This might lead to connection error - // (ApplicationClosed::ApplicationClose) or to stream error - // (ConnectionLost::ApplicationClosed::ApplicationClose). - assert_eq!( - stats.write_error_connection_lost + stats.connection_error_application_closed, - 1 + // With proactive detection, we detect rejection immediately and retry within test duration. + // Expect at least 2 errors: initial rejection + retry attempts. + assert!( + stats.write_error_connection_lost + stats.connection_error_application_closed >= 2, + "Expected at least 2 connection errors, got write_error_connection_lost: {}, connection_error_application_closed: {}", + stats.write_error_connection_lost, + stats.connection_error_application_closed ); drop(throttling_connection); @@ -336,8 +336,8 @@ async fn test_connection_denied_until_allowed() { } // Check that if the client connection has been pruned, client manages to -// reestablish it. Pruning will lead to 1 packet loss, because when we send the -// next packet we will reestablish connection. +// reestablish it. With more packets, we can observe the impact of pruning +// even with proactive detection. #[tokio::test] async fn test_connection_pruned_and_reopened() { let SpawnTestServerResult { @@ -357,7 +357,7 @@ async fn test_connection_pruned_and_reopened() { // Setup sending txs let tx_size = 1; - let expected_num_txs: usize = 16; + let expected_num_txs: usize = 48; let SpawnTxGenerator { tx_receiver, tx_sender_shutdown, @@ -377,13 +377,11 @@ async fn test_connection_pruned_and_reopened() { // Wait for the exchange to finish. tx_sender_shutdown.await; let stats = join_scheduler(scheduler_handle).await; - // in case of pruning, server closes the connection with code 1 and error - // message b"dropped". This might lead to connection error - // (ApplicationClosed::ApplicationClose) or to stream error - // (ConnectionLost::ApplicationClosed::ApplicationClose). - assert_eq!( - stats.connection_error_application_closed + stats.write_error_connection_lost, - 1, + // Proactive detection catches pruning immediately, expect multiple retries. + assert!( + stats.connection_error_application_closed + stats.write_error_connection_lost >= 1, + "Expected at least 1 connection error from pruning and retries. Stats: {:?}", + stats ); // Exit server @@ -743,3 +741,80 @@ async fn test_update_identity() { exit.store(true, Ordering::Relaxed); server_handle.await.unwrap(); } + +// Test that connection close events are detected immediately via connection.closed() +// monitoring, not only when send operations fail. +#[tokio::test] +async fn test_proactive_connection_close_detection() { + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server( + None, + QuicServerParams { + max_connections_per_peer: 1, + max_unstaked_connections: 1, + ..QuicServerParams::default_for_tests() + }, + ); + + // Setup controlled transaction sending + let tx_size = 1; + let (tx_sender, tx_receiver) = channel(10); + + let sender_task = tokio::spawn(async move { + // Send first transaction to establish connection + tx_sender + .send(TransactionBatch::new(vec![vec![1u8; tx_size]])) + .await + .expect("Send first batch"); + + // Idle period where connection might be closed + sleep(Duration::from_millis(500)).await; + + // Attempt another send + drop(tx_sender.send(TransactionBatch::new(vec![vec![2u8; tx_size]]))); + }); + + let (scheduler_handle, _update_identity_sender, scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + // Verify first packet received + let mut first_packet_received = false; + let start = Instant::now(); + while !first_packet_received && start.elapsed() < Duration::from_secs(1) { + if let Ok(packets) = receiver.try_recv() { + if !packets.is_empty() { + first_packet_received = true; + } + } else { + sleep(Duration::from_millis(10)).await; + } + } + assert!(first_packet_received, "First packet should be received"); + + // Force connection close by exceeding max_connections_per_peer + let _pruning_connection = make_client_endpoint(&server_address, None).await; + + // Allow time for proactive detection + sleep(Duration::from_millis(200)).await; + + // Clean up + scheduler_cancel.cancel(); + let _ = sender_task.await; + let stats = join_scheduler(scheduler_handle).await; + + // Verify proactive close detection + assert!( + stats.connection_error_application_closed > 0 || stats.write_error_connection_lost > 0, + "Should detect connection close proactively. Stats: {:?}", + stats + ); + + // Exit server + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +}