Skip to content

Commit

Permalink
Add worker, fill out CLI, + various lints
Browse files Browse the repository at this point in the history
  • Loading branch information
nbsp committed Apr 12, 2024
1 parent 9e47658 commit ba8711f
Show file tree
Hide file tree
Showing 15 changed files with 466 additions and 57 deletions.
2 changes: 1 addition & 1 deletion agents/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import dts from 'bun-plugin-dts';

await Bun.build({
entrypoints: ['./src/index.ts', './src/tts/index.ts', './src/stt/index.ts'],
entrypoints: ['./src/index.ts', './src/tts/index.ts', './src/stt/index.ts', './src/cli.ts'],
outdir: './dist',
target: 'bun', // https://github.com/oven-sh/bun/blob/main/src/bundler/bundle_v2.zig#L2667
sourcemap: 'external',
Expand Down
6 changes: 4 additions & 2 deletions agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
},
"devDependencies": {
"@types/bun": "latest",
"@types/ws": "^8.5.10",
"bun-plugin-dts": "^0.2.1",
"eslint": "^8.57.0",
"eslint-config-prettier": "^9.1.0",
Expand All @@ -25,10 +26,11 @@
"typescript": "^5.0.0"
},
"dependencies": {
"@livekit/protocol": "^1.12.0",
"@livekit/protocol": "^1.13.0",
"commander": "^12.0.0",
"livekit-server-sdk": "^2.1.2",
"pino": "^8.19.0",
"pino-pretty": "^11.0.0"
"pino-pretty": "^11.0.0",
"ws": "^8.16.0"
}
}
107 changes: 70 additions & 37 deletions agents/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,77 @@
//
// SPDX-License-Identifier: Apache-2.0

import { version } from './index';
import { version } from '.';
import { Option, Command } from 'commander';
import { WorkerOptions, Worker } from './worker';
import { EventEmitter } from 'events';
import { log } from './log';

const program = new Command();
program
.name('agents')
.description('LiveKit Agents CLI')
.version(version)
.addOption(
new Option('--log-level', 'Set the logging level').choices([
'DEBUG',
'INFO',
'WARNING',
'ERROR',
'CRITICAL',
]),
);

program
.command('start')
.description('Start the worker')
.addOption(
new Option('--url <string>', 'LiveKit server or Cloud project websocket URL')
.makeOptionMandatory(true)
.env('LIVEKIT_URL'),
)
.addOption(
new Option('--api-key <string>', "LiveKit server or Cloud project's API key")
.makeOptionMandatory(true)
.env('LIVEKIT_API_KEY'),
)
.addOption(
new Option('--api-secret <string>', "LiveKit server or Cloud project's API secret")
.makeOptionMandatory(true)
.env('LIVEKIT_API_SECRET'),
)
.action(() => {
return;
type CliArgs = {
opts: WorkerOptions;
logLevel: string;
production: boolean;
watch: boolean;
event?: EventEmitter;
};

const runWorker = async (args: CliArgs) => {
log.level = args.logLevel;
const worker = new Worker(args.opts);

process.on('SIGINT', async () => {
await worker.close();
process.exit(130); // SIGINT exit code
});

program.parse();
try {
await worker.run();
} catch {
log.fatal('worker failed');
}
};

export const runApp = (opts: WorkerOptions) => {
const program = new Command()
.name('agents')
.description('LiveKit Agents CLI')
.version(version)
.addOption(
new Option('--log-level <level>', 'Set the logging level')
.choices(['trace', 'debug', 'info', 'warn', 'error', 'fatal'])
.default('trace'),
)
.addOption(
new Option('--url <string>', 'LiveKit server or Cloud project websocket URL')
.makeOptionMandatory(true)
.env('LIVEKIT_URL'),
)
.addOption(
new Option('--api-key <string>', "LiveKit server or Cloud project's API key")
.makeOptionMandatory(true)
.env('LIVEKIT_API_KEY'),
)
.addOption(
new Option('--api-secret <string>', "LiveKit server or Cloud project's API secret")
.makeOptionMandatory(true)
.env('LIVEKIT_API_SECRET'),
)

program
.command('start')
.description('Start the worker in production mode')
.action(() => {
const options = program.optsWithGlobals()
opts.wsURL = options.url || opts.wsURL;
opts.apiKey = options.apiKey || opts.apiKey;
opts.apiSecret = options.apiSecret || opts.apiSecret;
runWorker({
opts,
production: true,
watch: false,
logLevel: options.logLevel,
});
});

program.parse();
};
3 changes: 3 additions & 0 deletions agents/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@
export * from './vad';
export * from './plugin';
export * from './version';
export * from './job_context';
export * from './job_request';
export * from './worker';
9 changes: 5 additions & 4 deletions agents/src/ipc/job_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import { runJob } from './job_main';
import { EventEmitter } from 'events';
import { log } from '../log';

const START_TIMEOUT = 90;
const PING_INTERVAL = 5;
const PING_TIMEOUT = 90;
const HIGH_PING_THRESHOLD = 10; // milliseconds
const START_TIMEOUT = 90 * 1000;
const PING_INTERVAL = 5 * 1000;
const PING_TIMEOUT = 90 * 1000;
const HIGH_PING_THRESHOLD = 10;

export class JobProcess {
#job: Job;
Expand Down Expand Up @@ -103,6 +103,7 @@ export class JobProcess {
const delay = Date.now() - msg.timestamp;
if (delay > HIGH_PING_THRESHOLD) {
this.logger.warn(`job is unresponsive (${delay}ms delay)`);
// @ts-expect-error: this actually works fine types/bun doesn't have a typedecl for it yet
pongTimeout.refresh();
}
} else if (msg instanceof UserExit || msg instanceof ShutdownResponse) {
Expand Down
2 changes: 1 addition & 1 deletion agents/src/ipc/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class StartJobRequest implements Message {

export class StartJobResponse implements Message {
static MSG_ID = 1;
err: Error | undefined;
err?: Error;

get MSG_ID(): number {
return StartJobResponse.MSG_ID;
Expand Down
2 changes: 1 addition & 1 deletion agents/src/job_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { EventEmitter } from 'events';
export class JobContext {
#job: Job;
#room: Room;
#publisher: RemoteParticipant | undefined;
#publisher?: RemoteParticipant;
tx: EventEmitter;

constructor(
Expand Down
8 changes: 2 additions & 6 deletions agents/src/job_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// SPDX-License-Identifier: Apache-2.0

import { JobContext } from './job_context';
import { VideoGrant } from 'livekit-server-sdk';
import { Job, ParticipantInfo, Room } from '@livekit/protocol';
import { log } from './log';
import { EventEmitter } from 'events';
Expand Down Expand Up @@ -35,16 +34,15 @@ export type AcceptData = {
entry: AgentEntry;
autoSubscribe: AutoSubscribe;
autoDisconnect: AutoDisconnect;
grants: VideoGrant;
name: string;
identity: string;
metadata: string;
assign: EventEmitter;
};

type AvailRes = {
export type AvailRes = {
avail: boolean;
data: AcceptData | undefined;
data?: AcceptData;
};

export class JobRequest {
Expand Down Expand Up @@ -91,7 +89,6 @@ export class JobRequest {
entry: AgentEntry,
autoSubscribe: AutoSubscribe = AutoSubscribe.SUBSCRIBE_ALL,
autoDisconnect: AutoDisconnect = AutoDisconnect.ROOM_EMPTY,
grants: VideoGrant,
name: string = '',
identity: string = '',
metadata: string = '',
Expand All @@ -110,7 +107,6 @@ export class JobRequest {
entry,
autoSubscribe,
autoDisconnect,
grants,
name,
identity,
metadata,
Expand Down
2 changes: 1 addition & 1 deletion agents/src/stt/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class StreamAdapterWrapper extends SpeechStream {
stt: STT;
vadStream: VADStream;
eventQueue: (SpeechEvent | undefined)[];
language: string | undefined;
language?: string;
task: {
run: Promise<void>;
cancel: () => void;
Expand Down
2 changes: 1 addition & 1 deletion agents/src/stt/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export abstract class STT {
this.#streamingSupported = streamingSupported;
}

abstract recognize(buffer: AudioBuffer, language: string | undefined): Promise<SpeechEvent>;
abstract recognize(buffer: AudioBuffer, language?: string): Promise<SpeechEvent>;

abstract stream(language: string | undefined): SpeechStream;

Expand Down
2 changes: 1 addition & 1 deletion agents/src/tokenize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export interface SegmentedSentence {
}

export abstract class SentenceTokenizer {
abstract tokenize(text: string, language: string | undefined): SegmentedSentence[];
abstract tokenize(text: string, language?: string): SegmentedSentence[];
abstract stream(language: string | undefined): SentenceStream;
}

Expand Down
4 changes: 2 additions & 2 deletions agents/src/tts/tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export enum SynthesisEventType {

export class SynthesisEvent {
type: SynthesisEventType;
audio: SynthesizedAudio | undefined;
audio?: SynthesizedAudio;

constructor(type: SynthesisEventType, audio: SynthesizedAudio | undefined = undefined) {
this.type = type;
Expand All @@ -26,7 +26,7 @@ export class SynthesisEvent {
}

export abstract class SynthesizeStream implements IterableIterator<SynthesisEvent> {
abstract pushText(token: string | undefined): void;
abstract pushText(token?: string): void;

markSegmentEnd() {
this.pushText(undefined);
Expand Down
Loading

0 comments on commit ba8711f

Please sign in to comment.