- 
                Notifications
    You must be signed in to change notification settings 
- Fork 240
Comments on Agent SDK prototype #237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
356dfb7
              effb228
              ec87981
              ed4a803
              a926e16
              809e322
              7760aec
              5e35853
              3978348
              b2355fc
              2bf6419
              abe2026
              4c1c024
              8fcd2ed
              9425833
              4443c04
              994b209
              1658dc9
              76a3962
              fb01009
              b4ba41f
              b192a46
              6b9dfc5
              58a5ad3
              d38ddf6
              91f4a08
              728258b
              0d312e3
              22282c2
              4d22785
              b2602b8
              6f0eeb5
              d24be87
              8f6301b
              00674f4
              34c3645
              0d6288e
              6fae521
              82a5fe2
              e5b1fc0
              f4eeace
              ad4c236
              730dbed
              80038a0
              c3993be
              fa6cac4
              e47c9af
              6f12617
              1d716d7
              a7ff04f
              9d7c5b3
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,228 @@ | ||
| import type TypedEventEmitter from 'typed-emitter'; | ||
| import { EventEmitter } from "events"; | ||
| import { ConnectionState, ParticipantEvent, ParticipantKind, RemoteParticipant, Room, RoomEvent, Track } from 'livekit-client'; | ||
| import { getParticipantTrackRefs, participantTrackEvents, TrackReference } from '@/agent-sdk/external-deps/components-js'; | ||
| import { ParticipantEventCallbacks } from '@/agent-sdk/external-deps/client-sdk-js'; | ||
| import { ParticipantAttributes } from '@/agent-sdk/lib/participant-attributes'; | ||
|  | ||
| /** State representing the current connection status to the server hosted agent */ | ||
| export type AgentConnectionState = 'disconnected' | 'connecting' | 'connected' | 'reconnecting' | 'signalReconnecting'; | ||
|  | ||
| /** State representing the current status of the agent, whether it is ready for speach, etc */ | ||
| export type AgentConversationalState = 'disconnected' | 'initializing' | 'idle' | 'listening' | 'thinking' | 'speaking'; | ||
|  | ||
| export enum AgentEvent { | ||
| VideoTrackChanged = 'videoTrackChanged', | ||
| AudioTrackChanged = 'videoTrackChanged', | ||
| AgentAttributesChanged = 'agentAttributesChanged', | ||
| AgentConnectionStateChanged = 'agentConnectionStateChanged', | ||
| AgentConversationalStateChanged = 'agentConversationalStateChanged', | ||
| } | ||
|  | ||
| export type AgentCallbacks = { | ||
| [AgentEvent.VideoTrackChanged]: (newTrack: TrackReference | null) => void; | ||
| [AgentEvent.AudioTrackChanged]: (newTrack: TrackReference | null) => void; | ||
| [AgentEvent.AgentAttributesChanged]: (newAttributes: Record<string, string>) => void; | ||
| [AgentEvent.AgentConnectionStateChanged]: (newAgentConnectionState: AgentConnectionState) => void; | ||
| [AgentEvent.AgentConversationalStateChanged]: (newAgentConversationalState: AgentConversationalState) => void; | ||
| }; | ||
|  | ||
| /** | ||
| * Agent encapculates all agent state, normalizing some quirks around how LiveKit Agents work. | ||
| */ | ||
| export default class Agent extends (EventEmitter as new () => TypedEventEmitter<AgentCallbacks>) { | ||
| private room: Room; | ||
|  | ||
| connectionState: AgentConnectionState = 'disconnected'; | ||
| conversationalState: AgentConversationalState = 'disconnected'; | ||
|  | ||
| private agentParticipant: RemoteParticipant | null = null; | ||
| private workerParticipant: RemoteParticipant | null = null; // ref: https://docs.livekit.io/agents/integrations/avatar/#avatar-workers | ||
| audioTrack: TrackReference | null = null; | ||
| videoTrack: TrackReference | null = null; | ||
|  | ||
| attributes: Record<string, string> = {}; | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the defined attributes are primarily intended for internal usage. If anything, I think we should strip all internal  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, probably orthogonal to this PR 👍 | ||
|  | ||
| constructor(room: Room) { | ||
| super(); | ||
| this.room = room; | ||
|  | ||
| this.room.on(RoomEvent.ParticipantConnected, this.handleParticipantConnected); | ||
| this.room.on(RoomEvent.ParticipantDisconnected, this.handleParticipantDisconnected); | ||
| this.room.on(RoomEvent.ConnectionStateChanged, this.handleConnectionStateChanged); | ||
| this.room.localParticipant.on(ParticipantEvent.TrackPublished, this.handleLocalParticipantTrackPublished) | ||
|  | ||
| this.updateConnectionState(); | ||
| this.updateConversationalState(); | ||
| } | ||
|  | ||
| teardown() { | ||
| this.room.off(RoomEvent.ParticipantConnected, this.handleParticipantConnected); | ||
| this.room.off(RoomEvent.ParticipantDisconnected, this.handleParticipantDisconnected); | ||
| this.room.off(RoomEvent.ConnectionStateChanged, this.handleConnectionStateChanged); | ||
| this.room.localParticipant.off(ParticipantEvent.TrackPublished, this.handleLocalParticipantTrackPublished) | ||
| } | ||
|  | ||
| private handleParticipantConnected = () => { | ||
| this.updateParticipants(); | ||
| } | ||
| private handleParticipantDisconnected = () => { | ||
| this.updateParticipants(); | ||
| } | ||
|  | ||
| private handleConnectionStateChanged = () => { | ||
| this.updateConnectionState(); | ||
| this.updateConversationalState(); | ||
| } | ||
|  | ||
| private handleLocalParticipantTrackPublished = () => { | ||
| this.updateConversationalState(); | ||
| } | ||
|  | ||
| private updateParticipants() { | ||
| const newAgentParticipant = this.roomRemoteParticipants.find( | ||
| (p) => p.kind === ParticipantKind.AGENT && !(ParticipantAttributes.publishOnBehalf in p.attributes), | ||
| ) ?? null; | ||
| const newWorkerParticipant = newAgentParticipant ? ( | ||
| this.roomRemoteParticipants.find( | ||
| (p) => | ||
| p.kind === ParticipantKind.AGENT && p.attributes[ParticipantAttributes.publishOnBehalf] === newAgentParticipant.identity, | ||
| ) ?? null | ||
| ) : null; | ||
|  | ||
| const oldAgentParticipant = this.agentParticipant; | ||
| const oldWorkerParticipant = this.workerParticipant; | ||
| this.agentParticipant = newAgentParticipant; | ||
| this.workerParticipant = newWorkerParticipant; | ||
|  | ||
| // 1. Listen for attribute changes | ||
| if (oldAgentParticipant !== this.agentParticipant) { | ||
| oldAgentParticipant?.off(ParticipantEvent.AttributesChanged, this.handleAttributesChanged); | ||
|  | ||
| if (this.agentParticipant) { | ||
| this.agentParticipant.on(ParticipantEvent.AttributesChanged, this.handleAttributesChanged); | ||
| this.handleAttributesChanged(this.agentParticipant.attributes); | ||
| } | ||
| } | ||
|  | ||
| // 2. Listen for track updates | ||
| for (const event of participantTrackEvents) { | ||
| if (oldAgentParticipant !== this.agentParticipant) { | ||
| oldAgentParticipant?.off(event as keyof ParticipantEventCallbacks, this.handleUpdateTracks); | ||
| if (this.agentParticipant) { | ||
| this.agentParticipant.on(event as keyof ParticipantEventCallbacks, this.handleUpdateTracks); | ||
| this.handleUpdateTracks(); | ||
| } | ||
| } | ||
| if (oldWorkerParticipant !== this.workerParticipant) { | ||
| oldWorkerParticipant?.off(event as keyof ParticipantEventCallbacks, this.handleUpdateTracks); | ||
| if (this.workerParticipant) { | ||
| this.workerParticipant.on(event as keyof ParticipantEventCallbacks, this.handleUpdateTracks); | ||
| this.handleUpdateTracks(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|  | ||
| private handleUpdateTracks = () => { | ||
| const newVideoTrack = ( | ||
| this.agentTracks.find((t) => t.source === Track.Source.Camera) ?? | ||
| this.workerTracks.find((t) => t.source === Track.Source.Camera) ?? null | ||
| ); | ||
| if (this.videoTrack !== newVideoTrack) { | ||
| this.videoTrack = newVideoTrack; | ||
| this.emit(AgentEvent.VideoTrackChanged, newVideoTrack); | ||
| } | ||
|  | ||
| const newAudioTrack = ( | ||
| this.agentTracks.find((t) => t.source === Track.Source.Microphone) ?? | ||
| this.workerTracks.find((t) => t.source === Track.Source.Microphone) ?? null | ||
| ); | ||
| if (this.audioTrack !== newAudioTrack) { | ||
|         
                  1egoman marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| this.audioTrack = newAudioTrack; | ||
| this.emit(AgentEvent.AudioTrackChanged, newAudioTrack); | ||
| } | ||
| }; | ||
|  | ||
| private handleAttributesChanged = (attributes: Record<string, string>) => { | ||
| this.attributes = attributes; | ||
| this.emit(AgentEvent.AgentAttributesChanged, attributes); | ||
| this.updateConnectionState(); | ||
| this.updateConversationalState(); | ||
| }; | ||
|  | ||
| private updateConnectionState() { | ||
| let newConnectionState: AgentConnectionState; | ||
|  | ||
| const roomConnectionState = this.room.state; | ||
| if (roomConnectionState === ConnectionState.Disconnected) { | ||
| newConnectionState = 'disconnected'; | ||
| } else if ( | ||
| roomConnectionState === ConnectionState.Connecting || | ||
| !this.agentParticipant || | ||
| !this.attributes[ParticipantAttributes.state] | ||
| ) { | ||
| newConnectionState = 'connecting'; | ||
| } else { | ||
| newConnectionState = roomConnectionState; | ||
| } | ||
| console.log('!! CONNECTION STATE:', newConnectionState); | ||
|  | ||
| if (this.connectionState !== newConnectionState) { | ||
| this.connectionState = newConnectionState; | ||
| this.emit(AgentEvent.AgentConnectionStateChanged, newConnectionState); | ||
| } | ||
| } | ||
|  | ||
| private updateConversationalState() { | ||
| let newConversationalState: AgentConversationalState = 'disconnected'; | ||
|  | ||
| if (this.room.state !== ConnectionState.Disconnected) { | ||
| newConversationalState = 'initializing'; | ||
| } | ||
|  | ||
| // If the microphone preconnect buffer is active, then the state should be "listening" rather | ||
| // than "initializing" | ||
| const micTrack = this.room.localParticipant.getTrackPublication(Track.Source.Microphone); | ||
| if (micTrack) { | ||
| newConversationalState = 'listening'; | ||
| } | ||
|  | ||
| if (this.agentParticipant && this.attributes[ParticipantAttributes.state]) { | ||
| // ref: https://github.com/livekit/agents/blob/65170238db197f62f479eb7aaef1c0e18bfad6e7/livekit-agents/livekit/agents/voice/events.py#L97 | ||
| const agentState = this.attributes[ParticipantAttributes.state] as 'initializing' | 'idle' | 'listening' | 'thinking' | 'speaking'; | ||
| newConversationalState = agentState; | ||
| } | ||
|  | ||
| console.log('!! CONVERSATIONAL STATE:', newConversationalState); | ||
|  | ||
| if (this.conversationalState !== newConversationalState) { | ||
| this.conversationalState = newConversationalState; | ||
| this.emit(AgentEvent.AgentConversationalStateChanged, newConversationalState); | ||
| } | ||
| } | ||
|  | ||
| private get roomRemoteParticipants() { | ||
| return Array.from(this.room.remoteParticipants.values()); | ||
| } | ||
|  | ||
| private get agentTracks() { | ||
| if (!this.agentParticipant) { | ||
| return []; | ||
| } | ||
| return getParticipantTrackRefs( | ||
| this.agentParticipant, | ||
| { sources: [Track.Source.Microphone, Track.Source.Camera] } | ||
| ); | ||
| } | ||
|  | ||
| private get workerTracks() { | ||
| if (!this.workerParticipant) { | ||
| return []; | ||
| } | ||
| return getParticipantTrackRefs( | ||
| this.workerParticipant, | ||
| { sources: [Track.Source.Microphone, Track.Source.Camera] } | ||
| ); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any strong (or JS-specific) arguments why it couldn't be flattened into one "observable" object?
Agentsounds more likeParticipantvs an object trackingRoomevents.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW that was the intention, I want to avoid
AgentSessionbecoming large and having to deal with all the quirks of the agent participant / worker participant, the "on behalf of" stuff, etc.It sounds like you are proposing this become more of a general "state container" that would have a larger responsibility than it currently does?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, 2 main points here:
Agentshould not know that much about theRoom- looks inverted from OOP perspective to mePersonally I don't think
AgentSessionwill ever be "too big" on the client side (look at the agents impl then)