diff --git a/pubsub-client/src/nonblocking/pubsub_client.rs b/pubsub-client/src/nonblocking/pubsub_client.rs index 71782710031..f23b781846e 100644 --- a/pubsub-client/src/nonblocking/pubsub_client.rs +++ b/pubsub-client/src/nonblocking/pubsub_client.rs @@ -219,10 +219,10 @@ pub enum PubsubClientError { UrlParseError(#[from] url::ParseError), #[error("unable to connect to server")] - ConnectionError(tokio_tungstenite::tungstenite::Error), + ConnectionError(Box), #[error("websocket error")] - WsError(#[from] tokio_tungstenite::tungstenite::Error), + WsError(#[from] Box), #[error("connection closed (({0})")] ConnectionClosed(String), @@ -276,6 +276,7 @@ impl PubsubClient { let url = Url::parse(url)?; let (ws, _response) = connect_async(url) .await + .map_err(Box::new) .map_err(PubsubClientError::ConnectionError)?; let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel(); @@ -505,20 +506,20 @@ impl PubsubClient { // Send close on shutdown signal _ = (&mut shutdown_receiver) => { let frame = CloseFrame { code: CloseCode::Normal, reason: "".into() }; - ws.send(Message::Close(Some(frame))).await?; - ws.flush().await?; + ws.send(Message::Close(Some(frame))).await.map_err(Box::new)?; + ws.flush().await.map_err(Box::new)?; break; }, // Send `Message::Ping` each 10s if no any other communication () = sleep(Duration::from_secs(10)) => { - ws.send(Message::Ping(Vec::new())).await?; + ws.send(Message::Ping(Vec::new())).await.map_err(Box::new)?; }, // Read message for subscribe Some((operation, params, response_sender)) = subscribe_receiver.recv() => { request_id += 1; let method = format!("{operation}Subscribe"); let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string(); - ws.send(Message::Text(text)).await?; + ws.send(Message::Text(text)).await.map_err(Box::new)?; requests_subscribe.insert(request_id, (operation, response_sender)); }, // Read message for unsubscribe @@ -527,20 +528,20 @@ impl PubsubClient { request_id += 1; let method = format!("{operation}Unsubscribe"); let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":[sid]}).to_string(); - ws.send(Message::Text(text)).await?; + ws.send(Message::Text(text)).await.map_err(Box::new)?; requests_unsubscribe.insert(request_id, response_sender); }, // Read message for other requests Some((method, params, response_sender)) = request_receiver.recv() => { request_id += 1; let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string(); - ws.send(Message::Text(text)).await?; + ws.send(Message::Text(text)).await.map_err(Box::new)?; other_requests.insert(request_id, response_sender); } // Read incoming WebSocket message next_msg = ws.next() => { let msg = match next_msg { - Some(msg) => msg?, + Some(msg) => msg.map_err(Box::new)?, None => break, }; trace!("ws.next(): {:?}", &msg); @@ -550,7 +551,7 @@ impl PubsubClient { Message::Text(text) => text, Message::Binary(_data) => continue, // Ignore Message::Ping(data) => { - ws.send(Message::Pong(data)).await?; + ws.send(Message::Pong(data)).await.map_err(Box::new)?; continue }, Message::Pong(_data) => continue, diff --git a/pubsub-client/src/pubsub_client.rs b/pubsub-client/src/pubsub_client.rs index a93dd312838..0c7d789a022 100644 --- a/pubsub-client/src/pubsub_client.rs +++ b/pubsub-client/src/pubsub_client.rs @@ -165,13 +165,17 @@ where writable_socket: &Arc>>>, body: String, ) -> Result { - writable_socket.write().unwrap().send(Message::Text(body))?; - let message = writable_socket.write().unwrap().read()?; + writable_socket + .write() + .unwrap() + .send(Message::Text(body)) + .map_err(Box::new)?; + let message = writable_socket.write().unwrap().read().map_err(Box::new)?; Self::extract_subscription_id(message) } fn extract_subscription_id(message: Message) -> Result { - let message_text = &message.into_text()?; + let message_text = &message.into_text().map_err(Box::new)?; if let Ok(json_msg) = serde_json::from_str::>(message_text) { if let Some(Number(x)) = json_msg.get("result") { @@ -205,17 +209,18 @@ where }) .to_string(), )) + .map_err(Box::new) .map_err(|err| err.into()) } fn read_message( writable_socket: &Arc>>>, ) -> Result, PubsubClientError> { - let message = writable_socket.write().unwrap().read()?; + let message = writable_socket.write().unwrap().read().map_err(Box::new)?; if message.is_ping() { return Ok(None); } - let message_text = &message.into_text()?; + let message_text = &message.into_text().map_err(Box::new)?; if let Ok(json_msg) = serde_json::from_str::>(message_text) { if let Some(Object(params)) = json_msg.get("params") { if let Some(result) = params.get("result") { @@ -300,7 +305,7 @@ pub struct PubsubClient {} fn connect_with_retry( url: Url, -) -> Result>, tungstenite::Error> { +) -> Result>, Box> { let mut connection_retries = 5; loop { let result = connect(url.clone()).map(|(socket, _)| socket); @@ -327,7 +332,7 @@ fn connect_with_retry( continue; } } - return result; + return result.map_err(Box::new); } }