From e9e7aebbd1eef466feccfb41c29a4eb1b49ca638 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Wed, 6 Aug 2025 12:37:04 +1000 Subject: [PATCH 01/24] Create TTL map Use system time and duration Add iterator method Organize Doc Add cleanup --- livekit/src/room/utils/mod.rs | 1 + livekit/src/room/utils/ttl_map.rs | 140 ++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 livekit/src/room/utils/ttl_map.rs diff --git a/livekit/src/room/utils/mod.rs b/livekit/src/room/utils/mod.rs index 0cc43b69b..601f23339 100644 --- a/livekit/src/room/utils/mod.rs +++ b/livekit/src/room/utils/mod.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +pub mod ttl_map; pub mod take_cell; pub mod utf8_chunk; diff --git a/livekit/src/room/utils/ttl_map.rs b/livekit/src/room/utils/ttl_map.rs new file mode 100644 index 000000000..32c865c1d --- /dev/null +++ b/livekit/src/room/utils/ttl_map.rs @@ -0,0 +1,140 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::HashMap, + hash::Hash, + time::{Duration, SystemTime}, +}; + +/// Time to live (TTL) map +/// +/// Elements older than the TTL duration are automatically removed. +/// +pub struct TtlMap { + inner: HashMap>, + last_cleanup: SystemTime, + ttl: Duration, +} + +struct Entry { + value: V, + expires_at: SystemTime, +} + +impl TtlMap { + /// Creates an empty `TtlMap`. + pub fn new(ttl: Duration) -> Self { + Self { inner: HashMap::new(), last_cleanup: SystemTime::now(), ttl } + } + + /// Returns the number of elements in the map. + pub fn len(&mut self) -> usize { + self.cleanup(); + self.inner.len() + } + + /// An iterator visiting all key-value pairs in arbitrary order. + /// The iterator element type is `(&'a K, &'a V)`. + pub fn iter(&mut self) -> impl Iterator { + self.cleanup(); + self.inner.iter().map(|(key, entry)| (key, &entry.value)) + } + + /// Removes expired elements. + fn cleanup(&mut self) { + let now = SystemTime::now(); + self.inner.retain(|_, entry| entry.expires_at >= now); + self.last_cleanup = now; + } +} + +impl TtlMap +where + K: Eq + Hash, +{ + /// Returns a reference to the value corresponding to the key. + pub fn get(&mut self, k: &K) -> Option<&V> { + let expires_at = self.inner.get(k).map(|entry| entry.expires_at)?; + let now = SystemTime::now(); + if expires_at < now { + _ = self.inner.remove(k); + return None; + } + Some(&self.inner.get(k).unwrap().value) + } + + /// Sets the value for the given key. + pub fn set(&mut self, k: K, v: Option) { + let now = SystemTime::now(); + let Ok(elapsed) = now.duration_since(self.last_cleanup) else { + log::error!("System clock anomaly detected"); + return; + }; + let half_ttl = self.ttl.div_f64(2.0); + if elapsed > half_ttl { + self.cleanup(); + } + + let Some(value) = v else { + _ = self.inner.remove(&k); + return; + }; + let expires_at = now + self.ttl; + let entry = Entry { value, expires_at }; + self.inner.insert(k, entry); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashSet; + use tokio::time::sleep; + + const SHORT_TTL: Duration = Duration::from_millis(100); + + #[tokio::test] + async fn test_expiration() { + let mut map = TtlMap::::new(SHORT_TTL); + map.set('a', Some(1)); + map.set('b', Some(2)); + map.set('c', Some(3)); + + assert_eq!(map.len(), 3); + assert!(map.get(&'a').is_some()); + assert!(map.get(&'b').is_some()); + assert!(map.get(&'c').is_some()); + + sleep(SHORT_TTL).await; + + assert_eq!(map.len(), 0); + assert!(map.get(&'a').is_none()); + assert!(map.get(&'b').is_none()); + assert!(map.get(&'c').is_none()); + } + + #[test] + fn test_iter() { + let mut map = TtlMap::::new(SHORT_TTL); + map.set('a', Some(1)); + map.set('b', Some(2)); + map.set('c', Some(3)); + + let elements: HashSet<_> = map.iter().map(|(k, v)| (*k, *v)).collect(); + assert!(elements.contains(&('a', 1))); + assert!(elements.contains(&('b', 2))); + assert!(elements.contains(&('c', 3))); + } +} From 83d75cfa11f9cbadc1a689e571901002df06cd6b Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Wed, 6 Aug 2025 13:22:54 +1000 Subject: [PATCH 02/24] Remove dead code --- livekit/src/room/mod.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index ee428e595..2c5420c45 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -353,15 +353,7 @@ pub struct RoomOptions { pub e2ee: Option, pub rtc_config: RtcConfiguration, pub join_retries: u32, - pub sdk_options: RoomSdkOptions, - pub preregistration: Option, -} - -#[derive(Debug, Clone)] -#[non_exhaustive] -pub struct PreRegistration { - text_stream_topics: Vec, - byte_stream_topics: Vec, + pub sdk_options: RoomSdkOptions } impl Default for RoomOptions { @@ -380,8 +372,7 @@ impl Default for RoomOptions { ice_transport_type: IceTransportsType::All, }, join_retries: 3, - sdk_options: RoomSdkOptions::default(), - preregistration: None, + sdk_options: RoomSdkOptions::default() } } } From f39a47e3090232b39fef4bae87eeae0a265e30f7 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Wed, 6 Aug 2025 14:12:50 +1000 Subject: [PATCH 03/24] Make internal --- livekit/src/room/utils/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit/src/room/utils/mod.rs b/livekit/src/room/utils/mod.rs index 601f23339..183c1ceb4 100644 --- a/livekit/src/room/utils/mod.rs +++ b/livekit/src/room/utils/mod.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -pub mod ttl_map; +pub(crate) mod ttl_map; pub mod take_cell; pub mod utf8_chunk; From b1d51ad978cc12901598fcbcba1ed77145c1297e Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Wed, 6 Aug 2025 15:31:16 +1000 Subject: [PATCH 04/24] Expose channel kind --- livekit/src/rtc_engine/rtc_events.rs | 23 ++++++++++++++++++----- livekit/src/rtc_engine/rtc_session.rs | 2 +- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/livekit/src/rtc_engine/rtc_events.rs b/livekit/src/rtc_engine/rtc_events.rs index 004fb2292..8206a8bb0 100644 --- a/livekit/src/rtc_engine/rtc_events.rs +++ b/livekit/src/rtc_engine/rtc_events.rs @@ -17,7 +17,10 @@ use livekit_protocol as proto; use tokio::sync::mpsc; use super::peer_transport::PeerTransport; -use crate::{rtc_engine::peer_transport::OnOfferCreated, DataPacketKind}; +use crate::{ + rtc_engine::{peer_transport::OnOfferCreated, rtc_session::RELIABLE_DC_LABEL}, + DataPacketKind, +}; pub type RtcEmitter = mpsc::UnboundedSender; pub type RtcEvents = mpsc::UnboundedReceiver; @@ -50,6 +53,7 @@ pub enum RtcEvent { Data { data: Vec, binary: bool, + kind: DataPacketKind, }, DataChannelBufferedAmountChange { sent: u64, @@ -90,7 +94,12 @@ fn on_data_channel( emitter: RtcEmitter, ) -> rtc::peer_connection::OnDataChannel { Box::new(move |data_channel| { - data_channel.on_message(Some(on_message(emitter.clone()))); + let kind = if data_channel.label() == RELIABLE_DC_LABEL { + DataPacketKind::Reliable + } else { + DataPacketKind::Lossy + }; + data_channel.on_message(Some(on_message(emitter.clone(), kind))); let _ = emitter.send(RtcEvent::DataChannel { data_channel, target }); }) @@ -140,9 +149,13 @@ pub fn forward_pc_events(transport: &mut PeerTransport, rtc_emitter: RtcEmitter) transport.on_offer(Some(on_offer(signal_target, rtc_emitter))); } -fn on_message(emitter: RtcEmitter) -> rtc::data_channel::OnMessage { +fn on_message(emitter: RtcEmitter, kind: DataPacketKind) -> rtc::data_channel::OnMessage { Box::new(move |buffer| { - let _ = emitter.send(RtcEvent::Data { data: buffer.data.to_vec(), binary: buffer.binary }); + let _ = emitter.send(RtcEvent::Data { + data: buffer.data.to_vec(), + binary: buffer.binary, + kind, + }); }) } @@ -158,6 +171,6 @@ fn on_buffered_amount_change( } pub fn forward_dc_events(dc: &mut DataChannel, kind: DataPacketKind, rtc_emitter: RtcEmitter) { - dc.on_message(Some(on_message(rtc_emitter.clone()))); + dc.on_message(Some(on_message(rtc_emitter.clone(), kind))); dc.on_buffered_amount_change(Some(on_buffered_amount_change(rtc_emitter, dc.clone(), kind))); } diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 9ea670072..f3701a1f7 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -825,7 +825,7 @@ impl SessionInner { log::warn!("Track event with no streams"); } } - RtcEvent::Data { data, binary } => { + RtcEvent::Data { data, binary, kind } => { if !binary { Err(EngineError::Internal("text messages aren't supported".into()))?; } From b9c6e465c1c4cda4cdd1421f28c5c1e7069c99c3 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Tue, 19 Aug 2025 12:25:07 +1000 Subject: [PATCH 05/24] Refactor data channel event - Root kind field applying to all event types - Detail enum - Optional completion tx channel - Prefer named fields - Publish data request gets its own type --- livekit/src/rtc_engine/rtc_session.rs | 81 ++++++++++++++++++--------- 1 file changed, 56 insertions(+), 25 deletions(-) diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index f3701a1f7..838c9f1d8 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -187,9 +187,28 @@ pub enum SessionEvent { } #[derive(Debug)] -enum DataChannelEvent { - PublishData(proto::DataPacket, DataPacketKind, oneshot::Sender>), - BufferedAmountChange(u64, DataPacketKind), +struct DataChannelEvent { + kind: DataPacketKind, + detail: DataChannelEventDetail, +} + +#[derive(Debug)] +enum DataChannelEventDetail { + PublishData(PublishDataRequest), + BufferedAmountChange(u64), +} + +#[derive(Debug)] +struct PublishDataRequest { + /// Encoded packet data. + data: Vec, + /// Packet's sequence number ([`proto::DataPacket::sequence`]). + sequence: u32, + /// Notifies the caller once the request has been fulfilled. + /// + /// For retries, this will be [`Option::None`]. + /// + completion_tx: Option>>, } #[derive(Serialize, Deserialize)] @@ -583,24 +602,23 @@ impl SessionInner { break; }; - match event { - DataChannelEvent::PublishData(packet, kind, tx) => { - let data = packet.encode_to_vec(); - match kind { + match event.detail { + DataChannelEventDetail::PublishData(request) => { + match event.kind { DataPacketKind::Lossy => { - lossy_queue.push_back((data, kind, tx)); + lossy_queue.push_back(request); let threshold = self.lossy_dc_buffered_amount_low_threshold.load(Ordering::Relaxed); - self._send_until_threshold(threshold, &mut lossy_buffered_amount, &mut lossy_queue); + self._send_until_threshold(DataPacketKind::Lossy, threshold, &mut lossy_buffered_amount, &mut lossy_queue); } DataPacketKind::Reliable => { - reliable_queue.push_back((data, kind, tx)); + reliable_queue.push_back(request); let threshold = self.reliable_dc_buffered_amount_low_threshold.load(Ordering::Relaxed); - self._send_until_threshold(threshold, &mut reliable_buffered_amount, &mut reliable_queue); + self._send_until_threshold(DataPacketKind::Reliable, threshold, &mut reliable_buffered_amount, &mut reliable_queue); } } } - DataChannelEvent::BufferedAmountChange(sent, kind) => { - match kind { + DataChannelEventDetail::BufferedAmountChange(sent) => { + match event.kind { DataPacketKind::Lossy => { if lossy_buffered_amount < sent { // I believe never reach here but adding logs just in case @@ -610,7 +628,7 @@ impl SessionInner { lossy_buffered_amount -= sent; } let threshold = self.lossy_dc_buffered_amount_low_threshold.load(Ordering::Relaxed); - self._send_until_threshold(threshold, &mut lossy_buffered_amount, &mut lossy_queue); + self._send_until_threshold(DataPacketKind::Lossy, threshold, &mut lossy_buffered_amount, &mut lossy_queue); } DataPacketKind::Reliable => { if reliable_buffered_amount < sent { @@ -620,7 +638,7 @@ impl SessionInner { reliable_buffered_amount -= sent; } let threshold = self.reliable_dc_buffered_amount_low_threshold.load(Ordering::Relaxed); - self._send_until_threshold(threshold, &mut reliable_buffered_amount, &mut reliable_queue); + self._send_until_threshold(DataPacketKind::Reliable, threshold, &mut reliable_buffered_amount, &mut reliable_queue); } } } @@ -638,15 +656,16 @@ impl SessionInner { fn _send_until_threshold( self: &Arc, + kind: DataPacketKind, threshold: u64, buffered_amount: &mut u64, - queue: &mut VecDeque<(Vec, DataPacketKind, oneshot::Sender>)>, + queue: &mut VecDeque, ) { while *buffered_amount <= threshold { - let Some((data, kind, tx)) = queue.pop_front() else { + let Some(request) = queue.pop_front() else { break; }; - + let data: Vec = request.data.into(); *buffered_amount += data.len() as u64; let result = self .data_channel(SignalTarget::Publisher, kind) @@ -656,7 +675,9 @@ impl SessionInner { EngineError::Internal(format!("failed to send data packet: {:?}", err).into()) }); - let _ = tx.send(result); + if let Some(completion_tx) = request.completion_tx { + _ = completion_tx.send(result); + } } } @@ -965,9 +986,11 @@ impl SessionInner { } } RtcEvent::DataChannelBufferedAmountChange { sent, amount: _, kind } => { - if let Err(err) = - self.dc_emitter.send(DataChannelEvent::BufferedAmountChange(sent, kind)) - { + let ev = DataChannelEvent { + kind, + detail: DataChannelEventDetail::BufferedAmountChange(sent), + }; + if let Err(err) = self.dc_emitter.send(ev) { log::error!("failed to send dc_event buffer_amount_change: {:?}", err); } } @@ -1183,13 +1206,21 @@ impl SessionInner { ) -> Result<(), EngineError> { self.ensure_publisher_connected(kind).await?; - let (tx, rx) = oneshot::channel(); - if let Err(err) = self.dc_emitter.send(DataChannelEvent::PublishData(data, kind, tx)) { + let (completion_tx, completion_rx) = oneshot::channel(); + let ev = DataChannelEvent { + kind, + detail: DataChannelEventDetail::PublishData(PublishDataRequest { + data: data.encode_to_vec(), + sequence: data.sequence, + completion_tx: Some(completion_tx) + }) + }; + if let Err(err) = self.dc_emitter.send(ev) { return Err(EngineError::Internal( format!("failed to push data into queue: {:?}", err).into(), )); }; - rx.await.map_err(|e| { + completion_rx.await.map_err(|e| { EngineError::Internal(format!("failed to receive data from dc_task: {:?}", e).into()) })? } From 2af51f0fb097fd57fe325a359746302225ed59b4 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Tue, 19 Aug 2025 14:18:22 +1000 Subject: [PATCH 06/24] Avoid cloning keys --- livekit/src/room/utils/ttl_map.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/livekit/src/room/utils/ttl_map.rs b/livekit/src/room/utils/ttl_map.rs index 32c865c1d..b375ddf6c 100644 --- a/livekit/src/room/utils/ttl_map.rs +++ b/livekit/src/room/utils/ttl_map.rs @@ -62,7 +62,7 @@ impl TtlMap { impl TtlMap where - K: Eq + Hash, + K: Eq + Hash + Clone, { /// Returns a reference to the value corresponding to the key. pub fn get(&mut self, k: &K) -> Option<&V> { @@ -76,7 +76,7 @@ where } /// Sets the value for the given key. - pub fn set(&mut self, k: K, v: Option) { + pub fn set(&mut self, k: &K, v: Option) { let now = SystemTime::now(); let Ok(elapsed) = now.duration_since(self.last_cleanup) else { log::error!("System clock anomaly detected"); @@ -93,7 +93,7 @@ where }; let expires_at = now + self.ttl; let entry = Entry { value, expires_at }; - self.inner.insert(k, entry); + self.inner.insert(k.clone(), entry); } } @@ -108,9 +108,9 @@ mod tests { #[tokio::test] async fn test_expiration() { let mut map = TtlMap::::new(SHORT_TTL); - map.set('a', Some(1)); - map.set('b', Some(2)); - map.set('c', Some(3)); + map.set(&'a', Some(1)); + map.set(&'b', Some(2)); + map.set(&'c', Some(3)); assert_eq!(map.len(), 3); assert!(map.get(&'a').is_some()); @@ -128,9 +128,9 @@ mod tests { #[test] fn test_iter() { let mut map = TtlMap::::new(SHORT_TTL); - map.set('a', Some(1)); - map.set('b', Some(2)); - map.set('c', Some(3)); + map.set(&'a', Some(1)); + map.set(&'b', Some(2)); + map.set(&'c', Some(3)); let elements: HashSet<_> = map.iter().map(|(k, v)| (*k, *v)).collect(); assert!(elements.contains(&('a', 1))); From ef5a499e42f3d0e616b121710d3bec9513e48857 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Tue, 19 Aug 2025 14:48:28 +1000 Subject: [PATCH 07/24] Create TxQueue --- livekit/src/room/utils/mod.rs | 1 + livekit/src/room/utils/tx_queue.rs | 116 +++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 livekit/src/room/utils/tx_queue.rs diff --git a/livekit/src/room/utils/mod.rs b/livekit/src/room/utils/mod.rs index 183c1ceb4..4e3b49cb6 100644 --- a/livekit/src/room/utils/mod.rs +++ b/livekit/src/room/utils/mod.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +pub(crate) mod tx_queue; pub(crate) mod ttl_map; pub mod take_cell; pub mod utf8_chunk; diff --git a/livekit/src/room/utils/tx_queue.rs b/livekit/src/room/utils/tx_queue.rs new file mode 100644 index 000000000..8e3ce7418 --- /dev/null +++ b/livekit/src/room/utils/tx_queue.rs @@ -0,0 +1,116 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; + +#[derive(Debug)] +pub struct TxQueue { + inner: VecDeque, + buffered_size: usize +} + +impl TxQueue { + /// Creates an empty queue. + pub fn new() -> Self { + Self { + inner: VecDeque::new(), + buffered_size: 0 + } + } + + /// Number of elements in the queue. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Total size in bytes of all items currently in the queue. + pub fn buffered_size(&self) -> usize { + self.buffered_size + } + + /// Provides a reference to the front element, or `None if the queue is empty. + pub fn peek(&self) -> Option<&T> { + self.inner.front() + } + + /// Appends an item to the back of the queue. + pub fn enqueue(&mut self, item: T) { + let size = item.buffered_size(); + self.inner.push_back(item); + self.buffered_size += size; + } + + /// Removes the first item and returns it, or `None` if the queue is empty. + pub fn dequeue(&mut self) -> Option { + let item = self.inner.pop_front()?; + let size = item.buffered_size(); + self.buffered_size -= size; + return Some(item); + } + + /// Dequeue and discard items until the buffered size is less than or + /// equal to the given target. + pub fn trim(&mut self, target_buffer_size: usize) { + while self.buffered_size > target_buffer_size { + _ = self.dequeue() + } + } +} + +/// Item in a [`TxQueue`]. +pub trait TxQueueItem { + /// Amount in bytes this item adds to the [`TxQueue`]'s buffered size. + fn buffered_size(&self) -> usize; +} + +impl TxQueueItem for Vec { + fn buffered_size(&self) -> usize { + self.len() + } +} + +#[cfg(test)] +mod tests { + use super::TxQueue; + + #[test] + fn test_buffered_size() { + let mut queue = TxQueue::new(); + + queue.enqueue(vec![0xFF, 0xFA]); + assert_eq!(queue.buffered_size(), 2); + + queue.enqueue(vec![0x0F, 0xFC, 0xAF]); + assert_eq!(queue.buffered_size(), 5); + + assert_eq!(queue.dequeue(), Some(vec![0xFF, 0xFA])); + assert_eq!(queue.buffered_size, 3); + + assert_eq!(queue.dequeue(), Some(vec![0x0F, 0xFC, 0xAF])); + assert_eq!(queue.buffered_size, 0); + + assert_eq!(queue.dequeue(), None); + } + + #[test] + fn test_trim() { + let mut queue = TxQueue::new(); + queue.enqueue(vec![0xFF, 0xFA]); + queue.enqueue(vec![0x0F, 0xFC, 0xAF]); + queue.enqueue(vec![0xAA]); + + queue.trim(1); + assert_eq!(queue.buffered_size(), 1); + } +} \ No newline at end of file From 875ad55311e0816c1083ed42c892bca49e2d1e0c Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Tue, 19 Aug 2025 16:03:45 +1000 Subject: [PATCH 08/24] Implement reliable retry --- livekit/src/room/mod.rs | 3 +- livekit/src/rtc_engine/rtc_session.rs | 176 +++++++++++++++++++++++--- 2 files changed, 162 insertions(+), 17 deletions(-) diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 2c5420c45..276fe62c9 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -1178,8 +1178,7 @@ impl RoomSession { }), publish_tracks: self.local_participant.published_tracks_info(), data_channels: dcs, - // unimplemented, stubbed for now - datachannel_receive_states: Vec::new(), + datachannel_receive_states: session.data_channel_receive_states() }; log::debug!("sending sync state {:?}", sync_state); diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 838c9f1d8..1333b4c3a 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -18,7 +18,7 @@ use std::{ fmt::Debug, ops::Not, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, Arc, }, time::Duration, @@ -38,7 +38,14 @@ use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot, watch, Notify}; use super::{rtc_events, EngineError, EngineOptions, EngineResult, SimulateScenario}; -use crate::{id::ParticipantIdentity, ChatMessage, TranscriptionSegment}; +use crate::{ + id::ParticipantIdentity, + utils::{ + ttl_map::TtlMap, + tx_queue::{TxQueue, TxQueueItem}, + }, + ChatMessage, TranscriptionSegment, +}; use crate::{ id::ParticipantSid, options::TrackPublishOptions, @@ -57,6 +64,7 @@ pub const ICE_CONNECT_TIMEOUT: Duration = Duration::from_secs(15); pub const TRACK_PUBLISH_TIMEOUT: Duration = Duration::from_secs(10); pub const LOSSY_DC_LABEL: &str = "_lossy"; pub const RELIABLE_DC_LABEL: &str = "_reliable"; +pub const RELIABLE_RECEIVED_STATE_TTL: Duration = Duration::from_secs(30); pub const PUBLISHER_NEGOTIATION_FREQUENCY: Duration = Duration::from_millis(150); pub const INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD: u64 = 2 * 1024 * 1024; @@ -194,23 +202,57 @@ struct DataChannelEvent { #[derive(Debug)] enum DataChannelEventDetail { + /// Publish data packet. + PublishPacket(PublishPacketRequest), + /// Publish data packet that has already been encoded. PublishData(PublishDataRequest), + /// RTC buffered amount changed. BufferedAmountChange(u64), + /// Enqueue reliable packets for retry starting from the given sequence number. + RetryFrom(u32), +} + +#[derive(Debug)] +struct PublishPacketRequest { + /// Unencoded data packewt. + packet: proto::DataPacket, + + /// Notifies the caller once the request has been fulfilled. + completion_tx: oneshot::Sender>, } #[derive(Debug)] struct PublishDataRequest { - /// Encoded packet data. - data: Vec, - /// Packet's sequence number ([`proto::DataPacket::sequence`]). - sequence: u32, + /// Encoded data packet. + encoded_packet: EncodedPacket, + /// Notifies the caller once the request has been fulfilled. /// - /// For retries, this will be [`Option::None`]. + /// For retries, this will be `None`. /// completion_tx: Option>>, } +#[derive(Debug)] +struct EncodedPacket { + /// Encoded packet data. + data: Vec, + /// Packet's sequence number from [`proto::DataPacket::sequence`]. + sequence: u32, +} + +impl Into for proto::DataPacket { + fn into(self) -> EncodedPacket { + EncodedPacket { data: self.encode_to_vec(), sequence: self.sequence } + } +} + +impl TxQueueItem for EncodedPacket { + fn buffered_size(&self) -> usize { + self.data.len() + } +} + #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct IceCandidateJson { @@ -236,6 +278,13 @@ struct SessionInner { lossy_dc_buffered_amount_low_threshold: AtomicU64, reliable_dc: DataChannel, reliable_dc_buffered_amount_low_threshold: AtomicU64, + + /// Next sequence number for reliable packets. + next_packet_sequence: AtomicU32, + + /// Time to live (TTL) map between publisher SID and last sequence number. + dc_receive_state: Mutex>, + dc_emitter: mpsc::UnboundedSender, // Keep a strong reference to the subscriber datachannels, @@ -341,6 +390,8 @@ impl RtcSession { reliable_dc_buffered_amount_low_threshold: AtomicU64::new( INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD, ), + next_packet_sequence: 1.into(), + dc_receive_state: Mutex::new(TtlMap::new(RELIABLE_RECEIVED_STATE_TTL)), dc_emitter, sub_lossy_dc: Mutex::new(None), sub_reliable_dc: Mutex::new(None), @@ -489,6 +540,10 @@ impl RtcSession { .send(SessionEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold }); } + pub fn data_channel_receive_states(&self) -> Vec { + self.inner.data_channel_receive_states() + } + pub async fn get_response(&self, request_id: u32) -> proto::RequestResponse { self.inner.get_response(request_id).await } @@ -593,6 +648,7 @@ impl SessionInner { let mut reliable_buffered_amount = 0; let mut lossy_queue = VecDeque::new(); let mut reliable_queue = VecDeque::new(); + let mut retry_queue = TxQueue::new(); loop { tokio::select! { @@ -603,6 +659,21 @@ impl SessionInner { }; match event.detail { + DataChannelEventDetail::PublishPacket(mut request) => { + if event.kind == DataPacketKind::Reliable { + request.packet.sequence = self.next_packet_sequence.fetch_add(1, Ordering::Relaxed); + } + let ev = DataChannelEvent { + kind: event.kind, + detail: DataChannelEventDetail::PublishData(PublishDataRequest { + encoded_packet: request.packet.into(), + completion_tx: request.completion_tx.into() + }) + }; + if let Err(err) = self.dc_emitter.send(ev) { + log::error!("Failed to enqueue send data request: {}", err) + } + } DataChannelEventDetail::PublishData(request) => { match event.kind { DataPacketKind::Lossy => { @@ -639,9 +710,16 @@ impl SessionInner { } let threshold = self.reliable_dc_buffered_amount_low_threshold.load(Ordering::Relaxed); self._send_until_threshold(DataPacketKind::Reliable, threshold, &mut reliable_buffered_amount, &mut reliable_queue); + // TODO: Ensure this is the proper quantity + let retry_min_amount = (threshold as usize * 5) / 4; // threshold * 1.25 + retry_queue.trim((sent as usize) + retry_min_amount); } } } + DataChannelEventDetail::RetryFrom(last_sequence) => { + assert!(event.kind == DataPacketKind::Reliable); + self._enqueue_for_retry_from(last_sequence, &mut retry_queue); + } } }, @@ -665,7 +743,7 @@ impl SessionInner { let Some(request) = queue.pop_front() else { break; }; - let data: Vec = request.data.into(); + let data = request.encoded_packet.data; *buffered_amount += data.len() as u64; let result = self .data_channel(SignalTarget::Publisher, kind) @@ -681,6 +759,53 @@ impl SessionInner { } } + fn _enqueue_for_retry_from( + self: &Arc, + last_sequence: u32, + queue: &mut TxQueue, + ) { + if let Some(first) = queue.peek() { + if first.sequence > last_sequence + 1 { + log::warn!( + "Wrong packet sequence while retrying: {} > {}, {} packets missing", + first.sequence, + last_sequence + 1, + first.sequence - last_sequence - 1 + ); + } + } + while let Some(encoded_packet) = queue.dequeue() { + if encoded_packet.sequence <= last_sequence { + continue; + }; + let ev = DataChannelEvent { + kind: DataPacketKind::Reliable, + detail: DataChannelEventDetail::PublishData(PublishDataRequest { + encoded_packet, + completion_tx: None, + }), + }; + if let Err(err) = self.dc_emitter.send(ev) { + log::error!("Failed to enqueue data for retry: {}", err); + } + } + } + + fn update_reliable_received_state(&self, packet: &proto::DataPacket) { + if packet.sequence <= 0 || packet.participant_sid.is_empty() { + return; + }; + let mut state = self.dc_receive_state.lock(); + if state + .get(&packet.participant_sid) + .is_some_and(|&last_sequence| packet.sequence <= last_sequence) + { + log::warn!("Ignoring duplicate/out-of-order reliable data message"); + return; + } + state.set(&packet.participant_sid, Some(packet.sequence)); + } + async fn on_signal_event(&self, event: proto::signal_response::Message) -> EngineResult<()> { match event { proto::signal_response::Message::Answer(answer) => { @@ -852,6 +977,9 @@ impl SessionInner { } let data = proto::DataPacket::decode(&*data).unwrap(); + if kind == DataPacketKind::Reliable { + self.update_reliable_received_state(&data); + } if let Some(packet) = data.value.as_ref() { match packet { proto::data_packet::Value::User(user) => { @@ -1201,7 +1329,7 @@ impl SessionInner { async fn publish_data( self: &Arc, - data: proto::DataPacket, + packet: proto::DataPacket, kind: DataPacketKind, ) -> Result<(), EngineError> { self.ensure_publisher_connected(kind).await?; @@ -1209,15 +1337,14 @@ impl SessionInner { let (completion_tx, completion_rx) = oneshot::channel(); let ev = DataChannelEvent { kind, - detail: DataChannelEventDetail::PublishData(PublishDataRequest { - data: data.encode_to_vec(), - sequence: data.sequence, - completion_tx: Some(completion_tx) - }) + detail: DataChannelEventDetail::PublishPacket(PublishPacketRequest { + packet, + completion_tx, + }), }; if let Err(err) = self.dc_emitter.send(ev) { return Err(EngineError::Internal( - format!("failed to push data into queue: {:?}", err).into(), + format!("Failed to enqueue publish packet request: {:?}", err).into(), )); }; completion_rx.await.map_err(|e| { @@ -1236,6 +1363,14 @@ impl SessionInner { self.publisher_pc.peer_connection().set_configuration(rtc_config.clone())?; self.subscriber_pc.peer_connection().set_configuration(rtc_config)?; + let ev = DataChannelEvent { + kind: DataPacketKind::Reliable, + detail: DataChannelEventDetail::RetryFrom(reconnect_response.last_message_seq), + }; + if let Err(err) = self.dc_emitter.send(ev) { + log::error!("Failed to request reliable retry: {:?}", err); + } + Ok(reconnect_response) } @@ -1444,6 +1579,17 @@ impl SessionInner { } } + fn data_channel_receive_states(self: &Arc) -> Vec { + let mut state = self.dc_receive_state.lock(); + state + .iter() + .map(|(publisher_sid, last_seq)| proto::DataChannelReceiveState { + publisher_sid: publisher_sid.to_string(), + last_seq: *last_seq, + }) + .collect() + } + async fn get_response(&self, request_id: u32) -> proto::RequestResponse { let (tx, rx) = oneshot::channel(); self.pending_requests.lock().insert(request_id, tx); From 618534b6c28f18425a45b1551fb5e529f3710047 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Tue, 19 Aug 2025 16:32:21 +1000 Subject: [PATCH 09/24] Set participant SID on all outgoing data packets --- livekit/src/room/mod.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 276fe62c9..388d396c0 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -568,7 +568,8 @@ impl Room { let (incoming_stream_manager, open_rx) = IncomingStreamManager::new(); let (outgoing_stream_manager, packet_rx) = OutgoingStreamManager::new(); - let identity = local_participant.identity().clone(); + let local_participant_identity = local_participant.identity(); + let local_participant_sid = local_participant.sid(); let room_info = join_response.room.unwrap(); let inner = Arc::new(RoomSession { @@ -662,7 +663,8 @@ impl Room { )); let outgoing_stream_handle = livekit_runtime::spawn(outgoing_data_stream_task( packet_rx, - identity, + local_participant_sid, + local_participant_identity, rtc_engine.clone(), close_rx.resubscribe(), )); @@ -1727,6 +1729,7 @@ async fn incoming_data_stream_task( /// Receives packets from the outgoing stream manager and send them. async fn outgoing_data_stream_task( mut packet_rx: UnboundedRequestReceiver>, + participant_sid: ParticipantSid, participant_identity: ParticipantIdentity, engine: Arc, mut close_rx: broadcast::Receiver<()>, @@ -1735,6 +1738,7 @@ async fn outgoing_data_stream_task( tokio::select! { Ok((mut packet, responder)) = packet_rx.recv() => { // Set packet's participant identity field + packet.participant_sid = participant_sid.clone().into(); packet.participant_identity = participant_identity.0.clone(); let result = engine.publish_data(packet, DataPacketKind::Reliable).await; let _ = responder.respond(result); From f989a3d5989cbc49a9376e034443f864799e464c Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Tue, 19 Aug 2025 17:11:17 +1000 Subject: [PATCH 10/24] Derive debug --- livekit/src/room/utils/ttl_map.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/livekit/src/room/utils/ttl_map.rs b/livekit/src/room/utils/ttl_map.rs index b375ddf6c..313886f8c 100644 --- a/livekit/src/room/utils/ttl_map.rs +++ b/livekit/src/room/utils/ttl_map.rs @@ -13,21 +13,21 @@ // limitations under the License. use std::{ - collections::HashMap, - hash::Hash, - time::{Duration, SystemTime}, + collections::HashMap, fmt::Debug, hash::Hash, time::{Duration, SystemTime} }; /// Time to live (TTL) map /// /// Elements older than the TTL duration are automatically removed. /// +#[derive(Debug)] pub struct TtlMap { inner: HashMap>, last_cleanup: SystemTime, ttl: Duration, } +#[derive(Debug)] struct Entry { value: V, expires_at: SystemTime, From 025988963fb74ddcc1617073905d84eddf88d035 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 28 Aug 2025 16:11:11 +1000 Subject: [PATCH 11/24] Avoid unsafe unwrap --- livekit/src/rtc_engine/rtc_session.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 1333b4c3a..ada08b272 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -976,7 +976,9 @@ impl SessionInner { Err(EngineError::Internal("text messages aren't supported".into()))?; } - let data = proto::DataPacket::decode(&*data).unwrap(); + let data = proto::DataPacket::decode(&*data).map_err(|err| { + EngineError::Internal(format!("failed to decode data packet: {}", err).into()) + })?; if kind == DataPacketKind::Reliable { self.update_reliable_received_state(&data); } From 017dffcf756b7e7010e53ad375500f07911b045b Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 28 Aug 2025 16:17:18 +1000 Subject: [PATCH 12/24] Obtain packet kind from RTC event --- livekit/src/rtc_engine/rtc_session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index ada08b272..c85419cee 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -1000,7 +1000,7 @@ impl SessionInner { }; let _ = self.emitter.send(SessionEvent::Data { - kind: data.kind().into(), + kind, participant_sid: participant_sid.map(|s| s.try_into().unwrap()), participant_identity: participant_identity .map(|s| s.try_into().unwrap()), From 412af8da5c030f3a8e52050d9eee96bafadb173d Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 28 Aug 2025 17:38:51 +1000 Subject: [PATCH 13/24] Refactor incoming data packet handling - Prefer packet level participant identity and SID for user packet - Take packet value to avoid many unnecessary clones --- livekit/src/rtc_engine/rtc_session.rs | 254 ++++++++++++-------------- 1 file changed, 120 insertions(+), 134 deletions(-) diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index c85419cee..ab859a09c 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -975,144 +975,14 @@ impl SessionInner { if !binary { Err(EngineError::Internal("text messages aren't supported".into()))?; } - - let data = proto::DataPacket::decode(&*data).map_err(|err| { + let mut packet = proto::DataPacket::decode(&*data).map_err(|err| { EngineError::Internal(format!("failed to decode data packet: {}", err).into()) })?; if kind == DataPacketKind::Reliable { - self.update_reliable_received_state(&data); + self.update_reliable_received_state(&packet); } - if let Some(packet) = data.value.as_ref() { - match packet { - proto::data_packet::Value::User(user) => { - let participant_sid = user - .participant_sid - .is_empty() - .not() - .then_some(user.participant_sid.clone()); - - let participant_identity = if !data.participant_identity.is_empty() { - Some(data.participant_identity.clone()) - } else if !user.participant_identity.is_empty() { - Some(user.participant_identity.clone()) - } else { - None - }; - - let _ = self.emitter.send(SessionEvent::Data { - kind, - participant_sid: participant_sid.map(|s| s.try_into().unwrap()), - participant_identity: participant_identity - .map(|s| s.try_into().unwrap()), - payload: user.payload.clone(), - topic: user.topic.clone(), - }); - } - proto::data_packet::Value::SipDtmf(dtmf) => { - let participant_identity = data - .participant_identity - .is_empty() - .not() - .then_some(data.participant_identity.clone()); - let digit = dtmf.digit.is_empty().not().then_some(dtmf.digit.clone()); - - let _ = self.emitter.send(SessionEvent::SipDTMF { - participant_identity: participant_identity - .map(|s| s.try_into().unwrap()), - digit: digit.map(|s| s.try_into().unwrap()), - code: dtmf.code, - }); - } - proto::data_packet::Value::Speaker(_) => {} - proto::data_packet::Value::Transcription(transcription) => { - let track_sid = transcription.track_id.clone(); - // let segments = transcription.segments.clone(); - let segments = transcription - .segments - .iter() - .map(|s| TranscriptionSegment { - id: s.id.clone(), - start_time: s.start_time, - end_time: s.end_time, - text: s.text.clone(), - language: s.language.clone(), - r#final: s.r#final, - }) - .collect(); - let participant_identity: ParticipantIdentity = - transcription.transcribed_participant_identity.clone().into(); - let _ = self.emitter.send(SessionEvent::Transcription { - participant_identity, - track_sid, - segments, - }); - } - proto::data_packet::Value::RpcRequest(rpc_request) => { - let caller_identity = data - .participant_identity - .is_empty() - .not() - .then_some(data.participant_identity.clone()) - .map(|s| s.try_into().unwrap()); - let _ = self.emitter.send(SessionEvent::RpcRequest { - caller_identity, - request_id: rpc_request.id.clone(), - method: rpc_request.method.clone(), - payload: rpc_request.payload.clone(), - response_timeout: Duration::from_millis( - rpc_request.response_timeout_ms as u64, - ), - version: rpc_request.version, - }); - } - proto::data_packet::Value::RpcResponse(rpc_response) => { - let _ = self.emitter.send(SessionEvent::RpcResponse { - request_id: rpc_response.request_id.clone(), - payload: rpc_response.value.as_ref().and_then(|v| match v { - proto::rpc_response::Value::Payload(payload) => { - Some(payload.clone()) - } - _ => None, - }), - error: rpc_response.value.as_ref().and_then(|v| match v { - proto::rpc_response::Value::Error(error) => Some(error.clone()), - _ => None, - }), - }); - } - proto::data_packet::Value::RpcAck(rpc_ack) => { - let _ = self.emitter.send(SessionEvent::RpcAck { - request_id: rpc_ack.request_id.clone(), - }); - } - proto::data_packet::Value::ChatMessage(message) => { - let _ = self.emitter.send(SessionEvent::ChatMessage { - participant_identity: ParticipantIdentity( - data.participant_identity, - ), - message: ChatMessage::from(message.clone()), - }); - } - proto::data_packet::Value::StreamHeader(message) => { - let _ = self.emitter.send(SessionEvent::DataStreamHeader { - header: message.clone(), - participant_identity: data.participant_identity.clone(), - }); - } - proto::data_packet::Value::StreamChunk(message) => { - let _ = self.emitter.send(SessionEvent::DataStreamChunk { - chunk: message.clone(), - participant_identity: data.participant_identity.clone(), - }); - } - proto::data_packet::Value::StreamTrailer(message) => { - let _ = self.emitter.send(SessionEvent::DataStreamTrailer { - trailer: message.clone(), - participant_identity: data.participant_identity.clone(), - }); - } - _ => {} - } + if let Some(detail) = packet.value.take() { + self.emit_incoming_packet(kind, packet, detail); } } RtcEvent::DataChannelBufferedAmountChange { sent, amount: _, kind } => { @@ -1129,6 +999,122 @@ impl SessionInner { Ok(()) } + fn emit_incoming_packet( + &self, + kind: DataPacketKind, + packet: proto::DataPacket, + value: proto::data_packet::Value, + ) { + // TODO: Standardize how participant identity is emitted in events; + // Option, ParticipantIdentity, and String are all used. + let participant_sid: Option = packet.participant_sid.try_into().ok(); + let participant_identity: Option = + packet.participant_identity.try_into().ok(); + + let send_result = match value { + proto::data_packet::Value::User(user) => { + // Participant SID and identity used to be defined on user packet, but + // they have been moved to the packet root. For backwards compatibility, + // we take the user packet's values if the top-level fields are not set. + let participant_sid = participant_sid + .is_none() + .then_some(user.participant_sid) + .and_then(|sid| sid.try_into().ok()); + let participant_identity = participant_identity + .is_none() + .then_some(user.participant_identity) + .and_then(|identity| identity.try_into().ok()); + + self.emitter.send(SessionEvent::Data { + kind, + participant_sid, + participant_identity, + payload: user.payload, + topic: user.topic, + }) + } + proto::data_packet::Value::SipDtmf(dtmf) => self.emitter.send(SessionEvent::SipDTMF { + participant_identity, + digit: (!dtmf.digit.is_empty()).then_some(dtmf.digit), + code: dtmf.code, + }), + proto::data_packet::Value::Transcription(transcription) => { + let segments = transcription + .segments + .into_iter() + .map(|s| TranscriptionSegment { + id: s.id, + start_time: s.start_time, + end_time: s.end_time, + text: s.text, + language: s.language, + r#final: s.r#final, + }) + .collect(); + let participant_identity = + transcription.transcribed_participant_identity.into(); + + self.emitter.send(SessionEvent::Transcription { + participant_identity, + track_sid: transcription.track_id, + segments, + }) + } + proto::data_packet::Value::RpcRequest(rpc_request) => { + let caller_identity = participant_identity; + self.emitter.send(SessionEvent::RpcRequest { + caller_identity, + request_id: rpc_request.id, + method: rpc_request.method, + payload: rpc_request.payload, + response_timeout: Duration::from_millis(rpc_request.response_timeout_ms as u64), + version: rpc_request.version, + }) + } + proto::data_packet::Value::RpcResponse(rpc_response) => { + let (payload, error) = match rpc_response.value { + None => (None, None), + Some(proto::rpc_response::Value::Payload(payload)) => (Some(payload), None), + Some(proto::rpc_response::Value::Error(err)) => (None, Some(err)), + }; + self.emitter.send(SessionEvent::RpcResponse { + request_id: rpc_response.request_id, + payload, + error, + }) + } + proto::data_packet::Value::RpcAck(rpc_ack) => { + self.emitter.send(SessionEvent::RpcAck { request_id: rpc_ack.request_id }) + } + proto::data_packet::Value::ChatMessage(message) => { + self.emitter.send(SessionEvent::ChatMessage { + participant_identity: participant_identity + .unwrap_or(ParticipantIdentity("".into())), + message: ChatMessage::from(message), + }) + } + proto::data_packet::Value::StreamHeader(header) => { + let participant_identity = + participant_identity.map_or("".into(), |identity| identity.0); + self.emitter.send(SessionEvent::DataStreamHeader { header, participant_identity }) + } + proto::data_packet::Value::StreamChunk(chunk) => { + let participant_identity = + participant_identity.map_or("".into(), |identity| identity.0); + self.emitter.send(SessionEvent::DataStreamChunk { chunk, participant_identity }) + } + proto::data_packet::Value::StreamTrailer(trailer) => { + let participant_identity = + participant_identity.map_or("".into(), |identity| identity.0); + self.emitter.send(SessionEvent::DataStreamTrailer { trailer, participant_identity }) + } + _ => Ok(()), + }; + if let Err(err) = send_result { + log::error!("failed to emit incoming data packet: {:?}", err); + } + } + async fn add_track(&self, req: proto::AddTrackRequest) -> EngineResult { let (tx, rx) = oneshot::channel(); let cid = req.cid.clone(); From adf61f359a20f6702d9382aad1b5b14fa4ceda2c Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 28 Aug 2025 19:41:33 +1000 Subject: [PATCH 14/24] Revert "Set participant SID on all outgoing data packets" This reverts commit 618534b6c28f18425a45b1551fb5e529f3710047. --- livekit/src/room/mod.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 388d396c0..276fe62c9 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -568,8 +568,7 @@ impl Room { let (incoming_stream_manager, open_rx) = IncomingStreamManager::new(); let (outgoing_stream_manager, packet_rx) = OutgoingStreamManager::new(); - let local_participant_identity = local_participant.identity(); - let local_participant_sid = local_participant.sid(); + let identity = local_participant.identity().clone(); let room_info = join_response.room.unwrap(); let inner = Arc::new(RoomSession { @@ -663,8 +662,7 @@ impl Room { )); let outgoing_stream_handle = livekit_runtime::spawn(outgoing_data_stream_task( packet_rx, - local_participant_sid, - local_participant_identity, + identity, rtc_engine.clone(), close_rx.resubscribe(), )); @@ -1729,7 +1727,6 @@ async fn incoming_data_stream_task( /// Receives packets from the outgoing stream manager and send them. async fn outgoing_data_stream_task( mut packet_rx: UnboundedRequestReceiver>, - participant_sid: ParticipantSid, participant_identity: ParticipantIdentity, engine: Arc, mut close_rx: broadcast::Receiver<()>, @@ -1738,7 +1735,6 @@ async fn outgoing_data_stream_task( tokio::select! { Ok((mut packet, responder)) = packet_rx.recv() => { // Set packet's participant identity field - packet.participant_sid = participant_sid.clone().into(); packet.participant_identity = participant_identity.0.clone(); let result = engine.publish_data(packet, DataPacketKind::Reliable).await; let _ = responder.respond(result); From c48cc8463f6c1d6d0bd21f7a345835e87f9716db Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 28 Aug 2025 20:46:12 +1000 Subject: [PATCH 15/24] Set participant info on outgoing data packets Ensure all outgoing data packets have their participant identity and SID fields set by handling this as a low-level protocol detail in session --- livekit/src/room/mod.rs | 8 +----- livekit/src/rtc_engine/rtc_session.rs | 35 ++++++++++++++++++++++++--- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 276fe62c9..b015e0ba3 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -568,8 +568,6 @@ impl Room { let (incoming_stream_manager, open_rx) = IncomingStreamManager::new(); let (outgoing_stream_manager, packet_rx) = OutgoingStreamManager::new(); - let identity = local_participant.identity().clone(); - let room_info = join_response.room.unwrap(); let inner = Arc::new(RoomSession { sid_promise: Promise::new(), @@ -662,7 +660,6 @@ impl Room { )); let outgoing_stream_handle = livekit_runtime::spawn(outgoing_data_stream_task( packet_rx, - identity, rtc_engine.clone(), close_rx.resubscribe(), )); @@ -1727,15 +1724,12 @@ async fn incoming_data_stream_task( /// Receives packets from the outgoing stream manager and send them. async fn outgoing_data_stream_task( mut packet_rx: UnboundedRequestReceiver>, - participant_identity: ParticipantIdentity, engine: Arc, mut close_rx: broadcast::Receiver<()>, ) { loop { tokio::select! { - Ok((mut packet, responder)) = packet_rx.recv() => { - // Set packet's participant identity field - packet.participant_identity = participant_identity.0.clone(); + Ok((packet, responder)) = packet_rx.recv() => { let result = engine.publish_data(packet, DataPacketKind::Reliable).await; let _ = responder.respond(result); }, diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index ab859a09c..44eb4f325 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -16,7 +16,6 @@ use std::{ collections::{HashMap, VecDeque}, convert::TryInto, fmt::Debug, - ops::Not, sync::{ atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, Arc, @@ -285,6 +284,8 @@ struct SessionInner { /// Time to live (TTL) map between publisher SID and last sequence number. dc_receive_state: Mutex>, + participant_info: SessionParticipantInfo, + dc_emitter: mpsc::UnboundedSender, // Keep a strong reference to the subscriber datachannels, @@ -302,6 +303,24 @@ struct SessionInner { pending_requests: Mutex>>, } +/// Information about the local participant needed for outgoing +/// data packets. +struct SessionParticipantInfo { + sid: ParticipantSid, + identity: ParticipantIdentity, +} + +impl SessionParticipantInfo { + /// Extracts participant info from a join response. + fn from_join(join_response: &proto::JoinResponse) -> Option { + let Some(info) = &join_response.participant else { None? }; + Some(Self { + sid: info.sid.clone().try_into().ok()?, + identity: info.identity.clone().try_into().ok()?, + }) + } +} + /// This struct holds a WebRTC session /// The session changes at every reconnection /// @@ -337,6 +356,10 @@ impl RtcSession { let signal_client = Arc::new(signal_client); log::debug!("received JoinResponse: {:?}", join_response); + let Some(participant_info) = SessionParticipantInfo::from_join(&join_response) else { + Err(EngineError::Internal("Join response missing participant info".into()))? + }; + let (rtc_emitter, rtc_events) = mpsc::unbounded_channel(); let rtc_config = make_rtc_config_join(join_response.clone(), options.rtc_config.clone()); @@ -392,6 +415,7 @@ impl RtcSession { ), next_packet_sequence: 1.into(), dc_receive_state: Mutex::new(TtlMap::new(RELIABLE_RECEIVED_STATE_TTL)), + participant_info, dc_emitter, sub_lossy_dc: Mutex::new(None), sub_reliable_dc: Mutex::new(None), @@ -1051,8 +1075,7 @@ impl SessionInner { r#final: s.r#final, }) .collect(); - let participant_identity = - transcription.transcribed_participant_identity.into(); + let participant_identity = transcription.transcribed_participant_identity.into(); self.emitter.send(SessionEvent::Transcription { participant_identity, @@ -1317,11 +1340,15 @@ impl SessionInner { async fn publish_data( self: &Arc, - packet: proto::DataPacket, + mut packet: proto::DataPacket, kind: DataPacketKind, ) -> Result<(), EngineError> { self.ensure_publisher_connected(kind).await?; + // Populate local participant info fields + packet.participant_identity = self.participant_info.identity.to_string(); + packet.participant_sid = self.participant_info.sid.to_string(); + let (completion_tx, completion_rx) = oneshot::channel(); let ev = DataChannelEvent { kind, From b89a0ad70c54e354bbf4720f6ffcef78fd0dc27b Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 28 Aug 2025 21:01:28 +1000 Subject: [PATCH 16/24] Rename --- livekit/src/rtc_engine/rtc_session.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 44eb4f325..ebc5de8d8 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -282,7 +282,7 @@ struct SessionInner { next_packet_sequence: AtomicU32, /// Time to live (TTL) map between publisher SID and last sequence number. - dc_receive_state: Mutex>, + packet_rx_state: Mutex>, participant_info: SessionParticipantInfo, @@ -414,7 +414,7 @@ impl RtcSession { INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD, ), next_packet_sequence: 1.into(), - dc_receive_state: Mutex::new(TtlMap::new(RELIABLE_RECEIVED_STATE_TTL)), + packet_rx_state: Mutex::new(TtlMap::new(RELIABLE_RECEIVED_STATE_TTL)), participant_info, dc_emitter, sub_lossy_dc: Mutex::new(None), @@ -815,19 +815,20 @@ impl SessionInner { } } - fn update_reliable_received_state(&self, packet: &proto::DataPacket) { + /// Updates the packet receive state (TTL map) for reliable packets. + fn update_packet_rx_state(&self, packet: &proto::DataPacket) { if packet.sequence <= 0 || packet.participant_sid.is_empty() { return; }; - let mut state = self.dc_receive_state.lock(); - if state + let mut rx_state = self.packet_rx_state.lock(); + if rx_state .get(&packet.participant_sid) .is_some_and(|&last_sequence| packet.sequence <= last_sequence) { log::warn!("Ignoring duplicate/out-of-order reliable data message"); return; } - state.set(&packet.participant_sid, Some(packet.sequence)); + rx_state.set(&packet.participant_sid, Some(packet.sequence)); } async fn on_signal_event(&self, event: proto::signal_response::Message) -> EngineResult<()> { @@ -1003,7 +1004,7 @@ impl SessionInner { EngineError::Internal(format!("failed to decode data packet: {}", err).into()) })?; if kind == DataPacketKind::Reliable { - self.update_reliable_received_state(&packet); + self.update_packet_rx_state(&packet); } if let Some(detail) = packet.value.take() { self.emit_incoming_packet(kind, packet, detail); @@ -1595,7 +1596,7 @@ impl SessionInner { } fn data_channel_receive_states(self: &Arc) -> Vec { - let mut state = self.dc_receive_state.lock(); + let mut state = self.packet_rx_state.lock(); state .iter() .map(|(publisher_sid, last_seq)| proto::DataChannelReceiveState { From e216d29fbec125322f04b360636b8cf54dfeabb8 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 28 Aug 2025 21:18:19 +1000 Subject: [PATCH 17/24] Create helpers for E2E testing --- Cargo.lock | 5 ++-- livekit/Cargo.toml | 3 ++ livekit/tests/common/mod.rs | 56 +++++++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 2 deletions(-) create mode 100644 livekit/tests/common/mod.rs diff --git a/Cargo.lock b/Cargo.lock index b6c575546..0108df514 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -54,9 +54,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.98" +version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100" [[package]] name = "async-channel" @@ -1618,6 +1618,7 @@ checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" name = "livekit" version = "0.7.16" dependencies = [ + "anyhow", "bmrng", "bytes", "chrono", diff --git a/livekit/Cargo.toml b/livekit/Cargo.toml index c476164bb..157c237b8 100644 --- a/livekit/Cargo.toml +++ b/livekit/Cargo.toml @@ -45,3 +45,6 @@ semver = "1.0" libloading = { version = "0.8.6" } bytes = "1.10.1" bmrng = "0.5.2" + +[dev-dependencies] +anyhow = "1.0.99" \ No newline at end of file diff --git a/livekit/tests/common/mod.rs b/livekit/tests/common/mod.rs new file mode 100644 index 000000000..676c758c4 --- /dev/null +++ b/livekit/tests/common/mod.rs @@ -0,0 +1,56 @@ +use anyhow::{Context, Result}; +use futures_util::future::try_join_all; +use libwebrtc::native::create_random_uuid; +use livekit::{Room, RoomEvent, RoomOptions}; +use livekit_api::access_token::{AccessToken, VideoGrants}; +use std::{env, time::Duration}; +use tokio::sync::mpsc::UnboundedReceiver; + +struct TestEnvironment { + api_key: String, + api_secret: String, + server_url: String, +} + +impl TestEnvironment { + /// Reads API key, secret, and server URL from the environment, using the + /// development defaults for values that are not present. + pub fn from_env_or_defaults() -> Self { + Self { + api_key: env::var("LIVEKIT_API_KEY").unwrap_or("devkey".into()), + api_secret: env::var("LIVEKIT_API_SECRET").unwrap_or("secret".into()), + server_url: env::var("LIVEKIT_URL").unwrap_or("http://localhost:7880".into()), + } + } +} + +/// Creates the specified number of connections to a shared room for testing. +pub async fn test_rooms(count: usize) -> Result)>> { + let test_env = TestEnvironment::from_env_or_defaults(); + let room_name = format!("test_room_{}", create_random_uuid()); + + let tokens = (0..count) + .into_iter() + .map(|id| -> Result { + let grants = + VideoGrants { room_join: true, room: room_name.clone(), ..Default::default() }; + Ok(AccessToken::with_api_key(&test_env.api_key, &test_env.api_secret) + .with_ttl(Duration::from_secs(30 * 60)) // 30 minutes + .with_grants(grants) + .with_identity(&format!("p{}", id)) + .to_jwt() + .context("Failed to generate JWT")?) + }) + .collect::>>()?; + + let rooms = try_join_all(tokens.into_iter().map(|token| { + let server_url = test_env.server_url.clone(); + async move { + let options = RoomOptions::default(); + Room::connect(&server_url, &token, options).await.context("Failed to connect to room") + } + })) + .await?; + + Ok(rooms) +} From 9dd5d332359aaf325ab4b0eaa68c42bc2811ee0b Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 28 Aug 2025 21:19:40 +1000 Subject: [PATCH 18/24] Create E2E test for data channel reliability --- livekit/tests/data_channel_test.rs | 75 ++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 livekit/tests/data_channel_test.rs diff --git a/livekit/tests/data_channel_test.rs b/livekit/tests/data_channel_test.rs new file mode 100644 index 000000000..bc9d97308 --- /dev/null +++ b/livekit/tests/data_channel_test.rs @@ -0,0 +1,75 @@ +use crate::common::test_rooms; +use anyhow::{anyhow, Result}; +use livekit::{DataPacket, RoomEvent, SimulateScenario}; +use std::{sync::Arc, time::Duration}; +use tokio::{sync::oneshot, time}; + +mod common; + +#[tokio::test] +async fn test_reliable_retry() -> Result<()> { + const ITERATIONS: usize = 128; + const PAYLOAD_SIZE: usize = 4096; + + // Set up test rooms + let mut rooms = test_rooms(2).await?; + let (sending_room, _) = rooms.pop().unwrap(); + let (receiving_room, mut receiving_event_rx) = rooms.pop().unwrap(); + + let sending_room = Arc::new(sending_room); + let receiving_room = Arc::new(receiving_room); + + let receiving_identity = receiving_room.local_participant().identity(); + let (fulfill, expectation) = oneshot::channel(); + + tokio::spawn({ + let sending_room = sending_room.clone(); + async move { + time::sleep(Duration::from_millis(200)).await; + _ = sending_room.simulate_scenario(SimulateScenario::SignalReconnect).await; + println!("Reconnecting sending room"); + } + }); + tokio::spawn({ + let receiving_room = receiving_room.clone(); + async move { + time::sleep(Duration::from_millis(400)).await; + _ = receiving_room.simulate_scenario(SimulateScenario::SignalReconnect).await; + println!("Reconnecting receiving room"); + } + }); + + tokio::spawn({ + let fulfill = fulfill; + async move { + let mut packets_received = 0; + while let Some(event) = receiving_event_rx.recv().await { + if let RoomEvent::DataReceived { payload, .. } = event { + assert_eq!(payload.len(), PAYLOAD_SIZE); + packets_received += 1; + if packets_received == ITERATIONS { + fulfill.send(()).ok(); + break; + } + } + } + } + }); + + for _ in 0..ITERATIONS { + let packet = DataPacket { + reliable: true, + payload: [0xFA; PAYLOAD_SIZE].to_vec(), + destination_identities: vec![receiving_identity.clone()], + ..Default::default() + }; + sending_room.local_participant().publish_data(packet).await?; + time::sleep(Duration::from_millis(10)).await; + } + + match time::timeout(Duration::from_secs(30), expectation).await { + Ok(Ok(())) => Ok(()), + Ok(Err(_)) => Err(anyhow!("Not all packets were received")), + Err(_) => Err(anyhow!("Timed out waiting for packets")), + } +} From 1993355674f22a4aefbf23aabf8a49658a6164ff Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 28 Aug 2025 21:20:53 +1000 Subject: [PATCH 19/24] Allow E2E tests to be run in CI Install and run a LiveKit server before running tests --- .github/workflows/tests.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7f398403f..7be80b432 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -52,10 +52,16 @@ jobs: sudo apt update -y sudo apt install -y libssl-dev libx11-dev libgl1-mesa-dev libxext-dev libva-dev libdrm-dev libnvidia-decode-570 libnvidia-compute-570 nvidia-cuda-dev + - name: Install LiveKit server + run: curl -sSL https://get.livekit.io | bash + - uses: actions/checkout@v3 with: submodules: true + - name: Run LiveKit server + run: livekit-server --dev & + - name: Test run: cargo +nightly test --release --verbose --target ${{ matrix.target }} -- --nocapture From b7cb719d8442fb59e1a3a7dcff56781a6b211e29 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 28 Aug 2025 21:43:04 +1000 Subject: [PATCH 20/24] Put E2E tests behind an internal feature --- livekit/Cargo.toml | 5 ++--- livekit/tests/data_channel_test.rs | 10 ++++++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/livekit/Cargo.toml b/livekit/Cargo.toml index 157c237b8..c7de67c20 100644 --- a/livekit/Cargo.toml +++ b/livekit/Cargo.toml @@ -22,9 +22,8 @@ native-tls-vendored = ["livekit-api/native-tls-vendored"] rustls-tls-native-roots = ["livekit-api/rustls-tls-native-roots"] rustls-tls-webpki-roots = ["livekit-api/rustls-tls-webpki-roots"] __rustls-tls = ["livekit-api/__rustls-tls"] - -# internal features (used by livekit-ffi) -__lk-internal = [] +__lk-internal = [] # internal features (used by livekit-ffi) +__lk-e2e-test = [] # end-to-end testing with a LiveKit server [dependencies] livekit-runtime = { workspace = true, default-features = false } diff --git a/livekit/tests/data_channel_test.rs b/livekit/tests/data_channel_test.rs index bc9d97308..d0eb9a33e 100644 --- a/livekit/tests/data_channel_test.rs +++ b/livekit/tests/data_channel_test.rs @@ -1,3 +1,4 @@ +#![allow(unused_imports)] use crate::common::test_rooms; use anyhow::{anyhow, Result}; use livekit::{DataPacket, RoomEvent, SimulateScenario}; @@ -6,6 +7,15 @@ use tokio::{sync::oneshot, time}; mod common; +// These tests depend on a LiveKit server, and thus are not enabled by default; +// to run them, start a local LiveKit server in development mode, and enable the +// E2E test feature: +// +// > livekit-server --dev +// > cargo test --features __lk-e2e-test +// + +#[cfg(feature = "__lk-e2e-test")] #[tokio::test] async fn test_reliable_retry() -> Result<()> { const ITERATIONS: usize = 128; From 97102652fb27f4596a81b4a5e2dc74c88bf2eb86 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 28 Aug 2025 21:46:14 +1000 Subject: [PATCH 21/24] Enable E2E tests in CI --- .github/workflows/tests.yml | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7be80b432..1e214e666 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -35,8 +35,10 @@ jobs: target: x86_64-pc-windows-msvc - os: macos-latest target: x86_64-apple-darwin + e2e-testing: true - os: ubuntu-latest target: x86_64-unknown-linux-gnu + e2e-testing: true name: Test (${{ matrix.target }}) runs-on: ${{ matrix.os }} @@ -53,16 +55,28 @@ jobs: sudo apt install -y libssl-dev libx11-dev libgl1-mesa-dev libxext-dev libva-dev libdrm-dev libnvidia-decode-570 libnvidia-compute-570 nvidia-cuda-dev - name: Install LiveKit server - run: curl -sSL https://get.livekit.io | bash + if: ${{ matrix.e2e-testing }} + run: | + if [[ "${{ matrix.os }}" == "ubuntu-latest" ]]; then + curl -sSL https://get.livekit.io | bash + elif [[ "${{ matrix.os }}" == "macos-latest" ]]; then + brew install livekit + fi - uses: actions/checkout@v3 with: submodules: true - name: Run LiveKit server + if: ${{ matrix.e2e-testing }} run: livekit-server --dev & - - name: Test + - name: Test (no E2E) + if: ${{ !matrix.e2e-testing }} run: cargo +nightly test --release --verbose --target ${{ matrix.target }} -- --nocapture + - name: Test (with E2E) + if: ${{ matrix.e2e-testing }} + run: cargo +nightly test --release --verbose --target ${{ matrix.target }} --features __lk-e2e-test -- --nocapture + From 48cfebec87c46cb62cf70effa0cdc2bd280fa37c Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 28 Aug 2025 22:09:56 +1000 Subject: [PATCH 22/24] Format --- livekit/src/room/mod.rs | 6 +++--- livekit/src/room/utils/mod.rs | 4 ++-- livekit/src/room/utils/ttl_map.rs | 5 ++++- livekit/src/room/utils/tx_queue.rs | 9 +++------ 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index b015e0ba3..de72cb808 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -353,7 +353,7 @@ pub struct RoomOptions { pub e2ee: Option, pub rtc_config: RtcConfiguration, pub join_retries: u32, - pub sdk_options: RoomSdkOptions + pub sdk_options: RoomSdkOptions, } impl Default for RoomOptions { @@ -372,7 +372,7 @@ impl Default for RoomOptions { ice_transport_type: IceTransportsType::All, }, join_retries: 3, - sdk_options: RoomSdkOptions::default() + sdk_options: RoomSdkOptions::default(), } } } @@ -1175,7 +1175,7 @@ impl RoomSession { }), publish_tracks: self.local_participant.published_tracks_info(), data_channels: dcs, - datachannel_receive_states: session.data_channel_receive_states() + datachannel_receive_states: session.data_channel_receive_states(), }; log::debug!("sending sync state {:?}", sync_state); diff --git a/livekit/src/room/utils/mod.rs b/livekit/src/room/utils/mod.rs index 4e3b49cb6..e534425a0 100644 --- a/livekit/src/room/utils/mod.rs +++ b/livekit/src/room/utils/mod.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; -pub(crate) mod tx_queue; -pub(crate) mod ttl_map; pub mod take_cell; +pub(crate) mod ttl_map; +pub(crate) mod tx_queue; pub mod utf8_chunk; pub fn calculate_changed_attributes( diff --git a/livekit/src/room/utils/ttl_map.rs b/livekit/src/room/utils/ttl_map.rs index 313886f8c..bb67a7442 100644 --- a/livekit/src/room/utils/ttl_map.rs +++ b/livekit/src/room/utils/ttl_map.rs @@ -13,7 +13,10 @@ // limitations under the License. use std::{ - collections::HashMap, fmt::Debug, hash::Hash, time::{Duration, SystemTime} + collections::HashMap, + fmt::Debug, + hash::Hash, + time::{Duration, SystemTime}, }; /// Time to live (TTL) map diff --git a/livekit/src/room/utils/tx_queue.rs b/livekit/src/room/utils/tx_queue.rs index 8e3ce7418..5a73dd15a 100644 --- a/livekit/src/room/utils/tx_queue.rs +++ b/livekit/src/room/utils/tx_queue.rs @@ -17,16 +17,13 @@ use std::collections::VecDeque; #[derive(Debug)] pub struct TxQueue { inner: VecDeque, - buffered_size: usize + buffered_size: usize, } impl TxQueue { /// Creates an empty queue. pub fn new() -> Self { - Self { - inner: VecDeque::new(), - buffered_size: 0 - } + Self { inner: VecDeque::new(), buffered_size: 0 } } /// Number of elements in the queue. @@ -113,4 +110,4 @@ mod tests { queue.trim(1); assert_eq!(queue.buffered_size(), 1); } -} \ No newline at end of file +} From 14390207865321ddffe6444ea4dee0dfef6808ec Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Fri, 5 Sep 2025 10:54:20 +1000 Subject: [PATCH 23/24] Remove additional buffering --- livekit/src/rtc_engine/rtc_session.rs | 33 ++++++++++++++------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index ebc5de8d8..a95c3aa27 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -703,12 +703,12 @@ impl SessionInner { DataPacketKind::Lossy => { lossy_queue.push_back(request); let threshold = self.lossy_dc_buffered_amount_low_threshold.load(Ordering::Relaxed); - self._send_until_threshold(DataPacketKind::Lossy, threshold, &mut lossy_buffered_amount, &mut lossy_queue); + self._send_until_threshold(DataPacketKind::Lossy, threshold, &mut lossy_buffered_amount, &mut lossy_queue, &mut retry_queue); } DataPacketKind::Reliable => { reliable_queue.push_back(request); let threshold = self.reliable_dc_buffered_amount_low_threshold.load(Ordering::Relaxed); - self._send_until_threshold(DataPacketKind::Reliable, threshold, &mut reliable_buffered_amount, &mut reliable_queue); + self._send_until_threshold(DataPacketKind::Reliable, threshold, &mut reliable_buffered_amount, &mut reliable_queue, &mut retry_queue); } } } @@ -723,7 +723,7 @@ impl SessionInner { lossy_buffered_amount -= sent; } let threshold = self.lossy_dc_buffered_amount_low_threshold.load(Ordering::Relaxed); - self._send_until_threshold(DataPacketKind::Lossy, threshold, &mut lossy_buffered_amount, &mut lossy_queue); + self._send_until_threshold(DataPacketKind::Lossy, threshold, &mut lossy_buffered_amount, &mut lossy_queue, &mut retry_queue); } DataPacketKind::Reliable => { if reliable_buffered_amount < sent { @@ -733,10 +733,8 @@ impl SessionInner { reliable_buffered_amount -= sent; } let threshold = self.reliable_dc_buffered_amount_low_threshold.load(Ordering::Relaxed); - self._send_until_threshold(DataPacketKind::Reliable, threshold, &mut reliable_buffered_amount, &mut reliable_queue); - // TODO: Ensure this is the proper quantity - let retry_min_amount = (threshold as usize * 5) / 4; // threshold * 1.25 - retry_queue.trim((sent as usize) + retry_min_amount); + self._send_until_threshold(DataPacketKind::Reliable, threshold, &mut reliable_buffered_amount, &mut reliable_queue, &mut retry_queue); + retry_queue.trim(sent as usize); } } } @@ -761,34 +759,36 @@ impl SessionInner { kind: DataPacketKind, threshold: u64, buffered_amount: &mut u64, - queue: &mut VecDeque, + request_queue: &mut VecDeque, + retry_queue: &mut TxQueue, ) { while *buffered_amount <= threshold { - let Some(request) = queue.pop_front() else { + let Some(request) = request_queue.pop_front() else { break; }; - let data = request.encoded_packet.data; - *buffered_amount += data.len() as u64; + *buffered_amount += request.encoded_packet.data.len() as u64; let result = self .data_channel(SignalTarget::Publisher, kind) .unwrap() - .send(&data, true) + .send(&request.encoded_packet.data, true) .map_err(|err| { EngineError::Internal(format!("failed to send data packet: {:?}", err).into()) }); - if let Some(completion_tx) = request.completion_tx { _ = completion_tx.send(result); } + if kind == DataPacketKind::Reliable { + retry_queue.enqueue(request.encoded_packet); + } } } fn _enqueue_for_retry_from( self: &Arc, last_sequence: u32, - queue: &mut TxQueue, + retry_queue: &mut TxQueue, ) { - if let Some(first) = queue.peek() { + if let Some(first) = retry_queue.peek() { if first.sequence > last_sequence + 1 { log::warn!( "Wrong packet sequence while retrying: {} > {}, {} packets missing", @@ -798,7 +798,8 @@ impl SessionInner { ); } } - while let Some(encoded_packet) = queue.dequeue() { + + while let Some(encoded_packet) = retry_queue.dequeue() { if encoded_packet.sequence <= last_sequence { continue; }; From 582f47261022443a2730736718cc91ccd952ddf5 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Fri, 5 Sep 2025 10:54:44 +1000 Subject: [PATCH 24/24] Reduce test timeout --- livekit/tests/data_channel_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit/tests/data_channel_test.rs b/livekit/tests/data_channel_test.rs index d0eb9a33e..ba155aa8f 100644 --- a/livekit/tests/data_channel_test.rs +++ b/livekit/tests/data_channel_test.rs @@ -77,7 +77,7 @@ async fn test_reliable_retry() -> Result<()> { time::sleep(Duration::from_millis(10)).await; } - match time::timeout(Duration::from_secs(30), expectation).await { + match time::timeout(Duration::from_secs(15), expectation).await { Ok(Ok(())) => Ok(()), Ok(Err(_)) => Err(anyhow!("Not all packets were received")), Err(_) => Err(anyhow!("Timed out waiting for packets")),