Skip to content

Commit

Permalink
feat: rewrite presence system to use sets
Browse files Browse the repository at this point in the history
  • Loading branch information
insertish committed Apr 21, 2023
1 parent 1df90ff commit 32542a8
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 156 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/bonfire/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "revolt-bonfire"
version = "0.5.17"
version = "0.5.18"
license = "AGPL-3.0-or-later"
edition = "2021"

Expand Down
2 changes: 1 addition & 1 deletion crates/delta/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "revolt-delta"
version = "0.5.17"
version = "0.5.18"
license = "AGPL-3.0-or-later"
authors = ["Paul Makles <[email protected]>"]
edition = "2018"
Expand Down
3 changes: 2 additions & 1 deletion crates/quark/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "revolt-quark"
version = "0.5.17"
version = "0.5.18"
license = "AGPL-3.0-or-later"
edition = "2021"

Expand Down Expand Up @@ -57,6 +57,7 @@ log = "0.4.14"
pretty_env_logger = "0.4.0"

# Util
rand = "0.8.5"
ulid = "0.5.0"
regex = "1.5.5"
nanoid = "0.4.0"
Expand Down
64 changes: 6 additions & 58 deletions crates/quark/src/presence/entry.rs
Original file line number Diff line number Diff line change
@@ -1,64 +1,12 @@
use std::env;

use serde::{Deserialize, Serialize};
use once_cell::sync::Lazy;

pub static REGION_ID: Lazy<u16> = Lazy::new(|| env::var("REGION_ID")
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap());
pub static REGION_ID: Lazy<u16> = Lazy::new(|| {
env::var("REGION_ID")
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap()
});

pub static REGION_KEY: Lazy<String> = Lazy::new(|| format!("region{}", &*REGION_ID));

/// Compact presence information for a user
#[derive(Serialize, Deserialize, Debug)]
pub struct PresenceEntry {
/// Region this session exists in
///
/// We can have up to 65535 regions
pub region_id: u16,

/// Unique session ID
pub session_id: u8,

/// Known flags about session
pub flags: u8,
}

impl PresenceEntry {
/// Create a new presence entry from a given session ID and known flags
pub fn from(session_id: u8, flags: u8) -> Self {
Self {
region_id: *REGION_ID,
session_id,
flags,
}
}
}

pub trait PresenceOp {
/// Find next available session ID
fn find_next_id(&self) -> u8;
}

impl PresenceOp for Vec<PresenceEntry> {
fn find_next_id(&self) -> u8 {
// O(n^2) scan algorithm
// should be relatively fast at low numbers anyways
for i in 0..255 {
let mut found = false;
for entry in self {
if entry.session_id == i {
found = true;
break;
}
}

if !found {
return i;
}
}

255
}
}
96 changes: 45 additions & 51 deletions crates/quark/src/presence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,87 +2,77 @@ use std::collections::HashSet;

use redis_kiss::{get_connection, AsyncCommands};

use rand::Rng;
mod entry;
mod operations;

use entry::{PresenceEntry, PresenceOp};
use operations::{
__add_to_set_sessions, __delete_key_presence_entry, __get_key_presence_entry,
__get_set_sessions, __remove_from_set_sessions, __set_key_presence_entry,
__add_to_set_string, __add_to_set_u32, __delete_key, __get_set_members_as_string,
__get_set_size, __remove_from_set_string, __remove_from_set_u32,
};

use crate::presence::operations::__delete_set_sessions;

use self::entry::REGION_KEY;

/// Create a new presence session, returns the ID of this session
pub async fn presence_create_session(user_id: &str, flags: u8) -> (bool, u8) {
pub async fn presence_create_session(user_id: &str, flags: u8) -> (bool, u32) {
info!("Creating a presence session for {user_id} with flags {flags}");

// Try to find the presence entry for this user.
let mut conn = get_connection().await.unwrap();
let mut entry: Vec<PresenceEntry> = __get_key_presence_entry(&mut conn, user_id)
.await
.unwrap_or_default();
if let Ok(mut conn) = get_connection().await {
// Check whether this is the first session
let was_empty = __get_set_size(&mut conn, user_id).await == 0;

// Return whether this was the first session.
let was_empty = entry.is_empty();
info!("User ID {} just came online.", &user_id);
// A session ID is comprised of random data and any flags ORed to the end
let session_id = {
let mut rng = rand::thread_rng();
(rng.gen::<u32>() ^ 1) | (flags as u32 & 1)
};

// Generate session ID and push new entry.
let session_id = entry.find_next_id();
entry.push(PresenceEntry::from(session_id, flags));
__set_key_presence_entry(&mut conn, user_id, entry).await;
// Add session to user's sessions and to the region
__add_to_set_u32(&mut conn, user_id, session_id).await;
__add_to_set_string(&mut conn, &REGION_KEY, &format!("{user_id}:{session_id}")).await;
info!("Created session for {user_id}, assigned them a session ID of {session_id}.");

// Add to region set in case of failure.
__add_to_set_sessions(&mut conn, &REGION_KEY, user_id, session_id).await;
(was_empty, session_id)
(was_empty, session_id)
} else {
// Fail through
(false, 0)
}
}

/// Delete existing presence session
pub async fn presence_delete_session(user_id: &str, session_id: u8) -> bool {
pub async fn presence_delete_session(user_id: &str, session_id: u32) -> bool {
presence_delete_session_internal(user_id, session_id, false).await
}

/// Delete existing presence session (but also choose whether to skip region)
async fn presence_delete_session_internal(
user_id: &str,
session_id: u8,
session_id: u32,
skip_region: bool,
) -> bool {
info!("Deleting presence session for {user_id} with id {session_id}");

// Return whether this was the last session.
let mut is_empty = false;

// Only continue if we can actually find one.
let mut conn = get_connection().await.unwrap();
let entry: Option<Vec<PresenceEntry>> = __get_key_presence_entry(&mut conn, user_id).await;
if let Some(entry) = entry {
let entries = entry
.into_iter()
.filter(|x| x.session_id != session_id)
.collect::<Vec<PresenceEntry>>();

// If entry is empty, then just delete it.
if entries.is_empty() {
__delete_key_presence_entry(&mut conn, user_id).await;
is_empty = true;
} else {
__set_key_presence_entry(&mut conn, user_id, entries).await;
}
if let Ok(mut conn) = get_connection().await {
// Remove the session
__remove_from_set_u32(&mut conn, user_id, session_id).await;

// Remove from region set.
// Remove from the region
if !skip_region {
__remove_from_set_sessions(&mut conn, &REGION_KEY, user_id, session_id).await;
__remove_from_set_string(&mut conn, &REGION_KEY, &format!("{user_id}:{session_id}"))
.await;
}
}

if is_empty {
info!("User ID {} just went offline.", &user_id);
}
// Return whether this was the last session
let is_empty = __get_set_size(&mut conn, user_id).await == 0;
if is_empty {
info!("User ID {} just went offline.", &user_id);
}

is_empty
is_empty
} else {
// Fail through
false
}
}

/// Check whether a given user ID is online
Expand All @@ -102,6 +92,10 @@ pub async fn presence_filter_online(user_ids: &'_ [String]) -> HashSet<String> {
return set;
}

// NOTE: at the point that we need mobile indicators
// you can interpret the data here and return a new data
// structure like HashMap<String /* id */, u8 /* flags */>

// We need to handle a special case where only one is present
// as for some reason or another, Redis does not like us sending
// a list of just one ID to the server.
Expand Down Expand Up @@ -136,7 +130,7 @@ pub async fn presence_clear_region(region_id: Option<&str>) {
let region_id = region_id.unwrap_or(&*REGION_KEY);
let mut conn = get_connection().await.expect("Redis connection");

let sessions = __get_set_sessions(&mut conn, region_id).await;
let sessions = __get_set_members_as_string(&mut conn, region_id).await;
if !sessions.is_empty() {
info!(
"Cleaning up {} sessions, this may take a while...",
Expand All @@ -155,7 +149,7 @@ pub async fn presence_clear_region(region_id: Option<&str>) {
}

// Then clear the set in Redis.
__delete_set_sessions(&mut conn, region_id).await;
__delete_key(&mut conn, region_id).await;

info!("Clean up complete.");
}
Expand Down
67 changes: 26 additions & 41 deletions crates/quark/src/presence/operations.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,42 @@
use redis_kiss::{AsyncCommands, Conn};

use super::entry::PresenceEntry;

/// Set presence entry by given ID
pub async fn __set_key_presence_entry(conn: &mut Conn, id: &str, data: Vec<PresenceEntry>) {
let _: Option<()> = conn.set(id, bincode::serialize(&data).unwrap()).await.ok();
/// Add to set (string)
pub async fn __add_to_set_string(conn: &mut Conn, key: &str, value: &str) {
let _: Option<()> = conn.sadd(key, value).await.ok();
}

/// Delete presence entry by given ID
pub async fn __delete_key_presence_entry(conn: &mut Conn, id: &str) {
let _: Option<()> = conn.del(id).await.ok();
/// Add to set (u32)
pub async fn __add_to_set_u32(conn: &mut Conn, key: &str, value: u32) {
let _: Option<()> = conn.sadd(key, value).await.ok();
}

/// Get presence entry by given ID
pub async fn __get_key_presence_entry(conn: &mut Conn, id: &str) -> Option<Vec<PresenceEntry>> {
conn.get::<_, Option<Vec<u8>>>(id)
.await
.unwrap()
.map(|entry| bincode::deserialize(&entry[..]).unwrap())
/// Remove from set (string)
pub async fn __remove_from_set_string(conn: &mut Conn, key: &str, value: &str) {
let _: Option<()> = conn.srem(key, value).await.ok();
}

/// Add to region session set
pub async fn __add_to_set_sessions(
conn: &mut Conn,
region_id: &str,
user_id: &str,
session_id: u8,
) {
let _: Option<()> = conn
.sadd(region_id, format!("{user_id}:{session_id}"))
.await
.ok();
/// Remove from set (u32)
pub async fn __remove_from_set_u32(conn: &mut Conn, key: &str, value: u32) {
let _: Option<()> = conn.srem(key, value).await.ok();
}

/// Remove from region session set
pub async fn __remove_from_set_sessions(
conn: &mut Conn,
region_id: &str,
user_id: &str,
session_id: u8,
) {
let _: Option<()> = conn
.srem(region_id, format!("{user_id}:{session_id}"))
/// Get set members as string
pub async fn __get_set_members_as_string(conn: &mut Conn, key: &str) -> Vec<String> {
conn.smembers::<_, Vec<String>>(key)
.await
.ok();
.expect("could not get set members as string")
}

/// Get region session set as list
pub async fn __get_set_sessions(conn: &mut Conn, region_id: &str) -> Vec<String> {
conn.smembers::<_, Vec<String>>(region_id).await.unwrap()
/// Get set size
pub async fn __get_set_size(conn: &mut Conn, id: &str) -> u32 {
conn.scard::<_, u32>(id)
.await
.expect("could not get set size")
}

/// Delete region session set
pub async fn __delete_set_sessions(conn: &mut Conn, region_id: &str) {
conn.del::<_, ()>(region_id).await.unwrap();
/// Delete key by id
pub async fn __delete_key(conn: &mut Conn, id: &str) {
conn.del::<_, ()>(id)
.await
.expect("could not delete key by id");
}
6 changes: 6 additions & 0 deletions docker-compose.db.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
version: "3.3"
services:
# Redis
redis:
image: eqalpha/keydb
ports:
- "6379:6379"

# MongoDB
database:
image: mongo
Expand Down

0 comments on commit 32542a8

Please sign in to comment.