Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions livekit/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ impl From<participant_info::Kind> for participant::ParticipantKind {
}
}

impl From<participant_info::State> for participant::ParticipantState {
fn from(value: participant_info::State) -> Self {
match value {
participant_info::State::Joining => participant::ParticipantState::Joining,
participant_info::State::Joined => participant::ParticipantState::Joined,
participant_info::State::Active => participant::ParticipantState::Active,
participant_info::State::Disconnected => participant::ParticipantState::Disconnected,
}
}
}

impl From<ChatMessage> for RoomChatMessage {
fn from(proto_msg: ChatMessage) -> Self {
RoomChatMessage {
Expand Down
21 changes: 20 additions & 1 deletion livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub use self::{
};
pub use crate::rtc_engine::SimulateScenario;
use crate::{
participant::ConnectionQuality,
participant::{ConnectionQuality, ParticipantState},
prelude::*,
registered_audio_filter_plugins,
rtc_engine::{
Expand Down Expand Up @@ -86,7 +86,16 @@ pub enum RoomError {
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum RoomEvent {
/// Remote participant joined the room.
///
/// This event is fired immediately after a participant joins before
/// it is able to receive data messages. To send data messages in response
/// to a participant joining, respond to the [`Self::ParticipantActive`] event instead.
///
ParticipantConnected(RemoteParticipant),
/// Remote participant is active and ready to receive data messages.
ParticipantActive(RemoteParticipant),
/// Remote participant disconnected from the room.
ParticipantDisconnected(RemoteParticipant),
LocalTrackPublished {
publication: LocalTrackPublication,
Expand Down Expand Up @@ -493,6 +502,7 @@ impl Room {
let local_participant = LocalParticipant::new(
rtc_engine.clone(),
pi.kind().into(),
pi.state().into(),
pi.sid.try_into().unwrap(),
pi.identity.into(),
pi.name,
Expand Down Expand Up @@ -639,6 +649,7 @@ impl Room {
let pi = pi.clone();
inner.create_participant(
pi.kind().into(),
pi.state().into(),
pi.sid.try_into().unwrap(),
pi.identity.into(),
pi.name,
Expand Down Expand Up @@ -976,14 +987,20 @@ impl RoomSession {
// disconnected
}
} else if let Some(remote_participant) = remote_participant {
let already_active = remote_participant.is_active();
remote_participant.update_info(pi.clone());
if !already_active && remote_participant.is_active() {
self.dispatcher
.dispatch(&RoomEvent::ParticipantActive(remote_participant.clone()));
}
participants.push(Participant::Remote(remote_participant));
} else {
// Create a new participant
let remote_participant = {
let pi = pi.clone();
self.create_participant(
pi.kind().into(),
pi.state().into(),
pi.sid.try_into().unwrap(),
pi.identity.into(),
pi.name,
Expand Down Expand Up @@ -1547,6 +1564,7 @@ impl RoomSession {
fn create_participant(
self: &Arc<Self>,
kind: ParticipantKind,
state: ParticipantState,
sid: ParticipantSid,
identity: ParticipantIdentity,
name: String,
Expand All @@ -1556,6 +1574,7 @@ impl RoomSession {
let participant = RemoteParticipant::new(
self.rtc_engine.clone(),
kind,
state,
sid.clone(),
identity.clone(),
name,
Expand Down
14 changes: 12 additions & 2 deletions livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use std::{
time::Duration,
};

use super::{ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantTrackPermission};
use super::{
ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantState,
ParticipantTrackPermission,
};
use crate::{
data_stream::{
ByteStreamInfo, ByteStreamWriter, StreamByteOptions, StreamResult, StreamTextOptions,
Expand Down Expand Up @@ -106,6 +109,7 @@ impl LocalParticipant {
pub(crate) fn new(
rtc_engine: Arc<RtcEngine>,
kind: ParticipantKind,
state: ParticipantState,
sid: ParticipantSid,
identity: ParticipantIdentity,
name: String,
Expand All @@ -114,7 +118,9 @@ impl LocalParticipant {
encryption_type: EncryptionType,
) -> Self {
Self {
inner: super::new_inner(rtc_engine, sid, identity, name, metadata, attributes, kind),
inner: super::new_inner(
rtc_engine, sid, identity, name, metadata, attributes, kind, state,
),
local: Arc::new(LocalInfo {
events: LocalEvents::default(),
encryption_type,
Expand Down Expand Up @@ -665,6 +671,10 @@ impl LocalParticipant {
self.inner.info.read().name.clone()
}

pub fn is_active(&self) -> bool {
self.inner.info.read().state == ParticipantState::Active
}

pub fn metadata(&self) -> String {
self.inner.info.read().metadata.clone()
}
Expand Down
13 changes: 13 additions & 0 deletions livekit/src/room/participant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ pub enum ParticipantKind {
Agent,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum ParticipantState {
Joining,
Joined,
Active,
Disconnected,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum DisconnectReason {
UnknownReason,
Expand Down Expand Up @@ -78,6 +86,7 @@ impl Participant {
pub fn sid(self: &Self) -> ParticipantSid;
pub fn identity(self: &Self) -> ParticipantIdentity;
pub fn name(self: &Self) -> String;
pub fn is_active(self: &Self) -> bool;
pub fn metadata(self: &Self) -> String;
pub fn attributes(self: &Self) -> HashMap<String, String>;
pub fn is_speaking(self: &Self) -> bool;
Expand Down Expand Up @@ -114,6 +123,7 @@ struct ParticipantInfo {
pub audio_level: f32,
pub connection_quality: ConnectionQuality,
pub kind: ParticipantKind,
pub state: ParticipantState,
pub disconnect_reason: DisconnectReason,
}

Expand Down Expand Up @@ -154,6 +164,7 @@ pub(super) fn new_inner(
metadata: String,
attributes: HashMap<String, String>,
kind: ParticipantKind,
state: ParticipantState,
) -> Arc<ParticipantInner> {
Arc::new(ParticipantInner {
rtc_engine,
Expand All @@ -164,6 +175,7 @@ pub(super) fn new_inner(
metadata,
attributes,
kind,
state,
speaking: false,
audio_level: 0.0,
connection_quality: ConnectionQuality::Excellent,
Expand All @@ -180,6 +192,7 @@ pub(super) fn update_info(
new_info: proto::ParticipantInfo,
) {
let mut info = inner.info.write();
info.state = new_info.state().into();
info.disconnect_reason = new_info.disconnect_reason().into();
info.kind = new_info.kind().into();
info.sid = new_info.sid.try_into().unwrap();
Expand Down
11 changes: 9 additions & 2 deletions livekit/src/room/participant/remote_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use livekit_protocol as proto;
use livekit_runtime::timeout;
use parking_lot::Mutex;

use super::{ConnectionQuality, ParticipantInner, ParticipantKind, TrackKind};
use super::{ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantState, TrackKind};
use crate::{prelude::*, rtc_engine::RtcEngine, track::TrackError};

const ADD_TRACK_TIMEOUT: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -71,6 +71,7 @@ impl RemoteParticipant {
pub(crate) fn new(
rtc_engine: Arc<RtcEngine>,
kind: ParticipantKind,
state: ParticipantState,
sid: ParticipantSid,
identity: ParticipantIdentity,
name: String,
Expand All @@ -79,7 +80,9 @@ impl RemoteParticipant {
auto_subscribe: bool,
) -> Self {
Self {
inner: super::new_inner(rtc_engine, sid, identity, name, metadata, attributes, kind),
inner: super::new_inner(
rtc_engine, sid, identity, name, metadata, attributes, kind, state,
),
remote: Arc::new(RemoteInfo { events: Default::default(), auto_subscribe }),
}
}
Expand Down Expand Up @@ -450,6 +453,10 @@ impl RemoteParticipant {
self.inner.info.read().name.clone()
}

pub fn is_active(&self) -> bool {
self.inner.info.read().state == ParticipantState::Active
}

pub fn metadata(&self) -> String {
self.inner.info.read().metadata.clone()
}
Expand Down
Loading