From 62d77cfa62f168386013e8d09cdf9e876cdb7056 Mon Sep 17 00:00:00 2001 From: Steven Luscher Date: Tue, 14 Oct 2025 22:18:54 +0000 Subject: [PATCH 1/2] Apply the retry code to the async pubsub client Create a test server ```ts import http from "http"; import { WebSocketServer } from "ws"; let attemptCount = 0; const server = http.createServer(); const wss = new WebSocketServer({ noServer: true }); wss.on("connection", (ws) => { ws.send("Connection accepted."); ws.on("message", (msg) => console.log(`Received: ${msg}`)); }); server.on("upgrade", (req, socket, head) => { attemptCount += 1; if (attemptCount <= 4) { socket.write("HTTP/1.1 429 Too Many Requests\r\n\r\n"); socket.destroy(); console.log(`Rejected connection #${attemptCount} (429)`); return; } wss.handleUpgrade(req, socket, head, (ws) => { wss.emit("connection", ws, req); console.log("Connection accepted on attempt", attemptCount); }); }); server.listen(8080, () => { console.log("Server listening on port 8080"); }); ``` Run `test_slot_subscription_async`: ``` Rejected connection #1 (429) Rejected connection #2 (429) Rejected connection #3 (429) Rejected connection #4 (429) Connection accepted on attempt 5 Received: {"id":1,"jsonrpc":"2.0","method":"slotSubscribe","params":[]} ``` --- .../src/nonblocking/pubsub_client.rs | 45 +++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/pubsub-client/src/nonblocking/pubsub_client.rs b/pubsub-client/src/nonblocking/pubsub_client.rs index 7e19efaf126..e915372d071 100644 --- a/pubsub-client/src/nonblocking/pubsub_client.rs +++ b/pubsub-client/src/nonblocking/pubsub_client.rs @@ -208,7 +208,11 @@ use { }, MaybeTlsStream, WebSocketStream, }, - tungstenite::{client::IntoClientRequest, Bytes}, + tungstenite::{ + client::IntoClientRequest, + http::{header, StatusCode}, + Bytes, + }, }; pub type PubsubClientResult = Result; @@ -271,12 +275,47 @@ pub struct PubsubClient { ws: JoinHandle, } +async fn connect_async_with_retry( + request: R, +) -> Result>, Box> { + let mut connection_retries = 5; + let client_request = request.into_client_request().map_err(Box::new)?; + loop { + let result = connect_async(client_request.clone()) + .await + .map(|(socket, _)| socket); + if let Err(tungstenite::Error::Http(response)) = &result { + if response.status() == StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 { + let mut duration = Duration::from_millis(500); + if let Some(retry_after) = response.headers().get(header::RETRY_AFTER) { + if let Ok(retry_after) = retry_after.to_str() { + if let Ok(retry_after) = retry_after.parse::() { + if retry_after < 120 { + duration = Duration::from_secs(retry_after); + } + } + } + } + + connection_retries -= 1; + debug!( + "Too many requests: server responded with {response:?}, {connection_retries} \ + retries left, pausing for {duration:?}" + ); + + sleep(duration).await; + continue; + } + } + return result.map_err(Box::new); + } +} + impl PubsubClient { pub async fn new(request: R) -> PubsubClientResult { let client_request = request.into_client_request().map_err(Box::new)?; - let (ws, _response) = connect_async(client_request) + let ws = connect_async_with_retry(client_request) .await - .map_err(Box::new) .map_err(PubsubClientError::ConnectionError)?; let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel(); From 6f0e8e8256660458df86fa6318f8fc7b2c17caac Mon Sep 17 00:00:00 2001 From: Steven Luscher Date: Mon, 20 Oct 2025 17:53:15 +0000 Subject: [PATCH 2/2] `s/async_with_retry/with_retry/` --- pubsub-client/src/nonblocking/pubsub_client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub-client/src/nonblocking/pubsub_client.rs b/pubsub-client/src/nonblocking/pubsub_client.rs index e915372d071..f7c43d6fe9d 100644 --- a/pubsub-client/src/nonblocking/pubsub_client.rs +++ b/pubsub-client/src/nonblocking/pubsub_client.rs @@ -275,7 +275,7 @@ pub struct PubsubClient { ws: JoinHandle, } -async fn connect_async_with_retry( +async fn connect_with_retry( request: R, ) -> Result>, Box> { let mut connection_retries = 5; @@ -314,7 +314,7 @@ async fn connect_async_with_retry( impl PubsubClient { pub async fn new(request: R) -> PubsubClientResult { let client_request = request.into_client_request().map_err(Box::new)?; - let ws = connect_async_with_retry(client_request) + let ws = connect_with_retry(client_request) .await .map_err(PubsubClientError::ConnectionError)?;