diff --git a/packages/genui/server/app/a2ui/_shared.ts b/packages/genui/server/app/a2ui/_shared.ts index f1e85d890c..3b2ed4a3ab 100644 --- a/packages/genui/server/app/a2ui/_shared.ts +++ b/packages/genui/server/app/a2ui/_shared.ts @@ -91,6 +91,14 @@ export interface InvalidMessages { error: string; } +export interface JsonBodyMetrics { + declaredByteLength?: number; + rawByteLength?: number; + readMs?: number; + parseMs?: number; + totalMs: number; +} + export function validateMessages( value: unknown, ): ValidatedMessages | InvalidMessages { @@ -235,40 +243,94 @@ export function validateConversation( export async function readJsonBodyWithLimit( req: Request, ): Promise< - | { ok: true; body: T } - | { ok: false; status: number; error: string } + | { ok: true; body: T; metrics: JsonBodyMetrics } + | { ok: false; status: number; error: string; metrics: JsonBodyMetrics } > { + const startedAt = performance.now(); const declaredLength = req.headers.get('content-length'); - if (declaredLength) { - const n = Number(declaredLength); - if (Number.isFinite(n) && n > MAX_BODY_BYTES) { - return { - ok: false, - status: 413, - error: `request body exceeds ${MAX_BODY_BYTES} bytes`, - }; - } + const declaredByteLength = declaredLength + ? Number(declaredLength) + : undefined; + if ( + declaredLength + && declaredByteLength !== undefined + && Number.isFinite(declaredByteLength) + && declaredByteLength > MAX_BODY_BYTES + ) { + return { + ok: false, + status: 413, + error: `request body exceeds ${MAX_BODY_BYTES} bytes`, + metrics: { + declaredByteLength, + totalMs: performance.now() - startedAt, + }, + }; } let raw: string; + const readStartedAt = performance.now(); try { raw = await req.text(); } catch { - return { ok: false, status: 400, error: 'failed to read request body' }; + const now = performance.now(); + return { + ok: false, + status: 400, + error: 'failed to read request body', + metrics: { + declaredByteLength, + readMs: now - readStartedAt, + totalMs: now - startedAt, + }, + }; } + const readEndedAt = performance.now(); const rawByteLength = Buffer.byteLength(raw, 'utf8'); if (rawByteLength > MAX_BODY_BYTES) { + const now = performance.now(); return { ok: false, status: 413, error: `request body exceeds ${MAX_BODY_BYTES} bytes`, + metrics: { + declaredByteLength, + rawByteLength, + readMs: readEndedAt - readStartedAt, + totalMs: now - startedAt, + }, }; } + const parseStartedAt = performance.now(); try { - return { ok: true, body: JSON.parse(raw) as T }; + const body = JSON.parse(raw) as T; + const now = performance.now(); + return { + ok: true, + body, + metrics: { + declaredByteLength, + rawByteLength, + readMs: readEndedAt - readStartedAt, + parseMs: now - parseStartedAt, + totalMs: now - startedAt, + }, + }; } catch { - return { ok: false, status: 400, error: 'invalid JSON body' }; + const now = performance.now(); + return { + ok: false, + status: 400, + error: 'invalid JSON body', + metrics: { + declaredByteLength, + rawByteLength, + readMs: readEndedAt - readStartedAt, + parseMs: now - parseStartedAt, + totalMs: now - startedAt, + }, + }; } } diff --git a/packages/genui/server/app/a2ui/action/stream/route.ts b/packages/genui/server/app/a2ui/action/stream/route.ts index 0879d56d5d..4a369bd52f 100644 --- a/packages/genui/server/app/a2ui/action/stream/route.ts +++ b/packages/genui/server/app/a2ui/action/stream/route.ts @@ -84,12 +84,30 @@ export function OPTIONS(req: Request) { } export async function POST(req: Request) { + const { log, requestId } = createStreamLogger('/a2ui/action/stream'); + log('request.received', { + contentLength: req.headers.get('content-length'), + }); + const decision = checkRateLimit(req); if (!decision.ok) { + log('rate_limit.rejected', { + retryAfterSec: decision.retryAfterSec, + remaining: decision.remaining, + resetAt: decision.resetAt, + }); return rateLimitSseResponse(req, decision); } + log('rate_limit.accepted', { + remaining: decision.remaining, + resetAt: decision.resetAt, + }); const parsed = await readJsonBodyWithLimit(req); + log(parsed.ok ? 'body.parsed' : 'body.rejected', { + ...parsed.metrics, + error: parsed.ok ? undefined : parsed.error, + }); if (!parsed.ok) { return jsonWithCors( req, @@ -99,7 +117,12 @@ export async function POST(req: Request) { } const body = parsed.body; + const validationStartedAt = performance.now(); if (!body.action || !body.action.name) { + log('action.rejected', { + durationMs: performance.now() - validationStartedAt, + error: 'action.name is required', + }); return jsonWithCors( req, { ok: false, error: 'action.name is required' }, @@ -108,6 +131,10 @@ export async function POST(req: Request) { } if (!body.surfaceId) { + log('action.rejected', { + durationMs: performance.now() - validationStartedAt, + error: 'surfaceId is required for action responses', + }); return jsonWithCors( req, { @@ -120,12 +147,19 @@ export async function POST(req: Request) { const validatedConversation = validateConversation(body.conversation); if (!validatedConversation.ok) { + log('conversation.rejected', { + durationMs: performance.now() - validationStartedAt, + error: validatedConversation.error, + }); return jsonWithCors( req, { ok: false, error: validatedConversation.error }, { status: validatedConversation.status }, ); } + log('request.validated', { + durationMs: performance.now() - validationStartedAt, + }); const service = getA2UIAgentService(); const payload = { @@ -134,6 +168,11 @@ export async function POST(req: Request) { }; const userContent = `A2UI_USER_ACTION: ${JSON.stringify(payload)}`; if (userContent.length > MAX_MESSAGE_CHARS) { + log('action.rejected', { + durationMs: performance.now() - validationStartedAt, + error: `synthesized user action exceeds ${MAX_MESSAGE_CHARS} characters`, + userContentLength: userContent.length, + }); return jsonWithCors( req, { @@ -149,17 +188,26 @@ export async function POST(req: Request) { content: userContent, }; - const opts = pickChatOptions(body); - const { log, requestId } = createStreamLogger('/a2ui/action/stream'); + const opts = { + ...pickChatOptions(body), + onPerformanceEvent: (event: string, details = {}) => { + log(event, details); + }, + }; log('request.accepted', { surfaceId: body.surfaceId, actionName: body.action.name, conversationHistoryCount: validatedConversation.conversation?.history.length ?? 0, + conversationHistoryChars: validatedConversation.conversation?.history + .reduce((total, message) => total + message.content.length, 0) ?? 0, dataModelKeyCount: validatedConversation.conversation ? Object.keys(validatedConversation.conversation.dataModel).length : 0, + dataModelChars: validatedConversation.conversation + ? JSON.stringify(validatedConversation.conversation.dataModel).length + : 0, userContentLength: userContent.length, model: opts.model, hasBaseURL: Boolean(opts.baseURL), @@ -174,20 +222,34 @@ export async function POST(req: Request) { }; try { + const connectStartedAt = performance.now(); + log('agent.connect.started'); const { textStream, finalize } = await service.streamAsAsyncIterable( [userMessage], opts, validatedConversation.conversation, ); + log('agent.connect.completed', { + durationMs: performance.now() - connectStartedAt, + }); const protocolParser = new A2UIProtocolMessageStreamParser(); const streamedMessages: unknown[] = []; let streamedText = ''; let chunkCount = 0; + let firstChunkLogged = false; log('upstream.stream.started'); for await (const chunk of textStream) { chunkCount += 1; + if (!firstChunkLogged) { + firstChunkLogged = true; + log('upstream.first_chunk', { + durationSinceConnectStartedMs: performance.now() + - connectStartedAt, + chunkLength: chunk.length, + }); + } streamedText += chunk; enqueue('delta', { text: chunk }); const newMessages = protocolParser.push(chunk); diff --git a/packages/genui/server/app/a2ui/stream/route.ts b/packages/genui/server/app/a2ui/stream/route.ts index 27ee424dee..a40c50c506 100644 --- a/packages/genui/server/app/a2ui/stream/route.ts +++ b/packages/genui/server/app/a2ui/stream/route.ts @@ -68,12 +68,30 @@ export function OPTIONS(req: Request) { } export async function POST(req: Request) { + const { log, requestId } = createStreamLogger('/a2ui/stream'); + log('request.received', { + contentLength: req.headers.get('content-length'), + }); + const decision = checkRateLimit(req); if (!decision.ok) { + log('rate_limit.rejected', { + retryAfterSec: decision.retryAfterSec, + remaining: decision.remaining, + resetAt: decision.resetAt, + }); return rateLimitSseResponse(req, decision); } + log('rate_limit.accepted', { + remaining: decision.remaining, + resetAt: decision.resetAt, + }); const parsed = await readJsonBodyWithLimit(req); + log(parsed.ok ? 'body.parsed' : 'body.rejected', { + ...parsed.metrics, + error: parsed.ok ? undefined : parsed.error, + }); if (!parsed.ok) { return jsonWithCors( req, @@ -83,8 +101,13 @@ export async function POST(req: Request) { } const body = parsed.body; + const validationStartedAt = performance.now(); const validated = validateMessages(body.messages); if (!validated.ok) { + log('messages.rejected', { + durationMs: performance.now() - validationStartedAt, + error: validated.error, + }); return jsonWithCors( req, { ok: false, error: validated.error }, @@ -94,23 +117,43 @@ export async function POST(req: Request) { const messages = validated.messages; const validatedConversation = validateConversation(body.conversation); if (!validatedConversation.ok) { + log('conversation.rejected', { + durationMs: performance.now() - validationStartedAt, + error: validatedConversation.error, + }); return jsonWithCors( req, { ok: false, error: validatedConversation.error }, { status: validatedConversation.status }, ); } - const opts = pickChatOptions(body); + log('request.validated', { + durationMs: performance.now() - validationStartedAt, + }); + const opts = { + ...pickChatOptions(body), + onPerformanceEvent: (event: string, details = {}) => { + log(event, details); + }, + }; const service = getA2UIAgentService(); - const { log, requestId } = createStreamLogger('/a2ui/stream'); log('request.accepted', { messageCount: messages.length, + messageChars: messages.reduce( + (total, message) => total + message.content.length, + 0, + ), conversationHistoryCount: validatedConversation.conversation?.history.length ?? 0, + conversationHistoryChars: validatedConversation.conversation?.history + .reduce((total, message) => total + message.content.length, 0) ?? 0, dataModelKeyCount: validatedConversation.conversation ? Object.keys(validatedConversation.conversation.dataModel).length : 0, + dataModelChars: validatedConversation.conversation + ? JSON.stringify(validatedConversation.conversation.dataModel).length + : 0, model: opts.model, hasBaseURL: Boolean(opts.baseURL), catalogId: opts.catalog?.id ?? BASIC_CATALOG.id, @@ -124,20 +167,34 @@ export async function POST(req: Request) { }; try { + const connectStartedAt = performance.now(); + log('agent.connect.started'); const { textStream, finalize } = await service.streamAsAsyncIterable( messages, opts, validatedConversation.conversation, ); + log('agent.connect.completed', { + durationMs: performance.now() - connectStartedAt, + }); const protocolParser = new A2UIProtocolMessageStreamParser(); const streamedMessages: unknown[] = []; let streamedText = ''; let chunkCount = 0; + let firstChunkLogged = false; log('upstream.stream.started'); for await (const chunk of textStream) { chunkCount += 1; + if (!firstChunkLogged) { + firstChunkLogged = true; + log('upstream.first_chunk', { + durationSinceConnectStartedMs: performance.now() + - connectStartedAt, + chunkLength: chunk.length, + }); + } streamedText += chunk; enqueue('delta', { text: chunk }); const newMessages = protocolParser.push(chunk); diff --git a/packages/genui/server/service/a2ui-agent.ts b/packages/genui/server/service/a2ui-agent.ts index 089846dd20..9daa220da9 100644 --- a/packages/genui/server/service/a2ui-agent.ts +++ b/packages/genui/server/service/a2ui-agent.ts @@ -32,6 +32,10 @@ export interface ChatOptions { model?: string | undefined; catalog?: A2UICatalog | undefined; maxRepairAttempts?: number | undefined; + onPerformanceEvent?: ( + event: string, + details?: Record, + ) => void; } export interface A2UIResponse { @@ -92,15 +96,26 @@ function buildDataModelSystemMessage( }; } +function sumContentChars(messages: ChatMessage[]): number { + return messages.reduce((total, message) => total + message.content.length, 0); +} + export default class A2UIAgentService { private agentCache = new Map>(); private getAgent(opts: ChatOptions): Promise { + const startedAt = performance.now(); const cacheKey = `${opts.baseURL ?? 'default'}:${opts.model ?? 'default'}:${ hashApiKey(opts.apiKey) }:${opts.catalog?.id ?? 'basic'}`; let cached = this.agentCache.get(cacheKey); - if (cached) return cached; + if (cached) { + opts.onPerformanceEvent?.('agent.cache.hit', { + durationMs: performance.now() - startedAt, + cacheSize: this.agentCache.size, + }); + return cached; + } cached = Promise.resolve( createA2UIAgent(pickDefined({ @@ -111,6 +126,10 @@ export default class A2UIAgentService { })).agent, ); this.agentCache.set(cacheKey, cached); + opts.onPerformanceEvent?.('agent.cache.miss', { + durationMs: performance.now() - startedAt, + cacheSize: this.agentCache.size, + }); return cached; } @@ -123,12 +142,27 @@ export default class A2UIAgentService { opts: ChatOptions = {}, ): Promise { const agent = await this.getAgent(opts); - return agent.stream( - this.toModelMessages(messages), + const modelMessagesStartedAt = performance.now(); + const modelMessages = this.toModelMessages(messages); + opts.onPerformanceEvent?.('agent.model_messages.built', { + durationMs: performance.now() - modelMessagesStartedAt, + messageCount: messages.length, + contentChars: sumContentChars(messages), + }); + + const streamStartedAt = performance.now(); + opts.onPerformanceEvent?.('agent.stream.invoke.started'); + const result = agent.stream( + modelMessages, pickDefined({ resourceId: opts.resourceId, }), ) as MastraStreamResult; + opts.onPerformanceEvent?.('agent.stream.invoke.completed', { + durationMs: performance.now() - streamStartedAt, + hasTextStream: Boolean(result.textStream), + }); + return result; } public async streamAsAsyncIterable( @@ -143,8 +177,20 @@ export default class A2UIAgentService { finishReason: unknown; }>; }> { + const buildConversationStartedAt = performance.now(); + const preparedMessages = buildConversationMessages(messages, conversation); + opts.onPerformanceEvent?.('agent.conversation.built', { + durationMs: performance.now() - buildConversationStartedAt, + inputMessageCount: messages.length, + conversationHistoryCount: conversation?.history.length ?? 0, + dataModelKeyCount: conversation + ? Object.keys(conversation.dataModel).length + : 0, + preparedMessageCount: preparedMessages.length, + preparedContentChars: sumContentChars(preparedMessages), + }); const streamResult: MastraStreamResult = await this.stream( - buildConversationMessages(messages, conversation), + preparedMessages, opts, ); const raw = streamResult.textStream;