Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
30e8c9c
Proto v1.41.0
pblazej Sep 5, 2025
2ef85c0
Todo
pblazej Sep 5, 2025
a1f2230
Lib v137.7151.06
pblazej Sep 8, 2025
a8ac611
Minimal impl
pblazej Sep 8, 2025
c29edbc
Lib v137.7151.07
pblazej Sep 10, 2025
37e1267
Basic test
pblazej Sep 10, 2025
37a5f27
Participant keys
pblazej Sep 10, 2025
9e2acec
Failure tests
pblazej Sep 10, 2025
c9c5683
Fix sequencing
pblazej Sep 10, 2025
f0e7299
Fix pod
pblazej Sep 10, 2025
a473c49
Add current key infra
pblazej Sep 10, 2025
13a2f8d
Ratcheting test
pblazej Sep 10, 2025
414e235
Simplify extensions
pblazej Sep 10, 2025
dbb1ef5
Retry cosmetics
pblazej Sep 10, 2025
621d9ad
Key index test
pblazej Sep 10, 2025
1a16920
Stream handling and delegates
pblazej Sep 10, 2025
bf349fa
Deprecate old methods and fix tests
pblazej Sep 11, 2025
cde547a
Cosmetics
pblazej Sep 11, 2025
96eccc0
Deprecate e2ee options
pblazej Sep 11, 2025
c7eed63
Move data channel tests around
pblazej Sep 11, 2025
c21e498
Assert
pblazej Sep 11, 2025
bb1c86a
Minor simplifications
pblazej Sep 11, 2025
592c0d4
Change
pblazej Sep 11, 2025
5bf4652
Fix fallback
pblazej Sep 11, 2025
87b49c0
Merge branch 'main' into blaze/dc-e2ee
pblazej Sep 11, 2025
70af9b6
Update proto plugin
pblazej Sep 11, 2025
bdbe79c
Proto v1.41.0, new generator
pblazej Sep 11, 2025
d569de4
Fix default ratchet window
pblazej Sep 11, 2025
b34a38f
Revert "Fix default ratchet window"
pblazej Sep 12, 2025
15dfb96
Cmt
pblazej Sep 12, 2025
d48f241
Param names
pblazej Sep 12, 2025
6070ec2
Always decrypt
pblazej Sep 15, 2025
9b567f8
Split inits
pblazej Sep 15, 2025
825ae80
ObjC
pblazej Sep 15, 2025
7529f9c
Merge branch 'main' into blaze/dc-e2ee
hiroshihorie Sep 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/encrypted-dc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
minor type="added" "Added support for data channel encryption, deprecated existing E2EE options"
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ let package = Package(
dependencies: [
// LK-Prefixed Dynamic WebRTC XCFramework
.package(url: "https://github.com/livekit/webrtc-xcframework.git", exact: "137.7151.09"),
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.29.0"),
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.31.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.6.2"),
.package(url: "https://github.com/apple/swift-collections.git", from: "1.1.0"),
// Only used for DocC generation
Expand Down
2 changes: 1 addition & 1 deletion [email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ let package = Package(
dependencies: [
// LK-Prefixed Dynamic WebRTC XCFramework
.package(url: "https://github.com/livekit/webrtc-xcframework.git", exact: "137.7151.09"),
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.29.0"),
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.31.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.6.2"),
.package(url: "https://github.com/apple/swift-collections.git", from: "1.1.0"),
// Only used for DocC generation
Expand Down
53 changes: 48 additions & 5 deletions Sources/LiveKit/Core/DataChannelPair.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ internal import LiveKitWebRTC

protocol DataChannelDelegate: Sendable {
func dataChannel(_ dataChannelPair: DataChannelPair, didReceiveDataPacket dataPacket: Livekit_DataPacket)
func dataChannel(_ dataChannelPair: DataChannelPair, didFailToDecryptDataPacket dataPacket: Livekit_DataPacket, error: LiveKitError)
}

class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
Expand All @@ -34,6 +35,8 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {

var isOpen: Bool { _state.isOpen }

var e2eeManager: E2EEManager?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this can be weak ? I think the Room holds the instance. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be careful with that - I mean, there's no point in making sure that there's only 1 strong ref... if there's no cycle here... Let's double-check if the DC is deallocated properly (if needed).


// MARK: - Private

private struct State {
Expand Down Expand Up @@ -87,7 +90,7 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
func peek() -> PublishDataRequest? { queue.first }

mutating func enqueue(_ request: PublishDataRequest) {
queue.append(request)
queue.append(request.withoutContinuation())
currentAmount += UInt64(request.data.data.count)
}

Expand All @@ -110,6 +113,10 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
let data: LKRTCDataBuffer
let sequence: UInt32
let continuation: CheckedContinuation<Void, any Error>?

func withoutContinuation() -> Self {
.init(data: data, sequence: sequence, continuation: nil)
}
}

private struct ChannelEvent: Sendable {
Expand Down Expand Up @@ -234,8 +241,9 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
log("Wrong packet sequence while retrying: \(first.sequence) > \(lastSeq + 1), \(first.sequence - lastSeq - 1) packets missing", .warning)
}
while let request = buffer.dequeue() {
assert(request.continuation == nil, "Continuation may fire multiple times while retrying causing crash")
if request.sequence > lastSeq {
let event = ChannelEvent(channelKind: .reliable, detail: .publishData(PublishDataRequest(data: request.data, sequence: request.sequence, continuation: nil)))
let event = ChannelEvent(channelKind: .reliable, detail: .publishData(request))
_state.eventContinuation?.yield(event)
}
}
Expand Down Expand Up @@ -315,7 +323,7 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
}

func send(dataPacket packet: Livekit_DataPacket) async throws {
let packet = withSequence(packet)
let packet = try withEncryption(withSequence(packet))
let serializedData = try packet.serializedData()
let rtcData = RTC.createDataBuffer(data: serializedData)

Expand All @@ -333,6 +341,20 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
}
}

private func withEncryption(_ packet: Livekit_DataPacket) throws -> Livekit_DataPacket {
guard let e2eeManager, e2eeManager.isDataChannelEncryptionEnabled,
let payload = Livekit_EncryptedPacketPayload(dataPacket: packet) else { return packet }
var packet = packet
do {
let payloadData = try payload.serializedData()
let rtcEncryptedPacket = try e2eeManager.encrypt(data: payloadData)
packet.encryptedPacket = Livekit_EncryptedPacket(rtcPacket: rtcEncryptedPacket)
} catch {
throw LiveKitError(.encryptionFailed, internalError: error)
}
return packet
}

private func withSequence(_ packet: Livekit_DataPacket) -> Livekit_DataPacket {
guard packet.kind == .reliable, packet.sequence == 0 else { return packet }
var packet = packet
Expand Down Expand Up @@ -413,8 +435,29 @@ extension DataChannelPair: LKRTCDataChannelDelegate {
}
}

delegates.notify {
$0.dataChannel(self, didReceiveDataPacket: dataPacket)
if let encryptedPacket = dataPacket.encryptedPacketOrNil,
let e2eeManager
{
do {
let decryptedData = try e2eeManager.handle(encryptedData: encryptedPacket.toRTCEncryptedPacket(), participantIdentity: dataPacket.participantIdentity)
let decryptedPayload = try Livekit_EncryptedPacketPayload(serializedBytes: decryptedData)

var dataPacket = dataPacket
decryptedPayload.applyTo(&dataPacket)

delegates.notify { [dataPacket] in
$0.dataChannel(self, didReceiveDataPacket: dataPacket)
}
} catch {
log("Failed to decrypt data packet: \(error)", .error)
delegates.notify {
$0.dataChannel(self, didFailToDecryptDataPacket: dataPacket, error: LiveKitError(.decryptionFailed, internalError: error))
}
}
} else {
delegates.notify {
$0.dataChannel(self, didReceiveDataPacket: dataPacket)
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions Sources/LiveKit/Core/Room+EngineDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,20 @@ extension Room {
await publication.set(track: nil)
}

func engine(_ engine: Room, didReceiveUserPacket packet: Livekit_UserPacket) {
func engine(_ engine: Room, didReceiveUserPacket packet: Livekit_UserPacket, encryptionType: EncryptionType) {
// participant could be null if data broadcasted from server
let identity = Participant.Identity(from: packet.participantIdentity)
let participant = _state.remoteParticipants[identity]

if case .connected = engine._state.connectionState {
delegates.notify(label: { "room.didReceive data: \(packet.payload)" }) {
$0.room?(self, participant: participant, didReceiveData: packet.payload, forTopic: packet.topic)
$0.room?(self, participant: participant, didReceiveData: packet.payload, forTopic: packet.topic, encryptionType: encryptionType)
}

if let participant {
participant.delegates.notify(label: { "participant.didReceive data: \(packet.payload)" }) { [weak participant] delegate in
guard let participant else { return }
delegate.participant?(participant, didReceiveData: packet.payload, forTopic: packet.topic)
delegate.participant?(participant, didReceiveData: packet.payload, forTopic: packet.topic, encryptionType: encryptionType)
}
}
}
Expand Down
33 changes: 26 additions & 7 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable {
let incomingStreamManager = IncomingStreamManager()
lazy var outgoingStreamManager = OutgoingStreamManager { [weak self] packet in
try await self?.send(dataPacket: packet)
} encryptionProvider: { [weak self] in
self?.e2eeManager?.dataChannelEncryptionType ?? .none
}

// MARK: - PreConnect
Expand Down Expand Up @@ -340,15 +342,26 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable {
_state.mutate { $0.connectOptions = connectOptions }
}

await cleanUp()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved due to E2EEManager.cleanUp called after setup


try Task.checkCancellation()

// enable E2EE
if let e2eeOptions = state.roomOptions.e2eeOptions {
e2eeManager = E2EEManager(e2eeOptions: e2eeOptions)
e2eeManager!.setup(room: self)
}
} else if let encryptionOptions = state.roomOptions.encryptionOptions {
e2eeManager = E2EEManager(options: encryptionOptions)
e2eeManager!.setup(room: self)

await cleanUp()
subscriberDataChannel.e2eeManager = e2eeManager
publisherDataChannel.e2eeManager = e2eeManager
} else {
e2eeManager = nil

try Task.checkCancellation()
subscriberDataChannel.e2eeManager = nil
publisherDataChannel.e2eeManager = nil
}

_state.mutate { $0.connectionState = .connecting }

Expand Down Expand Up @@ -620,15 +633,21 @@ extension Room: DataChannelDelegate {
func dataChannel(_: DataChannelPair, didReceiveDataPacket dataPacket: Livekit_DataPacket) {
switch dataPacket.value {
case let .speaker(update): engine(self, didUpdateSpeakers: update.speakers)
case let .user(userPacket): engine(self, didReceiveUserPacket: userPacket)
case let .user(userPacket): engine(self, didReceiveUserPacket: userPacket, encryptionType: dataPacket.encryptedPacket.encryptionType.toLKType())
case let .transcription(packet): room(didReceiveTranscriptionPacket: packet)
case let .rpcResponse(response): room(didReceiveRpcResponse: response)
case let .rpcAck(ack): room(didReceiveRpcAck: ack)
case let .rpcRequest(request): room(didReceiveRpcRequest: request, from: dataPacket.participantIdentity)
case let .streamHeader(header): Task { await incomingStreamManager.handle(header: header, from: dataPacket.participantIdentity) }
case let .streamChunk(chunk): Task { await incomingStreamManager.handle(chunk: chunk) }
case let .streamTrailer(trailer): Task { await incomingStreamManager.handle(trailer: trailer) }
case let .streamHeader(header): Task { await incomingStreamManager.handle(header: header, from: dataPacket.participantIdentity, encryptionType: dataPacket.encryptedPacket.encryptionType.toLKType()) }
case let .streamChunk(chunk): Task { await incomingStreamManager.handle(chunk: chunk, encryptionType: dataPacket.encryptedPacket.encryptionType.toLKType()) }
case let .streamTrailer(trailer): Task { await incomingStreamManager.handle(trailer: trailer, encryptionType: dataPacket.encryptedPacket.encryptionType.toLKType()) }
default: return
}
}

func dataChannel(_: DataChannelPair, didFailToDecryptDataPacket _: Livekit_DataPacket, error: LiveKitError) {
delegates.notify {
$0.room?(self, didFailToDecryptDataWithEror: error)
}
}
}
3 changes: 3 additions & 0 deletions Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,9 @@ private extension SignalClient {

case .roomMoved:
log("Received roomMoved message")

case .mediaSectionsRequirement:
log("Received mediaSectionsRequirement message")
}
}
}
Expand Down
41 changes: 32 additions & 9 deletions Sources/LiveKit/DataStream/Incoming/IncomingStreamManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ actor IncomingStreamManager: Loggable {
// MARK: - Packet processing

/// Handles a data stream header.
func handle(header: Livekit_DataStream.Header, from identityString: String) {
func handle(header: Livekit_DataStream.Header, from identityString: String, encryptionType: EncryptionType) {
let identity = Participant.Identity(from: identityString)

guard let streamInfo = Self.streamInfo(from: header) else {
guard let streamInfo = Self.streamInfo(from: header, encryptionType: encryptionType) else {
return
}
openStream(with: streamInfo, from: identity)
Expand Down Expand Up @@ -110,9 +110,18 @@ actor IncomingStreamManager: Loggable {
}

/// Handles a data stream chunk.
func handle(chunk: Livekit_DataStream.Chunk) {
func handle(chunk: Livekit_DataStream.Chunk, encryptionType: EncryptionType) {
guard !chunk.content.isEmpty, let descriptor = openStreams[chunk.streamID] else { return }

if descriptor.info.encryptionType != encryptionType {
let error = StreamError.encryptionTypeMismatch(
expected: descriptor.info.encryptionType,
received: encryptionType
)
descriptor.continuation.finish(throwing: error)
return
}

let readLength = descriptor.readLength + chunk.content.count

if let totalLength = descriptor.info.totalLength {
Expand All @@ -126,10 +135,20 @@ actor IncomingStreamManager: Loggable {
}

/// Handles a data stream trailer.
func handle(trailer: Livekit_DataStream.Trailer) {
func handle(trailer: Livekit_DataStream.Trailer, encryptionType: EncryptionType) {
guard let descriptor = openStreams[trailer.streamID] else {
return
}

if descriptor.info.encryptionType != encryptionType {
let error = StreamError.encryptionTypeMismatch(
expected: descriptor.info.encryptionType,
received: encryptionType
)
descriptor.continuation.finish(throwing: error)
return
}

if let totalLength = descriptor.info.totalLength {
guard descriptor.readLength == totalLength else {
descriptor.continuation.finish(throwing: StreamError.incomplete)
Expand Down Expand Up @@ -186,10 +205,10 @@ public typealias TextStreamHandler = @Sendable (TextStreamReader, Participant.Id
// MARK: - From protocol types

extension IncomingStreamManager {
static func streamInfo(from header: Livekit_DataStream.Header) -> StreamInfo? {
static func streamInfo(from header: Livekit_DataStream.Header, encryptionType: EncryptionType) -> StreamInfo? {
switch header.contentHeader {
case let .byteHeader(byteHeader): ByteStreamInfo(header, byteHeader)
case let .textHeader(textHeader): TextStreamInfo(header, textHeader)
case let .byteHeader(byteHeader): ByteStreamInfo(header, byteHeader, encryptionType)
case let .textHeader(textHeader): TextStreamInfo(header, textHeader, encryptionType)
default: nil
}
}
Expand All @@ -198,14 +217,16 @@ extension IncomingStreamManager {
extension ByteStreamInfo {
convenience init(
_ header: Livekit_DataStream.Header,
_ byteHeader: Livekit_DataStream.ByteHeader
_ byteHeader: Livekit_DataStream.ByteHeader,
_ encryptionType: EncryptionType
) {
self.init(
id: header.streamID,
topic: header.topic,
timestamp: header.timestampDate,
totalLength: header.hasTotalLength ? Int(header.totalLength) : nil,
attributes: header.attributes,
encryptionType: encryptionType,
// ---
mimeType: header.mimeType,
name: byteHeader.name
Expand All @@ -216,14 +237,16 @@ extension ByteStreamInfo {
extension TextStreamInfo {
convenience init(
_ header: Livekit_DataStream.Header,
_ textHeader: Livekit_DataStream.TextHeader
_ textHeader: Livekit_DataStream.TextHeader,
_ encryptionType: EncryptionType
) {
self.init(
id: header.streamID,
topic: header.topic,
timestamp: header.timestampDate,
totalLength: header.hasTotalLength ? Int(header.totalLength) : nil,
attributes: header.attributes,
encryptionType: encryptionType,
// ---
operationType: TextStreamInfo.OperationType(textHeader.operationType),
version: Int(textHeader.version),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import Foundation
/// Manages state of outgoing data streams.
actor OutgoingStreamManager: Loggable {
typealias PacketHandler = @Sendable (Livekit_DataPacket) async throws -> Void
typealias EncryptionProvider = @Sendable () -> EncryptionType

private nonisolated let packetHandler: PacketHandler
private nonisolated let encryptionProvider: EncryptionProvider

init(packetHandler: @escaping PacketHandler) {
init(packetHandler: @escaping PacketHandler, encryptionProvider: @escaping EncryptionProvider) {
self.packetHandler = packetHandler
self.encryptionProvider = encryptionProvider
}

// MARK: - Opening streams
Expand All @@ -35,6 +38,7 @@ actor OutgoingStreamManager: Loggable {
timestamp: Date(),
totalLength: text.utf8.count, // Number of bytes in UTF-8 representation
attributes: options.attributes,
encryptionType: encryptionProvider(),
operationType: .create,
version: options.version,
replyToStreamID: options.replyToStreamID,
Expand All @@ -61,6 +65,7 @@ actor OutgoingStreamManager: Loggable {
timestamp: Date(),
totalLength: fileInfo.size, // Not overridable
attributes: options.attributes,
encryptionType: encryptionProvider(),
mimeType: options.mimeType ?? fileInfo.mimeType ?? Self.byteMimeType,
name: options.name ?? fileInfo.name
)
Expand All @@ -81,6 +86,7 @@ actor OutgoingStreamManager: Loggable {
timestamp: Date(),
totalLength: nil,
attributes: options.attributes,
encryptionType: encryptionProvider(),
operationType: .create,
version: options.version,
replyToStreamID: options.replyToStreamID,
Expand All @@ -100,6 +106,7 @@ actor OutgoingStreamManager: Loggable {
timestamp: Date(),
totalLength: options.totalSize,
attributes: options.attributes,
encryptionType: encryptionProvider(),
mimeType: options.mimeType ?? Self.byteMimeType,
name: options.name
)
Expand Down
3 changes: 3 additions & 0 deletions Sources/LiveKit/DataStream/StreamError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ public enum StreamError: Error, Equatable {

/// Unable to read information about the file to send.
case fileInfoUnavailable

/// Encryption type mismatch between stream header and chunk/trailer.
case encryptionTypeMismatch(expected: EncryptionType, received: EncryptionType)
}
Loading
Loading