Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions client/src/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use {
mpsc::{channel, Receiver},
Arc, RwLock,
},
thread::JoinHandle,
thread::{sleep, JoinHandle},
time::Duration,
},
thiserror::Error,
tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
Expand Down Expand Up @@ -165,14 +166,48 @@ pub type SignatureSubscription = (

pub struct PubsubClient {}

fn connect_with_retry(
url: Url,
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
let mut connection_retries = 5;
loop {
let result = connect(url.clone()).map(|(socket, _)| socket);
if let Err(tungstenite::Error::Http(response)) = &result {
if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0
{
let mut duration = Duration::from_millis(500);
if let Some(retry_after) = response.headers().get(reqwest::header::RETRY_AFTER) {
if let Ok(retry_after) = retry_after.to_str() {
if let Ok(retry_after) = retry_after.parse::<u64>() {
if retry_after < 120 {
duration = Duration::from_secs(retry_after);
}
}
}
}

connection_retries -= 1;
debug!(
"Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
response, connection_retries, duration
);

sleep(duration);
continue;
}
}
return result;
}
}

impl PubsubClient {
pub fn logs_subscribe(
url: &str,
filter: RpcTransactionLogsFilter,
config: RpcTransactionLogsConfig,
) -> Result<LogsSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();

let socket = Arc::new(RwLock::new(socket));
Expand Down Expand Up @@ -227,7 +262,7 @@ impl PubsubClient {

pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel::<SlotInfo>();

let socket = Arc::new(RwLock::new(socket));
Expand Down Expand Up @@ -283,7 +318,7 @@ impl PubsubClient {
config: Option<RpcSignatureSubscribeConfig>,
) -> Result<SignatureSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();

let socket = Arc::new(RwLock::new(socket));
Expand Down Expand Up @@ -349,7 +384,7 @@ impl PubsubClient {
handler: impl Fn(SlotUpdate) + Send + 'static,
) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let socket = connect_with_retry(url)?;

let socket = Arc::new(RwLock::new(socket));
let exit = Arc::new(AtomicBool::new(false));
Expand Down