From 242e887e448fd328738952c8c7d02aaca37480d4 Mon Sep 17 00:00:00 2001 From: Jon Cinque Date: Sat, 18 Sep 2021 12:40:43 +0200 Subject: [PATCH 1/2] client: Add retry logic on Pubsub 429s (#19990) (cherry picked from commit e9b066d4973960b073a1b9c3386e35bd85df43a6) --- client/src/pubsub_client.rs | 45 ++++++++++++++++++++++++++++++++----- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index d3dd63bcc30e7e..843365a0dca1d1 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -22,7 +22,8 @@ use { mpsc::{channel, Receiver}, Arc, RwLock, }, - thread::JoinHandle, + thread::{sleep, JoinHandle}, + time::Duration, }, thiserror::Error, tungstenite::{client::AutoStream, connect, Message, WebSocket}, @@ -164,6 +165,40 @@ pub type SignatureSubscription = ( pub struct PubsubClient {} +fn connect_with_retry( + url: Url, +) -> Result>, tungstenite::Error> { + let mut connection_retries = 5; + loop { + let result = connect(url.clone()).map(|(socket, _)| socket); + if let Err(tungstenite::Error::Http(response)) = &result { + if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 + { + let mut duration = Duration::from_millis(500); + if let Some(retry_after) = response.headers().get(reqwest::header::RETRY_AFTER) { + if let Ok(retry_after) = retry_after.to_str() { + if let Ok(retry_after) = retry_after.parse::() { + if retry_after < 120 { + duration = Duration::from_secs(retry_after); + } + } + } + } + + connection_retries -= 1; + debug!( + "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}", + response, connection_retries, duration + ); + + sleep(duration); + continue; + } + } + return result; + } +} + impl PubsubClient { pub fn logs_subscribe( url: &str, @@ -171,7 +206,7 @@ impl PubsubClient { config: RpcTransactionLogsConfig, ) -> Result { let url = Url::parse(url)?; - let (socket, _response) = connect(url)?; + let socket = connect_with_retry(url)?; let (sender, receiver) = channel(); let socket = Arc::new(RwLock::new(socket)); @@ -226,7 +261,7 @@ impl PubsubClient { pub fn slot_subscribe(url: &str) -> Result { let url = Url::parse(url)?; - let (socket, _response) = connect(url)?; + let socket = connect_with_retry(url)?; let (sender, receiver) = channel::(); let socket = Arc::new(RwLock::new(socket)); @@ -282,7 +317,7 @@ impl PubsubClient { config: Option, ) -> Result { let url = Url::parse(url)?; - let (socket, _response) = connect(url)?; + let socket = connect_with_retry(url)?; let (sender, receiver) = channel(); let socket = Arc::new(RwLock::new(socket)); @@ -348,7 +383,7 @@ impl PubsubClient { handler: impl Fn(SlotUpdate) + Send + 'static, ) -> Result, PubsubClientError> { let url = Url::parse(url)?; - let (socket, _response) = connect(url)?; + let socket = connect_with_retry(url)?; let socket = Arc::new(RwLock::new(socket)); let exit = Arc::new(AtomicBool::new(false)); From d8fd760e2311b18c56a999de6d27a4713677f531 Mon Sep 17 00:00:00 2001 From: Jon Cinque Date: Mon, 20 Sep 2021 12:36:30 +0200 Subject: [PATCH 2/2] Use exponential backoff for older version of tungstenite --- client/src/pubsub_client.rs | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index 843365a0dca1d1..725ee901f0e577 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -165,30 +165,18 @@ pub type SignatureSubscription = ( pub struct PubsubClient {} -fn connect_with_retry( - url: Url, -) -> Result>, tungstenite::Error> { +fn connect_with_retry(url: Url) -> Result, tungstenite::Error> { let mut connection_retries = 5; loop { let result = connect(url.clone()).map(|(socket, _)| socket); - if let Err(tungstenite::Error::Http(response)) = &result { - if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 - { - let mut duration = Duration::from_millis(500); - if let Some(retry_after) = response.headers().get(reqwest::header::RETRY_AFTER) { - if let Ok(retry_after) = retry_after.to_str() { - if let Ok(retry_after) = retry_after.parse::() { - if retry_after < 120 { - duration = Duration::from_secs(retry_after); - } - } - } - } + if let Err(tungstenite::Error::Http(status_code)) = &result { + if *status_code == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 { + let duration = Duration::from_millis(500) * 2u32.pow(5 - connection_retries); connection_retries -= 1; debug!( "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}", - response, connection_retries, duration + status_code, connection_retries, duration ); sleep(duration);