Skip to content

Commit ff02780

Browse files
authored
Merge pull request #11 from dorianim/feat/redis-channels
Feat/redis channels
2 parents 266aae4 + e518f9b commit ff02780

File tree

2 files changed

+69
-40
lines changed

2 files changed

+69
-40
lines changed

src/main.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use tracing::Span;
1111
mod color;
1212
mod routes;
1313

14+
use tokio::sync::broadcast;
1415
#[derive(Serialize, Deserialize, Clone, Debug)]
1516
pub struct Segment {
1617
label: String,
@@ -19,7 +20,7 @@ pub struct Segment {
1920
color: Option<Color>,
2021
}
2122

22-
#[derive(Serialize, Deserialize, Clone, Default)]
23+
#[derive(Serialize, Deserialize, Clone, Default, Debug)]
2324
pub struct Timer {
2425
// Return after TimerRequest
2526
pub segments: Vec<Segment>,
@@ -44,9 +45,9 @@ struct InstanceProperties {
4445
type SharedState = Arc<AppState>;
4546
pub struct AppState {
4647
redis: redis::aio::ConnectionManager,
47-
redis_client: redis::Client,
4848
jwt_key: String,
4949
instance_properties: InstanceProperties,
50+
redis_task_rx: broadcast::Receiver<Timer>,
5051
}
5152

5253
#[tokio::main]
@@ -58,7 +59,7 @@ async fn main() {
5859

5960
let redis_string = env::var("REDIS_STRING").expect("REDIS_STRING is not set");
6061
let jwt_key = env::var("JWT_KEY").expect("JWT_KEY is not set");
61-
let client = redis::Client::open(redis_string.to_owned()).unwrap();
62+
let client = redis::Client::open(redis_string.to_owned()).expect("Could not connect to redis");
6263
let manager = redis::aio::ConnectionManager::new(client.clone())
6364
.await
6465
.unwrap();
@@ -70,23 +71,28 @@ async fn main() {
7071
.unwrap_or(None),
7172
};
7273

74+
let (redis_task_tx, redis_task_rx) = broadcast::channel::<Timer>(10);
75+
7376
let state: SharedState = Arc::new(AppState {
74-
redis: manager,
75-
redis_client: client,
77+
redis: manager.clone(),
7678
jwt_key,
7779
instance_properties,
80+
redis_task_rx,
7881
});
7982

8083
let app = Router::new()
81-
.nest("/api/ws", routes::ws::routes())
84+
.nest(
85+
"/api/ws",
86+
routes::ws::routes(manager, client, redis_task_tx),
87+
)
8288
.nest("/api/timer", routes::timer::routes(state.clone()))
8389
.nest("/api/instance", routes::instance::routes())
8490
.fallback(routes::client::client_assets)
8591
.layer(cors)
8692
.layer(
8793
TraceLayer::new_for_http()
88-
.on_request(|_request: &Request<_>, _span: &Span| {
89-
println!("Request {}", _request.uri());
94+
.on_request(|request: &Request<_>, _span: &Span| {
95+
println!("Request {} {}", request.method(), request.uri());
9096
})
9197
.on_response(|_response: &Response, _latency: Duration, _span: &Span| {
9298
println!(

src/routes/ws.rs

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use serde::{Deserialize, Serialize};
1717
use serde_json;
1818

1919
use redis::AsyncCommands;
20+
use tokio::sync::broadcast;
2021
use tokio::sync::mpsc::{Receiver, Sender};
2122
use tokio::task::JoinHandle;
2223

@@ -41,16 +42,6 @@ struct WsConnection {}
4142

4243
impl WsConnection {
4344
async fn new(state: SharedState, socket: WebSocket) {
44-
let mut connection = state.redis_client.get_async_connection().await.unwrap();
45-
let _: () = redis::cmd("CONFIG")
46-
.arg("SET")
47-
.arg("notify-keyspace-events")
48-
.arg("KEA")
49-
.query_async(&mut connection)
50-
.await
51-
.unwrap();
52-
let pubsub = connection.into_pubsub();
53-
5445
let (ws_sender, ws_receiver) = socket.split();
5546
let (ws_message_tx, ws_message_rx) = tokio::sync::mpsc::channel::<WSMessage>(32);
5647
let (redis_listen_id_tx, redis_listen_id_rx) = tokio::sync::mpsc::channel::<String>(32);
@@ -63,10 +54,9 @@ impl WsConnection {
6354
ws_receiver,
6455
);
6556
let redis_listener_task = WsConnection::spawn_redis_listener_task(
66-
state.redis.clone(),
6757
ws_message_tx,
6858
redis_listen_id_rx,
69-
pubsub,
59+
state.redis_task_rx.resubscribe(),
7060
);
7161

7262
ws_receiver_task.await.unwrap();
@@ -76,10 +66,9 @@ impl WsConnection {
7666
}
7767

7868
fn spawn_redis_listener_task(
79-
mut redis: redis::aio::ConnectionManager,
8069
ws_message_tx: Sender<WSMessage>,
8170
mut redis_listen_id_rx: Receiver<String>,
82-
mut pubsub: redis::aio::PubSub,
71+
mut redis_task_rx: tokio::sync::broadcast::Receiver<Timer>,
8372
) -> JoinHandle<()> {
8473
tokio::spawn(async move {
8574
let mut msg = None;
@@ -88,25 +77,14 @@ impl WsConnection {
8877
}
8978

9079
let timer_id = msg.unwrap();
91-
pubsub
92-
.psubscribe(format!("__keyspace@*__:{}", &timer_id))
93-
.await
94-
.unwrap();
95-
println!("Redis listening!");
96-
redis_listen_id_rx.close();
97-
98-
let mut pubsub = pubsub.into_on_message();
99-
100-
while let Some(msg) = pubsub.next().await {
101-
println!("Updated! {:?}", msg);
10280

103-
let timer: Timer = serde_json::from_str(
104-
&redis.get::<String, String>(timer_id.clone()).await.unwrap(),
105-
)
106-
.unwrap();
81+
while let Ok(timer) = redis_task_rx.recv().await {
82+
if timer.id == timer_id {
83+
println!("Updated! {:?}", timer);
10784

108-
let response = WSMessage::Timer(timer.into());
109-
ws_message_tx.send(response).await.unwrap();
85+
let response = WSMessage::Timer(timer.into());
86+
ws_message_tx.send(response).await.unwrap();
87+
}
11088
}
11189
})
11290
}
@@ -214,6 +192,51 @@ pub async fn ws_handler(
214192
ws.on_upgrade(move |socket| WsConnection::new(state, socket))
215193
}
216194

217-
pub fn routes() -> Router<SharedState> {
195+
fn spawn_global_redis_listener_task(
196+
mut redis: redis::aio::ConnectionManager,
197+
redis_client: redis::Client,
198+
redis_task_tx: broadcast::Sender<Timer>,
199+
) -> JoinHandle<()> {
200+
tokio::spawn(async move {
201+
let mut connection = redis_client.get_async_connection().await.unwrap();
202+
let _: () = redis::cmd("CONFIG")
203+
.arg("SET")
204+
.arg("notify-keyspace-events")
205+
.arg("KEA")
206+
.query_async(&mut connection)
207+
.await
208+
.unwrap();
209+
210+
let mut pubsub = connection.into_pubsub();
211+
212+
pubsub
213+
.psubscribe("__keyspace@*__:*")
214+
.await
215+
.expect("Failed to subscribe to redis channel");
216+
217+
let mut pubsub = pubsub.into_on_message();
218+
219+
while let Some(msg) = pubsub.next().await {
220+
println!("Updated! {:?}", msg);
221+
let timer_id = msg.get_channel_name().split(":").last().unwrap();
222+
223+
let timer_str = &redis
224+
.get::<String, String>(String::from(timer_id))
225+
.await
226+
.expect("Did not find timer in redis");
227+
let timer: Timer = serde_json::from_str(timer_str).unwrap();
228+
229+
// Broadcast to all listeners
230+
redis_task_tx.send(timer).unwrap();
231+
}
232+
})
233+
}
234+
235+
pub fn routes(
236+
redis: redis::aio::ConnectionManager,
237+
redis_client: redis::Client,
238+
redis_task_tx: broadcast::Sender<Timer>,
239+
) -> Router<SharedState> {
240+
spawn_global_redis_listener_task(redis, redis_client, redis_task_tx);
218241
Router::new().route("/", get(ws_handler))
219242
}

0 commit comments

Comments
 (0)