Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 63 additions & 115 deletions src/client/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* except according to those terms.
*/

use std::collections::{btree_map::Entry, BTreeMap};
use std::collections::BTreeMap;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
Expand Down Expand Up @@ -139,15 +139,10 @@ impl PubsubConnectionInner {
messages.pop(),
messages.pop(),
) {
(Some(msg), Some(topic), Some(message_type), None) => {
match (msg, String::from_resp(topic), message_type) {
(msg, Ok(topic), resp::RespValue::BulkString(bytes)) => (bytes, topic, msg),
_ => return Err(error::unexpected("Incorrect format of a PUBSUB message")),
}
}
(Some(msg), Some(_), Some(topic), Some(message_type)) => {
match (msg, String::from_resp(topic), message_type) {
(msg, Ok(topic), resp::RespValue::BulkString(bytes)) => (bytes, topic, msg),
(Some(msg), Some(topic), Some(message_type), None)
| (Some(msg), Some(_), Some(topic), Some(message_type)) => {
match (String::from_resp(topic), message_type) {
(Ok(topic), resp::RespValue::BulkString(bytes)) => (bytes, topic, msg),
_ => return Err(error::unexpected("Incorrect format of a PUBSUB message")),
}
}
Expand All @@ -165,105 +160,23 @@ impl PubsubConnectionInner {
};

match message_type.as_slice() {
b"subscribe" => match self.pending_subs.remove(&topic) {
Some((sender, signal)) => {
self.subscriptions.insert(topic, sender);
signal
.send(())
.map_err(|()| error::internal("Error confirming subscription"))?
}
None => {
return Err(error::internal(format!(
"Received unexpected subscribe notification for topic: {}",
topic
)));
}
},
b"psubscribe" => match self.pending_psubs.remove(&topic) {
Some((sender, signal)) => {
self.psubscriptions.insert(topic, sender);
signal
.send(())
.map_err(|()| error::internal("Error confirming subscription"))?
}
None => {
return Err(error::internal(format!(
"Received unexpected subscribe notification for topic: {}",
topic
)));
}
},
b"unsubscribe" => {
match self.subscriptions.entry(topic) {
Entry::Occupied(entry) => {
entry.remove_entry();
}
Entry::Vacant(vacant) => {
return Err(error::internal(format!(
"Unexpected unsubscribe message: {}",
vacant.key()
)));
}
}
if self.subscriptions.is_empty() {
return Ok(false);
}
b"subscribe" => {
process_subscribe(&mut self.pending_subs, &mut self.subscriptions, topic)
}
b"punsubscribe" => {
match self.psubscriptions.entry(topic) {
Entry::Occupied(entry) => {
entry.remove_entry();
}
Entry::Vacant(vacant) => {
return Err(error::internal(format!(
"Unexpected unsubscribe message: {}",
vacant.key()
)));
}
}
if self.psubscriptions.is_empty() {
return Ok(false);
}
b"psubscribe" => {
process_subscribe(&mut self.pending_psubs, &mut self.psubscriptions, topic)
}
b"message" => match self.subscriptions.get(&topic) {
Some(sender) => {
if let Err(error) = sender.unbounded_send(Ok(msg)) {
if !error.is_disconnected() {
return Err(error::internal(format!("Cannot send message: {}", error)));
}
}
}
None => {
return Err(error::internal(format!(
"Unexpected message on topic: {}",
topic
)));
}
},
b"pmessage" => match self.psubscriptions.get(&topic) {
Some(sender) => {
if let Err(error) = sender.unbounded_send(Ok(msg)) {
if !error.is_disconnected() {
return Err(error::internal(format!("Cannot send message: {}", error)));
}
}
}
None => {
return Err(error::internal(format!(
"Unexpected message on topic: {}",
topic
)));
}
},
b"unsubscribe" => process_unsubscribe(&mut self.subscriptions, &topic),
b"punsubscribe" => process_unsubscribe(&mut self.psubscriptions, &topic),
b"message" => process_message(&self.subscriptions, &topic, msg),
b"pmessage" => process_message(&self.psubscriptions, &topic, msg),
t => {
return Err(error::internal(format!(
"Unexpected data on Pub/Sub connection: {}",
String::from_utf8_lossy(t)
)));
}
}

Ok(true)
}

/// Returns true, if there are still valid subscriptions at the end, or false if not, i.e. the whole thing can be dropped.
Expand All @@ -276,18 +189,14 @@ impl PubsubConnectionInner {
return Ok(false);
} else {
// This can only happen if the connection is closed server-side
for sub in self.subscriptions.values() {
let subs = self.subscriptions.values();
let psubs = self.psubscriptions.values();
for sub in subs.chain(psubs) {
sub.unbounded_send(Err(error::Error::Connection(
ConnectionReason::NotConnected,
)))
.unwrap();
}
for psub in self.psubscriptions.values() {
psub.unbounded_send(Err(error::Error::Connection(
ConnectionReason::NotConnected,
)))
.unwrap();
}
return Err(error::Error::Connection(ConnectionReason::NotConnected));
}
}
Expand All @@ -298,27 +207,66 @@ impl PubsubConnectionInner {
}
}
Poll::Ready(Some(Err(e))) => {
for sub in self.subscriptions.values() {
let subs = self.subscriptions.values();
let psubs = self.psubscriptions.values();
for sub in subs.chain(psubs) {
sub.unbounded_send(Err(error::unexpected(format!(
"Connection is in the process of failing due to: {}",
e
))))
.unwrap();
}
for psub in self.psubscriptions.values() {
psub.unbounded_send(Err(error::unexpected(format!(
"Connection is in the process of failing due to: {}",
e
))))
.unwrap();
}
return Err(e);
}
}
}
}
}

fn process_subscribe(
pending_subs: &mut BTreeMap<String, (PubsubSink, oneshot::Sender<()>)>,
subscriptions: &mut BTreeMap<String, PubsubSink>,
topic: String,
) -> Result<bool, error::Error> {
let (sender, signal) = pending_subs.remove(&topic).ok_or(error::internal(format!(
"Received unexpected subscribe notification for topic: {}",
topic
)))?;
subscriptions.insert(topic, sender);
signal
.send(())
.map_err(|()| error::internal("Error confirming subscription"))?;
Ok(true)
}

fn process_unsubscribe(
subscriptions: &mut BTreeMap<String, PubsubSink>,
topic: &str,
) -> Result<bool, error::Error> {
subscriptions.remove(topic).ok_or(error::internal(format!(
"Unexpected unsubscribe message: {}",
topic
)))?;
Ok(!subscriptions.is_empty())
}

fn process_message(
subscriptions: &BTreeMap<String, PubsubSink>,
topic: &str,
msg: resp::RespValue,
) -> Result<bool, error::Error> {
let sender = subscriptions.get(topic).ok_or(error::internal(format!(
"Unexpected message on topic: {}",
topic
)))?;
match sender.unbounded_send(Ok(msg)) {
Err(error) if !error.is_disconnected() => {
Err(error::internal(format!("Cannot send message: {}", error)))
}
_ => Ok(true),
}
}

impl Future for PubsubConnectionInner {
type Output = Result<(), error::Error>;

Expand Down
86 changes: 29 additions & 57 deletions src/resp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl RespValue {
}
}

/// A trait to be implemented for every time which can be read from a RESP value.
/// A trait to be implemented for every type which can be read from a RESP value.
///
/// Implementing this trait on a type means that type becomes a valid return type for calls such as `send` on
/// `client::PairedConnection`
Expand Down Expand Up @@ -207,22 +207,23 @@ impl<K: FromResp + Hash + Eq, T: FromResp, S: BuildHasher + Default> FromResp fo
fn from_resp_int(resp: RespValue) -> Result<HashMap<K, T, S>, Error> {
match resp {
RespValue::Array(ary) => {
let mut map = HashMap::with_capacity_and_hasher(ary.len(), S::default());
let mut items = ary.into_iter();

while let Some(k) = items.next() {
let key = K::from_resp(k)?;
let value = T::from_resp(items.next().ok_or_else(|| {
error::resp(
"Cannot convert an odd number of elements into a hashmap",
"".into(),
)
})?)?;

map.insert(key, value);
let len = ary.len();
if len % 2 != 0 {
return Err(error::resp(
"Cannot convert an odd number of elements into a hashmap",
RespValue::Array(ary),
));
}

Ok(map)
let mut items = ary.into_iter();
(0..(len / 2))
.map(|_| {
// It's safe to unwrap, because we checked the length before
let key = K::from_resp(items.next().unwrap())?;
let value = T::from_resp(items.next().unwrap())?;
Ok((key, value))
})
.collect()
}
_ => Err(error::resp("Cannot be converted into a hashmap", resp)),
}
Expand Down Expand Up @@ -365,52 +366,23 @@ pub trait IntoRespString {
}

macro_rules! string_into_resp {
($t:ty) => {
($(<$l:lifetime>)?|$i:ident : $t:ty| $e:expr) => {
impl $(<$l>)? IntoRespString for $t {
fn into_resp_string(self) -> RespValue {
let $i = self;
RespValue::BulkString($e)
}
}
into_resp!($t, into_resp_string);
};
}

impl IntoRespString for String {
fn into_resp_string(self) -> RespValue {
RespValue::BulkString(self.into_bytes())
}
}
string_into_resp!(String);

impl<'a> IntoRespString for &'a String {
fn into_resp_string(self) -> RespValue {
RespValue::BulkString(self.as_bytes().into())
}
}
string_into_resp!(&'a String);

impl<'a> IntoRespString for &'a str {
fn into_resp_string(self) -> RespValue {
RespValue::BulkString(self.as_bytes().into())
}
}
string_into_resp!(&'a str);

impl<'a> IntoRespString for &'a [u8] {
fn into_resp_string(self) -> RespValue {
RespValue::BulkString(self.to_vec())
}
}
string_into_resp!(&'a [u8]);

impl IntoRespString for Vec<u8> {
fn into_resp_string(self) -> RespValue {
RespValue::BulkString(self)
}
}
string_into_resp!(Vec<u8>);

impl IntoRespString for Arc<str> {
fn into_resp_string(self) -> RespValue {
RespValue::BulkString(self.as_bytes().into())
}
}
string_into_resp!(Arc<str>);
string_into_resp!(|it: String| it.into_bytes());
string_into_resp!(<'a>|it: &'a String| it.as_bytes().into());
string_into_resp!(<'a>|it: &'a str| it.as_bytes().into());
string_into_resp!(<'a>|it: &'a [u8]| it.to_vec());
string_into_resp!(|it: Vec<u8>| it);
string_into_resp!(|it: Arc<str>| it.as_bytes().into());

pub trait IntoRespInteger {
fn into_resp_integer(self) -> RespValue;
Expand Down