Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 76 additions & 14 deletions packages/genui/server/app/a2ui/_shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ export interface InvalidMessages {
error: string;
}

export interface JsonBodyMetrics {
declaredByteLength?: number;
rawByteLength?: number;
readMs?: number;
parseMs?: number;
totalMs: number;
}

export function validateMessages(
value: unknown,
): ValidatedMessages | InvalidMessages {
Expand Down Expand Up @@ -235,40 +243,94 @@ export function validateConversation(
export async function readJsonBodyWithLimit<T>(
req: Request,
): Promise<
| { ok: true; body: T }
| { ok: false; status: number; error: string }
| { ok: true; body: T; metrics: JsonBodyMetrics }
| { ok: false; status: number; error: string; metrics: JsonBodyMetrics }
> {
const startedAt = performance.now();
const declaredLength = req.headers.get('content-length');
if (declaredLength) {
const n = Number(declaredLength);
if (Number.isFinite(n) && n > MAX_BODY_BYTES) {
return {
ok: false,
status: 413,
error: `request body exceeds ${MAX_BODY_BYTES} bytes`,
};
}
const declaredByteLength = declaredLength
? Number(declaredLength)
: undefined;
if (
declaredLength
&& declaredByteLength !== undefined
&& Number.isFinite(declaredByteLength)
&& declaredByteLength > MAX_BODY_BYTES
) {
return {
ok: false,
status: 413,
error: `request body exceeds ${MAX_BODY_BYTES} bytes`,
metrics: {
declaredByteLength,
totalMs: performance.now() - startedAt,
},
};
}

let raw: string;
const readStartedAt = performance.now();
try {
raw = await req.text();
} catch {
return { ok: false, status: 400, error: 'failed to read request body' };
const now = performance.now();
return {
ok: false,
status: 400,
error: 'failed to read request body',
metrics: {
declaredByteLength,
readMs: now - readStartedAt,
totalMs: now - startedAt,
},
};
}
const readEndedAt = performance.now();

const rawByteLength = Buffer.byteLength(raw, 'utf8');
if (rawByteLength > MAX_BODY_BYTES) {
const now = performance.now();
return {
ok: false,
status: 413,
error: `request body exceeds ${MAX_BODY_BYTES} bytes`,
metrics: {
declaredByteLength,
rawByteLength,
readMs: readEndedAt - readStartedAt,
totalMs: now - startedAt,
},
};
}

const parseStartedAt = performance.now();
try {
return { ok: true, body: JSON.parse(raw) as T };
const body = JSON.parse(raw) as T;
const now = performance.now();
return {
ok: true,
body,
metrics: {
declaredByteLength,
rawByteLength,
readMs: readEndedAt - readStartedAt,
parseMs: now - parseStartedAt,
totalMs: now - startedAt,
},
};
} catch {
return { ok: false, status: 400, error: 'invalid JSON body' };
const now = performance.now();
return {
ok: false,
status: 400,
error: 'invalid JSON body',
metrics: {
declaredByteLength,
rawByteLength,
readMs: readEndedAt - readStartedAt,
parseMs: now - parseStartedAt,
totalMs: now - startedAt,
},
};
}
}
66 changes: 64 additions & 2 deletions packages/genui/server/app/a2ui/action/stream/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,30 @@ export function OPTIONS(req: Request) {
}

export async function POST(req: Request) {
const { log, requestId } = createStreamLogger('/a2ui/action/stream');
log('request.received', {
contentLength: req.headers.get('content-length'),
});

const decision = checkRateLimit(req);
if (!decision.ok) {
log('rate_limit.rejected', {
retryAfterSec: decision.retryAfterSec,
remaining: decision.remaining,
resetAt: decision.resetAt,
});
return rateLimitSseResponse(req, decision);
}
log('rate_limit.accepted', {
remaining: decision.remaining,
resetAt: decision.resetAt,
});

const parsed = await readJsonBodyWithLimit<A2UIActionStreamBody>(req);
log(parsed.ok ? 'body.parsed' : 'body.rejected', {
...parsed.metrics,
error: parsed.ok ? undefined : parsed.error,
});
if (!parsed.ok) {
return jsonWithCors(
req,
Expand All @@ -99,7 +117,12 @@ export async function POST(req: Request) {
}
const body = parsed.body;

const validationStartedAt = performance.now();
if (!body.action || !body.action.name) {
log('action.rejected', {
durationMs: performance.now() - validationStartedAt,
error: 'action.name is required',
});
return jsonWithCors(
req,
{ ok: false, error: 'action.name is required' },
Expand All @@ -108,6 +131,10 @@ export async function POST(req: Request) {
}

if (!body.surfaceId) {
log('action.rejected', {
durationMs: performance.now() - validationStartedAt,
error: 'surfaceId is required for action responses',
});
return jsonWithCors(
req,
{
Expand All @@ -120,12 +147,19 @@ export async function POST(req: Request) {

const validatedConversation = validateConversation(body.conversation);
if (!validatedConversation.ok) {
log('conversation.rejected', {
durationMs: performance.now() - validationStartedAt,
error: validatedConversation.error,
});
return jsonWithCors(
req,
{ ok: false, error: validatedConversation.error },
{ status: validatedConversation.status },
);
}
log('request.validated', {
durationMs: performance.now() - validationStartedAt,
});

const service = getA2UIAgentService();
const payload = {
Expand All @@ -134,6 +168,11 @@ export async function POST(req: Request) {
};
const userContent = `A2UI_USER_ACTION: ${JSON.stringify(payload)}`;
if (userContent.length > MAX_MESSAGE_CHARS) {
log('action.rejected', {
durationMs: performance.now() - validationStartedAt,
error: `synthesized user action exceeds ${MAX_MESSAGE_CHARS} characters`,
userContentLength: userContent.length,
});
return jsonWithCors(
req,
{
Expand All @@ -149,17 +188,26 @@ export async function POST(req: Request) {
content: userContent,
};

const opts = pickChatOptions(body);
const { log, requestId } = createStreamLogger('/a2ui/action/stream');
const opts = {
...pickChatOptions(body),
onPerformanceEvent: (event: string, details = {}) => {
log(event, details);
},
};

log('request.accepted', {
surfaceId: body.surfaceId,
actionName: body.action.name,
conversationHistoryCount: validatedConversation.conversation?.history.length
?? 0,
conversationHistoryChars: validatedConversation.conversation?.history
.reduce((total, message) => total + message.content.length, 0) ?? 0,
dataModelKeyCount: validatedConversation.conversation
? Object.keys(validatedConversation.conversation.dataModel).length
: 0,
dataModelChars: validatedConversation.conversation
? JSON.stringify(validatedConversation.conversation.dataModel).length
: 0,
userContentLength: userContent.length,
model: opts.model,
hasBaseURL: Boolean(opts.baseURL),
Expand All @@ -174,20 +222,34 @@ export async function POST(req: Request) {
};

try {
const connectStartedAt = performance.now();
log('agent.connect.started');
const { textStream, finalize } = await service.streamAsAsyncIterable(
[userMessage],
opts,
validatedConversation.conversation,
);
log('agent.connect.completed', {
durationMs: performance.now() - connectStartedAt,
});
const protocolParser = new A2UIProtocolMessageStreamParser();
const streamedMessages: unknown[] = [];
let streamedText = '';
let chunkCount = 0;
let firstChunkLogged = false;

log('upstream.stream.started');

for await (const chunk of textStream) {
chunkCount += 1;
if (!firstChunkLogged) {
firstChunkLogged = true;
log('upstream.first_chunk', {
durationSinceConnectStartedMs: performance.now()
- connectStartedAt,
chunkLength: chunk.length,
});
}
streamedText += chunk;
enqueue('delta', { text: chunk });
const newMessages = protocolParser.push(chunk);
Expand Down
Loading
Loading