Skip to content

Commit

Permalink
Merge pull request #54 from mozilla/switchboard-fixes
Browse files Browse the repository at this point in the history
Refactor & fix switchboard join/leave tracking
  • Loading branch information
mqp authored Apr 23, 2020
2 parents 63d3401 + 598359f commit 3694c36
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 95 deletions.
65 changes: 39 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use once_cell::sync::{Lazy, OnceCell};
use serde::de::DeserializeOwned;
use serde_json::json;
use serde_json::Value as JsonValue;
use sessions::{JoinState, Session, SessionState};
use sessions::{JoinKind, JoinState, Session, SessionState};
use std::error::Error;
use std::ffi::{CStr, CString};
use std::os::raw::{c_char, c_int};
Expand Down Expand Up @@ -309,12 +309,17 @@ extern "C" fn destroy_session(handle: *mut PluginSession, error: *mut c_int) {
Ok(sess) => {
janus_info!("Destroying SFU session {:p}...", sess.handle);
let mut switchboard = SWITCHBOARD.write().expect("Switchboard is poisoned :(");
switchboard.remove_session(&sess);
switchboard.disconnect(&sess);
if let Some(joined) = sess.join_state.get() {
// if they are entirely disconnected, notify their roommates
match joined.kind {
JoinKind::Publisher => switchboard.leave_publisher(&sess),
JoinKind::Subscriber => switchboard.leave_subscriber(&sess)
}
// if this user is entirely disconnected, notify their roommates.
// todo: is it better if this is instead when their publisher disconnects?
if !switchboard.is_connected(&joined.user_id) {
let response = json!({ "event": "leave", "user_id": &joined.user_id, "room_id": &joined.room_id });
let occupants = switchboard.occupants_of(&joined.room_id);
let occupants = switchboard.publishers_occupying(&joined.room_id);
notify_except(&response, &joined.user_id, occupants);
}
}
Expand Down Expand Up @@ -433,33 +438,37 @@ fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: UserId, subscribe
let mut switchboard = SWITCHBOARD.write()?;
let body = json!({ "users": { room_id.as_str(): switchboard.get_users(&room_id) }});

let mut is_master_handle = false;
if let Some(subscription) = subscribe.as_ref() {
let room_is_full = switchboard.occupants_of(&room_id).len() > config.max_room_size;
let server_is_full = switchboard.sessions().len() > config.max_ccu;
is_master_handle = subscription.data; // hack -- assume there is only one "master" data connection per user
if is_master_handle && room_is_full {
// hack -- use data channel subscription to infer this, it would probably be nicer if
// connections announced explicitly whether they were a publisher or subscriber
let gets_data_channel = subscribe.as_ref().map(|s| s.data).unwrap_or(false);
let join_kind = if gets_data_channel { JoinKind::Publisher } else { JoinKind::Subscriber };

if join_kind == JoinKind::Publisher {
if switchboard.publishers_occupying(&room_id).len() > config.max_room_size {
return Err(From::from("Room is full."));
}
if is_master_handle && server_is_full {
if switchboard.sessions().len() > config.max_ccu {
return Err(From::from("Server is full."));
}
}

if let Err(_existing) = from.join_state.set(JoinState::new(room_id.clone(), user_id.clone())) {
if let Err(_existing) = from.join_state.set(JoinState::new(join_kind, room_id.clone(), user_id.clone())) {
return Err(From::from("Handles may only join once!"));
}

if join_kind == JoinKind::Publisher {
let notification = json!({ "event": "join", "user_id": user_id, "room_id": room_id });
switchboard.join_publisher(Arc::clone(from), user_id.clone(), room_id.clone());
notify_except(&notification, &user_id, switchboard.publishers_occupying(&room_id));
} else {
switchboard.join_subscriber(Arc::clone(from), user_id.clone(), room_id.clone());
}

if let Some(subscription) = subscribe {
janus_info!("Processing join-time subscription from {:p}: {:?}.", from.handle, subscription);
if let Err(_existing) = from.subscription.set(subscription.clone()) {
return Err(From::from("Handles may only subscribe once!"));
};
if is_master_handle {
let notification = json!({ "event": "join", "user_id": user_id, "room_id": room_id });
switchboard.join_room(Arc::clone(from), user_id.clone(), room_id.clone());
notify_except(&notification, &user_id, switchboard.occupants_of(&room_id));
}
if let Some(ref publisher_id) = subscription.media {
let publisher = switchboard
.get_publisher(publisher_id)
Expand All @@ -473,6 +482,7 @@ fn process_join(from: &Arc<Session>, room_id: RoomId, user_id: UserId, subscribe
return Ok(MessageResponse::new(body, jsep));
}
}

Ok(MessageResponse::msg(body))
}

Expand All @@ -485,10 +495,15 @@ fn process_kick(from: &Arc<Session>, room_id: RoomId, user_id: UserId, token: St
janus_info!("Processing kick from {:p} targeting user ID {} in room ID {}.", from.handle, user_id, room_id);
let end_session = gateway_callbacks().end_session;
let switchboard = SWITCHBOARD.read()?;
let sessions = switchboard.get_sessions(&room_id, &user_id);
for sess in sessions {
janus_info!("Kicking session {:p}.", from.handle);
end_session(sess.as_ptr());
if let Some(publisher) = switchboard.get_publisher(&user_id) {
janus_info!("Kicking session {:p}.", publisher.handle);
end_session(publisher.as_ptr());
}
if let Some(subscribers) = switchboard.get_subscribers(&user_id) {
for subscriber in subscribers {
janus_info!("Kicking session {:p}.", subscriber.handle);
end_session(subscriber.as_ptr());
}
}
} else {
janus_warn!("Ignoring kick from {:p} because they didn't have kick permissions.", from.handle);
Expand All @@ -509,7 +524,7 @@ fn process_block(from: &Arc<Session>, whom: UserId) -> MessageResult {
if let Some(joined) = from.join_state.get() {
let mut switchboard = SWITCHBOARD.write()?;
let event = json!({ "event": "blocked", "by": &joined.user_id });
notify_user(&event, &whom, switchboard.occupants_of(&joined.room_id));
notify_user(&event, &whom, switchboard.publishers_occupying(&joined.room_id));
switchboard.establish_block(joined.user_id.clone(), whom);
Ok(MessageResponse::msg(json!({})))
} else {
Expand All @@ -526,7 +541,7 @@ fn process_unblock(from: &Arc<Session>, whom: UserId) -> MessageResult {
send_fir(&[publisher]);
}
let event = json!({ "event": "unblocked", "by": &joined.user_id });
notify_user(&event, &whom, switchboard.occupants_of(&joined.room_id));
notify_user(&event, &whom, switchboard.publishers_occupying(&joined.room_id));
Ok(MessageResponse::msg(json!({})))
} else {
Err(From::from("Cannot unblock when not in a room."))
Expand Down Expand Up @@ -560,7 +575,7 @@ fn process_data(from: &Arc<Session>, whom: Option<UserId>, body: &str) -> Messag
let payload = json!({ "event": "data", "body": body });
let switchboard = SWITCHBOARD.write()?;
if let Some(joined) = from.join_state.get() {
let occupants = switchboard.occupants_of(&joined.room_id);
let occupants = switchboard.publishers_occupying(&joined.room_id);
if let Some(user_id) = whom {
send_data_user(&payload, &user_id, occupants);
} else {
Expand Down Expand Up @@ -686,8 +701,6 @@ fn handle_message_async(RawMessage { jsep, msg, txn, from }: RawMessage) -> Janu
if let Some(ref from) = from.upgrade() {
janus_huge!("Processing txid {} from {:p}: msg={:?}, jsep={:?}", txn, from.handle, msg, jsep);
if !from.destroyed.load(Ordering::Relaxed) {
// process the message first, because processing a JSEP can cause us to want to send an RTCP
// FIR to our subscribers, which may have been established in the message
let parsed_msg = msg.and_then(|x| try_parse_jansson(&x).transpose());
let parsed_jsep = jsep.and_then(|x| try_parse_jansson(&x).transpose());
let msg_result = parsed_msg.map(|x| x.and_then(|msg| process_message(from, msg)));
Expand Down
29 changes: 22 additions & 7 deletions src/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,19 @@ use once_cell::sync::OnceCell;
use std::sync::atomic::{AtomicBool, AtomicIsize};
use std::sync::{Arc, Mutex};

/// State pertaining to this session's join of a particular room as a particular user ID.
/// Once they join a room, all sessions are classified as either subscribers or publishers.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinKind {
Publisher,
Subscriber,
}

/// State pertaining to all sessions that have joined a room.
#[derive(Debug, Clone)]
pub struct JoinState {
/// Whether this session is a subscriber or a publisher.
pub kind: JoinKind,

/// The room ID that this session is in.
pub room_id: RoomId,

Expand All @@ -17,28 +27,33 @@ pub struct JoinState {
}

impl JoinState {
pub fn new(room_id: RoomId, user_id: UserId) -> Self {
Self { room_id, user_id }
pub fn new(kind: JoinKind, room_id: RoomId, user_id: UserId) -> Self {
Self { kind, room_id, user_id }
}
}


/// The state associated with a single session.
#[derive(Debug)]
pub struct SessionState {
/// Whether this session has been destroyed.
pub destroyed: AtomicBool,

/// The current FIR sequence number for this session's video.
pub fir_seq: AtomicIsize,

/// Information pertaining to this session's user and room, if joined.
pub join_state: OnceCell<JoinState>,

/// The subscription this user has established, if any.
// todo: these following fields should be unified with the JoinState, but it's
// annoying in practice because they are established during JSEP negotiation
// rather than during the join flow

/// If this is a subscriber, the subscription this user has established, if any.
pub subscription: OnceCell<Subscription>,

/// If this is a publisher, the offer for subscribing to it.
pub subscriber_offer: Arc<Mutex<Option<Sdp>>>,

/// The current FIR sequence number for this session's video.
pub fir_seq: AtomicIsize,
}

/// Rust representation of a single Janus session, i.e. a single `RTCPeerConnection`.
Expand Down
103 changes: 41 additions & 62 deletions src/switchboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::sessions::Session;
use janus_plugin::janus_err;
use multimap::MultiMap;
use std::borrow::Borrow;
use std::collections::hash_map::Entry;
/// Tools for managing the set of subscriptions between connections.
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
Expand Down Expand Up @@ -103,12 +102,14 @@ where
/// connections should be sending traffic to which other connections.
#[derive(Debug)]
pub struct Switchboard {
/// All active connections.
/// All active connections, whether or not they have joined a room.
sessions: Vec<Box<Arc<Session>>>,
/// Connections which have joined a room, per room.
occupants: HashMap<RoomId, Vec<Arc<Session>>>,
/// Which room a user belongs to.
users_to_room: HashMap<UserId, RoomId>,
/// All joined publisher connections, by which room they have joined.
publishers_by_room: MultiMap<RoomId, Arc<Session>>,
/// All joined publisher connections, by which user they have joined as.
publishers_by_user: HashMap<UserId, Arc<Session>>,
/// All joined subscriber connections, by which user they have joined as.
subscribers_by_user: MultiMap<UserId, Arc<Session>>,
/// Which connections are subscribing to traffic from which other connections.
publisher_to_subscribers: BidirectionalMultimap<Arc<Session>, Arc<Session>>,
/// Which users have explicitly blocked traffic to and from other users.
Expand All @@ -119,8 +120,9 @@ impl Switchboard {
pub fn new() -> Self {
Self {
sessions: Vec::new(),
occupants: HashMap::new(),
users_to_room: HashMap::new(),
publishers_by_room: MultiMap::new(),
publishers_by_user: HashMap::new(),
subscribers_by_user: MultiMap::new(),
publisher_to_subscribers: BidirectionalMultimap::new(),
blockers_to_miscreants: BidirectionalMultimap::new(),
}
Expand All @@ -130,6 +132,10 @@ impl Switchboard {
self.sessions.push(session);
}

pub fn disconnect(&mut self, session: &Session) {
self.sessions.retain(|s| s.handle != session.handle);
}

pub fn is_connected(&self, user: &UserId) -> bool {
self.sessions.iter().any(|s| match s.join_state.get() {
None => false,
Expand All @@ -145,30 +151,31 @@ impl Switchboard {
self.blockers_to_miscreants.disassociate(from, target);
}

pub fn join_room(&mut self, session: Arc<Session>, user: UserId, room: RoomId) {
self.users_to_room.entry(user).or_insert(room.clone());
self.occupants.entry(room).or_insert_with(Vec::new).push(session);
pub fn join_publisher(&mut self, session: Arc<Session>, user: UserId, room: UserId) {
self.publishers_by_user.entry(user).or_insert(session.clone());
self.publishers_by_room.insert(room, session);
}

pub fn leave_room(&mut self, session: &Session, user: UserId, room: RoomId) {
if let Entry::Occupied(mut cohabitators) = self.occupants.entry(room) {
cohabitators.get_mut().retain(|x| x.as_ref() != session);
if cohabitators.get().is_empty() {
cohabitators.remove_entry();
}
}
pub fn join_subscriber(&mut self, session: Arc<Session>, user: UserId, _room: UserId) {
self.subscribers_by_user.insert(user, session);
}

if let Entry::Occupied(room) = self.users_to_room.entry(user) {
room.remove_entry();
pub fn leave_publisher(&mut self, session: &Session) {
self.publisher_to_subscribers.remove_key(session);
if let Some(joined) = session.join_state.get() {
self.publishers_by_user.remove(&joined.user_id);
if let Some(sessions) = self.publishers_by_room.get_vec_mut(&joined.room_id) {
sessions.retain(|x| x.handle != session.handle);
}
}
}

pub fn remove_session(&mut self, session: &Session) {
self.publisher_to_subscribers.remove_key(session);
pub fn leave_subscriber(&mut self, session: &Session) {
self.publisher_to_subscribers.remove_value(session);
self.sessions.retain(|s| s.handle != session.handle);
if let Some(joined) = session.join_state.get() {
self.leave_room(session, joined.user_id.clone(), joined.room_id.clone());
if let Some(sessions) = self.subscribers_by_user.get_vec_mut(&joined.user_id) {
sessions.retain(|x| x.handle != session.handle);
}
}
}

Expand All @@ -188,8 +195,8 @@ impl Switchboard {
&self.sessions
}

pub fn occupants_of(&self, room: &RoomId) -> &[Arc<Session>] {
self.occupants.get(room).map(Vec::as_slice).unwrap_or(&[])
pub fn publishers_occupying(&self, room: &RoomId) -> &[Arc<Session>] {
self.publishers_by_room.get_vec(room).map(Vec::as_slice).unwrap_or(&[])
}

pub fn media_recipients_for(&self, sender: &Session) -> impl Iterator<Item = &Arc<Session>> {
Expand Down Expand Up @@ -234,7 +241,7 @@ impl Switchboard {
Some(joined) => (
self.blockers_to_miscreants.get_keys(&joined.user_id),
self.blockers_to_miscreants.get_values(&joined.user_id),
self.occupants_of(&joined.room_id),
self.publishers_occupying(&joined.room_id),
),
};
cohabitators.iter().filter(move |cohabitator| {
Expand All @@ -252,47 +259,19 @@ impl Switchboard {

pub fn get_users(&self, room: &RoomId) -> HashSet<&UserId> {
let mut result = HashSet::new();
if let Some(sessions) = self.occupants.get(room) {
for session in sessions {
if let Some(joined) = session.join_state.get() {
result.insert(&joined.user_id);
}
for session in self.publishers_occupying(room) {
if let Some(joined) = session.join_state.get() {
result.insert(&joined.user_id);
}
}
result
}

pub fn get_publisher(&self, user_id: &UserId) -> Option<&Arc<Session>> {
let mut result = None;

if let Some(room_id) = self.users_to_room.get(user_id) {
if let Some(sessions) = self.occupants.get(room_id) {
result = sessions
.iter()
.find(|s| {
let subscriber_offer = s.subscriber_offer.lock().unwrap();
let join_state = s.join_state.get();
match (subscriber_offer.as_ref(), join_state) {
(Some(_), Some(state)) if &state.user_id == user_id => true,
_ => false,
}
})
}
}

return result;
pub fn get_publisher(&self, user: &UserId) -> Option<&Arc<Session>> {
self.publishers_by_user.get(user)
}

pub fn get_sessions(&self, room_id: &RoomId, user_id: &UserId) -> Vec<&Box<Arc<Session>>> {
self.sessions
.iter()
.filter(|s| {
let join_state = s.join_state.get();
match join_state {
Some(state) if &state.user_id == user_id && &state.room_id == room_id => true,
_ => false,
}
})
.collect::<_>()
pub fn get_subscribers(&self, user: &UserId) -> Option<&Vec<Arc<Session>>> {
self.subscribers_by_user.get_vec(user)
}
}

0 comments on commit 3694c36

Please sign in to comment.