Skip to content
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
bc411aa
Move basic Agent files
pblazej Sep 18, 2025
43861e8
Fix inconsistencies
pblazej Sep 18, 2025
314b1c1
Media state from participant
pblazej Sep 18, 2025
93f3081
Naming
pblazej Sep 19, 2025
319798f
Attributes gen
pblazej Sep 23, 2025
d4a496e
Transcription tests
pblazej Sep 23, 2025
dfa4db6
Extract tests
pblazej Sep 23, 2025
54acf68
Renaming
pblazej Sep 23, 2025
bec88b6
Pass token sources
pblazej Oct 16, 2025
33b02f9
Renaming
pblazej Oct 16, 2025
d141d6a
Extract Options
pblazej Oct 16, 2025
0e49e6f
Split options
pblazej Oct 16, 2025
7b954da
Nest
pblazej Oct 16, 2025
06609c1
Weak
pblazej Oct 16, 2025
ac90eb4
Fix existential
pblazej Oct 16, 2025
d84ef9b
Errors
pblazej Oct 16, 2025
4fcd651
Sendable
pblazej Oct 16, 2025
d5e6437
Older Swift
pblazej Oct 16, 2025
9cf68ff
CR: Session.withAgent factory
pblazej Oct 17, 2025
ae38145
CR: Don't expose multiple agents
pblazej Oct 21, 2025
2327745
Naming
pblazej Oct 21, 2025
ddf20a5
Use ordered dict
pblazej Oct 21, 2025
89e25f5
Merge branch 'main' into blaze/agent-conversation
pblazej Oct 22, 2025
a93fcf0
Alt design: Agent struct/enum
pblazej Oct 22, 2025
961b7c4
Discussion: update logic from JS
pblazej Oct 22, 2025
15509a9
Expose state again
pblazej Oct 22, 2025
00d74cd
Labels
pblazej Oct 22, 2025
24e6f18
Move
pblazej Oct 23, 2025
8ebbc9c
Mutable state with explicit transitions
pblazej Oct 23, 2025
095ffe7
Ext
pblazej Oct 23, 2025
a2654a0
State machine improvements
pblazej Oct 23, 2025
44e2af9
Cmts
pblazej Oct 23, 2025
9220587
Reconnect
pblazej Oct 23, 2025
624d580
Change
pblazej Oct 23, 2025
6cb1a38
Hide room conn state
pblazej Oct 23, 2025
565db30
Move token inside preconnect
pblazej Oct 23, 2025
160f8fb
Fix pre-connect gap
pblazej Oct 23, 2025
a787606
Simplify pre-connect state
pblazej Oct 23, 2025
7a71453
Enable mic w/o preconnect
pblazej Oct 23, 2025
585036b
Revert states on catch
pblazej Oct 24, 2025
385d2bb
Move wait
pblazej Oct 24, 2025
6846414
Catch receiver errors
pblazej Oct 24, 2025
99e74b8
Video device error handling
pblazej Oct 24, 2025
d8cf47d
Rename errors
pblazej Oct 24, 2025
b32b5db
Optional message return
pblazej Oct 24, 2025
ce657d4
Finish on deinit
pblazej Oct 24, 2025
ead1a44
Move fetch to extension
pblazej Oct 27, 2025
b20d1dc
ci: Update sims
pblazej Oct 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/agent-session
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
minor type="added" "Agent and Session APIs for creating agent-based apps"
151 changes: 151 additions & 0 deletions Sources/LiveKit/Agent/Agent.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

/// Represents a LiveKit Agent.
///
/// The ``Agent`` struct represents the state of a LiveKit agent within a ``Session``.
/// It provides information about the agent's connection status, its current state
/// (e.g., listening, thinking, speaking), and its media tracks.
///
/// The ``Agent``'s properties are updated automatically by the ``Session`` as the agent's
/// state changes. This allows the application to react to the agent's
/// behavior, such as displaying its avatar video or indicating when it is speaking.
/// The ``agentState`` property is particularly useful for building UIs that reflect
/// the agent's current activity.
///
/// - SeeAlso: [LiveKit SwiftUI Agent Starter](https://github.com/livekit-examples/agent-starter-swift).
/// - SeeAlso: [LiveKit Agents documentation](https://docs.livekit.io/agents/).
public struct Agent: Loggable {
// MARK: - Error

public enum Error: LocalizedError {
case timeout

public var errorDescription: String? {
switch self {
case .timeout:
"Agent did not connect"
}
}
}

// MARK: - State

private enum State {
case disconnected
case connecting(buffering: Bool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question
what this |buffering| mean ? or prebuffering is what it means ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's equivalent to JS isBufferingSpeech so pre-connect buffer, it's not exposed anywhere, but we can rename it

case connected(agentState: AgentState, audioTrack: (any AudioTrack)?, avatarVideoTrack: (any VideoTrack)?)
case failed(error: Error)
}

private var state: State = .disconnected

// MARK: - Transitions

mutating func disconnected() {
log("Agent disconnected from \(state)")
// From any state
state = .disconnected
}

mutating func failed(error: Error) {
log("Agent failed with error \(error) from \(state)")
// From any state
state = .failed(error: error)
}

mutating func connecting(buffering: Bool) {
log("Agent connecting from \(state)")
switch state {
case .disconnected, .connecting:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question
should you allow .connected here ? that might means reconnect ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no .reconnecting state in the agent itself, this overall Agent.State is a little artificial as it's derived partially from the room, as mentioned above: #789 (comment)

state = .connecting(buffering: buffering)
default:
log("Invalid transition from \(state) to connecting", .warning)
}
}

mutating func connected(participant: Participant) {
log("Agent connected to \(participant) from \(state)")
switch state {
case .connecting, .connected:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick

should you handle the noOp operation ? like adding a

private func assign(_ new: State) {
guard new != state else { return }
state = new
}

Then use
assign(.connected(agentState: participant.agentState,
audioTrack: participant.agentAudioTrack,
avatarVideoTrack: participant.avatarVideoTrack))

Copy link
Contributor Author

@pblazej pblazej Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the perspective of UI framework, it does not really matter (for perf) https://medium.com/airbnb-engineering/understanding-and-improving-swiftui-performance-37b77ac61896 as we use the non-equatable (default) comparison for the State struct.

state = .connected(agentState: participant.agentState,
audioTrack: participant.agentAudioTrack,
avatarVideoTrack: participant.avatarVideoTrack)
default:
log("Invalid transition from \(state) to connected", .warning)
}
}

// MARK: - Public

/// A boolean value indicating whether the agent is connected.
public var isConnected: Bool {
switch state {
case .connected: true
default: false
}
}

/// The current conversational state of the agent.
public var agentState: AgentState? {
switch state {
case let .connected(agentState, _, _): agentState
default: nil
}
}

/// The agent's audio track.
public var audioTrack: (any AudioTrack)? {
switch state {
case let .connected(_, audioTrack, _): audioTrack
default: nil
}
}

/// The agent's avatar video track.
public var avatarVideoTrack: (any VideoTrack)? {
switch state {
case let .connected(_, _, avatarVideoTrack): avatarVideoTrack
default: nil
}
}

/// The last error that occurred.
public var error: Error? {
switch state {
case let .failed(error): error
default: nil
}
}
}

private extension Participant {
var agentAudioTrack: (any AudioTrack)? {
audioTracks.first(where: { $0.source == .microphone })?.track as? AudioTrack
}

var avatarVideoTrack: (any VideoTrack)? {
avatarWorker?.firstCameraVideoTrack
}
}

extension AgentState: CustomStringConvertible {
public var description: String {
rawValue.capitalized
}
}
41 changes: 41 additions & 0 deletions Sources/LiveKit/Agent/Chat/Message.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

/// A message received from the agent.
public struct ReceivedMessage: Identifiable, Equatable, Codable, Sendable {
public let id: String
public let timestamp: Date
public let content: Content

public enum Content: Equatable, Codable, Sendable {
case agentTranscript(String)
case userTranscript(String)
case userInput(String)
}
}

/// A message sent to the agent.
public struct SentMessage: Identifiable, Equatable, Codable, Sendable {
public let id: String
public let timestamp: Date
public let content: Content

public enum Content: Equatable, Codable, Sendable {
case userInput(String)
}
}
25 changes: 25 additions & 0 deletions Sources/LiveKit/Agent/Chat/Receive/MessageReceiver.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

/// A protocol that defines a message receiver.
///
/// A message receiver is responsible for creating a stream of messages from the agent.
/// It is used to receive messages from the agent and update the message feed.
public protocol MessageReceiver: Sendable {
func messages() async throws -> AsyncStream<ReceivedMessage>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

/// An actor that receives transcription messages from the room and yields them as messages.
///
/// Room delegate methods are called multiple times for each message, with a stable message ID
/// that can be direcly used for diffing.
///
/// Example:
/// ```
/// { id: "1", content: "Hello" }
/// { id: "1", content: "Hello world!" }
/// ```
@available(*, deprecated, message: "Use TranscriptionStreamReceiver compatible with livekit-agents 1.0")
actor TranscriptionDelegateReceiver: MessageReceiver, RoomDelegate {
private let room: Room
private var continuation: AsyncStream<ReceivedMessage>.Continuation?

init(room: Room) {
self.room = room
room.add(delegate: self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question

should this TranscriptionDelegateReceiver hold the room strong reference ? or it should be weak ?

Could you please confirm that there is no cycle reference if they hold each other strongly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Room does not keep a strong ref to the delegates

let delegates = NSHashTable<AnyObject>.weakObjects()

}

deinit {
continuation?.finish()
room.remove(delegate: self)
}

/// Creates a new message stream for the transcription delegate receiver.
func messages() -> AsyncStream<ReceivedMessage> {
let (stream, continuation) = AsyncStream.makeStream(of: ReceivedMessage.self)
self.continuation = continuation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question
Can this messages() will be called multiple times?
If yes, the current code stores a single continuation, calling messages() again overwrites it and could it leave the old stream hanging ?
In that case, I am a bit worrying that it never finish the stream either, so consumers can hang ?

And I wonder if we should have an explicit stop function like
func stop() {
room?.remove(delegate: self)
room = nil
continuation?.finish()
continuation = nil
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a general problem with AsyncStream<> being exposed here (without storing anything internally). It's not intended to be used by multiple consumers at all (known issue). There's also no equivalent AnyAsyncSequence (as AnyPublisher).

it never finish the stream either,

It can be cancelled from the outside like this:

let locations = AsyncLocationStream()

let task = Task {
    for await location in locations.stream {
        print(location)
    }
}

task.cancel()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: stream idempotence, I think we've got 2 choices:

  • leave is as is - it won't register another consumer for this topic, just throwing StreamError.handlerAlreadyRegistered
  • unregister before registering, so the previous stream will stop working - I think that's the worst one

return stream
}

nonisolated func room(_: Room, participant: Participant, trackPublication _: TrackPublication, didReceiveTranscriptionSegments segments: [TranscriptionSegment]) {
segments
.filter { !$0.text.isEmpty }
.forEach { segment in
let message = ReceivedMessage(
id: segment.id,
timestamp: segment.lastReceivedTime,
content: participant.isAgent ? .agentTranscript(segment.text) : .userTranscript(segment.text)
)
Task {
await yield(message)
}
}
}

private func yield(_ message: ReceivedMessage) {
continuation?.yield(message)
}
}
Loading
Loading