Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions pubsub-client/src/nonblocking/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,10 @@ pub enum PubsubClientError {
UrlParseError(#[from] url::ParseError),

#[error("unable to connect to server")]
ConnectionError(tokio_tungstenite::tungstenite::Error),
ConnectionError(Box<tokio_tungstenite::tungstenite::Error>),

#[error("websocket error")]
WsError(#[from] tokio_tungstenite::tungstenite::Error),
WsError(#[from] Box<tokio_tungstenite::tungstenite::Error>),

#[error("connection closed (({0})")]
ConnectionClosed(String),
Expand Down Expand Up @@ -276,6 +276,7 @@ impl PubsubClient {
let url = Url::parse(url)?;
let (ws, _response) = connect_async(url)
.await
.map_err(Box::new)
.map_err(PubsubClientError::ConnectionError)?;

let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -505,20 +506,20 @@ impl PubsubClient {
// Send close on shutdown signal
_ = (&mut shutdown_receiver) => {
let frame = CloseFrame { code: CloseCode::Normal, reason: "".into() };
ws.send(Message::Close(Some(frame))).await?;
ws.flush().await?;
ws.send(Message::Close(Some(frame))).await.map_err(Box::new)?;
ws.flush().await.map_err(Box::new)?;
break;
},
// Send `Message::Ping` each 10s if no any other communication
() = sleep(Duration::from_secs(10)) => {
ws.send(Message::Ping(Vec::new())).await?;
ws.send(Message::Ping(Vec::new())).await.map_err(Box::new)?;
},
// Read message for subscribe
Some((operation, params, response_sender)) = subscribe_receiver.recv() => {
request_id += 1;
let method = format!("{operation}Subscribe");
let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
ws.send(Message::Text(text)).await?;
ws.send(Message::Text(text)).await.map_err(Box::new)?;
requests_subscribe.insert(request_id, (operation, response_sender));
},
// Read message for unsubscribe
Expand All @@ -527,20 +528,20 @@ impl PubsubClient {
request_id += 1;
let method = format!("{operation}Unsubscribe");
let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":[sid]}).to_string();
ws.send(Message::Text(text)).await?;
ws.send(Message::Text(text)).await.map_err(Box::new)?;
requests_unsubscribe.insert(request_id, response_sender);
},
// Read message for other requests
Some((method, params, response_sender)) = request_receiver.recv() => {
request_id += 1;
let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
ws.send(Message::Text(text)).await?;
ws.send(Message::Text(text)).await.map_err(Box::new)?;
other_requests.insert(request_id, response_sender);
}
// Read incoming WebSocket message
next_msg = ws.next() => {
let msg = match next_msg {
Some(msg) => msg?,
Some(msg) => msg.map_err(Box::new)?,
None => break,
};
trace!("ws.next(): {:?}", &msg);
Expand All @@ -550,7 +551,7 @@ impl PubsubClient {
Message::Text(text) => text,
Message::Binary(_data) => continue, // Ignore
Message::Ping(data) => {
ws.send(Message::Pong(data)).await?;
ws.send(Message::Pong(data)).await.map_err(Box::new)?;
continue
},
Message::Pong(_data) => continue,
Expand Down
19 changes: 12 additions & 7 deletions pubsub-client/src/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,17 @@ where
writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
body: String,
) -> Result<u64, PubsubClientError> {
writable_socket.write().unwrap().send(Message::Text(body))?;
let message = writable_socket.write().unwrap().read()?;
writable_socket
.write()
.unwrap()
.send(Message::Text(body))
.map_err(Box::new)?;
let message = writable_socket.write().unwrap().read().map_err(Box::new)?;
Self::extract_subscription_id(message)
}

fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
let message_text = &message.into_text()?;
let message_text = &message.into_text().map_err(Box::new)?;

if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
if let Some(Number(x)) = json_msg.get("result") {
Expand Down Expand Up @@ -205,17 +209,18 @@ where
})
.to_string(),
))
.map_err(Box::new)
.map_err(|err| err.into())
}

fn read_message(
writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
) -> Result<Option<T>, PubsubClientError> {
let message = writable_socket.write().unwrap().read()?;
let message = writable_socket.write().unwrap().read().map_err(Box::new)?;
if message.is_ping() {
return Ok(None);
}
let message_text = &message.into_text()?;
let message_text = &message.into_text().map_err(Box::new)?;
if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
if let Some(Object(params)) = json_msg.get("params") {
if let Some(result) = params.get("result") {
Expand Down Expand Up @@ -300,7 +305,7 @@ pub struct PubsubClient {}

fn connect_with_retry(
url: Url,
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Box<tungstenite::Error>> {
let mut connection_retries = 5;
loop {
let result = connect(url.clone()).map(|(socket, _)| socket);
Expand All @@ -327,7 +332,7 @@ fn connect_with_retry(
continue;
}
}
return result;
return result.map_err(Box::new);
}
}

Expand Down
Loading