From 0f30ca23bd171e374fa5af46ab2d2ea7e0fed3ca Mon Sep 17 00:00:00 2001 From: dippi Date: Sat, 3 Jul 2021 19:35:09 +0200 Subject: [PATCH 1/9] Fix typo in comments --- src/resp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/resp.rs b/src/resp.rs index e646fcc..97e4c32 100644 --- a/src/resp.rs +++ b/src/resp.rs @@ -85,7 +85,7 @@ impl RespValue { } } -/// A trait to be implemented for every time which can be read from a RESP value. +/// A trait to be implemented for every type which can be read from a RESP value. /// /// Implementing this trait on a type means that type becomes a valid return type for calls such as `send` on /// `client::PairedConnection` From 3166c8e6eaabdf4bda2281c33710693a16560c09 Mon Sep 17 00:00:00 2001 From: dippi Date: Sat, 3 Jul 2021 19:42:51 +0200 Subject: [PATCH 2/9] Refactor conversion to HashMap --- src/resp.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/resp.rs b/src/resp.rs index 97e4c32..c38d665 100644 --- a/src/resp.rs +++ b/src/resp.rs @@ -207,22 +207,23 @@ impl FromResp fo fn from_resp_int(resp: RespValue) -> Result, Error> { match resp { RespValue::Array(ary) => { - let mut map = HashMap::with_capacity_and_hasher(ary.len(), S::default()); - let mut items = ary.into_iter(); - - while let Some(k) = items.next() { - let key = K::from_resp(k)?; - let value = T::from_resp(items.next().ok_or_else(|| { - error::resp( + let len = ary.len(); + if len % 2 != 0 { + return Err(error::resp( "Cannot convert an odd number of elements into a hashmap", - "".into(), - ) - })?)?; - - map.insert(key, value); + RespValue::Array(ary), + )); } - Ok(map) + let mut items = ary.into_iter(); + (0..(len / 2)) + .map(|_| { + // It's safe to unwrap, because we checked the length before + let key = K::from_resp(items.next().unwrap())?; + let value = T::from_resp(items.next().unwrap())?; + Ok((key, value)) + }) + .collect() } _ => Err(error::resp("Cannot be converted into a hashmap", resp)), } From cb4b6c1ba56f9dfdb6ea06a6ee116b55630bb09e Mon Sep 17 00:00:00 2001 From: dippi Date: Sat, 3 Jul 2021 19:43:39 +0200 Subject: [PATCH 3/9] Refactor string_into_resp macro --- src/resp.rs | 57 +++++++++++++---------------------------------------- 1 file changed, 14 insertions(+), 43 deletions(-) diff --git a/src/resp.rs b/src/resp.rs index c38d665..e70fb83 100644 --- a/src/resp.rs +++ b/src/resp.rs @@ -210,7 +210,7 @@ impl FromResp fo let len = ary.len(); if len % 2 != 0 { return Err(error::resp( - "Cannot convert an odd number of elements into a hashmap", + "Cannot convert an odd number of elements into a hashmap", RespValue::Array(ary), )); } @@ -366,52 +366,23 @@ pub trait IntoRespString { } macro_rules! string_into_resp { - ($t:ty) => { + ($(<$l:lifetime>)?|$i:ident : $t:ty| $e:expr) => { + impl $(<$l>)? IntoRespString for $t { + fn into_resp_string(self) -> RespValue { + let $i = self; + RespValue::BulkString($e) + } + } into_resp!($t, into_resp_string); - }; -} - -impl IntoRespString for String { - fn into_resp_string(self) -> RespValue { - RespValue::BulkString(self.into_bytes()) - } -} -string_into_resp!(String); - -impl<'a> IntoRespString for &'a String { - fn into_resp_string(self) -> RespValue { - RespValue::BulkString(self.as_bytes().into()) - } -} -string_into_resp!(&'a String); - -impl<'a> IntoRespString for &'a str { - fn into_resp_string(self) -> RespValue { - RespValue::BulkString(self.as_bytes().into()) - } -} -string_into_resp!(&'a str); - -impl<'a> IntoRespString for &'a [u8] { - fn into_resp_string(self) -> RespValue { - RespValue::BulkString(self.to_vec()) } } -string_into_resp!(&'a [u8]); -impl IntoRespString for Vec { - fn into_resp_string(self) -> RespValue { - RespValue::BulkString(self) - } -} -string_into_resp!(Vec); - -impl IntoRespString for Arc { - fn into_resp_string(self) -> RespValue { - RespValue::BulkString(self.as_bytes().into()) - } -} -string_into_resp!(Arc); +string_into_resp!(|it: String| it.into_bytes()); +string_into_resp!(<'a>|it: &'a String| it.as_bytes().into()); +string_into_resp!(<'a>|it: &'a str| it.as_bytes().into()); +string_into_resp!(<'a>|it: &'a [u8]| it.to_vec()); +string_into_resp!(|it: Vec| it); +string_into_resp!(|it: Arc| it.as_bytes().into()); pub trait IntoRespInteger { fn into_resp_integer(self) -> RespValue; From f2056c5d06a37638478e8752b265f08fe38b49bf Mon Sep 17 00:00:00 2001 From: dippi Date: Sat, 3 Jul 2021 19:45:24 +0200 Subject: [PATCH 4/9] Compact message pattern matching --- src/client/pubsub.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/client/pubsub.rs b/src/client/pubsub.rs index 4a91574..93ab88f 100644 --- a/src/client/pubsub.rs +++ b/src/client/pubsub.rs @@ -139,15 +139,10 @@ impl PubsubConnectionInner { messages.pop(), messages.pop(), ) { - (Some(msg), Some(topic), Some(message_type), None) => { - match (msg, String::from_resp(topic), message_type) { - (msg, Ok(topic), resp::RespValue::BulkString(bytes)) => (bytes, topic, msg), - _ => return Err(error::unexpected("Incorrect format of a PUBSUB message")), - } - } - (Some(msg), Some(_), Some(topic), Some(message_type)) => { - match (msg, String::from_resp(topic), message_type) { - (msg, Ok(topic), resp::RespValue::BulkString(bytes)) => (bytes, topic, msg), + (Some(msg), Some(topic), Some(message_type), None) + | (Some(msg), Some(_), Some(topic), Some(message_type)) => { + match (String::from_resp(topic), message_type) { + (Ok(topic), resp::RespValue::BulkString(bytes)) => (bytes, topic, msg), _ => return Err(error::unexpected("Incorrect format of a PUBSUB message")), } } From f9038259487a37a76f6078c268ad51f8592caa7c Mon Sep 17 00:00:00 2001 From: dippi Date: Sat, 3 Jul 2021 20:02:15 +0200 Subject: [PATCH 5/9] Refactor handle_messages into smaller functions --- src/client/pubsub.rs | 164 +++++++++++++++++++------------------------ 1 file changed, 74 insertions(+), 90 deletions(-) diff --git a/src/client/pubsub.rs b/src/client/pubsub.rs index 93ab88f..f120236 100644 --- a/src/client/pubsub.rs +++ b/src/client/pubsub.rs @@ -160,96 +160,16 @@ impl PubsubConnectionInner { }; match message_type.as_slice() { - b"subscribe" => match self.pending_subs.remove(&topic) { - Some((sender, signal)) => { - self.subscriptions.insert(topic, sender); - signal - .send(()) - .map_err(|()| error::internal("Error confirming subscription"))? - } - None => { - return Err(error::internal(format!( - "Received unexpected subscribe notification for topic: {}", - topic - ))); - } - }, - b"psubscribe" => match self.pending_psubs.remove(&topic) { - Some((sender, signal)) => { - self.psubscriptions.insert(topic, sender); - signal - .send(()) - .map_err(|()| error::internal("Error confirming subscription"))? - } - None => { - return Err(error::internal(format!( - "Received unexpected subscribe notification for topic: {}", - topic - ))); - } - }, - b"unsubscribe" => { - match self.subscriptions.entry(topic) { - Entry::Occupied(entry) => { - entry.remove_entry(); - } - Entry::Vacant(vacant) => { - return Err(error::internal(format!( - "Unexpected unsubscribe message: {}", - vacant.key() - ))); - } - } - if self.subscriptions.is_empty() { - return Ok(false); - } + b"subscribe" => { + process_subscribe(&mut self.pending_subs, &mut self.subscriptions, topic) } - b"punsubscribe" => { - match self.psubscriptions.entry(topic) { - Entry::Occupied(entry) => { - entry.remove_entry(); - } - Entry::Vacant(vacant) => { - return Err(error::internal(format!( - "Unexpected unsubscribe message: {}", - vacant.key() - ))); - } - } - if self.psubscriptions.is_empty() { - return Ok(false); - } + b"psubscribe" => { + process_subscribe(&mut self.pending_psubs, &mut self.psubscriptions, topic) } - b"message" => match self.subscriptions.get(&topic) { - Some(sender) => { - if let Err(error) = sender.unbounded_send(Ok(msg)) { - if !error.is_disconnected() { - return Err(error::internal(format!("Cannot send message: {}", error))); - } - } - } - None => { - return Err(error::internal(format!( - "Unexpected message on topic: {}", - topic - ))); - } - }, - b"pmessage" => match self.psubscriptions.get(&topic) { - Some(sender) => { - if let Err(error) = sender.unbounded_send(Ok(msg)) { - if !error.is_disconnected() { - return Err(error::internal(format!("Cannot send message: {}", error))); - } - } - } - None => { - return Err(error::internal(format!( - "Unexpected message on topic: {}", - topic - ))); - } - }, + b"unsubscribe" => process_unsubscribe(&mut self.subscriptions, topic), + b"punsubscribe" => process_unsubscribe(&mut self.psubscriptions, topic), + b"message" => process_message(&self.subscriptions, &topic, msg), + b"pmessage" => process_message(&self.psubscriptions, &topic, msg), t => { return Err(error::internal(format!( "Unexpected data on Pub/Sub connection: {}", @@ -257,8 +177,6 @@ impl PubsubConnectionInner { ))); } } - - Ok(true) } /// Returns true, if there are still valid subscriptions at the end, or false if not, i.e. the whole thing can be dropped. @@ -314,6 +232,72 @@ impl PubsubConnectionInner { } } +fn process_subscribe( + pending_subs: &mut BTreeMap)>, + subscriptions: &mut BTreeMap, + topic: String, +) -> Result { + match pending_subs.remove(&topic) { + Some((sender, signal)) => { + subscriptions.insert(topic, sender); + signal + .send(()) + .map_err(|()| error::internal("Error confirming subscription"))? + } + None => { + return Err(error::internal(format!( + "Received unexpected subscribe notification for topic: {}", + topic + ))); + } + }; + Ok(true) +} + +fn process_unsubscribe( + subscriptions: &mut BTreeMap, + topic: String, +) -> Result { + match subscriptions.entry(topic) { + Entry::Occupied(entry) => { + entry.remove_entry(); + } + Entry::Vacant(vacant) => { + return Err(error::internal(format!( + "Unexpected unsubscribe message: {}", + vacant.key() + ))); + } + } + if subscriptions.is_empty() { + return Ok(false); + } + Ok(true) +} + +fn process_message( + subscriptions: &BTreeMap, + topic: &str, + msg: resp::RespValue, +) -> Result { + match subscriptions.get(topic) { + Some(sender) => { + if let Err(error) = sender.unbounded_send(Ok(msg)) { + if !error.is_disconnected() { + return Err(error::internal(format!("Cannot send message: {}", error))); + } + } + } + None => { + return Err(error::internal(format!( + "Unexpected message on topic: {}", + topic + ))); + } + }; + Ok(true) +} + impl Future for PubsubConnectionInner { type Output = Result<(), error::Error>; From 6b4b4341903f4257784a729ca0165eae4f8daaec Mon Sep 17 00:00:00 2001 From: dippi Date: Sat, 3 Jul 2021 20:09:13 +0200 Subject: [PATCH 6/9] Refactor process_subscribe --- src/client/pubsub.rs | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/src/client/pubsub.rs b/src/client/pubsub.rs index f120236..086a612 100644 --- a/src/client/pubsub.rs +++ b/src/client/pubsub.rs @@ -237,20 +237,14 @@ fn process_subscribe( subscriptions: &mut BTreeMap, topic: String, ) -> Result { - match pending_subs.remove(&topic) { - Some((sender, signal)) => { - subscriptions.insert(topic, sender); - signal - .send(()) - .map_err(|()| error::internal("Error confirming subscription"))? - } - None => { - return Err(error::internal(format!( - "Received unexpected subscribe notification for topic: {}", - topic - ))); - } - }; + let (sender, signal) = pending_subs.remove(&topic).ok_or(error::internal(format!( + "Received unexpected subscribe notification for topic: {}", + topic + )))?; + subscriptions.insert(topic, sender); + signal + .send(()) + .map_err(|()| error::internal("Error confirming subscription"))?; Ok(true) } From c69bfdce9d8398e5b81d22cdc193e5b757d2ae91 Mon Sep 17 00:00:00 2001 From: dippi Date: Sat, 3 Jul 2021 20:10:15 +0200 Subject: [PATCH 7/9] Refactor process_unsubscribe --- src/client/pubsub.rs | 28 +++++++++------------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/src/client/pubsub.rs b/src/client/pubsub.rs index 086a612..f5f2311 100644 --- a/src/client/pubsub.rs +++ b/src/client/pubsub.rs @@ -8,7 +8,7 @@ * except according to those terms. */ -use std::collections::{btree_map::Entry, BTreeMap}; +use std::collections::BTreeMap; use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; @@ -166,8 +166,8 @@ impl PubsubConnectionInner { b"psubscribe" => { process_subscribe(&mut self.pending_psubs, &mut self.psubscriptions, topic) } - b"unsubscribe" => process_unsubscribe(&mut self.subscriptions, topic), - b"punsubscribe" => process_unsubscribe(&mut self.psubscriptions, topic), + b"unsubscribe" => process_unsubscribe(&mut self.subscriptions, &topic), + b"punsubscribe" => process_unsubscribe(&mut self.psubscriptions, &topic), b"message" => process_message(&self.subscriptions, &topic, msg), b"pmessage" => process_message(&self.psubscriptions, &topic, msg), t => { @@ -250,23 +250,13 @@ fn process_subscribe( fn process_unsubscribe( subscriptions: &mut BTreeMap, - topic: String, + topic: &str, ) -> Result { - match subscriptions.entry(topic) { - Entry::Occupied(entry) => { - entry.remove_entry(); - } - Entry::Vacant(vacant) => { - return Err(error::internal(format!( - "Unexpected unsubscribe message: {}", - vacant.key() - ))); - } - } - if subscriptions.is_empty() { - return Ok(false); - } - Ok(true) + subscriptions.remove(topic).ok_or(error::internal(format!( + "Unexpected unsubscribe message: {}", + topic + )))?; + Ok(!subscriptions.is_empty()) } fn process_message( From f17107170770f58b5416d7d1088b8ed0711dcd65 Mon Sep 17 00:00:00 2001 From: dippi Date: Sat, 3 Jul 2021 20:11:01 +0200 Subject: [PATCH 8/9] Refactor process_message --- src/client/pubsub.rs | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/src/client/pubsub.rs b/src/client/pubsub.rs index f5f2311..b2030ac 100644 --- a/src/client/pubsub.rs +++ b/src/client/pubsub.rs @@ -264,22 +264,16 @@ fn process_message( topic: &str, msg: resp::RespValue, ) -> Result { - match subscriptions.get(topic) { - Some(sender) => { - if let Err(error) = sender.unbounded_send(Ok(msg)) { - if !error.is_disconnected() { - return Err(error::internal(format!("Cannot send message: {}", error))); - } - } - } - None => { - return Err(error::internal(format!( - "Unexpected message on topic: {}", - topic - ))); + let sender = subscriptions.get(topic).ok_or(error::internal(format!( + "Unexpected message on topic: {}", + topic + )))?; + match sender.unbounded_send(Ok(msg)) { + Err(error) if !error.is_disconnected() => { + Err(error::internal(format!("Cannot send message: {}", error))) } - }; - Ok(true) + _ => Ok(true), + } } impl Future for PubsubConnectionInner { From 45200345ab9c9a2305008e274649975232a8599e Mon Sep 17 00:00:00 2001 From: dippi Date: Sat, 3 Jul 2021 20:31:41 +0200 Subject: [PATCH 9/9] Use a single loop when sending error to subscribers --- src/client/pubsub.rs | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/src/client/pubsub.rs b/src/client/pubsub.rs index b2030ac..edead40 100644 --- a/src/client/pubsub.rs +++ b/src/client/pubsub.rs @@ -189,18 +189,14 @@ impl PubsubConnectionInner { return Ok(false); } else { // This can only happen if the connection is closed server-side - for sub in self.subscriptions.values() { + let subs = self.subscriptions.values(); + let psubs = self.psubscriptions.values(); + for sub in subs.chain(psubs) { sub.unbounded_send(Err(error::Error::Connection( ConnectionReason::NotConnected, ))) .unwrap(); } - for psub in self.psubscriptions.values() { - psub.unbounded_send(Err(error::Error::Connection( - ConnectionReason::NotConnected, - ))) - .unwrap(); - } return Err(error::Error::Connection(ConnectionReason::NotConnected)); } } @@ -211,20 +207,15 @@ impl PubsubConnectionInner { } } Poll::Ready(Some(Err(e))) => { - for sub in self.subscriptions.values() { + let subs = self.subscriptions.values(); + let psubs = self.psubscriptions.values(); + for sub in subs.chain(psubs) { sub.unbounded_send(Err(error::unexpected(format!( "Connection is in the process of failing due to: {}", e )))) .unwrap(); } - for psub in self.psubscriptions.values() { - psub.unbounded_send(Err(error::unexpected(format!( - "Connection is in the process of failing due to: {}", - e - )))) - .unwrap(); - } return Err(e); } }