Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 103 additions & 13 deletions tpu-client-next/src/connection_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use {
transaction_batch::TransactionBatch,
QuicError,
},
quinn::{ConnectError, Connection, Endpoint},
quinn::{ConnectError, Connection, ConnectionError, Endpoint},
solana_clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS},
solana_measure::measure::Measure,
solana_time_utils::timestamp,
Expand Down Expand Up @@ -70,6 +70,9 @@ impl Drop for ConnectionState {

/// [`ConnectionWorker`] holds connection to the validator with address `peer`.
///
/// The worker proactively monitors connection health while processing
/// transactions, detecting connection closures immediately rather than waiting
/// for send failures.
/// If connection has been closed, [`ConnectionWorker`] tries to reconnect
/// `max_reconnect_attempts` times. If connection is in `Active` state, it sends
/// transactions received from `transactions_receiver`. Additionally, it
Expand Down Expand Up @@ -126,7 +129,8 @@ impl ConnectionWorker {
///
/// This method manages the connection to the peer and handles state
/// transitions. It runs indefinitely until the connection is closed or an
/// unrecoverable error occurs.
/// unrecoverable error occurs. The worker monitors both incoming transactions
/// and connection health simultaneously when in the Active state.
pub async fn run(&mut self) {
let cancel = self.cancel.clone();

Expand All @@ -140,16 +144,29 @@ impl ConnectionWorker {
self.create_connection(0).await;
}
ConnectionState::Active(connection) => {
let Some(transactions) = self.transactions_receiver.recv().await else {
debug!(
"Transactions sender has been dropped for peer: {}",
self.peer
);
self.connection = ConnectionState::Closing;
continue;
};
self.send_transactions(connection.clone(), transactions)
.await;
tokio::select! {
// Process incoming transactions
transactions = self.transactions_receiver.recv() => {
match transactions {
Some(batch) => {
self.send_transactions(connection.clone(), batch).await;
}
None => {
debug!(
"Transactions sender has been dropped for peer: {}",
self.peer
);
self.connection = ConnectionState::Closing;
}
}
}

// Monitor connection health proactively
close_reason = connection.closed() => {
self.handle_connection_closed(close_reason).await;
continue;
}
}
}
ConnectionState::Retry(num_reconnects) => {
if *num_reconnects > self.max_reconnect_attempts {
Expand All @@ -170,6 +187,67 @@ impl ConnectionWorker {
}
}

/// Handles connection closure events detected by the connection monitor.
///
/// This method logs the close reason with appropriate severity based on
/// the type of closure, records statistics, and determines whether to
/// attempt reconnection based on the error type.
async fn handle_connection_closed(&mut self, close_reason: ConnectionError) {
match &close_reason {
ConnectionError::ConnectionClosed(close) => {
debug!(
"Connection to {} closed by peer: code={} reason={:?}",
self.peer,
close.error_code,
String::from_utf8_lossy(&close.reason)
);
}
ConnectionError::ApplicationClosed(close) => {
debug!(
"Connection to {} closed by application: code={} reason={:?}",
self.peer,
close.error_code,
String::from_utf8_lossy(&close.reason)
);
}
ConnectionError::LocallyClosed => {
debug!("Connection to {} closed locally", self.peer);
}
ConnectionError::TimedOut => {
warn!("Connection to {} timed out", self.peer);
}
ConnectionError::Reset => {
warn!("Connection to {} reset", self.peer);
}
ConnectionError::TransportError(e) => {
warn!(
"Connection to {} encountered transport error: {}",
self.peer, e
);
}
ConnectionError::VersionMismatch => {
error!("Connection to {} failed: version mismatch", self.peer);
}
ConnectionError::CidsExhausted => {
warn!(
"Connection to {} closed: connection IDs exhausted",
self.peer
);
}
}

record_error(close_reason.clone().into(), &self.send_txs_stats);
Comment thread
KirillLykov marked this conversation as resolved.

// Determine next state based on close reason
// Fatal errors transition to Closing, recoverable errors transition to Retry
self.connection = match close_reason {
ConnectionError::VersionMismatch | ConnectionError::LocallyClosed => {
ConnectionState::Closing
}
_ => ConnectionState::Retry(0),
};
}

/// Sends a batch of transactions using the provided `connection`.
///
/// Each transaction in the batch is sent over the QUIC streams one at the
Expand All @@ -178,7 +256,9 @@ impl ConnectionWorker {
/// outdated and flag `skip_check_transaction_age` is unset, it will be
/// dropped without being sent.
///
/// In case of error, it doesn't retry to send the same transactions again.
/// The method checks connection health before sending each transaction to
/// avoid operations on a closed connection. In case of error, it doesn't
/// retry to send the same transactions again but transitions to retry state.
async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) {
let now = timestamp();
if !self.skip_check_transaction_age
Expand All @@ -187,8 +267,16 @@ impl ConnectionWorker {
debug!("Drop outdated transaction batch for peer: {}", self.peer);
return;
}

let mut measure_send = Measure::start("send transaction batch");
for data in transactions.into_iter() {
// Check connection health before each send
if connection.close_reason().is_some() {
debug!("Connection closed during transaction batch sending");
self.connection = ConnectionState::Retry(0);
break;
}

let result = send_data_over_stream(&connection, &data).await;

if let Err(error) = result {
Expand All @@ -198,6 +286,8 @@ impl ConnectionWorker {
);
record_error(error, &self.send_txs_stats);
self.connection = ConnectionState::Retry(0);
// Exit early since connection is likely broken
break;
} else {
self.send_txs_stats
.successfully_sent
Expand Down
109 changes: 92 additions & 17 deletions tpu-client-next/tests/connection_workers_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,13 @@ async fn test_connection_denied_until_allowed() {
// Wait for the exchange to finish.
tx_sender_shutdown.await;
let stats = join_scheduler(scheduler_handle).await;
// in case of pruning, server closes the connection with code 1 and error
// message b"dropped". This might lead to connection error
// (ApplicationClosed::ApplicationClose) or to stream error
// (ConnectionLost::ApplicationClosed::ApplicationClose).
assert_eq!(
stats.write_error_connection_lost + stats.connection_error_application_closed,
1
// With proactive detection, we detect rejection immediately and retry within test duration.
// Expect at least 2 errors: initial rejection + retry attempts.
assert!(
stats.write_error_connection_lost + stats.connection_error_application_closed >= 2,
"Expected at least 2 connection errors, got write_error_connection_lost: {}, connection_error_application_closed: {}",
stats.write_error_connection_lost,
stats.connection_error_application_closed
);

drop(throttling_connection);
Expand All @@ -336,8 +336,8 @@ async fn test_connection_denied_until_allowed() {
}

// Check that if the client connection has been pruned, client manages to
// reestablish it. Pruning will lead to 1 packet loss, because when we send the
// next packet we will reestablish connection.
// reestablish it. With more packets, we can observe the impact of pruning
// even with proactive detection.
#[tokio::test]
async fn test_connection_pruned_and_reopened() {
let SpawnTestServerResult {
Expand All @@ -357,7 +357,7 @@ async fn test_connection_pruned_and_reopened() {

// Setup sending txs
let tx_size = 1;
let expected_num_txs: usize = 16;
let expected_num_txs: usize = 48;
let SpawnTxGenerator {
tx_receiver,
tx_sender_shutdown,
Expand All @@ -377,13 +377,11 @@ async fn test_connection_pruned_and_reopened() {
// Wait for the exchange to finish.
tx_sender_shutdown.await;
let stats = join_scheduler(scheduler_handle).await;
// in case of pruning, server closes the connection with code 1 and error
// message b"dropped". This might lead to connection error
// (ApplicationClosed::ApplicationClose) or to stream error
// (ConnectionLost::ApplicationClosed::ApplicationClose).
assert_eq!(
stats.connection_error_application_closed + stats.write_error_connection_lost,
1,
// Proactive detection catches pruning immediately, expect multiple retries.
assert!(
stats.connection_error_application_closed + stats.write_error_connection_lost >= 1,
"Expected at least 1 connection error from pruning and retries. Stats: {:?}",
stats
);

// Exit server
Expand Down Expand Up @@ -743,3 +741,80 @@ async fn test_update_identity() {
exit.store(true, Ordering::Relaxed);
server_handle.await.unwrap();
}

// Test that connection close events are detected immediately via connection.closed()
// monitoring, not only when send operations fail.
#[tokio::test]
async fn test_proactive_connection_close_detection() {
let SpawnTestServerResult {
join_handle: server_handle,
exit,
receiver,
server_address,
stats: _stats,
} = setup_quic_server(
None,
QuicServerParams {
max_connections_per_peer: 1,
max_unstaked_connections: 1,
..QuicServerParams::default_for_tests()
},
);

// Setup controlled transaction sending
let tx_size = 1;
let (tx_sender, tx_receiver) = channel(10);

let sender_task = tokio::spawn(async move {
// Send first transaction to establish connection
tx_sender
.send(TransactionBatch::new(vec![vec![1u8; tx_size]]))
.await
.expect("Send first batch");

// Idle period where connection might be closed
sleep(Duration::from_millis(500)).await;

// Attempt another send
drop(tx_sender.send(TransactionBatch::new(vec![vec![2u8; tx_size]])));
});

let (scheduler_handle, _update_identity_sender, scheduler_cancel) =
setup_connection_worker_scheduler(server_address, tx_receiver, None).await;

// Verify first packet received
let mut first_packet_received = false;
let start = Instant::now();
while !first_packet_received && start.elapsed() < Duration::from_secs(1) {
if let Ok(packets) = receiver.try_recv() {
if !packets.is_empty() {
first_packet_received = true;
}
} else {
sleep(Duration::from_millis(10)).await;
}
}
assert!(first_packet_received, "First packet should be received");

// Force connection close by exceeding max_connections_per_peer
let _pruning_connection = make_client_endpoint(&server_address, None).await;

// Allow time for proactive detection
sleep(Duration::from_millis(200)).await;

// Clean up
scheduler_cancel.cancel();
let _ = sender_task.await;
let stats = join_scheduler(scheduler_handle).await;

// Verify proactive close detection
assert!(
stats.connection_error_application_closed > 0 || stats.write_error_connection_lost > 0,
"Should detect connection close proactively. Stats: {:?}",
stats
);

// Exit server
exit.store(true, Ordering::Relaxed);
server_handle.await.unwrap();
}
Loading