Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ jobs:
target: x86_64-pc-windows-msvc
- os: macos-latest
target: x86_64-apple-darwin
e2e-testing: true
- os: ubuntu-latest
target: x86_64-unknown-linux-gnu
e2e-testing: true

name: Test (${{ matrix.target }})
runs-on: ${{ matrix.os }}
Expand All @@ -52,11 +54,29 @@ jobs:
sudo apt update -y
sudo apt install -y libssl-dev libx11-dev libgl1-mesa-dev libxext-dev libva-dev libdrm-dev libnvidia-decode-570 libnvidia-compute-570 nvidia-cuda-dev

- name: Install LiveKit server
if: ${{ matrix.e2e-testing }}
run: |
if [[ "${{ matrix.os }}" == "ubuntu-latest" ]]; then
curl -sSL https://get.livekit.io | bash
elif [[ "${{ matrix.os }}" == "macos-latest" ]]; then
brew install livekit
fi

- uses: actions/checkout@v3
with:
submodules: true

- name: Test
- name: Run LiveKit server
if: ${{ matrix.e2e-testing }}
run: livekit-server --dev &

- name: Test (no E2E)
if: ${{ !matrix.e2e-testing }}
run: cargo +nightly test --release --verbose --target ${{ matrix.target }} -- --nocapture

- name: Test (with E2E)
if: ${{ matrix.e2e-testing }}
run: cargo +nightly test --release --verbose --target ${{ matrix.target }} --features __lk-e2e-test -- --nocapture


5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions livekit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ native-tls-vendored = ["livekit-api/native-tls-vendored"]
rustls-tls-native-roots = ["livekit-api/rustls-tls-native-roots"]
rustls-tls-webpki-roots = ["livekit-api/rustls-tls-webpki-roots"]
__rustls-tls = ["livekit-api/__rustls-tls"]

# internal features (used by livekit-ffi)
__lk-internal = []
__lk-internal = [] # internal features (used by livekit-ffi)
__lk-e2e-test = [] # end-to-end testing with a LiveKit server

[dependencies]
livekit-runtime = { workspace = true, default-features = false }
Expand All @@ -45,3 +44,6 @@ semver = "1.0"
libloading = { version = "0.8.6" }
bytes = "1.10.1"
bmrng = "0.5.2"

[dev-dependencies]
anyhow = "1.0.99"
20 changes: 2 additions & 18 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,6 @@ pub struct RoomOptions {
pub rtc_config: RtcConfiguration,
pub join_retries: u32,
pub sdk_options: RoomSdkOptions,
pub preregistration: Option<PreRegistration>,
}

#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct PreRegistration {
text_stream_topics: Vec<String>,
byte_stream_topics: Vec<String>,
}

impl Default for RoomOptions {
Expand All @@ -381,7 +373,6 @@ impl Default for RoomOptions {
},
join_retries: 3,
sdk_options: RoomSdkOptions::default(),
preregistration: None,
}
}
}
Expand Down Expand Up @@ -577,8 +568,6 @@ impl Room {
let (incoming_stream_manager, open_rx) = IncomingStreamManager::new();
let (outgoing_stream_manager, packet_rx) = OutgoingStreamManager::new();

let identity = local_participant.identity().clone();

let room_info = join_response.room.unwrap();
let inner = Arc::new(RoomSession {
sid_promise: Promise::new(),
Expand Down Expand Up @@ -671,7 +660,6 @@ impl Room {
));
let outgoing_stream_handle = livekit_runtime::spawn(outgoing_data_stream_task(
packet_rx,
identity,
rtc_engine.clone(),
close_rx.resubscribe(),
));
Expand Down Expand Up @@ -1187,8 +1175,7 @@ impl RoomSession {
}),
publish_tracks: self.local_participant.published_tracks_info(),
data_channels: dcs,
// unimplemented, stubbed for now
datachannel_receive_states: Vec::new(),
datachannel_receive_states: session.data_channel_receive_states(),
};

log::debug!("sending sync state {:?}", sync_state);
Expand Down Expand Up @@ -1737,15 +1724,12 @@ async fn incoming_data_stream_task(
/// Receives packets from the outgoing stream manager and send them.
async fn outgoing_data_stream_task(
mut packet_rx: UnboundedRequestReceiver<proto::DataPacket, Result<(), EngineError>>,
participant_identity: ParticipantIdentity,
engine: Arc<RtcEngine>,
mut close_rx: broadcast::Receiver<()>,
) {
loop {
tokio::select! {
Ok((mut packet, responder)) = packet_rx.recv() => {
// Set packet's participant identity field
packet.participant_identity = participant_identity.0.clone();
Ok((packet, responder)) = packet_rx.recv() => {
let result = engine.publish_data(packet, DataPacketKind::Reliable).await;
let _ = responder.respond(result);
},
Expand Down
2 changes: 2 additions & 0 deletions livekit/src/room/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;

pub mod take_cell;
pub(crate) mod ttl_map;
pub(crate) mod tx_queue;
pub mod utf8_chunk;

pub fn calculate_changed_attributes(
Expand Down
143 changes: 143 additions & 0 deletions livekit/src/room/utils/ttl_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2025 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{
collections::HashMap,
fmt::Debug,
hash::Hash,
time::{Duration, SystemTime},
};

/// Time to live (TTL) map
///
/// Elements older than the TTL duration are automatically removed.
///
#[derive(Debug)]
pub struct TtlMap<K, V> {
inner: HashMap<K, Entry<V>>,
last_cleanup: SystemTime,
ttl: Duration,
}

#[derive(Debug)]
struct Entry<V> {
value: V,
expires_at: SystemTime,
}

impl<K, V> TtlMap<K, V> {
/// Creates an empty `TtlMap`.
pub fn new(ttl: Duration) -> Self {
Self { inner: HashMap::new(), last_cleanup: SystemTime::now(), ttl }
}

/// Returns the number of elements in the map.
pub fn len(&mut self) -> usize {
self.cleanup();
self.inner.len()
}

/// An iterator visiting all key-value pairs in arbitrary order.
/// The iterator element type is `(&'a K, &'a V)`.
pub fn iter(&mut self) -> impl Iterator<Item = (&K, &V)> {
self.cleanup();
self.inner.iter().map(|(key, entry)| (key, &entry.value))
}

/// Removes expired elements.
fn cleanup(&mut self) {
let now = SystemTime::now();
self.inner.retain(|_, entry| entry.expires_at >= now);
self.last_cleanup = now;
}
}

impl<K, V> TtlMap<K, V>
where
K: Eq + Hash + Clone,
{
/// Returns a reference to the value corresponding to the key.
pub fn get(&mut self, k: &K) -> Option<&V> {
let expires_at = self.inner.get(k).map(|entry| entry.expires_at)?;
let now = SystemTime::now();
if expires_at < now {
_ = self.inner.remove(k);
return None;
}
Some(&self.inner.get(k).unwrap().value)
}

/// Sets the value for the given key.
pub fn set(&mut self, k: &K, v: Option<V>) {
let now = SystemTime::now();
let Ok(elapsed) = now.duration_since(self.last_cleanup) else {
log::error!("System clock anomaly detected");
return;
};
let half_ttl = self.ttl.div_f64(2.0);
if elapsed > half_ttl {
self.cleanup();
}

let Some(value) = v else {
_ = self.inner.remove(&k);
return;
};
let expires_at = now + self.ttl;
let entry = Entry { value, expires_at };
self.inner.insert(k.clone(), entry);
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use tokio::time::sleep;

const SHORT_TTL: Duration = Duration::from_millis(100);

#[tokio::test]
async fn test_expiration() {
let mut map = TtlMap::<char, u8>::new(SHORT_TTL);
map.set(&'a', Some(1));
map.set(&'b', Some(2));
map.set(&'c', Some(3));

assert_eq!(map.len(), 3);
assert!(map.get(&'a').is_some());
assert!(map.get(&'b').is_some());
assert!(map.get(&'c').is_some());

sleep(SHORT_TTL).await;

assert_eq!(map.len(), 0);
assert!(map.get(&'a').is_none());
assert!(map.get(&'b').is_none());
assert!(map.get(&'c').is_none());
}

#[test]
fn test_iter() {
let mut map = TtlMap::<char, u8>::new(SHORT_TTL);
map.set(&'a', Some(1));
map.set(&'b', Some(2));
map.set(&'c', Some(3));

let elements: HashSet<_> = map.iter().map(|(k, v)| (*k, *v)).collect();
assert!(elements.contains(&('a', 1)));
assert!(elements.contains(&('b', 2)));
assert!(elements.contains(&('c', 3)));
}
}
Loading
Loading