Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion packages/providers/src/community/pi/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import type { ProviderCapabilities } from '../../types';
* envInjection covers both auth-key passthrough (setRuntimeApiKey for mapped
* provider env vars) and bash tool subprocess env (BashSpawnHook merges the
* caller's env over Pi's inherited baseline), matching Claude/Codex semantics.
*
* structuredOutput is best-effort (not SDK-enforced like Claude/Codex): the
* provider appends a "JSON only" instruction + the schema to the prompt and
* the event bridge parses the final assistant transcript on agent_end.
* Reliable on instruction-following models (GPT-5, Claude, Gemini 2.x,
* recent Qwen Coder, DeepSeek V3); parse failures degrade via the
* dag-executor's existing dag.structured_output_missing path.
*/
export const PI_CAPABILITIES: ProviderCapabilities = {
sessionResume: true,
Expand All @@ -16,7 +23,7 @@ export const PI_CAPABILITIES: ProviderCapabilities = {
skills: true,
agents: false,
toolRestrictions: true,
structuredOutput: false,
structuredOutput: true,
envInjection: true,
costControl: false,
effortControl: true,
Expand Down
54 changes: 54 additions & 0 deletions packages/providers/src/community/pi/event-bridge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
buildResultChunk,
mapPiEvent,
serializeToolResult,
tryParseStructuredOutput,
usageToTokens,
} from './event-bridge';

Expand Down Expand Up @@ -367,3 +368,56 @@ describe('mapPiEvent', () => {
).toEqual([]);
});
});

// ─── tryParseStructuredOutput ──────────────────────────────────────────────

describe('tryParseStructuredOutput', () => {
test('parses clean JSON object', () => {
expect(tryParseStructuredOutput('{"name":"alpha","count":3}')).toEqual({
name: 'alpha',
count: 3,
});
});

test('parses JSON with surrounding whitespace', () => {
expect(tryParseStructuredOutput(' \n{"ok":true}\n ')).toEqual({ ok: true });
});

test('strips ```json fences', () => {
const fenced = '```json\n{"area":"web","confidence":0.9}\n```';
expect(tryParseStructuredOutput(fenced)).toEqual({ area: 'web', confidence: 0.9 });
});

test('strips bare ``` fences', () => {
expect(tryParseStructuredOutput('```\n{"ok":1}\n```')).toEqual({ ok: 1 });
});

test('parses JSON arrays', () => {
expect(tryParseStructuredOutput('[1,2,3]')).toEqual([1, 2, 3]);
});

test('returns undefined on empty string', () => {
expect(tryParseStructuredOutput('')).toBeUndefined();
expect(tryParseStructuredOutput(' ')).toBeUndefined();
});

test('returns undefined when model wraps JSON in prose', () => {
// Realistic failure mode — model ignores "JSON only" instruction and adds
// explanatory text before/after. Caller degrades via the executor's
// missing-structured-output warning path.
const prose =
'Here is the JSON you requested:\n{"ok":true}\nLet me know if you need anything else.';
expect(tryParseStructuredOutput(prose)).toBeUndefined();
});

test('returns undefined on malformed JSON', () => {
expect(tryParseStructuredOutput('{not valid}')).toBeUndefined();
expect(tryParseStructuredOutput('{"unclosed":')).toBeUndefined();
});

test('preserves backticks inside JSON string values', () => {
// Fence stripper matches only at start/end; inner backticks must survive.
const withBackticks = '{"code":"run `npm test`"}';
expect(tryParseStructuredOutput(withBackticks)).toEqual({ code: 'run `npm test`' });
});
});
62 changes: 59 additions & 3 deletions packages/providers/src/community/pi/event-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,33 @@ export function buildResultChunk(messages: readonly unknown[]): MessageChunk {
return chunk;
}

/**
* Attempt to parse a Pi assistant transcript as the structured-output JSON
* requested via `outputFormat`. Handles two common model failure modes:
* - trailing/leading whitespace (always stripped)
* - markdown code fences (```json ... ``` or bare ``` ... ```) that models
* emit despite the "no code fences" instruction in the prompt
*
* Returns the parsed value on success, `undefined` on any failure. Callers
* treat `undefined` as "structured output unavailable" and degrade via the
* dag-executor's existing missing-structured-output warning.
*/
export function tryParseStructuredOutput(text: string): unknown {
const trimmed = text.trim();
if (trimmed.length === 0) return undefined;
// Strip ```json / ``` fences if present. Match only at boundaries so we
// don't mangle JSON strings that legitimately contain backticks.
const cleaned = trimmed
.replace(/^```(?:json)?\s*\n?/i, '')
.replace(/\n?\s*```\s*$/, '')
.trim();
try {
return JSON.parse(cleaned);
} catch {
return undefined;
}
}

/**
* Pure mapper from Pi's `AgentSessionEvent` → zero-or-more Archon `MessageChunk`s.
*
Expand Down Expand Up @@ -243,13 +270,22 @@ export type BridgeQueueItem =
export async function* bridgeSession(
session: AgentSession,
prompt: string,
abortSignal?: AbortSignal
abortSignal?: AbortSignal,
jsonSchema?: Record<string, unknown>
): AsyncGenerator<MessageChunk> {
const queue = new AsyncQueue<BridgeQueueItem>();
// Best-effort structured-output buffer. Only accumulates when the caller
// requested a JSON schema; otherwise stays empty and the terminal chunk
// passes through untouched.
const wantsStructured = jsonSchema !== undefined;
Comment on lines +273 to +280
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate parsed JSON against jsonSchema before attaching it.

Line 341 only parses JSON, and Line 343 treats any valid JSON as structured output. If the schema requires { area: string } but the model returns {"ok":true}, the executor will suppress dag.structured_output_missing and downstream field refs can silently degrade. Treat schema mismatches like parse failures.

Also applies to: 340-343

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/providers/src/community/pi/event-bridge.ts` around lines 273 - 280,
When jsonSchema is provided (wantsStructured) validate the parsed JSON against
that schema before attaching it as structured output: after parsing the chunk
JSON (the code around the parsing at lines ~340-343) run a schema validation
step against jsonSchema and if validation fails treat it like a parse failure—do
not set/attach the structured output field on the MessageChunk and follow the
same error/suppression flow as parse errors; use the existing variables
(wantsStructured, jsonSchema, MessageChunk, BridgeQueueItem) and existing error
handling path so schema mismatches are not silently accepted.

let assistantBuffer = '';

const unsubscribe = session.subscribe((event: AgentSessionEvent) => {
try {
for (const chunk of mapPiEvent(event)) {
if (wantsStructured && chunk.type === 'assistant') {
assistantBuffer += chunk.content;
}
queue.push({ kind: 'chunk', chunk });
}
} catch (err) {
Expand Down Expand Up @@ -291,8 +327,28 @@ export async function* bridgeSession(
// Pi's session.sessionId is always a UUID (even for in-memory); we emit
// it unconditionally and let the caller decide whether resume is
// meaningful (capability-gated at the registry level).
if (item.chunk.type === 'result' && session.sessionId) {
yield { ...item.chunk, sessionId: session.sessionId };
if (item.chunk.type === 'result') {
let terminal: MessageChunk = item.chunk;
if (session.sessionId) {
terminal = { ...terminal, sessionId: session.sessionId };
}
// Best-effort structured output: parse the accumulated assistant
// transcript as JSON and attach. On parse failure, leave it off —
// the dag-executor's existing dag.structured_output_missing path
// warns and downstream $node.output.field refs degrade to '' instead
// of propagating bogus data.
if (wantsStructured) {
const parsed = tryParseStructuredOutput(assistantBuffer);
if (parsed !== undefined) {
terminal = { ...terminal, structuredOutput: parsed };
} else {
getLog().warn(
{ bufferLength: assistantBuffer.length },
'pi.event-bridge.structured_output_parse_failed'
);
}
}
yield terminal;
} else {
yield item.chunk;
}
Expand Down
139 changes: 138 additions & 1 deletion packages/providers/src/community/pi/provider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -902,10 +902,11 @@ describe('PiProvider', () => {
expect(caps.skills).toBe(true);
expect(caps.sessionResume).toBe(true);
expect(caps.envInjection).toBe(true);
// Best-effort structured output via prompt engineering (not SDK-enforced).
expect(caps.structuredOutput).toBe(true);
// Still false:
expect(caps.mcp).toBe(false);
expect(caps.hooks).toBe(false);
expect(caps.structuredOutput).toBe(false);
});

test('nodeConfig.skills with unknown name yields system warning, does not abort', async () => {
Expand Down Expand Up @@ -1047,4 +1048,140 @@ describe('PiProvider', () => {
);
expect(systemChunks.some(c => c.content.includes('sonnet-5 not available'))).toBe(true);
});

// ─── structured output (best-effort JSON via prompt engineering) ──────

// Script an assistant text_delta followed by agent_end so the bridge has
// buffered content to parse when outputFormat is set.
function scriptedAssistantThenEnd(text: string): FakeEvent[] {
return [
{
type: 'message_update',
message: { role: 'assistant' } as never,
assistantMessageEvent: {
type: 'text_delta',
contentIndex: 0,
delta: text,
partial: { role: 'assistant' } as never,
},
},
...scriptedAgentEnd(),
];
}

test('outputFormat: schema is appended to prompt as JSON instruction', async () => {
process.env.GEMINI_API_KEY = 'sk-test';
resetScript(scriptedAgentEnd());

await consume(
new PiProvider().sendQuery('Summarize this bug.', '/tmp', undefined, {
model: 'google/gemini-2.5-pro',
outputFormat: {
type: 'json_schema',
schema: { type: 'object', properties: { area: { type: 'string' } } },
},
})
);

// Prompt should now contain the original instruction + the schema hint.
expect(mockPrompt).toHaveBeenCalled();
const [sentPrompt] = mockPrompt.mock.calls[0] as [string];
expect(sentPrompt).toContain('Summarize this bug.');
expect(sentPrompt).toContain('Respond with ONLY a JSON object');
expect(sentPrompt).toContain('"area"');
});

test('outputFormat: absent → prompt passed through unchanged', async () => {
process.env.GEMINI_API_KEY = 'sk-test';
resetScript(scriptedAgentEnd());

await consume(
new PiProvider().sendQuery('do a thing', '/tmp', undefined, {
model: 'google/gemini-2.5-pro',
})
);

const [sentPrompt] = mockPrompt.mock.calls[0] as [string];
expect(sentPrompt).toBe('do a thing');
expect(sentPrompt).not.toContain('JSON');
});

test('outputFormat: result chunk carries parsed structuredOutput on clean JSON', async () => {
process.env.GEMINI_API_KEY = 'sk-test';
resetScript(scriptedAssistantThenEnd('{"area":"web","confidence":0.9}'));

const { chunks } = await consume(
new PiProvider().sendQuery('classify', '/tmp', undefined, {
model: 'google/gemini-2.5-pro',
outputFormat: {
type: 'json_schema',
schema: { type: 'object' },
},
})
);

const result = chunks.find(
(c): c is { type: 'result'; structuredOutput?: unknown } =>
typeof c === 'object' && c !== null && (c as { type?: string }).type === 'result'
);
expect(result).toBeDefined();
expect(result?.structuredOutput).toEqual({ area: 'web', confidence: 0.9 });
});

test('outputFormat: fenced JSON (```json ... ```) still parses', async () => {
process.env.GEMINI_API_KEY = 'sk-test';
resetScript(scriptedAssistantThenEnd('```json\n{"ok":true}\n```'));

const { chunks } = await consume(
new PiProvider().sendQuery('x', '/tmp', undefined, {
model: 'google/gemini-2.5-pro',
outputFormat: { type: 'json_schema', schema: {} },
})
);

const result = chunks.find(
(c): c is { type: 'result'; structuredOutput?: unknown } =>
typeof c === 'object' && c !== null && (c as { type?: string }).type === 'result'
);
expect(result?.structuredOutput).toEqual({ ok: true });
});

test('outputFormat: prose-wrapped JSON → no structuredOutput, degrades cleanly', async () => {
process.env.GEMINI_API_KEY = 'sk-test';
resetScript(scriptedAssistantThenEnd('Here is the JSON:\n{"ok":true}\nHope this helps!'));

const { chunks, error } = await consume(
new PiProvider().sendQuery('x', '/tmp', undefined, {
model: 'google/gemini-2.5-pro',
outputFormat: { type: 'json_schema', schema: {} },
})
);

// No crash — downstream degradation is the executor's job via its
// existing dag.structured_output_missing warning path.
expect(error).toBeUndefined();
const result = chunks.find(
(c): c is { type: 'result'; structuredOutput?: unknown } =>
typeof c === 'object' && c !== null && (c as { type?: string }).type === 'result'
);
expect(result).toBeDefined();
expect(result?.structuredOutput).toBeUndefined();
});

test('no outputFormat → structuredOutput never set even if assistant emits JSON', async () => {
process.env.GEMINI_API_KEY = 'sk-test';
resetScript(scriptedAssistantThenEnd('{"accidental":"json"}'));

const { chunks } = await consume(
new PiProvider().sendQuery('x', '/tmp', undefined, {
model: 'google/gemini-2.5-pro',
})
);

const result = chunks.find(
(c): c is { type: 'result'; structuredOutput?: unknown } =>
typeof c === 'object' && c !== null && (c as { type?: string }).type === 'result'
);
expect(result?.structuredOutput).toBeUndefined();
});
});
44 changes: 42 additions & 2 deletions packages/providers/src/community/pi/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,30 @@ function lookupPiModel(provider: string, modelId: string): Model<Api> | undefine
);
}

/**
* Append a "respond with JSON matching this schema" instruction to the user
* prompt so Pi-backed models produce parseable structured output. Pi's SDK
* has no JSON-mode equivalent to Claude's outputFormat or Codex's
* outputSchema, so this is a best-effort fallback: the event bridge parses
* the assistant transcript on agent_end. Models that reliably follow
* instruction (GPT-5, Claude, Gemini 2.x, recent Qwen Coder, DeepSeek V3)
* return clean JSON; models that don't produce a parse failure, which the
* executor surfaces via the existing dag.structured_output_missing warning.
*/
export function augmentPromptForJsonSchema(
prompt: string,
schema: Record<string, unknown>
): string {
return `${prompt}

---

CRITICAL: Respond with ONLY a JSON object matching the schema below. No prose before or after the JSON. No markdown code fences. Just the raw JSON object as your final message.

Schema:
${JSON.stringify(schema, null, 2)}`;
Comment on lines +77 to +88
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid forcing object-shaped output for every schema.

Line 85 asks for a JSON object, but JSON Schema can describe arrays/scalars and tryParseStructuredOutput already accepts arrays. This can steer Pi models away from valid non-object schemas; use schema-root-neutral wording.

Suggested prompt wording
-CRITICAL: Respond with ONLY a JSON object matching the schema below. No prose before or after the JSON. No markdown code fences. Just the raw JSON object as your final message.
+CRITICAL: Respond with ONLY valid JSON matching the schema below. No prose before or after the JSON. No markdown code fences. Just the raw JSON as your final message.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/providers/src/community/pi/provider.ts` around lines 77 - 88, The
prompt in augmentPromptForJsonSchema forces an "object" response which conflicts
with schemas that may be arrays or scalars and with tryParseStructuredOutput
which accepts non-objects; update the wording to be schema-root-neutral (e.g.,
say "Respond with ONLY a JSON value that matches the schema below" or similar),
keep the constraints "No prose before or after... No markdown code fences", and
ensure the function augmentPromptForJsonSchema returns the revised instruction
string so models can return arrays/scalars as valid JSON matching the provided
schema.

}

/**
* Pi community provider — wraps `@mariozechner/pi-coding-agent`'s full
* coding-agent harness. Each `sendQuery()` call creates a fresh session
Expand Down Expand Up @@ -257,10 +281,26 @@ export class PiProvider implements IAgentProvider {
yield { type: 'system', content: `⚠️ ${modelFallbackMessage}` };
}

// 5. Bridge callback-based events to the async generator contract.
// 5. Structured output (best-effort). Pi has no SDK-level JSON schema
// mode the way Claude and Codex do, so we implement it via prompt
// engineering: append the schema + "JSON only, no fences" instruction,
// and have the bridge parse the accumulated assistant text on
// agent_end. Parse failures degrade gracefully — the executor's
// existing dag.structured_output_missing warning path handles them.
const outputFormat = requestOptions?.outputFormat;
const effectivePrompt = outputFormat
? augmentPromptForJsonSchema(prompt, outputFormat.schema)
: prompt;

// 6. Bridge callback-based events to the async generator contract.
// bridgeSession owns dispose() and abort wiring.
try {
yield* bridgeSession(session, prompt, requestOptions?.abortSignal);
yield* bridgeSession(
session,
effectivePrompt,
requestOptions?.abortSignal,
outputFormat?.schema
);
getLog().info({ piProvider: parsed.provider }, 'pi.prompt_completed');
} catch (err) {
getLog().error({ err, piProvider: parsed.provider }, 'pi.prompt_failed');
Expand Down
Loading
Loading