Skip to content

Commit 0368c60

Browse files
jaymelldjc
authored andcommitted
Support redis::aio::MultiplexedConnection
The currently supported `redis::aio::Connection` has serious potential data inconsistency issues. Offer a ConnectionManager based on `MultiplexedConnection` as a safer alternative.
1 parent 040b7dc commit 0368c60

File tree

1 file changed

+44
-1
lines changed

1 file changed

+44
-1
lines changed

redis/src/lib.rs

+44-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ pub use bb8;
3939
pub use redis;
4040

4141
use async_trait::async_trait;
42-
use redis::{aio::Connection, ErrorKind};
42+
use redis::{
43+
aio::{Connection, MultiplexedConnection},
44+
ErrorKind,
45+
};
4346
use redis::{Client, IntoConnectionInfo, RedisError};
4447

4548
/// A `bb8::ManageConnection` for `redis::Client::get_async_connection`.
@@ -79,3 +82,43 @@ impl bb8::ManageConnection for RedisConnectionManager {
7982
false
8083
}
8184
}
85+
86+
/// A `bb8::ManageConnection` for `redis::Client::get_multiplexed_async_connection`.
87+
#[derive(Clone, Debug)]
88+
pub struct RedisMultiplexedConnectionManager {
89+
client: Client,
90+
}
91+
92+
impl RedisMultiplexedConnectionManager {
93+
/// Create a new `RedisMultiplexedConnectionManager`.
94+
/// See `redis::Client::open` for a description of the parameter types.
95+
pub fn new<T: IntoConnectionInfo>(
96+
info: T,
97+
) -> Result<RedisMultiplexedConnectionManager, RedisError> {
98+
Ok(RedisMultiplexedConnectionManager {
99+
client: Client::open(info.into_connection_info()?)?,
100+
})
101+
}
102+
}
103+
104+
#[async_trait]
105+
impl bb8::ManageConnection for RedisMultiplexedConnectionManager {
106+
type Connection = MultiplexedConnection;
107+
type Error = RedisError;
108+
109+
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
110+
self.client.get_multiplexed_async_connection().await
111+
}
112+
113+
async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
114+
let pong: String = redis::cmd("PING").query_async(conn).await?;
115+
match pong.as_str() {
116+
"PONG" => Ok(()),
117+
_ => Err((ErrorKind::ResponseError, "ping request").into()),
118+
}
119+
}
120+
121+
fn has_broken(&self, _: &mut Self::Connection) -> bool {
122+
false
123+
}
124+
}

0 commit comments

Comments
 (0)