diff --git a/livekit/src/proto.rs b/livekit/src/proto.rs index ef833a95a..372d69c38 100644 --- a/livekit/src/proto.rs +++ b/livekit/src/proto.rs @@ -148,6 +148,17 @@ impl From for participant::ParticipantKind { } } +impl From for participant::ParticipantState { + fn from(value: participant_info::State) -> Self { + match value { + participant_info::State::Joining => participant::ParticipantState::Joining, + participant_info::State::Joined => participant::ParticipantState::Joined, + participant_info::State::Active => participant::ParticipantState::Active, + participant_info::State::Disconnected => participant::ParticipantState::Disconnected, + } + } +} + impl From for RoomChatMessage { fn from(proto_msg: ChatMessage) -> Self { RoomChatMessage { diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index ee428e595..0944a3145 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -45,7 +45,7 @@ pub use self::{ }; pub use crate::rtc_engine::SimulateScenario; use crate::{ - participant::ConnectionQuality, + participant::{ConnectionQuality, ParticipantState}, prelude::*, registered_audio_filter_plugins, rtc_engine::{ @@ -86,7 +86,16 @@ pub enum RoomError { #[derive(Clone, Debug)] #[non_exhaustive] pub enum RoomEvent { + /// Remote participant joined the room. + /// + /// This event is fired immediately after a participant joins before + /// it is able to receive data messages. To send data messages in response + /// to a participant joining, respond to the [`Self::ParticipantActive`] event instead. + /// ParticipantConnected(RemoteParticipant), + /// Remote participant is active and ready to receive data messages. + ParticipantActive(RemoteParticipant), + /// Remote participant disconnected from the room. ParticipantDisconnected(RemoteParticipant), LocalTrackPublished { publication: LocalTrackPublication, @@ -493,6 +502,7 @@ impl Room { let local_participant = LocalParticipant::new( rtc_engine.clone(), pi.kind().into(), + pi.state().into(), pi.sid.try_into().unwrap(), pi.identity.into(), pi.name, @@ -639,6 +649,7 @@ impl Room { let pi = pi.clone(); inner.create_participant( pi.kind().into(), + pi.state().into(), pi.sid.try_into().unwrap(), pi.identity.into(), pi.name, @@ -976,7 +987,12 @@ impl RoomSession { // disconnected } } else if let Some(remote_participant) = remote_participant { + let already_active = remote_participant.is_active(); remote_participant.update_info(pi.clone()); + if !already_active && remote_participant.is_active() { + self.dispatcher + .dispatch(&RoomEvent::ParticipantActive(remote_participant.clone())); + } participants.push(Participant::Remote(remote_participant)); } else { // Create a new participant @@ -984,6 +1000,7 @@ impl RoomSession { let pi = pi.clone(); self.create_participant( pi.kind().into(), + pi.state().into(), pi.sid.try_into().unwrap(), pi.identity.into(), pi.name, @@ -1547,6 +1564,7 @@ impl RoomSession { fn create_participant( self: &Arc, kind: ParticipantKind, + state: ParticipantState, sid: ParticipantSid, identity: ParticipantIdentity, name: String, @@ -1556,6 +1574,7 @@ impl RoomSession { let participant = RemoteParticipant::new( self.rtc_engine.clone(), kind, + state, sid.clone(), identity.clone(), name, diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index f4a12d627..ef362c6a7 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -22,7 +22,10 @@ use std::{ time::Duration, }; -use super::{ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantTrackPermission}; +use super::{ + ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantState, + ParticipantTrackPermission, +}; use crate::{ data_stream::{ ByteStreamInfo, ByteStreamWriter, StreamByteOptions, StreamResult, StreamTextOptions, @@ -106,6 +109,7 @@ impl LocalParticipant { pub(crate) fn new( rtc_engine: Arc, kind: ParticipantKind, + state: ParticipantState, sid: ParticipantSid, identity: ParticipantIdentity, name: String, @@ -114,7 +118,9 @@ impl LocalParticipant { encryption_type: EncryptionType, ) -> Self { Self { - inner: super::new_inner(rtc_engine, sid, identity, name, metadata, attributes, kind), + inner: super::new_inner( + rtc_engine, sid, identity, name, metadata, attributes, kind, state, + ), local: Arc::new(LocalInfo { events: LocalEvents::default(), encryption_type, @@ -665,6 +671,10 @@ impl LocalParticipant { self.inner.info.read().name.clone() } + pub fn is_active(&self) -> bool { + self.inner.info.read().state == ParticipantState::Active + } + pub fn metadata(&self) -> String { self.inner.info.read().metadata.clone() } diff --git a/livekit/src/room/participant/mod.rs b/livekit/src/room/participant/mod.rs index b5ab45f73..787d0dfd1 100644 --- a/livekit/src/room/participant/mod.rs +++ b/livekit/src/room/participant/mod.rs @@ -46,6 +46,14 @@ pub enum ParticipantKind { Agent, } +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum ParticipantState { + Joining, + Joined, + Active, + Disconnected, +} + #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub enum DisconnectReason { UnknownReason, @@ -78,6 +86,7 @@ impl Participant { pub fn sid(self: &Self) -> ParticipantSid; pub fn identity(self: &Self) -> ParticipantIdentity; pub fn name(self: &Self) -> String; + pub fn is_active(self: &Self) -> bool; pub fn metadata(self: &Self) -> String; pub fn attributes(self: &Self) -> HashMap; pub fn is_speaking(self: &Self) -> bool; @@ -114,6 +123,7 @@ struct ParticipantInfo { pub audio_level: f32, pub connection_quality: ConnectionQuality, pub kind: ParticipantKind, + pub state: ParticipantState, pub disconnect_reason: DisconnectReason, } @@ -154,6 +164,7 @@ pub(super) fn new_inner( metadata: String, attributes: HashMap, kind: ParticipantKind, + state: ParticipantState, ) -> Arc { Arc::new(ParticipantInner { rtc_engine, @@ -164,6 +175,7 @@ pub(super) fn new_inner( metadata, attributes, kind, + state, speaking: false, audio_level: 0.0, connection_quality: ConnectionQuality::Excellent, @@ -180,6 +192,7 @@ pub(super) fn update_info( new_info: proto::ParticipantInfo, ) { let mut info = inner.info.write(); + info.state = new_info.state().into(); info.disconnect_reason = new_info.disconnect_reason().into(); info.kind = new_info.kind().into(); info.sid = new_info.sid.try_into().unwrap(); diff --git a/livekit/src/room/participant/remote_participant.rs b/livekit/src/room/participant/remote_participant.rs index 58a1659e7..41a483ad8 100644 --- a/livekit/src/room/participant/remote_participant.rs +++ b/livekit/src/room/participant/remote_participant.rs @@ -24,7 +24,7 @@ use livekit_protocol as proto; use livekit_runtime::timeout; use parking_lot::Mutex; -use super::{ConnectionQuality, ParticipantInner, ParticipantKind, TrackKind}; +use super::{ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantState, TrackKind}; use crate::{prelude::*, rtc_engine::RtcEngine, track::TrackError}; const ADD_TRACK_TIMEOUT: Duration = Duration::from_secs(5); @@ -71,6 +71,7 @@ impl RemoteParticipant { pub(crate) fn new( rtc_engine: Arc, kind: ParticipantKind, + state: ParticipantState, sid: ParticipantSid, identity: ParticipantIdentity, name: String, @@ -79,7 +80,9 @@ impl RemoteParticipant { auto_subscribe: bool, ) -> Self { Self { - inner: super::new_inner(rtc_engine, sid, identity, name, metadata, attributes, kind), + inner: super::new_inner( + rtc_engine, sid, identity, name, metadata, attributes, kind, state, + ), remote: Arc::new(RemoteInfo { events: Default::default(), auto_subscribe }), } } @@ -450,6 +453,10 @@ impl RemoteParticipant { self.inner.info.read().name.clone() } + pub fn is_active(&self) -> bool { + self.inner.info.read().state == ParticipantState::Active + } + pub fn metadata(&self) -> String { self.inner.info.read().metadata.clone() }