Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions packages/genui/a2ui-playground/examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# A2UI playground examples

Reference implementations that intentionally live **outside**
`@lynx-js/a2ui-reactlynx`. The package itself ships only:

- `<A2UI>` — the protocol-naive renderer.
- `MessageStore` — a pure raw-message buffer.
- The catalog + custom-component-author API.

Everything else — talking to an agent, chunking turns, theming the chat
shell — is the developer's choice. These examples show common shapes;
copy and adapt them.

## `io-mock/`

`createMockAgent(store, opts)` returns a driver that pushes a fixed
initial stream into the store and serves canned responses to user
actions. Used by the playground's `lynx-src/App.tsx` to exercise demos
without a real agent.

```ts
const store = createMessageStore();
const agent = createMockAgent(store, { initialMessages, actionMocks });
agent.start(); // streams initial messages into the buffer
agent.onAction(action); // pushes the canned response to a user action
```

## `io-sse/`

`createSseAgent(store, { url })` opens an SSE connection and pushes the
parsed `delta` / `complete` events into the store. Roughly the
implementation that used to live inside the package's `BaseClient`,
re-targeted at the dumb-buffer store.

```ts
const store = createMessageStore();
const agent = createSseAgent(store, { url: '/api/agent' });
await agent.send('hello'); // streams response into the buffer
await agent.onAction(action); // forwards a user action over SSE
```

## Multi-turn chat shell pattern

For chat UIs, give each turn (user prompt + agent response) its own
`MessageStore` and render one `<A2UI messageStore={turnStore}>` per
agent turn. The shell only tracks turns; the renderer handles
everything inside an agent turn.

```tsx
function Conversation({ catalogs, respond }) {
const [turns, setTurns] = useState([]);
const send = async (input) => {
const store = createMessageStore();
setTurns((t) => [
...t,
{ kind: 'user', content: input },
{ kind: 'agent', store },
]);
await respond(input, store);
};
return turns.map((t) =>
t.kind === 'user'
? <view key={...}><text>{t.content}</text></view>
: <A2UI key={...} messageStore={t.store} catalogs={catalogs} />
);
}
```

Each `<A2UI>` only sees a bounded buffer; history is just a list of
turns the shell maintains.
96 changes: 96 additions & 0 deletions packages/genui/a2ui-playground/examples/io-mock/mockAgent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2026 The Lynx Authors. All rights reserved.
// Licensed under the Apache License Version 2.0 that can be found in the
// LICENSE file in the root directory of this source tree.
//
// Reference mock IO module. Pushes a fixed initial stream into the store
// and serves canned responses to user actions. NOT shipped from
// `@lynx-js/a2ui-reactlynx` — copy as a starting point for tests / demos.
import type {
MessageStore,
ServerToClientMessage,
UserActionPayload,
} from '@lynx-js/a2ui-reactlynx';

export interface MockAgentOptions {
/** Streamed once after `start()`. */
initialMessages?: readonly ServerToClientMessage[];
/** Per-action response messages, keyed by action name. */
actionMocks?: Record<
string,
| readonly ServerToClientMessage[]
| ((ctx: UserActionPayload) => readonly ServerToClientMessage[])
>;
/** Delay between successive batches when streaming. */
delayMs?: number;
}

export interface MockAgent {
/**
* Begin streaming the initial messages. Idempotent — calling twice
* returns the original promise.
*/
start(): Promise<void>;
/** Forward a user action; pushes the canned response, if any. */
onAction(action: UserActionPayload): Promise<void>;
/** Stop streaming and discard any pending messages. */
stop(): void;
}

/**
* Build a mock agent driver bound to a `MessageStore`. The driver
* streams raw protocol messages into the store with a small delay
* between each, simulating an SSE-like server.
*/
export function createMockAgent(
store: MessageStore,
options: MockAgentOptions = {},
): MockAgent {
const { initialMessages, actionMocks = {}, delayMs = 800 } = options;
const abort = new AbortController();
let started: Promise<void> | null = null;

function sleep(ms: number): Promise<void> {
if (ms <= 0 || abort.signal.aborted) return Promise.resolve();
return new Promise<void>((resolve) => {
const onAbort = () => {
clearTimeout(timer);
resolve();
};
const timer = setTimeout(() => {
abort.signal.removeEventListener('abort', onAbort);
resolve();
}, ms);
abort.signal.addEventListener('abort', onAbort, { once: true });
});
}

async function streamInto(
messages: readonly ServerToClientMessage[],
): Promise<void> {
for (const msg of messages) {
if (abort.signal.aborted) return;
store.push(msg);
if (delayMs > 0) {
await sleep(delayMs);
if (abort.signal.aborted) return;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}

return {
start() {
if (started) return started;
started = streamInto(initialMessages ?? []);
return started;
},
async onAction(action) {
const mock = actionMocks[action.name];
if (!mock) return;
const stream = typeof mock === 'function' ? mock(action) : mock;
await streamInto(stream);
},
stop() {
abort.abort();
},
};
}
231 changes: 231 additions & 0 deletions packages/genui/a2ui-playground/examples/io-sse/sseAgent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
// Copyright 2026 The Lynx Authors. All rights reserved.
// Licensed under the Apache License Version 2.0 that can be found in the
// LICENSE file in the root directory of this source tree.
//
// Reference SSE IO module. Opens an EventSource, parses `delta` /
// `complete` events, normalizes their payloads, and pushes the resulting
// raw protocol messages into the store. NOT shipped from
// `@lynx-js/a2ui-reactlynx` — copy and adapt the URL building, event
// names, and queueing for your agent.
import type {
A2UIClientEventMessage,
MessageStore,
ServerToClientMessage,
UserActionPayload,
} from '@lynx-js/a2ui-reactlynx';
import { normalizePayloadToMessages } from '@lynx-js/a2ui-reactlynx';

const MESSAGE_PROCESS_DELAY = 300;

interface TypedEventSource {
addEventListener(
type: string,
listener: (
event: { data?: unknown; target?: unknown; type?: string },
) => void,
): void;
close(): void;
readyState: number;
}

declare const lynx: {
EventSource?: new(url: string) => TypedEventSource;
} | undefined;

function buildSseParams(
message: A2UIClientEventMessage,
messageId: string,
): Record<string, string> {
const params: Record<string, string> = { messageId };
const anyMessage = message as Record<string, unknown>;

if (typeof message === 'string') {
params.text = message;
} else if (anyMessage) {
if (typeof anyMessage.text === 'string') {
params.text = anyMessage.text;
} else if (anyMessage.text) {
params.text = JSON.stringify(anyMessage.text);
} else if (anyMessage.userAction) {
const userAction = anyMessage.userAction as {
name: string;
context?: Record<string, unknown>;
};
const actionName = userAction.name || 'unknownAction';
const context = userAction.context ?? {};
params.text = `USER_ACTION: ${actionName}, Context: ${
JSON.stringify(context)
}`;
} else {
params.text = JSON.stringify(message);
}

if (typeof anyMessage.sessionId === 'string') {
params.sessionId = anyMessage.sessionId;
} else if (anyMessage.sessionId) {
params.sessionId = JSON.stringify(anyMessage.sessionId);
}
}

return params;
}

function randomId(prefix: string) {
return prefix + Date.now().toString(36)
+ Math.random().toString(36).slice(2, 10);
}

function toError(e: unknown): Error {
return e instanceof Error ? e : new Error(String(e));
}

export interface SseAgentOptions {
url: string;
}

export interface SseAgent {
/**
* Send an input to the agent and stream the response messages into the
* store. Returns when the SSE connection emits its `complete` event.
*/
send(input: A2UIClientEventMessage | string): Promise<void>;
/**
* Forward a user action — convenience over `send({ userAction })`.
*/
onAction(action: UserActionPayload): Promise<void>;
/** Abort any open connections. */
stop(): void;
}

export function createSseAgent(
store: MessageStore,
options: SseAgentOptions,
): SseAgent {
const { url: baseUrl } = options;
const abort = new AbortController();

const send = (input: A2UIClientEventMessage | string): Promise<void> =>
new Promise<void>((resolve, reject) => {
if (abort.signal.aborted) {
resolve();
return;
}
const messageId = randomId('task_');
const params = new URLSearchParams(buildSseParams(input, messageId));
const url = `${baseUrl}?${params.toString()}`;

const g = globalThis as Record<string, unknown>;
// eslint-disable-next-line n/no-unsupported-features/node-builtins
const NativeES = g.EventSource as
| (new(url: string) => TypedEventSource)
| undefined;
const EventSourceImpl = NativeES
?? (typeof lynx !== 'undefined' && lynx?.EventSource);
if (!EventSourceImpl) {
reject(new Error('No EventSource implementation available.'));
return;
}

const eventSource = new EventSourceImpl(url);
let settled = false;
const queue: ServerToClientMessage[][] = [];

const cleanup = () => {
eventSource.close();
abort.signal.removeEventListener('abort', onAbort);
queue.length = 0;
};
const succeed = () => {
if (settled) return;
settled = true;
cleanup();
resolve();
};
const fail = (e: unknown) => {
if (settled) return;
settled = true;
cleanup();
reject(toError(e));
};
const onAbort = () => succeed();
abort.signal.addEventListener('abort', onAbort, { once: true });

let isCompleted = false;
let hasReceivedDelta = false;
let isProcessing = false;

const flush = async () => {
if (isProcessing) return;
isProcessing = true;
while (queue.length > 0 && !settled) {
const batch = queue.shift();
if (batch && batch.length > 0) {
for (const msg of batch) {
msg.messageId ??= messageId;
store.push(msg);
}
}
await new Promise((r) => setTimeout(r, MESSAGE_PROCESS_DELAY));
}
isProcessing = false;
};

const ingestPayload = (raw: unknown) => {
let payload = raw;
if (typeof payload === 'string') {
try {
payload = JSON.parse(payload);
} catch {
/* leave as string */
}
}
if (typeof payload === 'string') {
try {
payload = JSON.parse(payload);
} catch {
/* leave as string */
}
}
const messages = normalizePayloadToMessages(payload);
if (messages.length > 0) {
queue.push(messages);
void flush();
}
};

eventSource.addEventListener('delta', (event) => {
try {
ingestPayload(event.data);
hasReceivedDelta = true;
} catch (e) {
fail(e);
}
});

eventSource.addEventListener('complete', (event) => {
if (isCompleted) return;
isCompleted = true;
try {
if (!hasReceivedDelta) ingestPayload(event.data);
} catch (e) {
fail(e);
return;
}
succeed();
});

eventSource.addEventListener('error', (event) => {
fail(new Error(`SSE error: ${JSON.stringify(event)}`));
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.
});

return {
send,
async onAction(action) {
await send({ userAction: action });
},
stop() {
abort.abort();
},
};
}
Loading
Loading