-
Notifications
You must be signed in to change notification settings - Fork 139
Data tracks core #862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Data tracks core #862
Conversation
📝 WalkthroughWalkthroughAdds a new Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant LocalMgr as Local Manager
participant Pipeline as Pipeline
participant Packetizer as Packetizer
participant SFU as SFU (external)
participant Output as Output Events
User->>LocalMgr: publish_track(options)
LocalMgr->>SFU: SfuPublishRequest(handle,name,uses_e2ee)
SFU->>LocalMgr: SfuPublishResponse(handle, result)
LocalMgr->>User: LocalDataTrack (on success)
User->>LocalMgr: try_push(frame)
LocalMgr->>Pipeline: process_frame(frame)
Pipeline->>Packetizer: packetize(frame or encrypted_payload)
Packetizer->>LocalMgr: Vec<Packet>
LocalMgr->>Output: PacketsAvailable(packets)
sequenceDiagram
actor User
participant RemoteMgr as Remote Manager
participant SFU as SFU (external)
participant Pipeline as Pipeline
participant Depacketizer as Depacketizer
participant UserStream as Subscriber
User->>RemoteMgr: subscribe(sid)
RemoteMgr->>SFU: SubscribeRequest
SFU->>RemoteMgr: SfuPublicationUpdates & SubscriberHandles
RemoteMgr->>Pipeline: attach track
SFU->>RemoteMgr: Packet
RemoteMgr->>Pipeline: process_packet(packet)
Pipeline->>Depacketizer: push(packet)
Depacketizer->>Pipeline: DepacketizerFrame
Pipeline->>RemoteMgr: DataTrackFrame
RemoteMgr->>UserStream: deliver frame
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 15
🤖 Fix all issues with AI agents
In `@livekit-datatrack/Cargo.toml`:
- Line 13: The crate livekit-datatrack is using a pinned dependency thiserror =
"2.0.17" because it isn't part of the workspace members; either add
livekit-datatrack to the workspace members in the root Cargo.toml and change
thiserror in livekit-datatrack/Cargo.toml to thiserror = { workspace = true }
(then coordinate a workspace upgrade to thiserror 2.x), or if livekit-datatrack
is intentionally external, remove the TODO and add a short comment in
livekit-datatrack/Cargo.toml documenting why it is excluded from the workspace
and why it pins thiserror; update the thiserror entry and comments accordingly.
- Around line 1-7: The workspace is missing registration for the new crate: add
"livekit-datatrack" to the workspace members array so Cargo includes it in
builds/tests/publishing; update the [workspace] members list (near existing
"livekit-runtime") to include the exact member name livekit-datatrack.
In `@livekit-datatrack/src/frame.rs`:
- Around line 50-62: The current duration_since_timestamp may panic because
SystemTime + Duration can overflow; update duration calculation in
duration_since_timestamp to use SystemTime::checked_add instead of the Add impl:
take the optional user_timestamp, build a Duration::from_millis, call
UNIX_EPOCH.checked_add(...) to get an Option<SystemTime>, and then if
Some(epoch) call SystemTime::now().duration_since(epoch).inspect_err(...).ok()
(returning None if checked_add returns None or duration_since errors). Keep the
existing log::error call for duration_since errors and preserve the Option
return semantics.
In `@livekit-datatrack/src/local/manager.rs`:
- Around line 240-250: In remove_descriptor, the logic in the Descriptor::Active
branch inverts the publish-check: it currently sends false only when
published_tx is already false. Change the condition to check if
*published_tx.borrow() is true and then send false to notify unpublish; i.e., in
the remove_descriptor implementation (handling Descriptor::Active and
published_tx) send false when the track is currently published so subscribers
are notified.
- Around line 32-40: The struct field name is wrong: in ManagerOptions rename
decryption_provider to encryption_provider and update its doc comment to
"Provider to use for encrypting outgoing frame payloads." Keep the type as
Option<Arc<dyn EncryptionProvider>> and then update all references/usages
(constructors, Manager::new, tests, builders, and any pattern matches) from
decryption_provider to encryption_provider to avoid breakage.
In `@livekit-datatrack/src/local/packetizer.rs`:
- Around line 56-86: The closure in packetize moves the non-Copy Header via
`..header`, causing a move on the first iteration; fix by creating a cloned base
header for each packet (e.g., call header.clone() or clone a prepared
base_header) inside the map/iterator so you set per-packet fields (marker,
sequence, extensions) on that clone rather than using `..header`; ensure the
Header type implements Clone if it doesn't already and update the map to use the
cloned header when building each Packet (see packetize, Header, frame_marker,
and sequence references).
In `@livekit-datatrack/src/local/pipeline.rs`:
- Around line 45-51: The constructor Pipeline::new currently uses
debug_assert_eq! to validate that options.info.uses_e2ee matches
options.encryption_provider.is_some(), which is stripped in release builds;
change that check to a runtime assertion (replace debug_assert_eq! with
assert_eq!) so mismatches are caught in all builds (include a clear error
message referencing uses_e2ee vs encryption_provider). Update the assertion in
Pipeline::new (and keep the rest of the initialization untouched) so
encrypt_if_needed cannot be left running without a provider when the server
indicates E2EE is required.
In `@livekit-datatrack/src/packet/deserialize.rs`:
- Around line 78-88: The ext_words u16 can overflow when computing ext_len by
doing ext_words + 1 in u16; update the calculation in the ext_flag branch (where
ext_words is read with raw.get_u16()) to convert ext_words to usize before
adding 1 and multiplying (i.e., compute ext_len using ext_words as usize: 4 *
(ext_words_as_usize + 1)) and keep the existing bounds checks
(MissingExtWords/HeaderOverrun) intact so malformed inputs cannot wrap and
bypass the header length validation.
- Around line 97-131: The deserializer currently ignores the len field and can
go out-of-sync; update Extensions::deserialize to use the len value for
alignment: after reading tag and len, validate that raw.remaining() >= len (or
return MalformedExt), and for known tags (E2eeExt::TAG, UserTimestampExt::TAG)
check that len equals the serialized payload-length (i.e. expected_payload_len -
1 as written by serialize.rs) before reading fields (E2eeExt::LEN and
UserTimestampExt::LEN should be used to compute the expected len), then read
exactly that payload; for EXT_TAG_PADDING and unknown tags, advance by len (or
len + 1 if serialize writes len as payload_len - 1 and you need to skip the
extra byte) to preserve alignment instead of just continuing, returning
MalformedExt on mismatch. Use the existing symbols Extensions::deserialize,
E2eeExt::TAG/LEN, UserTimestampExt::TAG/LEN, and EXT_TAG_PADDING to locate where
to add these checks and advances.
In `@livekit-datatrack/src/packet/time.rs`:
- Around line 52-79: The Clock::at method currently calls
instant.duration_since(self.epoch), which can panic if instant is earlier than
epoch; change that to instant.saturating_duration_since(self.epoch) to
defensively handle stale instants and avoid future panics. Update the
computation in Clock::at (function name: at, struct: Clock) to use
saturating_duration_since when computing elapsed before calling
duration_to_ticks, preserving the remaining logic (wrapping_add, monotonicity
check with prev) unchanged.
In `@livekit-datatrack/src/remote/depacketizer.rs`:
- Around line 151-158: The expected packet count calculation in the
DepacketizerDropError construction is wrong when u16 sequence numbers wrap;
update the computation used in DepacketizerDropReason::Incomplete (where you
reference partial.frame_number, partial.start_sequence and end_sequence) to use
wrapping arithmetic instead of plain subtraction—e.g., compute expected by using
end_sequence.wrapping_sub(partial.start_sequence).wrapping_add(1) (and
cast/promote to the expected integer type such as usize or u32) so wrapped
ranges like start=65530 and end=5 produce the correct inclusive count.
In `@livekit-datatrack/src/remote/manager.rs`:
- Around line 210-219: handle_track_unpublished currently removes the
subscription entry but does not await the running subscription task, risking
races; change handle_track_unpublished to be async (matching shutdown()) and
when matching SubscriptionState::Active { sub_handle, .. } use
self.sub_handles.remove(&sub_handle) to take the task's JoinHandle (or
equivalent task_handle) and await it (e.g., task_handle.await) before
proceeding, then send published_tx(false) and remove descriptor; use the same
JoinHandle type and awaiting pattern used in shutdown() to ensure the background
task finishes before the manager continues.
- Around line 142-156: on_unsubscribe_request currently returns early when
descriptor.subscription matches SubscriptionState::Pending, leaving the
descriptor.subscription and any pending result receivers unresolved; update
on_unsubscribe_request (the match on descriptor.subscription) to, when
SubscriptionState::Pending is found, set descriptor.subscription =
SubscriptionState::None, clear any pending result receivers (notify or send an
error/closure to the pending result channel stored on the descriptor), and still
send the SfuUpdateSubscription event via self.event_out_tx; ensure you also
remove any related entries from self.sub_handles if applicable so state and
receivers are consistently cleaned up.
- Around line 253-257: The code creates a pipeline for tracks that use E2EE
(descriptor.info.uses_e2ee()) even when self.decryption_provider is None, which
can cause silent decryption failures; update the manager to detect this mismatch
and refuse subscription (or at minimum log a warning) before constructing the
pipeline: when descriptor.info.uses_e2ee() is true and
self.decryption_provider.is_none(), either return an Err (or skip subscription)
from the function creating the pipeline or emit a clear warning via the existing
logger, and only build Pipeline::new with the decryption_provider variable when
Some(Arc) is present so Pipeline::new never receives None for an E2EE track.
In `@livekit-datatrack/src/remote/pipeline.rs`:
- Around line 23-84: The code currently relies on
debug_assert_eq!(options.info.uses_e2ee, options.decryption_provider.is_some())
which is removed in release builds and can cause encrypted payloads to be
treated as plaintext; fix by performing a runtime invariant check in
Pipeline::new and by making decryption decision based on the track's E2EE flag.
Specifically: in Pipeline::new (where PipelineOptions, publisher_identity, and
decryption_provider are passed) replace the debug_assert with a runtime check —
if options.info.uses_e2ee is true but options.decryption_provider.is_none(), log
an error and fail construction (return a Result or panic) or set a safe
fallback; if uses_e2ee is false but a decryption_provider is provided, ignore it
or log a warning. Also add a field to Pipeline (e.g., uses_e2ee: bool or store
Arc<DataTrackInfo>) and update decrypt_if_needed to first check that stored
uses_e2ee is true before attempting decryption, and if uses_e2ee is true but
e2ee_provider is missing, log and return None to avoid emitting ciphertext as
plaintext.
🧹 Nitpick comments (10)
livekit-datatrack/src/remote/proto.rs (1)
66-74: Usemap_orfor broader Rust version compatibility.The crate does not declare an MSRV, and
Option::is_none_orrequires Rust 1.82.0 (stabilized October 2024). To support a wider range of Rust versions without explicitly committing to an MSRV, use the equivalent and more compatiblemap_orapproach:Suggested change
- .filter(|participant| { - local_participant_identity.is_none_or(|identity| participant.identity != identity) - }) + .filter(|participant| { + local_participant_identity.map_or(true, |identity| participant.identity != identity) + })livekit-datatrack/src/remote/mod.rs (1)
55-97: Extract subscription timeouts into named constants.
This aligns with the TODO and avoids magic numbers in the API flow.♻️ Suggested refactor
+const SUBSCRIBE_SEND_TIMEOUT_MS: u64 = 50; +const SUBSCRIBE_RESULT_TIMEOUT_SECS: u64 = 10; + pub async fn subscribe(&self) -> Result<impl Stream<Item = DataTrackFrame>, SubscribeError> { let (result_tx, result_rx) = oneshot::channel(); let subscribe_event = SubscribeRequest { sid: self.info.sid.clone(), result_tx }; self.inner() .event_in_tx .upgrade() .ok_or(SubscribeError::Disconnected)? - .send_timeout(subscribe_event.into(), Duration::from_millis(50)) + .send_timeout(subscribe_event.into(), Duration::from_millis(SUBSCRIBE_SEND_TIMEOUT_MS)) .await .map_err(|_| { SubscribeError::Internal(anyhow!("Failed to send subscribe event").into()) })?; // TODO: standardize timeout - let frame_rx = timeout(Duration::from_secs(10), result_rx) + let frame_rx = timeout(Duration::from_secs(SUBSCRIBE_RESULT_TIMEOUT_SECS), result_rx) .await .map_err(|_| SubscribeError::Timeout)? .map_err(|_| SubscribeError::Disconnected)??;livekit-datatrack/src/remote/manager.rs (2)
74-75: Consider documenting or increasing channel buffer sizes.The
mpsc::channel(4)buffer size is quite small. With high-frequency events or bursts of publications/subscriptions, this could cause backpressure. The TODO comment acknowledges this, but it would be good to either document the rationale or plan to tune this before production use.
466-467: Minor: Redundant field name in struct initialization.The field initialization can be simplified.
✨ Suggested fix
let task = - TrackTask { info: info, pipeline, published_rx, packet_rx, frame_tx, event_in_tx }; + TrackTask { info, pipeline, published_rx, packet_rx, frame_tx, event_in_tx };livekit-datatrack/src/remote/depacketizer.rs (1)
386-415: Test could be more comprehensive for duplicate sequence handling.The test verifies that duplicate sequences result in the later packet's payload being used, but doesn't validate:
- The final frame length (should be 6 bytes: 3 from seq 1 + 3 from seq 2, not 9)
- Complete frame content
The comment "Should retain the second packet" is slightly ambiguous—it replaces the first packet with the duplicate sequence, not retains both.
✨ Suggested test improvement
let result = depacketizer.push(packet.clone()); assert!(result.drop_error.is_none()); let frame = result.frame.unwrap(); - assert!(frame.payload.starts_with(&[0xCD; 3])); - // Should retain the second packet with duplicate sequence number + // Duplicate sequence replaces the first packet's payload + assert_eq!(frame.payload.len(), 6); // seq 1 (replaced) + seq 2 + assert_eq!(&frame.payload[..3], &[0xCD; 3]); // Replaced payload + assert_eq!(&frame.payload[3..], &[0xEF; 3]); // Final payload }livekit-datatrack/src/track.rs (1)
108-118: Consider adding length validation for DataTrackSid.The
TryFrom<String>implementation only validates the prefix. A SID like"DTR_"(empty ID portion) or"DTR_x"(very short ID) would be accepted. If the SFU has specific requirements for SID format, consider adding validation here.livekit-datatrack/src/local/manager.rs (3)
284-290: Consider usingbiasedselect for consistent priority handling.Unlike the remote
TrackTask, thistokio::select!doesn't usebiased, meaning thepublished_rx.changed()branch may not be prioritized over frame processing. For consistent behavior and responsive unpublish handling, consider addingbiased.✨ Suggested fix
while is_published { tokio::select! { + biased; _ = self.published_rx.changed() => { is_published = *self.published_rx.borrow(); } Some(frame) = self.frame_rx.recv() => self.process_and_send(frame) } }
355-358: Inconsistent timeout API usage.
publish_trackusestokio::time::timeout, but tests in this file uselivekit_runtime::timeout. For consistency and to ensure proper runtime behavior across different execution contexts, consider usinglivekit_runtime::timeouthere as well.✨ Suggested fix
- let track = tokio::time::timeout(Self::PUBLISH_TIMEOUT, result_rx) + let track = livekit_runtime::timeout(Self::PUBLISH_TIMEOUT, result_rx) .await - .map_err(|_| PublishError::Timeout)? + .map_err(|_| PublishError::Timeout)? .map_err(|_| PublishError::Disconnected)??;
433-436: Minor: Preferfirst()ornext()overnth(0).Using
into_iter().nth(0)is less idiomatic than alternatives.✨ Suggested fix
OutputEvent::PacketsAvailable(packets) => { - let packet = packets.into_iter().nth(0).unwrap(); + let packet = packets.into_iter().next().unwrap(); let payload = Packet::deserialize(packet).unwrap().payload;livekit-datatrack/src/local/mod.rs (1)
137-149: Consider validating non-empty track name.The doc comment states the name "Must not be empty", but
new()accepts any string including empty. Consider adding validation to enforce this requirement.✨ Suggested validation
/// Creates options with the given track name. /// /// The track name is used to identify the track to other participants. /// /// # Requirements /// - Must not be empty /// - Must be unique per publisher /// - pub fn new(name: impl Into<String>) -> Self { - Self { name: name.into() } + /// # Panics + /// Panics if `name` is empty. + /// + pub fn new(name: impl Into<String>) -> Self { + let name = name.into(); + assert!(!name.is_empty(), "Track name must not be empty"); + Self { name } }Alternatively, return a
Resultfor a non-panicking API.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (29)
Cargo.tomllivekit-datatrack/Cargo.tomllivekit-datatrack/README.mdlivekit-datatrack/src/e2ee.rslivekit-datatrack/src/error.rslivekit-datatrack/src/frame.rslivekit-datatrack/src/lib.rslivekit-datatrack/src/local/events.rslivekit-datatrack/src/local/manager.rslivekit-datatrack/src/local/mod.rslivekit-datatrack/src/local/packetizer.rslivekit-datatrack/src/local/pipeline.rslivekit-datatrack/src/local/proto.rslivekit-datatrack/src/packet/deserialize.rslivekit-datatrack/src/packet/extension.rslivekit-datatrack/src/packet/handle.rslivekit-datatrack/src/packet/mod.rslivekit-datatrack/src/packet/serialize.rslivekit-datatrack/src/packet/time.rslivekit-datatrack/src/remote/depacketizer.rslivekit-datatrack/src/remote/events.rslivekit-datatrack/src/remote/manager.rslivekit-datatrack/src/remote/mod.rslivekit-datatrack/src/remote/pipeline.rslivekit-datatrack/src/remote/proto.rslivekit-datatrack/src/track.rslivekit-datatrack/src/utils/bytes.rslivekit-datatrack/src/utils/counter.rslivekit-datatrack/src/utils/mod.rs
🧰 Additional context used
🧬 Code graph analysis (16)
livekit-datatrack/src/utils/mod.rs (1)
livekit-api/src/http_client.rs (1)
bytes(81-83)
livekit-datatrack/src/packet/mod.rs (6)
livekit-api/src/http_client.rs (1)
bytes(81-83)livekit-datatrack/src/frame.rs (2)
fmt(90-95)payload(41-43)livekit-datatrack/src/packet/extension.rs (1)
fmt(36-39)livekit-datatrack/src/packet/handle.rs (1)
fmt(64-66)livekit-datatrack/src/packet/deserialize.rs (3)
deserialize(44-49)deserialize(53-94)deserialize(98-134)livekit-datatrack/src/packet/serialize.rs (1)
serialize(50-57)
livekit-datatrack/src/lib.rs (1)
livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)
livekit-datatrack/src/frame.rs (2)
livekit-datatrack/src/remote/depacketizer.rs (2)
from(190-192)from(196-198)livekit-datatrack/src/local/pipeline.rs (1)
from(81-89)
livekit-datatrack/src/packet/handle.rs (2)
livekit-datatrack/src/local/proto.rs (6)
try_from(45-48)try_from(54-58)try_from(64-73)from(27-31)from(35-37)from(97-109)livekit-datatrack/src/remote/proto.rs (2)
try_from(28-39)from(94-101)
livekit-datatrack/src/utils/counter.rs (1)
livekit-datatrack/src/packet/time.rs (1)
wrapping_add(39-41)
livekit-datatrack/src/local/packetizer.rs (4)
livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)livekit-datatrack/src/packet/time.rs (2)
new(54-56)random(23-25)livekit-datatrack/src/utils/counter.rs (1)
new(21-23)livekit-datatrack/src/packet/handle.rs (2)
from(52-54)from(58-60)
livekit-datatrack/src/remote/depacketizer.rs (1)
livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)
livekit-datatrack/src/packet/deserialize.rs (4)
livekit-datatrack/src/frame.rs (6)
from(101-103)from(107-109)from(113-115)from(119-121)payload(41-43)user_timestamp(46-48)livekit-datatrack/src/packet/handle.rs (2)
from(52-54)from(58-60)livekit-datatrack/src/packet/time.rs (1)
from_ticks(27-29)livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)
livekit-datatrack/src/remote/proto.rs (1)
livekit-datatrack/src/packet/handle.rs (5)
try_from(34-39)try_from(45-48)from(52-54)from(58-60)get(78-81)
livekit-datatrack/src/remote/pipeline.rs (1)
livekit-datatrack/src/remote/depacketizer.rs (3)
new(39-41)from(190-192)from(196-198)
livekit-datatrack/src/remote/manager.rs (3)
livekit-datatrack/src/remote/mod.rs (4)
new(43-45)publisher_identity(100-102)published_rx(113-115)inner(47-52)livekit-datatrack/src/remote/pipeline.rs (1)
new(39-47)livekit-datatrack/src/track.rs (4)
info(38-40)sid(83-85)published_rx(63-68)uses_e2ee(91-93)
livekit-datatrack/src/packet/extension.rs (6)
livekit-datatrack/src/frame.rs (1)
fmt(90-95)livekit-datatrack/src/local/mod.rs (1)
fmt(231-236)livekit-datatrack/src/packet/handle.rs (1)
fmt(64-66)livekit-datatrack/src/packet/mod.rs (1)
fmt(60-65)livekit-datatrack/src/remote/depacketizer.rs (1)
fmt(228-237)livekit-datatrack/src/track.rs (1)
fmt(127-129)
livekit-datatrack/src/local/manager.rs (5)
livekit-datatrack/src/local/mod.rs (4)
new(41-43)new(146-148)new(203-205)inner(45-50)livekit-datatrack/src/local/pipeline.rs (1)
new(47-51)livekit-datatrack/src/remote/pipeline.rs (1)
new(39-47)livekit-datatrack/src/track.rs (4)
info(38-40)sid(83-85)uses_e2ee(91-93)name(87-89)livekit-runtime/src/dispatcher.rs (1)
timeout(115-128)
livekit-datatrack/src/packet/time.rs (1)
livekit-datatrack/src/local/packetizer.rs (1)
new(46-54)
livekit-datatrack/src/track.rs (2)
livekit-datatrack/src/remote/mod.rs (2)
published_rx(113-115)inner(47-52)livekit-datatrack/src/local/proto.rs (3)
from(27-31)from(35-37)from(97-109)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Test (x86_64-unknown-linux-gnu)
- GitHub Check: Test (x86_64-pc-windows-msvc)
- GitHub Check: Test (x86_64-apple-darwin)
🔇 Additional comments (71)
livekit-datatrack/README.md (1)
1-6: Clear and appropriately scoped README.Concise guidance that the crate is internal and should be used via client SDK APIs is helpful and unambiguous.
Cargo.toml (1)
58-73: LGTM!The new workspace dependencies (
from_variants,futures-core,tokio-stream) are properly added in alphabetical order and align with existing dependency versions in the workspace (e.g.,futures-core = "0.3"matchesfutures = "0.3"andfutures-util = "0.3").livekit-datatrack/Cargo.toml (1)
9-25: Dependencies are well-structured.Good use of workspace dependencies for shared crates, and the inline comment on
anyhowclarifying its intended scope ("For internal error handling only") is helpful for maintainability.livekit-datatrack/src/error.rs (1)
17-24: LGTM: clear internal error wrapper.livekit-datatrack/src/utils/bytes.rs (3)
17-32: LGTM: clean chunking extension API.
34-52: LGTM: iterator flow is straightforward and safe.
55-85: Tests cover empty input and varied sizes.livekit-datatrack/src/frame.rs (3)
33-48: LGTM: compact frame container and accessors.
65-86: LGTM: builder-style constructors are clear.
89-121: LGTM: Debug and From impls improve ergonomics.livekit-datatrack/src/e2ee.rs (1)
22-57: LGTM: E2EE payload and provider traits are clear.livekit-datatrack/src/utils/mod.rs (1)
15-21: LGTM: simple utils re-export layer.livekit-datatrack/src/local/pipeline.rs (7)
25-35: LGTM: pipeline options and state are minimal and clear.
37-43: LGTM: error composition is straightforward.
53-57: LGTM: process_frame flow is clean.
59-74: LGTM: encryption path is straightforward.
76-77: LGTM: MTU constant is clearly scoped.
80-89: LGTM: frame-to-packetizer conversion is clear.
92-120: LGTM: test validates packetization output.livekit-datatrack/src/lib.rs (1)
17-59: LGTM: clear public API and backend re-exports.livekit-datatrack/src/utils/counter.rs (2)
15-36: Clean counter API.Monotonic wrap-around behavior is concise and easy to use.
38-53: WrappingIncrement impls look good.Macro coverage for unsigned ints is consistent.
livekit-datatrack/src/packet/deserialize.rs (3)
22-41: Clear error taxonomy.Variants map cleanly to parsing failure cases.
43-49: Straightforward packet assembly.Header parse + payload copy path is clean.
137-275: Good coverage of error and extension cases.The tests exercise critical parsing paths and edge cases well.
livekit-datatrack/src/packet/handle.rs (3)
18-67: Handle type + conversions are solid.Validation and display formatting are clear.
69-82: Allocator behavior looks correct.Monotonic allocation with overflow guard is fine.
84-88: Test-only Dummy impl is fine.Keeps test scaffolding concise.
livekit-datatrack/src/packet/mod.rs (4)
15-57: Core packet types are clean and focused.Public surface is minimal and readable.
59-65: Custom Debug output is appropriate.Avoids payload dumps while keeping shape visible.
68-90: Serialization constants are clear.Bitfield and size constants are easy to reason about.
92-109: Round‑trip test is a good sanity check.Validates serialization ↔ deserialization cohesion.
livekit-datatrack/src/packet/serialize.rs (4)
19-58: Packet serialization API looks consistent.Surface area aligns well with deserialization.
60-127: Header sizing + serialization logic reads correctly.Metrics and write order are clear.
130-168: Extension serialization is straightforward.Marker/length/payload layout is consistent with tests.
170-241: Serialization tests cover key paths.Good validation of lengths, padding, and payload layout.
livekit-datatrack/src/local/packetizer.rs (3)
22-54: Packetizer setup looks good.State fields and defaults are sensible.
90-99: Frame marker helper is clear.The branching is easy to follow and testable.
102-145: Packetization tests cover marker and sizing cases.Good breadth across payload sizes and MTUs.
livekit-datatrack/src/remote/proto.rs (5)
25-39: Subscriber handle mapping is clear.Error mapping and collection are clean.
47-64: Join/update extraction logic looks fine.Ownership transfer via
mem::takeis well‑documented.
82-89: Track-info extraction viamem::takeis clean.Avoids clones and preserves intent.
93-101: Subscription update mapping is correct.The protocol payload is populated consistently.
104-158: Tests exercise the conversion paths well.Covers handle mapping and track extraction.
livekit-datatrack/src/packet/extension.rs (2)
17-33: LGTM: extension container and typed wrappers are clean and test-friendly.
The derives and Option-wrapped fields keep the API minimal and clear.
35-52: Redacted Debug plus explicit TAG/LEN constants look solid.
Good balance between observability and security hygiene.livekit-datatrack/src/packet/time.rs (2)
18-42: Timestamp encapsulation and wraparound helpers look good.
91-122: Tests and Dummy impl provide good coverage and test ergonomics.livekit-datatrack/src/remote/pipeline.rs (1)
87-123: Frame conversion and the no‑E2EE test path look good.livekit-datatrack/src/local/proto.rs (6)
26-38: Publish/unpublish request conversions are clear and consistent.
42-59: Response conversions handle missing info and handle parsing cleanly.
61-73: DataTrackInfo conversion maps encryption and SID as expected.
76-92: RequestResponse error mapping looks sensible.
96-120: Sync-state conversion helpers are tidy and efficient.
122-204: Tests cover the critical conversion paths well.livekit-datatrack/src/remote/mod.rs (3)
42-53: Type-state wiring forRemotelooks good.
99-115: Publisher identity accessor and inner state look good.
118-128: SubscribeError variants are clear and user-facing.livekit-datatrack/src/remote/events.rs (2)
24-54: Input/Output event enums are well-scoped and readable.
56-126: Event payload structs are clear and map cleanly to protocol intent.livekit-datatrack/src/local/events.rs (2)
24-44: Input/Output event enums are well-structured.
48-131: Local event payload structs are clear and consistent.livekit-datatrack/src/remote/manager.rs (2)
378-411: LGTM!The
TrackTask::runimplementation correctly usesbiasedselect to prioritize state updates, properly handles subscriber drop detection viaframe_tx.closed(), and cleanly sends an unsubscribe request when all subscribers are gone.
419-424: LGTM!The
sendmethod appropriately usestry_sendfor synchronous contexts and properly wraps the error with context usinganyhow.livekit-datatrack/src/remote/depacketizer.rs (2)
115-121: LGTM!The buffer overflow protection correctly drops the entire frame and resets state. This prevents memory exhaustion from large fragmented frames while providing clear error reporting.
177-199: LGTM!The
DepacketizerPushResultdesign elegantly handles the case where a single push operation can both complete a frame and report a dropped frame. TheFromimplementations provide clean ergonomic construction.livekit-datatrack/src/track.rs (2)
21-69: LGTM!The
DataTrack<L>generic type with phantom marker for location is a clean design pattern. Thepublished_rx()method correctly dispatches to the appropriate inner type, andwait_for_unpublish()properly handles the already-unpublished case.
132-145: LGTM!The
fake::Dummyimplementation correctly generates realistic test SIDs using a base-57 alphabet that excludes ambiguous characters. The 12-character random suffix provides sufficient uniqueness for testing.livekit-datatrack/src/local/mod.rs (3)
40-96: LGTM!The
DataTrack<Local>implementation provides a clean API for frame publishing. Thetry_pushmethod correctly checks publication state before sending, and the error handling preserves the frame for potential retry. Theunpublishmethod consumingselfensures the track can't be used after unpublishing.
114-119: LGTM!The
Dropimplementation ensures the track is properly unpublished even if the user doesn't explicitly callunpublish(). This is good defensive programming for resource cleanup.
163-237: LGTM!The error types are well-designed with clear documentation.
PushFrameErrorpreserving the frame viainto_frame()enables application-level retry logic, which is a thoughtful API design choice.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@livekit-datatrack/src/local/manager.rs`:
- Around line 298-311: process_and_send currently uses
event_out_tx.try_send(...) which silently drops packets on full channel; change
the behavior to propagate backpressure instead of dropping: make
process_and_send async (or add a new async variant) and replace try_send(...)
with event_out_tx.send(packets.into()).await, returning or propagating the
Result so callers (e.g. try_push) can handle backpressure/failure; update
callers of process_and_send/try_push to await and handle the send error rather
than relying on the non-blocking try_send drop semantics.
In `@livekit-datatrack/src/local/mod.rs`:
- Around line 143-149: The constructor new currently accepts any string but the
doc says "Must not be empty", so add validation: convert the incoming name (in
new) to a String, check if .is_empty(), and if so return an error instead of
constructing the struct; update new's signature to return Result<Self,
SomeErrorType> (create a simple error enum like InvalidTrackName or reuse an
existing error type) and return Ok(Self { name }) on success; reference the new
function and the name field so reviewers can locate the change.
In `@livekit-datatrack/src/remote/mod.rs`:
- Around line 118-121: The SubscribeError enum defines an unused variant
SubscribeError::Unpublished; either remove this dead variant or wire it into the
subscription flow: update the subscribe() function (and any helpers in remote
module) to detect when a track becomes unpublished and return
SubscribeError::Unpublished at that point, or delete the Unpublished variant and
any references to it to avoid dead code. Ensure references to SubscribeError and
match arms in callers are adjusted accordingly.
🧹 Nitpick comments (5)
livekit-datatrack/src/remote/mod.rs (1)
94-96: Silently dropping lagged frames is intentional but worth noting.The
filter_map(|result| async move { result.ok() })silently discardsBroadcastStreamlag errors (when a subscriber can't keep up). This is a reasonable design choice for real-time data, but subscribers won't know if they've missed frames.Consider adding a brief doc comment noting this behavior for users who need guaranteed delivery.
livekit-datatrack/src/track.rs (1)
43-47: Minor: Simplifyis_published()implementation.The intermediate variable is unnecessary.
♻️ Suggested simplification
pub fn is_published(&self) -> bool { let published_rx = self.published_rx(); - let published = *published_rx.borrow(); - published + *published_rx.borrow() }livekit-datatrack/src/local/manager.rs (3)
252-265: Consider parallel task shutdown for efficiency.The shutdown awaits each active task sequentially. With many published tracks, this could delay shutdown. Consider
futures::future::join_allfor parallel completion.♻️ Parallel shutdown suggestion
async fn shutdown(self) { + let mut task_handles = Vec::new(); for (_, descriptor) in self.descriptors { match descriptor { Descriptor::Pending(result_tx) => { _ = result_tx.send(Err(PublishError::Disconnected)) } Descriptor::Active { published_tx, task_handle, .. } => { _ = published_tx.send(false); - task_handle.await; + task_handles.push(task_handle); } } } + futures::future::join_all(task_handles).await; }
367-376: Inconsistent channel send behavior.
publish_trackusestry_send(non-blocking), whilequery_tracksusessend().await(blocking). This inconsistency is likely intentional since queries are less time-critical, but it could cause unexpected blocking if the manager is overwhelmed.
433-436: Minor: Preferfirst()overnth(0).♻️ Style suggestion
OutputEvent::PacketsAvailable(packets) => { - let packet = packets.into_iter().nth(0).unwrap(); + let packet = packets.into_iter().next().unwrap(); let payload = Packet::deserialize(packet).unwrap().payload;
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (15)
Cargo.tomllivekit-datatrack/src/frame.rslivekit-datatrack/src/local/events.rslivekit-datatrack/src/local/manager.rslivekit-datatrack/src/local/mod.rslivekit-datatrack/src/local/pipeline.rslivekit-datatrack/src/packet/mod.rslivekit-datatrack/src/remote/depacketizer.rslivekit-datatrack/src/remote/mod.rslivekit-datatrack/src/track.rslivekit-ffi/src/conversion/participant.rslivekit/src/proto.rslivekit/src/room/mod.rslivekit/src/room/participant/mod.rsrelease-plz.toml
✅ Files skipped from review due to trivial changes (1)
- livekit-ffi/src/conversion/participant.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- Cargo.toml
🧰 Additional context used
🧬 Code graph analysis (5)
livekit-datatrack/src/track.rs (2)
livekit-datatrack/src/remote/mod.rs (1)
published_rx(113-115)livekit-datatrack/src/local/proto.rs (3)
try_from(45-48)try_from(54-58)try_from(64-73)
livekit/src/room/mod.rs (7)
livekit-api/src/access_token.rs (2)
default(100-118)default(131-133)webrtc-sys/src/peer_connection.rs (1)
default(223-234)livekit-api/src/signal_client/mod.rs (2)
default(82-84)default(96-102)livekit/src/room/options.rs (1)
default(94-106)livekit/src/room/participant/rpc.rs (1)
default(29-36)libwebrtc/src/data_channel.rs (1)
default(34-44)libwebrtc/src/native/peer_connection_factory.rs (1)
default(44-55)
livekit-datatrack/src/local/mod.rs (1)
livekit-datatrack/src/remote/mod.rs (2)
inner(47-52)published_rx(113-115)
livekit-datatrack/src/local/pipeline.rs (3)
livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)livekit-datatrack/src/frame.rs (2)
new(69-71)payload(41-43)livekit-datatrack/src/local/packetizer.rs (1)
new(46-54)
livekit-datatrack/src/remote/mod.rs (3)
livekit-datatrack/src/local/mod.rs (5)
new(41-43)new(147-149)new(204-206)inner(45-50)published_rx(110-112)livekit-datatrack/src/remote/pipeline.rs (1)
new(39-47)livekit-datatrack/src/track.rs (2)
info(38-40)published_rx(63-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Build (x86_64-linux-android)
- GitHub Check: Test (x86_64-apple-darwin)
- GitHub Check: Test (x86_64-unknown-linux-gnu)
- GitHub Check: Test (x86_64-pc-windows-msvc)
- GitHub Check: Build (x86_64-unknown-linux-gnu)
- GitHub Check: Build (x86_64-apple-darwin)
- GitHub Check: Build (aarch64-apple-ios-sim)
- GitHub Check: Build (aarch64-pc-windows-msvc)
- GitHub Check: Build (aarch64-linux-android)
- GitHub Check: Build (aarch64-apple-ios)
- GitHub Check: Build (armv7-linux-androideabi)
- GitHub Check: Build (aarch64-apple-darwin)
- GitHub Check: Build (x86_64-pc-windows-msvc)
- GitHub Check: Build (aarch64-unknown-linux-gnu)
🔇 Additional comments (11)
livekit/src/room/participant/mod.rs (1)
48-48: LGTM!Trailing comma additions to enum variants improve consistency and reduce diff noise for future additions.
Also applies to: 57-57
livekit-datatrack/src/remote/mod.rs (1)
39-52: LGTM!The type-state pattern with
PhantomData<Remote>and theinner()accessor is correctly implemented. Theunreachable!()is safe due to compile-time guarantees.livekit-datatrack/src/track.rs (3)
71-94: LGTM!
DataTrackInfoprovides clean encapsulation with internal fields and appropriate public accessors.
108-118: LGTM!The prefix-based validation is appropriate for SFU-assigned identifiers. The implementation correctly rejects malformed SIDs.
132-145: LGTM!The test fake implementation correctly generates valid SIDs with the required prefix and a base57 suffix for better readability in test output.
livekit-datatrack/src/local/mod.rs (3)
83-91: LGTM!The
try_pushimplementation correctly handles the race between checkingis_published()and sending. The early check provides a clearTrackUnpublishederror, whiletry_sendfailure catches edge cases with theDroppederror.
115-120: LGTM!The
Dropimplementation ensures proper cleanup by signaling unpublish, preventing resource leaks if the user forgets to callunpublish().
164-238: LGTM!The error types are well-designed:
PublishErrorcovers all failure modes clearlyPushFrameErrorallows frame recovery for retry logic- Good documentation on each variant
livekit-datatrack/src/local/manager.rs (3)
61-77: LGTM!The manager setup is clean. The TODO for buffer size tuning is noted - this may need adjustment based on real-world usage patterns.
139-156: LGTM!The
forward_publish_resulttask correctly usesbiasedselect to prioritize the result channel, and properly notifies the manager on cancellation via the weak sender.
390-463: LGTM!Good test coverage for the manager lifecycle and publish flow. The tests verify the complete round-trip including SFU response handling and packet emission.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| /// # Requirements | ||
| /// - Must not be empty | ||
| /// - Must be unique per publisher | ||
| /// | ||
| pub fn new(name: impl Into<String>) -> Self { | ||
| Self { name: name.into() } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider validating non-empty name requirement.
The documentation states the track name "Must not be empty", but new() doesn't enforce this. Consider adding validation or clarifying that empty names are validated server-side.
🛡️ Optional validation
pub fn new(name: impl Into<String>) -> Self {
- Self { name: name.into() }
+ let name = name.into();
+ debug_assert!(!name.is_empty(), "Track name must not be empty");
+ Self { name }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// # Requirements | |
| /// - Must not be empty | |
| /// - Must be unique per publisher | |
| /// | |
| pub fn new(name: impl Into<String>) -> Self { | |
| Self { name: name.into() } | |
| } | |
| /// # Requirements | |
| /// - Must not be empty | |
| /// - Must be unique per publisher | |
| /// | |
| pub fn new(name: impl Into<String>) -> Self { | |
| let name = name.into(); | |
| debug_assert!(!name.is_empty(), "Track name must not be empty"); | |
| Self { name } | |
| } |
🤖 Prompt for AI Agents
In `@livekit-datatrack/src/local/mod.rs` around lines 143 - 149, The constructor
new currently accepts any string but the doc says "Must not be empty", so add
validation: convert the incoming name (in new) to a String, check if
.is_empty(), and if so return an error instead of constructing the struct;
update new's signature to return Result<Self, SomeErrorType> (create a simple
error enum like InvalidTrackName or reuse an existing error type) and return
Ok(Self { name }) on success; reference the new function and the name field so
reviewers can locate the change.
| #[derive(Debug, Error)] | ||
| pub enum SubscribeError { | ||
| #[error("The track has been unpublished and is no longer available")] | ||
| Unpublished, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
rg -n 'SubscribeError::Unpublished' livekit-datatrack/Repository: livekit/rust-sdks
Length of output: 43
🏁 Script executed:
rg -n 'subscribe' livekit-datatrack/src/remote/mod.rs -A 5 -B 2Repository: livekit/rust-sdks
Length of output: 1875
🏁 Script executed:
sed -n '75,100p' livekit-datatrack/src/remote/mod.rsRepository: livekit/rust-sdks
Length of output: 1158
Remove the unused SubscribeError::Unpublished variant or implement its use.
The Unpublished variant is defined in the enum but is never returned by subscribe() or used elsewhere in the crate. Either remove it as dead code or implement the logic to return this error when the track becomes unpublished.
🤖 Prompt for AI Agents
In `@livekit-datatrack/src/remote/mod.rs` around lines 118 - 121, The
SubscribeError enum defines an unused variant SubscribeError::Unpublished;
either remove this dead variant or wire it into the subscription flow: update
the subscribe() function (and any helpers in remote module) to detect when a
track becomes unpublished and return SubscribeError::Unpublished at that point,
or delete the Unpublished variant and any references to it to avoid dead code.
Ensure references to SubscribeError and match arms in callers are adjusted
accordingly.
This PR introduces livekit-datatrack, a new crate that encapsulates the core functionality and APIs for data tracks. The crate is intentionally designed with no dependencies on libwebrtc or the signaling client, making it fully unit-testable and enabling incremental adoption in Swift and other languages via the new livekit-uniffi interface.
Key modules:
Given the large scope of data tracks, integration with the Rust client (including E2E tests and examples) as well as FFI bindings have been split up into separate PRs:
Data tracks is also being developed in parallel for the web: