From 919b2ae3743346a2085ad3352062c5ea0bf70b09 Mon Sep 17 00:00:00 2001 From: Luis Erlacher Date: Sun, 12 Apr 2026 23:03:36 -0300 Subject: [PATCH 1/2] feat: add quality gate engine and node state store - Quality gate engine independently verifies agent claims (test results, type checks, lint) by running commands and parsing output - Node state store persists validated node state to DB, replacing event-replay resume with explicit validated state - Gates integrated into DAG executor: P0/P1 block progression, P2 warns - Loop nodes get real feedback (test names, error counts) on gate failure - Test output parsers for vitest/jest/bun test, tsc, and eslint - API endpoints for gate results and node state summaries - SSE events for gate lifecycle (started/passed/failed/blocked) Co-Authored-By: Claude Opus 4.6 (1M context) --- migrations/022_node_states.sql | 38 +++ packages/core/src/db/adapters/sqlite.ts | 35 +++ packages/core/src/db/node-states.ts | 190 +++++++++++++ .../core/src/workflows/store-adapter.test.ts | 25 +- packages/core/src/workflows/store-adapter.ts | 7 + .../src/adapters/web/workflow-bridge.ts | 44 +++ packages/server/src/routes/api.ts | 102 +++++++ .../server/src/routes/schemas/gate.schemas.ts | 90 ++++++ packages/workflows/package.json | 5 +- packages/workflows/src/dag-executor.test.ts | 22 +- packages/workflows/src/dag-executor.ts | 246 ++++++++++++++-- packages/workflows/src/event-emitter.ts | 45 ++- .../workflows/src/executor-preamble.test.ts | 6 + packages/workflows/src/executor.test.ts | 6 + packages/workflows/src/executor.ts | 10 +- packages/workflows/src/gates/engine.test.ts | 267 ++++++++++++++++++ packages/workflows/src/gates/engine.ts | 263 +++++++++++++++++ packages/workflows/src/gates/index.ts | 2 + packages/workflows/src/gates/parsers.test.ts | 192 +++++++++++++ packages/workflows/src/gates/parsers.ts | 178 ++++++++++++ packages/workflows/src/schemas/dag-node.ts | 29 +- packages/workflows/src/schemas/gate.ts | 100 +++++++ packages/workflows/src/schemas/index.ts | 21 ++ .../workflows/src/script-node-deps.test.ts | 6 + packages/workflows/src/store.ts | 73 ++++- 25 files changed, 1955 insertions(+), 47 deletions(-) create mode 100644 migrations/022_node_states.sql create mode 100644 packages/core/src/db/node-states.ts create mode 100644 packages/server/src/routes/schemas/gate.schemas.ts create mode 100644 packages/workflows/src/gates/engine.test.ts create mode 100644 packages/workflows/src/gates/engine.ts create mode 100644 packages/workflows/src/gates/index.ts create mode 100644 packages/workflows/src/gates/parsers.test.ts create mode 100644 packages/workflows/src/gates/parsers.ts create mode 100644 packages/workflows/src/schemas/gate.ts diff --git a/migrations/022_node_states.sql b/migrations/022_node_states.sql new file mode 100644 index 0000000000..81078b9827 --- /dev/null +++ b/migrations/022_node_states.sql @@ -0,0 +1,38 @@ +-- Node state store: persists validated node execution state +CREATE TABLE IF NOT EXISTS remote_agent_node_states ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + workflow_run_id UUID NOT NULL REFERENCES remote_agent_workflow_runs(id) ON DELETE CASCADE, + node_id VARCHAR(255) NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'pending', + output TEXT DEFAULT '', + output_validated BOOLEAN DEFAULT FALSE, + gate_results JSONB DEFAULT '[]', + attempt_count INTEGER DEFAULT 0, + started_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + completed_at TIMESTAMP WITH TIME ZONE, + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + UNIQUE(workflow_run_id, node_id) +); + +CREATE INDEX IF NOT EXISTS idx_node_states_run_id + ON remote_agent_node_states(workflow_run_id); +CREATE INDEX IF NOT EXISTS idx_node_states_status + ON remote_agent_node_states(status); + +-- Test result evidence: stores actual test run data per node +CREATE TABLE IF NOT EXISTS remote_agent_test_results ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + node_state_id UUID NOT NULL REFERENCES remote_agent_node_states(id) ON DELETE CASCADE, + suite_name VARCHAR(255) NOT NULL, + total INTEGER NOT NULL DEFAULT 0, + passed INTEGER NOT NULL DEFAULT 0, + failed INTEGER NOT NULL DEFAULT 0, + skipped INTEGER NOT NULL DEFAULT 0, + failures JSONB DEFAULT '[]', + stdout TEXT DEFAULT '', + exit_code INTEGER NOT NULL DEFAULT 0, + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_test_results_node_state + ON remote_agent_test_results(node_state_id); diff --git a/packages/core/src/db/adapters/sqlite.ts b/packages/core/src/db/adapters/sqlite.ts index 2864e4fc43..df395364cc 100644 --- a/packages/core/src/db/adapters/sqlite.ts +++ b/packages/core/src/db/adapters/sqlite.ts @@ -353,6 +353,37 @@ export class SqliteAdapter implements IDatabase { created_at TEXT DEFAULT (datetime('now')) ); + -- Node states table (validated node execution state) + CREATE TABLE IF NOT EXISTS remote_agent_node_states ( + id TEXT PRIMARY KEY, + workflow_run_id TEXT NOT NULL REFERENCES remote_agent_workflow_runs(id) ON DELETE CASCADE, + node_id TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + output TEXT DEFAULT '', + output_validated INTEGER DEFAULT 0, + gate_results TEXT DEFAULT '[]', + attempt_count INTEGER DEFAULT 0, + started_at TEXT DEFAULT (datetime('now')), + completed_at TEXT, + updated_at TEXT DEFAULT (datetime('now')), + UNIQUE(workflow_run_id, node_id) + ); + + -- Test results table (test run evidence per node) + CREATE TABLE IF NOT EXISTS remote_agent_test_results ( + id TEXT PRIMARY KEY, + node_state_id TEXT NOT NULL REFERENCES remote_agent_node_states(id) ON DELETE CASCADE, + suite_name TEXT NOT NULL, + total INTEGER NOT NULL DEFAULT 0, + passed INTEGER NOT NULL DEFAULT 0, + failed INTEGER NOT NULL DEFAULT 0, + skipped INTEGER NOT NULL DEFAULT 0, + failures TEXT DEFAULT '[]', + stdout TEXT DEFAULT '', + exit_code INTEGER NOT NULL DEFAULT 0, + created_at TEXT DEFAULT (datetime('now')) + ); + -- Messages table (conversation history for Web UI) CREATE TABLE IF NOT EXISTS remote_agent_messages ( id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))), @@ -383,6 +414,10 @@ export class SqliteAdapter implements IDatabase { CREATE INDEX IF NOT EXISTS idx_sessions_codebase ON remote_agent_sessions(codebase_id); CREATE INDEX IF NOT EXISTS idx_isolation_env_status ON remote_agent_isolation_environments(status); + CREATE INDEX IF NOT EXISTS idx_node_states_run_id ON remote_agent_node_states(workflow_run_id); + CREATE INDEX IF NOT EXISTS idx_node_states_status ON remote_agent_node_states(status); + CREATE INDEX IF NOT EXISTS idx_test_results_node_state ON remote_agent_test_results(node_state_id); + -- From PG migration 009: staleness detection for running workflows CREATE INDEX IF NOT EXISTS idx_workflow_runs_last_activity ON remote_agent_workflow_runs(last_activity_at) WHERE status = 'running'; diff --git a/packages/core/src/db/node-states.ts b/packages/core/src/db/node-states.ts new file mode 100644 index 0000000000..f204a1d1d0 --- /dev/null +++ b/packages/core/src/db/node-states.ts @@ -0,0 +1,190 @@ +/** + * Node state persistence — CRUD operations for the remote_agent_node_states + * and remote_agent_test_results tables. + */ +import type { NodeStateRow, TestResultRow } from '@archon/workflows/store'; +import type { GateResult } from '@archon/workflows/schemas/gate'; +import type { NodeState } from '@archon/workflows/schemas/workflow-run'; +import { pool, getDialect } from './connection'; +import { createLogger } from '@archon/paths'; + +let cachedLog: ReturnType | undefined; +function getLog(): ReturnType { + if (!cachedLog) cachedLog = createLogger('db.node-states'); + return cachedLog; +} + +/** + * Upsert a node state row. Fire-and-forget: catches all errors internally. + */ +export async function upsertNodeState(data: { + workflow_run_id: string; + node_id: string; + status: NodeState; + output?: string; + output_validated?: boolean; + gate_results?: GateResult[]; + attempt_count?: number; +}): Promise { + try { + const dialect = getDialect(); + const id = dialect.generateUuid(); + const gateResultsJson = JSON.stringify(data.gate_results ?? []); + const outputValidated = data.output_validated ?? false; + + await pool.query( + `INSERT INTO remote_agent_node_states + (id, workflow_run_id, node_id, status, output, output_validated, gate_results, attempt_count) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT(workflow_run_id, node_id) DO UPDATE SET + status = $4, + output = $5, + output_validated = $6, + gate_results = $7, + attempt_count = $8, + completed_at = CASE WHEN $4 IN ('completed', 'failed') THEN ${dialect.now()} ELSE remote_agent_node_states.completed_at END, + updated_at = ${dialect.now()}`, + [ + id, + data.workflow_run_id, + data.node_id, + data.status, + data.output ?? '', + outputValidated, + gateResultsJson, + data.attempt_count ?? 0, + ] + ); + } catch (error) { + getLog().error( + { err: error as Error, runId: data.workflow_run_id, nodeId: data.node_id }, + 'db.node_state_upsert_failed' + ); + // Fire-and-forget: never throw + } +} + +/** + * Get a single node state by composite key. + */ +export async function getNodeState( + workflowRunId: string, + nodeId: string +): Promise { + const result = await pool.query( + `SELECT * FROM remote_agent_node_states + WHERE workflow_run_id = $1 AND node_id = $2`, + [workflowRunId, nodeId] + ); + const row = result.rows[0]; + if (!row) return null; + return normalizeNodeStateRow(row); +} + +/** + * Get all node states for a workflow run, ordered by start time. + */ +export async function getNodeStates(workflowRunId: string): Promise { + const result = await pool.query( + `SELECT * FROM remote_agent_node_states + WHERE workflow_run_id = $1 + ORDER BY started_at ASC`, + [workflowRunId] + ); + return [...result.rows].map(normalizeNodeStateRow); +} + +/** + * Return a map of nodeId → output for all validated completed nodes. + * Preferred over getCompletedDagNodeOutputs for new runs. + */ +export async function getValidatedNodeOutputs(workflowRunId: string): Promise> { + const result = await pool.query<{ node_id: string; output: string }>( + `SELECT node_id, output FROM remote_agent_node_states + WHERE workflow_run_id = $1 AND status = 'completed' AND output_validated = $2`, + [workflowRunId, true] + ); + const outputs = new Map(); + for (const row of result.rows) { + outputs.set(row.node_id, row.output); + } + return outputs; +} + +/** + * Create a test result row. Fire-and-forget: catches all errors internally. + */ +export async function createTestResult(data: { + node_state_id: string; + suite_name: string; + total: number; + passed: number; + failed: number; + skipped: number; + failures?: { name: string; message: string }[]; + stdout: string; + exit_code: number; +}): Promise { + try { + const dialect = getDialect(); + const id = dialect.generateUuid(); + const failuresJson = JSON.stringify(data.failures ?? []); + + await pool.query( + `INSERT INTO remote_agent_test_results + (id, node_state_id, suite_name, total, passed, failed, skipped, failures, stdout, exit_code) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + id, + data.node_state_id, + data.suite_name, + data.total, + data.passed, + data.failed, + data.skipped, + failuresJson, + data.stdout, + data.exit_code, + ] + ); + } catch (error) { + getLog().error( + { err: error as Error, nodeStateId: data.node_state_id }, + 'db.test_result_create_failed' + ); + // Fire-and-forget: never throw + } +} + +/** + * Get all test results for a node state, ordered by creation time. + */ +export async function getTestResults(nodeStateId: string): Promise { + const result = await pool.query( + `SELECT * FROM remote_agent_test_results + WHERE node_state_id = $1 + ORDER BY created_at ASC`, + [nodeStateId] + ); + return [...result.rows].map(normalizeTestResultRow); +} + +// ─── Normalization helpers ────────────────────────────────────────────────── + +function normalizeNodeStateRow(row: NodeStateRow): NodeStateRow { + return { + ...row, + // SQLite stores booleans as INTEGER (0/1); coerce via intermediate unknown + output_validated: Boolean(row.output_validated as unknown), + // JSON fields may arrive as strings from SQLite + gate_results: + typeof row.gate_results === 'string' ? JSON.parse(row.gate_results) : row.gate_results, + }; +} + +function normalizeTestResultRow(row: TestResultRow): TestResultRow { + return { + ...row, + failures: typeof row.failures === 'string' ? JSON.parse(row.failures) : row.failures, + }; +} diff --git a/packages/core/src/workflows/store-adapter.test.ts b/packages/core/src/workflows/store-adapter.test.ts index 0501a88000..f0f59dc166 100644 --- a/packages/core/src/workflows/store-adapter.test.ts +++ b/packages/core/src/workflows/store-adapter.test.ts @@ -44,8 +44,23 @@ mock.module('../db/codebases', () => ({ getCodebase: mockGetCodebase, })); -mock.module('../clients/factory', () => ({ - getAssistantClient: mock(() => ({})), +const mockUpsertNodeState = mock(() => Promise.resolve()); +const mockGetNodeState = mock(() => Promise.resolve(null)); +const mockGetNodeStates = mock(() => Promise.resolve([])); +const mockGetValidatedNodeOutputs = mock(() => Promise.resolve(new Map())); +const mockCreateTestResult = mock(() => Promise.resolve()); +const mockGetTestResults = mock(() => Promise.resolve([])); +mock.module('../db/node-states', () => ({ + upsertNodeState: mockUpsertNodeState, + getNodeState: mockGetNodeState, + getNodeStates: mockGetNodeStates, + getValidatedNodeOutputs: mockGetValidatedNodeOutputs, + createTestResult: mockCreateTestResult, + getTestResults: mockGetTestResults, +})); + +mock.module('../providers/factory', () => ({ + getAgentProvider: mock(() => ({})), })); mock.module('../config/config-loader', () => ({ @@ -73,6 +88,12 @@ describe('createWorkflowStore', () => { 'cancelWorkflowRun', 'createWorkflowEvent', 'getCompletedDagNodeOutputs', + 'upsertNodeState', + 'getNodeState', + 'getNodeStates', + 'getValidatedNodeOutputs', + 'createTestResult', + 'getTestResults', 'getCodebase', 'getCodebaseEnvVars', ]; diff --git a/packages/core/src/workflows/store-adapter.ts b/packages/core/src/workflows/store-adapter.ts index 0bf8683fb8..962d749851 100644 --- a/packages/core/src/workflows/store-adapter.ts +++ b/packages/core/src/workflows/store-adapter.ts @@ -8,6 +8,7 @@ import type { WorkflowRunStatus } from '@archon/workflows/schemas/workflow-run'; import type { MergedConfig } from '../config/config-types'; import * as workflowDb from '../db/workflows'; import * as workflowEventDb from '../db/workflow-events'; +import * as nodeStateDb from '../db/node-states'; import * as codebaseDb from '../db/codebases'; import * as envVarDb from '../db/env-vars'; import { getAssistantClient } from '../clients/factory'; @@ -57,6 +58,12 @@ export function createWorkflowStore(): IWorkflowStore { } }, getCompletedDagNodeOutputs: workflowEventDb.getCompletedDagNodeOutputs, + upsertNodeState: nodeStateDb.upsertNodeState, + getNodeState: nodeStateDb.getNodeState, + getNodeStates: nodeStateDb.getNodeStates, + getValidatedNodeOutputs: nodeStateDb.getValidatedNodeOutputs, + createTestResult: nodeStateDb.createTestResult, + getTestResults: nodeStateDb.getTestResults, getCodebase: codebaseDb.getCodebase, getCodebaseEnvVars: envVarDb.getCodebaseEnvVars, }; diff --git a/packages/server/src/adapters/web/workflow-bridge.ts b/packages/server/src/adapters/web/workflow-bridge.ts index 78f7465a9f..7daf9bdc97 100644 --- a/packages/server/src/adapters/web/workflow-bridge.ts +++ b/packages/server/src/adapters/web/workflow-bridge.ts @@ -149,6 +149,50 @@ export function mapWorkflowEvent(event: WorkflowEmitterEvent): string | null { timestamp: Date.now(), }); + case 'gate_started': + return JSON.stringify({ + type: 'gate_started', + runId: event.runId, + nodeId: event.nodeId, + gateName: event.gateName, + gateType: event.gateType, + severity: event.severity, + timestamp: Date.now(), + }); + + case 'gate_passed': + return JSON.stringify({ + type: 'gate_passed', + runId: event.runId, + nodeId: event.nodeId, + gateName: event.gateName, + gateType: event.gateType, + evidence: event.evidence, + timestamp: Date.now(), + }); + + case 'gate_failed': + return JSON.stringify({ + type: 'gate_failed', + runId: event.runId, + nodeId: event.nodeId, + gateName: event.gateName, + gateType: event.gateType, + severity: event.severity, + evidence: event.evidence, + timestamp: Date.now(), + }); + + case 'gate_blocked': + return JSON.stringify({ + type: 'gate_blocked', + runId: event.runId, + nodeId: event.nodeId, + gateName: event.gateName, + message: event.message, + timestamp: Date.now(), + }); + default: { const exhaustiveCheck: never = event; getLog().warn( diff --git a/packages/server/src/routes/api.ts b/packages/server/src/routes/api.ts index cfade2c012..53ddf5aea3 100644 --- a/packages/server/src/routes/api.ts +++ b/packages/server/src/routes/api.ts @@ -68,6 +68,7 @@ import * as isolationEnvDb from '@archon/core/db/isolation-environments'; import * as workflowDb from '@archon/core/db/workflows'; import * as workflowEventDb from '@archon/core/db/workflow-events'; import * as messageDb from '@archon/core/db/messages'; +import * as nodeStateDb from '@archon/core/db/node-states'; import { errorSchema } from './schemas/common.schemas'; import { updateCheckResponseSchema } from './schemas/system.schemas'; import { @@ -90,6 +91,11 @@ import { approveWorkflowRunBodySchema, rejectWorkflowRunBodySchema, } from './schemas/workflow.schemas'; +import { + runSummaryResponseSchema, + storeGateResultBodySchema, + storeGateResultResponseSchema, +} from './schemas/gate.schemas'; import { conversationListResponseSchema, listConversationsQuerySchema, @@ -851,6 +857,48 @@ const getUpdateCheckRoute = createRoute({ }, }); +// ========================================================================= +// Gate & node state route definitions +// ========================================================================= + +const getRunSummaryRoute = createRoute({ + method: 'get', + path: '/api/workflows/runs/{runId}/summary', + tags: ['Workflows'], + summary: 'Get run summary with node states, gate results, and test evidence', + request: { params: z.object({ runId: z.string() }) }, + responses: { + 200: { + content: { 'application/json': { schema: runSummaryResponseSchema } }, + description: 'Run summary', + }, + 404: jsonError('Not found'), + 500: jsonError('Server error'), + }, +}); + +const storeGateResultRoute = createRoute({ + method: 'post', + path: '/api/workflows/runs/{runId}/nodes/{nodeId}/gate-result', + tags: ['Workflows'], + summary: 'Store a gate execution result for a node', + request: { + params: z.object({ runId: z.string(), nodeId: z.string() }), + body: { + content: { 'application/json': { schema: storeGateResultBodySchema } }, + required: true, + }, + }, + responses: { + 200: { + content: { 'application/json': { schema: storeGateResultResponseSchema } }, + description: 'Gate result stored', + }, + 404: jsonError('Not found'), + 500: jsonError('Server error'), + }, +}); + /** * Register all /api/* routes on the Hono app. */ @@ -2153,6 +2201,60 @@ export function registerApiRoutes( } }); + // GET /api/workflows/runs/:runId/summary - Run summary with node states + gate results + registerOpenApiRoute(getRunSummaryRoute, async c => { + try { + const runId = c.req.param('runId') ?? ''; + const run = await workflowDb.getWorkflowRun(runId); + if (!run) { + return apiError(c, 404, 'Workflow run not found'); + } + const nodeStates = await nodeStateDb.getNodeStates(runId); + const nodes = await Promise.all( + nodeStates.map(async ns => ({ + nodeState: ns, + testResults: await nodeStateDb.getTestResults(ns.id), + })) + ); + return c.json({ runId, nodes }); + } catch (error) { + getLog().error({ err: error }, 'get_run_summary_failed'); + return apiError(c, 500, 'Failed to get run summary'); + } + }); + + // POST /api/workflows/runs/:runId/nodes/:nodeId/gate-result - Store gate result + registerOpenApiRoute(storeGateResultRoute, async c => { + try { + const runId = c.req.param('runId') ?? ''; + const nodeId = c.req.param('nodeId') ?? ''; + const run = await workflowDb.getWorkflowRun(runId); + if (!run) { + return apiError(c, 404, 'Workflow run not found'); + } + const body = getValidatedBody(c, storeGateResultBodySchema); + + // Get or create node state, then append gate result. + // Cast body to GateResult: Zod validation applies .default([]) for failures, + // so the runtime value always has failures populated. The type mismatch is + // due to Zod input vs output types when using .default(). + const gateResult = body as import('@archon/workflows/schemas/gate').GateResult; + const nodeState = await nodeStateDb.getNodeState(runId, nodeId); + const existingResults = nodeState?.gate_results ?? []; + await nodeStateDb.upsertNodeState({ + workflow_run_id: runId, + node_id: nodeId, + status: nodeState?.status ?? 'running', + output: nodeState?.output, + gate_results: [...existingResults, gateResult], + }); + return c.json({ success: true }); + } catch (error) { + getLog().error({ err: error }, 'store_gate_result_failed'); + return apiError(c, 500, 'Failed to store gate result'); + } + }); + // POST /api/workflows/validate - Validate a workflow definition without saving // MUST be registered before GET /api/workflows/:name so "validate" is not treated as :name registerOpenApiRoute(validateWorkflowRoute, async c => { diff --git a/packages/server/src/routes/schemas/gate.schemas.ts b/packages/server/src/routes/schemas/gate.schemas.ts new file mode 100644 index 0000000000..b31d6ebf27 --- /dev/null +++ b/packages/server/src/routes/schemas/gate.schemas.ts @@ -0,0 +1,90 @@ +/** + * Zod schemas for gate-related API endpoints. + */ +import { z } from '@hono/zod-openapi'; +import { gateResultSchema } from '@archon/workflows/schemas/gate'; + +/** A single gate result with OpenAPI metadata. */ +export const gateResultResponseSchema = gateResultSchema.openapi('GateResult'); + +/** A node state row returned by the API. */ +export const nodeStateResponseSchema = z + .object({ + id: z.string(), + workflow_run_id: z.string(), + node_id: z.string(), + status: z.string(), + output: z.string(), + output_validated: z.boolean(), + gate_results: z.array(gateResultResponseSchema), + attempt_count: z.number(), + started_at: z.string(), + completed_at: z.string().nullable(), + updated_at: z.string(), + }) + .openapi('NodeState'); + +/** Test result evidence for a single node. */ +export const testResultResponseSchema = z + .object({ + id: z.string(), + node_state_id: z.string(), + suite_name: z.string(), + total: z.number(), + passed: z.number(), + failed: z.number(), + skipped: z.number(), + failures: z.array(z.object({ name: z.string(), message: z.string() })), + stdout: z.string(), + exit_code: z.number(), + created_at: z.string(), + }) + .openapi('TestResult'); + +/** Per-node summary with gate results and test evidence. */ +export const nodeSummarySchema = z + .object({ + nodeState: nodeStateResponseSchema, + testResults: z.array(testResultResponseSchema), + }) + .openapi('NodeSummary'); + +/** GET /api/workflows/runs/:runId/summary response. */ +export const runSummaryResponseSchema = z + .object({ + runId: z.string(), + nodes: z.array(nodeSummarySchema), + }) + .openapi('RunSummary'); + +/** POST /api/workflows/runs/:runId/nodes/:nodeId/gate-result request body. + * Uses a standalone schema (not derived from gateResultSchema) to avoid + * Zod input/output type mismatches from .default() fields. */ +export const storeGateResultBodySchema = z + .object({ + gateId: z.string(), + gateName: z.string(), + gateType: z.enum(['test-suite', 'typecheck', 'lint', 'custom']), + severity: z.enum(['p0', 'p1', 'p2']), + passed: z.boolean(), + evidence: z.object({ + stdout: z.string(), + exitCode: z.number().int(), + parsedResults: z + .object({ + total: z.number(), + passed: z.number(), + failed: z.number(), + skipped: z.number(), + failures: z.array(z.object({ name: z.string(), message: z.string() })).default([]), + }) + .optional(), + }), + error: z.string().optional(), + }) + .openapi('StoreGateResultBody'); + +/** POST /api/workflows/runs/:runId/nodes/:nodeId/gate-result response. */ +export const storeGateResultResponseSchema = z + .object({ success: z.boolean() }) + .openapi('StoreGateResultResponse'); diff --git a/packages/workflows/package.json b/packages/workflows/package.json index 0b6f7e38ff..53b649f6b7 100644 --- a/packages/workflows/package.json +++ b/packages/workflows/package.json @@ -16,10 +16,11 @@ "./defaults": "./src/defaults/bundled-defaults.ts", "./validator": "./src/validator.ts", "./utils/tool-formatter": "./src/utils/tool-formatter.ts", - "./test-utils": "./src/test-utils.ts" + "./test-utils": "./src/test-utils.ts", + "./gates": "./src/gates/index.ts" }, "scripts": { - "test": "bun test src/dag-executor.test.ts && bun test src/loader.test.ts && bun test src/logger.test.ts && bun test src/condition-evaluator.test.ts && bun test src/event-emitter.test.ts && bun test src/executor-shared.test.ts && bun test src/executor.test.ts && bun test src/executor-preamble.test.ts && bun test src/defaults/ src/model-validation.test.ts src/router.test.ts src/utils/ src/hooks.test.ts && bun test src/validation-parser.test.ts src/schemas.test.ts src/command-validation.test.ts && bun test src/validator.test.ts && bun test src/script-discovery.test.ts && bun test src/runtime-check.test.ts && bun test src/script-node-deps.test.ts", + "test": "bun test src/dag-executor.test.ts && bun test src/loader.test.ts && bun test src/logger.test.ts && bun test src/condition-evaluator.test.ts && bun test src/event-emitter.test.ts && bun test src/executor-shared.test.ts && bun test src/executor.test.ts && bun test src/executor-preamble.test.ts && bun test src/defaults/ src/model-validation.test.ts src/router.test.ts src/utils/ src/hooks.test.ts && bun test src/validation-parser.test.ts src/schemas.test.ts src/command-validation.test.ts && bun test src/validator.test.ts && bun test src/script-discovery.test.ts && bun test src/runtime-check.test.ts && bun test src/script-node-deps.test.ts && bun test src/gates/parsers.test.ts && bun test src/gates/engine.test.ts", "type-check": "bun x tsc --noEmit" }, "dependencies": { diff --git a/packages/workflows/src/dag-executor.test.ts b/packages/workflows/src/dag-executor.test.ts index 150ea4eeb7..52a97a00f4 100644 --- a/packages/workflows/src/dag-executor.test.ts +++ b/packages/workflows/src/dag-executor.test.ts @@ -88,6 +88,12 @@ function createMockStore(): IWorkflowStore { cancelWorkflowRun: mock(() => Promise.resolve()), createWorkflowEvent: mock(() => Promise.resolve()), getCompletedDagNodeOutputs: mock(() => Promise.resolve(new Map())), + upsertNodeState: mock(() => Promise.resolve()), + getNodeState: mock(() => Promise.resolve(null)), + getNodeStates: mock(() => Promise.resolve([])), + getValidatedNodeOutputs: mock(() => Promise.resolve(new Map())), + createTestResult: mock(() => Promise.resolve()), + getTestResults: mock(() => Promise.resolve([])), getCodebase: mock(() => Promise.resolve(null)), getCodebaseEnvVars: mock(() => Promise.resolve({})), }; @@ -635,15 +641,11 @@ describe('substituteNodeOutputRefs', () => { expect(substituteNodeOutputRefs('Result: $a.output', outputs)).toBe('Result: hello'); }); - it('unknown node ref resolves to empty string and logs a warning', () => { - mockLogFn.mockClear(); + it('unknown node ref throws in non-bash context', () => { const outputs = new Map(); - expect(substituteNodeOutputRefs('Result: $missing.output', outputs)).toBe('Result: '); - const warnCalls = mockLogFn.mock.calls.filter( - (call: unknown[]) => call[1] === 'dag_node_output_ref_unknown_node' + expect(() => substituteNodeOutputRefs('Result: $missing.output', outputs)).toThrow( + /Node 'missing' referenced in output substitution but not found/ ); - expect(warnCalls.length).toBe(1); - expect(warnCalls[0][0]).toEqual(expect.objectContaining({ nodeId: 'missing' })); }); it('dot notation extracts JSON field', () => { @@ -651,9 +653,11 @@ describe('substituteNodeOutputRefs', () => { expect(substituteNodeOutputRefs('Fix $a.output.type issue', outputs)).toBe('Fix BUG issue'); }); - it('dot notation on invalid JSON returns empty string', () => { + it('dot notation on invalid JSON throws in non-bash context', () => { const outputs = new Map([['a', makeOutput('completed', 'not-json')]]); - expect(substituteNodeOutputRefs('$a.output.field', outputs)).toBe(''); + expect(() => substituteNodeOutputRefs('$a.output.field', outputs)).toThrow( + /Node 'a' output is not valid JSON/ + ); }); }); diff --git a/packages/workflows/src/dag-executor.ts b/packages/workflows/src/dag-executor.ts index 5427c1974f..d78f5840f4 100644 --- a/packages/workflows/src/dag-executor.ts +++ b/packages/workflows/src/dag-executor.ts @@ -32,6 +32,7 @@ import type { EffortLevel, ThinkingConfig, SandboxSettings, + GateResult, } from './schemas'; import { isBashNode, @@ -46,6 +47,7 @@ import { createLogger } from '@archon/paths'; import { getWorkflowEventEmitter } from './event-emitter'; import { evaluateCondition } from './condition-evaluator'; import { isClaudeModel, isModelCompatible } from './model-validation'; +import { executeGates, formatGateFailureFeedback } from './gates'; import { logNodeStart, logNodeComplete, @@ -202,8 +204,14 @@ export function substituteNodeOutputRefs( (match, nodeId: string, field: string | undefined) => { const nodeOutput = nodeOutputs.get(nodeId); if (!nodeOutput) { - getLog().warn({ nodeId, match }, 'dag_node_output_ref_unknown_node'); - return escapedForBash ? "''" : ''; + if (escapedForBash) { + // Bash scripts handle empty strings; return safely to avoid breaking shell evaluation. + getLog().warn({ nodeId, match }, 'dag_node_output_ref_unknown_node'); + return "''"; + } + throw new Error( + `Node '${nodeId}' referenced in output substitution but not found in node outputs` + ); } if (!field) { return escapedForBash ? shellQuote(nodeOutput.output) : nodeOutput.output; @@ -218,11 +226,22 @@ export function substituteNodeOutputRefs( if (typeof value === 'number' || typeof value === 'boolean') return String(value); return escapedForBash ? "''" : ''; // objects, null, undefined, symbol, bigint → empty } catch (jsonErr) { - getLog().warn( - { nodeId, field, outputPreview: nodeOutput.output.slice(0, 100), err: jsonErr as Error }, - 'dag_node_output_ref_json_parse_failed' + if (escapedForBash) { + // Bash scripts handle empty strings; return safely to avoid breaking shell evaluation. + getLog().warn( + { + nodeId, + field, + outputPreview: nodeOutput.output.slice(0, 100), + err: jsonErr as Error, + }, + 'dag_node_output_ref_json_parse_failed' + ); + return "''"; + } + throw new Error( + `Node '${nodeId}' output is not valid JSON; cannot extract field '${field}'. Raw output: ${nodeOutput.output.slice(0, 200)}` ); - return escapedForBash ? "''" : ''; } } ); @@ -1071,35 +1090,34 @@ async function executeNodeInternal( } getLog().debug({ nodeId: node.id, streamingMode }, 'dag.structured_output_override'); } else if (provider === 'codex') { - // Codex returns structured output inline in agent_message text - // (already accumulated in nodeOutputText). Validate it is valid JSON - // so downstream $nodeId.output.field references can parse it. + // Codex returns structured output inline in agent_message text. + // Validate it is valid JSON — fail the node if not, since output_format + // implies downstream nodes depend on structured data. try { JSON.parse(nodeOutputText); getLog().debug({ nodeId: node.id }, 'dag.codex_structured_output_valid_json'); } catch { - getLog().warn( + getLog().error( { nodeId: node.id, outputPreview: nodeOutputText.slice(0, 200) }, 'dag.codex_structured_output_not_json' ); - await safeSendMessage( - platform, - conversationId, - `Warning: Node '${node.id}' requested output_format but Codex returned non-JSON output. Downstream conditions referencing \`$${node.id}.output.field\` may not evaluate correctly.`, - nodeContext - ); + const failMsg = `Node '${node.id}' requested output_format but Codex returned non-JSON output. Node failed.`; + await safeSendMessage(platform, conversationId, failMsg, nodeContext); + lastNodeCancelCheck.delete(`${workflowRun.id}:${node.id}`); + lastNodeActivityUpdate.delete(`${workflowRun.id}:${node.id}`); + return { state: 'failed', output: nodeOutputText, error: failMsg }; } } else { - getLog().warn( + // SDK did not return structured_output — output_format contract violated. + getLog().error( { nodeId: node.id, workflowRunId: workflowRun.id }, 'dag.structured_output_missing' ); - await safeSendMessage( - platform, - conversationId, - `Warning: Node '${node.id}' requested output_format but the SDK did not return structured output. Downstream conditions may not evaluate correctly.`, - nodeContext - ); + const failMsg = `Node '${node.id}' requested output_format but the SDK did not return structured output. Node failed.`; + await safeSendMessage(platform, conversationId, failMsg, nodeContext); + lastNodeCancelCheck.delete(`${workflowRun.id}:${node.id}`); + lastNodeActivityUpdate.delete(`${workflowRun.id}:${node.id}`); + return { state: 'failed', output: nodeOutputText, error: failMsg }; } } @@ -1745,6 +1763,9 @@ async function executeLoopNode( let loopTotalCostUsd: number | undefined; let loopFinalStopReason: string | undefined; let loopTotalNumTurns: number | undefined; + let loopGateFailureCount = 0; + let loopGateFeedback = ''; + let gateRejectedCompletion = false; const resolvedOptions = buildLoopNodeOptions(workflowProvider, workflowModel, config); // Helper to log event store errors consistently @@ -1754,6 +1775,7 @@ async function executeLoopNode( for (let i = startIteration; i <= loop.max_iterations; i++) { const iterationStart = Date.now(); + gateRejectedCompletion = false; // Check for non-running status between iterations (cancellation, deletion, or future: pause) const runStatus = await deps.store.getWorkflowRunStatus(workflowRun.id); @@ -1815,7 +1837,13 @@ async function executeLoopNode( issueContext, i === startIteration ? loopUserInput : '' ); - const finalPrompt = substituteNodeOutputRefs(substitutedPrompt, nodeOutputs); + let finalPrompt = substituteNodeOutputRefs(substitutedPrompt, nodeOutputs); + + // Inject gate failure feedback from a prior iteration (if gates rejected COMPLETE signal) + if (loopGateFeedback) { + finalPrompt += loopGateFeedback; + loopGateFeedback = ''; // consume once + } const iterationOptions: WorkflowAssistantOptions | undefined = { ...resolvedOptions, @@ -2058,13 +2086,76 @@ async function executeLoopNode( durationMs: duration, }); - // Completion signal detected — exit the loop. + // Completion signal detected — verify with gates (if configured) before accepting. // For interactive loops: only honor the signal when the AI had user input to evaluate // (i.e., this is a resume iteration with loopUserInput). On the first iteration of a // fresh interactive loop, the user hasn't seen anything yet — always gate first. // For non-interactive loops: the AI signals task completion at any point. const interactiveFirstRun = loop.interactive && !isLoopResume; if (completionDetected && !interactiveFirstRun) { + // Run quality gates if configured on this loop node + if (node.gates && node.gates.length > 0) { + try { + const gateOutcome = await executeGates(node.gates, cwd, workflowRun.id, node.id); + if (gateOutcome.blocked) { + // Gate verification failed — reject COMPLETE signal and continue looping. + // Inject real failure feedback into the next iteration. + const feedback = formatGateFailureFeedback(gateOutcome.results); + getLog().warn( + { + nodeId: node.id, + iteration: i, + blockedGates: gateOutcome.results.filter(r => !r.passed).length, + }, + 'loop_node.gate_rejected_completion' + ); + await safeSendMessage( + platform, + conversationId, + `Gate verification rejected COMPLETE signal for loop \`${node.id}\` (iteration ${String(i)}):\n${feedback}`, + msgContext + ); + // Track consecutive gate failures for escalation + loopGateFailureCount = (loopGateFailureCount ?? 0) + 1; + const maxGateRetries = Math.max(...node.gates.map(g => g.maxRetries ?? 3)); + if (loopGateFailureCount > maxGateRetries) { + // Escalate to human with gate evidence + const escalateMsg = + `Loop \`${node.id}\` failed gate verification ${String(loopGateFailureCount)} times. ` + + `Pausing for human review.\n\n${feedback}`; + await safeSendMessage(platform, conversationId, escalateMsg, msgContext); + await deps.store.pauseWorkflowRun(workflowRun.id, { + nodeId: node.id, + message: `Gate verification failed ${String(loopGateFailureCount)} times: ${feedback.slice(0, 500)}`, + type: 'interactive_loop', + iteration: i, + sessionId: currentSessionId, + }); + getWorkflowEventEmitter().emit({ + type: 'approval_pending', + runId: workflowRun.id, + nodeId: node.id, + message: `Gate escalation after ${String(loopGateFailureCount)} failures`, + }); + return { state: 'completed', output: lastIterationOutput, costUsd: loopTotalCostUsd }; + } + // Inject gate feedback as additional context for the next iteration + loopGateFeedback = `\n\nGATE VERIFICATION FAILED — Your COMPLETE signal was rejected. Fix these issues:\n${feedback}\n\nRe-run the failing checks and try again.`; + gateRejectedCompletion = true; + continue; + } + // Gates passed — reset failure counter + loopGateFailureCount = 0; + } catch (gateErr) { + getLog().error( + { err: gateErr as Error, nodeId: node.id }, + 'loop_node.gate_execution_error' + ); + // Gate execution error doesn't block — accept the COMPLETE signal + } + } + } + if (completionDetected && !interactiveFirstRun && !gateRejectedCompletion) { await safeSendMessage( platform, conversationId, @@ -2843,13 +2934,116 @@ export async function executeDagWorkflow( }) ); - // Process layer results — store all outputs, track failures + // Process layer results — store all outputs, run gates, persist node state, track failures let layerHadFailure = false; for (const result of layerResults) { if (result.status === 'fulfilled') { const { nodeId, output } = result.value; if (output.costUsd !== undefined) totalCostUsd += output.costUsd; + + // Find the node definition to check for gates + const nodeDef = layer.find(n => n.id === nodeId); + + // Run quality gates for completed AI nodes (command, prompt — loop handles its own gates) + let gateResults: GateResult[] | undefined; + if ( + output.state === 'completed' && + nodeDef && + 'gates' in nodeDef && + nodeDef.gates && + nodeDef.gates.length > 0 && + !isLoopNode(nodeDef) // loop nodes handle gates internally via Part D + ) { + try { + const gateOutcome = await executeGates(nodeDef.gates, cwd, workflowRun.id, nodeId); + gateResults = gateOutcome.results; + if (gateOutcome.blocked) { + // p0/p1 gate failed — downgrade the node to failed + const feedback = formatGateFailureFeedback(gateOutcome.results); + getLog().warn( + { + nodeId, + runId: workflowRun.id, + blockedGates: gateOutcome.results.filter(r => !r.passed).length, + }, + 'dag.node_gate_blocked' + ); + await safeSendMessage( + platform, + conversationId, + `Gate verification failed for node \`${nodeId}\`:\n${feedback}`, + { workflowId: workflowRun.id, nodeName: nodeId } + ); + // Override the output to failed + nodeOutputs.set(nodeId, { + state: 'failed', + output: output.output, + error: `Quality gate blocked: ${feedback.slice(0, 300)}`, + }); + layerHadFailure = true; + + // Persist node state as failed with gate results + deps.store + .upsertNodeState({ + workflow_run_id: workflowRun.id, + node_id: nodeId, + status: 'failed', + output: output.output, + output_validated: false, + gate_results: gateResults, + }) + .catch((err: Error) => { + getLog().error({ err, nodeId }, 'dag.node_state_upsert_failed'); + }); + continue; + } + } catch (gateErr) { + getLog().error({ err: gateErr as Error, nodeId }, 'dag.gate_execution_error'); + // Gate execution error doesn't block the node — treat as ungated + } + } + nodeOutputs.set(nodeId, output); + + // Persist node state (fire-and-forget) + if (output.state === 'completed') { + deps.store + .upsertNodeState({ + workflow_run_id: workflowRun.id, + node_id: nodeId, + status: 'completed', + output: output.output, + output_validated: true, + gate_results: gateResults, + }) + .catch((err: Error) => { + getLog().error({ err, nodeId }, 'dag.node_state_upsert_failed'); + }); + } else if (output.state === 'failed') { + deps.store + .upsertNodeState({ + workflow_run_id: workflowRun.id, + node_id: nodeId, + status: 'failed', + output: output.output, + output_validated: false, + gate_results: gateResults, + }) + .catch((err: Error) => { + getLog().error({ err, nodeId }, 'dag.node_state_upsert_failed'); + }); + } else if (output.state === 'skipped') { + deps.store + .upsertNodeState({ + workflow_run_id: workflowRun.id, + node_id: nodeId, + status: 'skipped', + }) + .catch((err: Error) => { + getLog().error({ err, nodeId }, 'dag.node_state_upsert_failed'); + }); + } + if (output.state === 'completed' && !isParallelLayer && output.sessionId !== undefined) { lastSequentialSessionId = output.sessionId; } diff --git a/packages/workflows/src/event-emitter.ts b/packages/workflows/src/event-emitter.ts index 700ba75a6d..7ab078ee47 100644 --- a/packages/workflows/src/event-emitter.ts +++ b/packages/workflows/src/event-emitter.ts @@ -142,6 +142,45 @@ interface WorkflowCancelledEvent { reason: string; } +interface GateStartedEvent { + type: 'gate_started'; + runId: string; + nodeId: string; + gateName: string; + gateType: string; + severity: string; +} + +interface GatePassedEvent { + type: 'gate_passed'; + runId: string; + nodeId: string; + gateName: string; + gateType: string; + evidence: { + exitCode: number; + parsedResults?: { total: number; passed: number; failed: number; skipped: number }; + }; +} + +interface GateFailedEvent { + type: 'gate_failed'; + runId: string; + nodeId: string; + gateName: string; + gateType: string; + severity: string; + evidence: { exitCode: number; stdout: string; parsedResults?: object }; +} + +interface GateBlockedEvent { + type: 'gate_blocked'; + runId: string; + nodeId: string; + gateName: string; + message: string; +} + export type WorkflowEmitterEvent = | WorkflowStartedEvent | WorkflowCompletedEvent @@ -157,7 +196,11 @@ export type WorkflowEmitterEvent = | ToolStartedEvent | ToolCompletedEvent | ApprovalPendingEvent - | WorkflowCancelledEvent; + | WorkflowCancelledEvent + | GateStartedEvent + | GatePassedEvent + | GateFailedEvent + | GateBlockedEvent; // --------------------------------------------------------------------------- // Emitter class diff --git a/packages/workflows/src/executor-preamble.test.ts b/packages/workflows/src/executor-preamble.test.ts index fd2b44ec3b..b2a80dc5f2 100644 --- a/packages/workflows/src/executor-preamble.test.ts +++ b/packages/workflows/src/executor-preamble.test.ts @@ -90,6 +90,12 @@ function makeStore(overrides: Partial = {}): IWorkflowStore { findResumableRun: mock(async () => null), getCompletedDagNodeOutputs: mock(async () => new Map()), resumeWorkflowRun: mock(async () => makeRun()), + upsertNodeState: mock(async () => {}), + getNodeState: mock(async () => null), + getNodeStates: mock(async () => []), + getValidatedNodeOutputs: mock(async () => new Map()), + createTestResult: mock(async () => {}), + getTestResults: mock(async () => []), getCodebase: mock(async () => null), getCodebaseEnvVars: mock(async () => ({})), ...overrides, diff --git a/packages/workflows/src/executor.test.ts b/packages/workflows/src/executor.test.ts index 0a91ac8299..125247c5b9 100644 --- a/packages/workflows/src/executor.test.ts +++ b/packages/workflows/src/executor.test.ts @@ -74,6 +74,12 @@ function makeStore(overrides: Partial = {}): IWorkflowStore { findResumableRun: mock(async () => null), getCompletedDagNodeOutputs: mock(async () => new Map()), resumeWorkflowRun: mock(async () => makeRun()), + upsertNodeState: mock(async () => {}), + getNodeState: mock(async () => null), + getNodeStates: mock(async () => []), + getValidatedNodeOutputs: mock(async () => new Map()), + createTestResult: mock(async () => {}), + getTestResults: mock(async () => []), getCodebase: mock(async () => null), getCodebaseEnvVars: mock(async () => ({})), ...overrides, diff --git a/packages/workflows/src/executor.ts b/packages/workflows/src/executor.ts index e87ea9065b..c2c28449f7 100644 --- a/packages/workflows/src/executor.ts +++ b/packages/workflows/src/executor.ts @@ -365,10 +365,15 @@ export async function executeWorkflow( // Step 2: Activate the resume — propagate as error if this fails if (resumableRun) { - // Load completed node outputs from the prior run's events. + // Load completed node outputs — prefer validated node states over event-replay. let priorNodes: Map; try { - priorNodes = await deps.store.getCompletedDagNodeOutputs(resumableRun.id); + // Try validated node states first (new runs persist validated state) + priorNodes = await deps.store.getValidatedNodeOutputs(resumableRun.id); + if (priorNodes.size === 0) { + // Fall back to event-replay for runs that predate node_states table + priorNodes = await deps.store.getCompletedDagNodeOutputs(resumableRun.id); + } } catch (error) { const err = error as Error; getLog().warn( @@ -381,7 +386,6 @@ export async function executeWorkflow( 'workflow.dag_resume_node_outputs_failed' ); // Intentional: fall back to empty map (fresh start) if prior node outputs can't be loaded. - // getCompletedDagNodeOutputs threw unexpectedly — safe to degrade rather than abort the run. priorNodes = new Map(); await safeSendMessage( platform, diff --git a/packages/workflows/src/gates/engine.test.ts b/packages/workflows/src/gates/engine.test.ts new file mode 100644 index 0000000000..96c4107f32 --- /dev/null +++ b/packages/workflows/src/gates/engine.test.ts @@ -0,0 +1,267 @@ +import { describe, expect, test, mock, beforeEach } from 'bun:test'; +import type { WorkflowEmitterEvent } from '../event-emitter'; + +// --------------------------------------------------------------------------- +// Mock logger FIRST (before all other mocks) +// --------------------------------------------------------------------------- + +const mockLogFn = mock(() => {}); +const mockLogger = { + info: mockLogFn, + warn: mockLogFn, + error: mockLogFn, + debug: mockLogFn, + trace: mockLogFn, + fatal: mockLogFn, + child: mock(() => mockLogger), +}; + +mock.module('@archon/paths', () => ({ + createLogger: mock(() => mockLogger), +})); + +// --------------------------------------------------------------------------- +// Mock @archon/git for execFileAsync +// --------------------------------------------------------------------------- + +const mockExecFileAsync = mock( + async ( + _cmd: string, + _args: string[], + _opts: object + ): Promise<{ stdout: string; stderr: string }> => ({ + stdout: '', + stderr: '', + }) +); + +mock.module('@archon/git', () => ({ + execFileAsync: mockExecFileAsync, +})); + +// --------------------------------------------------------------------------- +// Mock event emitter +// --------------------------------------------------------------------------- + +const emittedEvents: WorkflowEmitterEvent[] = []; +mock.module('../event-emitter', () => ({ + getWorkflowEventEmitter: () => ({ + emit: (event: WorkflowEmitterEvent) => { + emittedEvents.push(event); + }, + }), +})); + +// --------------------------------------------------------------------------- +// Import AFTER mocks +// --------------------------------------------------------------------------- + +import { executeGate, executeGates, formatGateFailureFeedback } from './engine'; +import type { QualityGateConfig, GateResult } from '../schemas/gate'; + +beforeEach(() => { + mockExecFileAsync.mockReset(); + mockLogFn.mockReset(); + emittedEvents.length = 0; +}); + +describe('executeGate', () => { + test('returns passed for exit 0 command', async () => { + mockExecFileAsync.mockResolvedValueOnce({ + stdout: 'Tests 5 passed (5)\n', + stderr: '', + }); + + const config: QualityGateConfig = { type: 'test-suite', severity: 'p1', maxRetries: 0 }; + const result = await executeGate(config, '/tmp/test'); + + expect(result.passed).toBe(true); + expect(result.evidence.exitCode).toBe(0); + expect(result.evidence.parsedResults?.total).toBe(5); + expect(result.evidence.parsedResults?.passed).toBe(5); + }); + + test('returns failed for non-zero exit', async () => { + const err = new Error('Command failed') as Error & { + stdout: string; + stderr: string; + status: number; + }; + err.stdout = 'Tests 1 passed | 2 failed (3)\n'; + err.stderr = ''; + err.status = 1; + mockExecFileAsync.mockRejectedValueOnce(err); + + const config: QualityGateConfig = { type: 'test-suite', severity: 'p0', maxRetries: 0 }; + const result = await executeGate(config, '/tmp/test'); + + expect(result.passed).toBe(false); + expect(result.evidence.exitCode).toBe(1); + expect(result.evidence.parsedResults?.failed).toBe(2); + expect(result.error).toContain('exit code 1'); + }); + + test('handles timeout', async () => { + const err = new Error('timed out') as Error & { + code: string; + stdout: string; + stderr: string; + status: number; + }; + err.code = 'ERR_CHILD_PROCESS_TIMEOUT'; + err.stdout = ''; + err.stderr = ''; + err.status = 1; + mockExecFileAsync.mockRejectedValueOnce(err); + + const config: QualityGateConfig = { type: 'typecheck', severity: 'p1', maxRetries: 0 }; + const result = await executeGate(config, '/tmp/test'); + + expect(result.passed).toBe(false); + expect(result.error).toContain('timed out'); + }); + + test('uses custom command when provided', async () => { + mockExecFileAsync.mockResolvedValueOnce({ stdout: '', stderr: '' }); + + const config: QualityGateConfig = { + type: 'custom', + name: 'my-check', + command: 'npm run my-check', + severity: 'p2', + maxRetries: 0, + }; + await executeGate(config, '/tmp/test'); + + expect(mockExecFileAsync).toHaveBeenCalledWith( + 'bash', + ['-c', 'npm run my-check'], + expect.objectContaining({ cwd: '/tmp/test' }) + ); + }); +}); + +describe('executeGates', () => { + test('runs all gates and returns aggregate', async () => { + // First gate passes + mockExecFileAsync.mockResolvedValueOnce({ stdout: 'Tests 5 passed (5)', stderr: '' }); + // Second gate fails + const err = new Error('fail') as Error & { stdout: string; stderr: string; status: number }; + err.stdout = ''; + err.stderr = 'error'; + err.status = 1; + mockExecFileAsync.mockRejectedValueOnce(err); + + const gates: QualityGateConfig[] = [ + { type: 'test-suite', severity: 'p1', maxRetries: 0 }, + { type: 'typecheck', severity: 'p0', maxRetries: 0 }, + ]; + + const { results, blocked } = await executeGates(gates, '/tmp/test', 'run-1', 'node-1'); + + expect(results.length).toBe(2); + expect(results[0]!.passed).toBe(true); + expect(results[1]!.passed).toBe(false); + expect(blocked).toBe(true); + }); + + test('p2 failure does not block', async () => { + // p0 passes + mockExecFileAsync.mockResolvedValueOnce({ stdout: '', stderr: '' }); + // p2 fails + const err = new Error('fail') as Error & { stdout: string; stderr: string; status: number }; + err.stdout = ''; + err.stderr = ''; + err.status = 1; + mockExecFileAsync.mockRejectedValueOnce(err); + + const gates: QualityGateConfig[] = [ + { type: 'typecheck', severity: 'p0', maxRetries: 0 }, + { type: 'lint', severity: 'p2', maxRetries: 0 }, + ]; + + const { results, blocked } = await executeGates(gates, '/tmp/test', 'run-1', 'node-1'); + + expect(results.length).toBe(2); + expect(blocked).toBe(false); + }); + + test('emits gate events', async () => { + mockExecFileAsync.mockResolvedValueOnce({ stdout: '', stderr: '' }); + + const gates: QualityGateConfig[] = [{ type: 'typecheck', severity: 'p1', maxRetries: 0 }]; + + await executeGates(gates, '/tmp/test', 'run-1', 'node-1'); + + const started = emittedEvents.find(e => e.type === 'gate_started'); + const passed = emittedEvents.find(e => e.type === 'gate_passed'); + expect(started).toBeTruthy(); + expect(passed).toBeTruthy(); + }); + + test('emits gate_blocked for p1 failure', async () => { + const err = new Error('fail') as Error & { stdout: string; stderr: string; status: number }; + err.stdout = ''; + err.stderr = ''; + err.status = 1; + mockExecFileAsync.mockRejectedValueOnce(err); + + const gates: QualityGateConfig[] = [{ type: 'lint', severity: 'p1', maxRetries: 0 }]; + + await executeGates(gates, '/tmp/test', 'run-1', 'node-1'); + + const blockedEvent = emittedEvents.find(e => e.type === 'gate_blocked'); + expect(blockedEvent).toBeTruthy(); + }); +}); + +describe('formatGateFailureFeedback', () => { + test('formats failed gate results', () => { + const results: GateResult[] = [ + { + gateId: 'test-suite:test-suite', + gateName: 'test-suite', + gateType: 'test-suite', + severity: 'p1', + passed: false, + evidence: { + stdout: 'Tests 1 passed | 2 failed (3)', + exitCode: 1, + parsedResults: { + total: 3, + passed: 1, + failed: 2, + skipped: 0, + failures: [ + { name: 'should work', message: 'Expected true' }, + { name: 'should also work', message: '' }, + ], + }, + }, + error: 'Gate command failed with exit code 1', + }, + ]; + + const feedback = formatGateFailureFeedback(results); + expect(feedback).toContain('Quality gate verification failed'); + expect(feedback).toContain('test-suite'); + expect(feedback).toContain('1 passed'); + expect(feedback).toContain('2 failed'); + expect(feedback).toContain('should work'); + }); + + test('returns empty string when all gates pass', () => { + const results: GateResult[] = [ + { + gateId: 'typecheck:typecheck', + gateName: 'typecheck', + gateType: 'typecheck', + severity: 'p1', + passed: true, + evidence: { stdout: '', exitCode: 0 }, + }, + ]; + + expect(formatGateFailureFeedback(results)).toBe(''); + }); +}); diff --git a/packages/workflows/src/gates/engine.ts b/packages/workflows/src/gates/engine.ts new file mode 100644 index 0000000000..da159b80dc --- /dev/null +++ b/packages/workflows/src/gates/engine.ts @@ -0,0 +1,263 @@ +/** + * Gate execution engine — runs verification commands and parses real output. + * + * Gates execute independently of the AI agent, providing a trust boundary + * between agent claims and actual verification results. + */ +import { execFileAsync } from '@archon/git'; +import { createLogger } from '@archon/paths'; +import type { QualityGateConfig, GateResult, TestResults } from '../schemas/gate'; +import { getWorkflowEventEmitter } from '../event-emitter'; +import { parseTestOutput, parseTypecheckOutput, parseLintOutput } from './parsers'; + +let cachedLog: ReturnType | undefined; +function getLog(): ReturnType { + if (!cachedLog) cachedLog = createLogger('gate.engine'); + return cachedLog; +} + +// --------------------------------------------------------------------------- +// Built-in gate registry +// --------------------------------------------------------------------------- + +interface BuiltInGate { + command: string; + parser: ((stdout: string) => TestResults) | null; +} + +const BUILT_IN_GATES: Record = { + 'test-suite': { command: 'bun run test', parser: parseTestOutput }, + typecheck: { command: 'bun run type-check', parser: null }, + lint: { command: 'bun run lint', parser: null }, +}; + +// --------------------------------------------------------------------------- +// Single gate execution +// --------------------------------------------------------------------------- + +const GATE_TIMEOUT_MS = 120_000; + +/** + * Execute a single quality gate by running its command and parsing output. + */ +export async function executeGate(config: QualityGateConfig, cwd: string): Promise { + const builtIn = BUILT_IN_GATES[config.type]; + const command = config.command ?? builtIn?.command ?? config.type; + const gateName = config.name ?? config.type; + const gateId = `${config.type}:${gateName}`; + + try { + const { stdout, stderr } = await execFileAsync('bash', ['-c', command], { + cwd, + timeout: GATE_TIMEOUT_MS, + }); + + const combinedOutput = stdout + (stderr ? '\n' + stderr : ''); + + // Parse results based on gate type + let parsedResults: TestResults | undefined; + if (config.type === 'test-suite') { + parsedResults = parseTestOutput(combinedOutput); + } + + return { + gateId, + gateName, + gateType: config.type, + severity: config.severity, + passed: true, + evidence: { + stdout: combinedOutput.slice(0, 10_000), + exitCode: 0, + parsedResults, + }, + }; + } catch (error) { + const err = error as Error & { + code?: string; + stdout?: string; + stderr?: string; + status?: number; + }; + const stdout = (err.stdout ?? '') + (err.stderr ? '\n' + err.stderr : ''); + const exitCode = err.status ?? 1; + + // Parse results even from failed commands + let parsedResults: TestResults | undefined; + if (config.type === 'test-suite') { + parsedResults = parseTestOutput(stdout); + } + + // Determine if this is a timeout + const isTimeout = + err.code === 'ERR_CHILD_PROCESS_TIMEOUT' || err.message?.includes('timed out'); + + return { + gateId, + gateName, + gateType: config.type, + severity: config.severity, + passed: false, + evidence: { + stdout: stdout.slice(0, 10_000), + exitCode, + parsedResults, + }, + error: isTimeout + ? `Gate timed out after ${GATE_TIMEOUT_MS}ms` + : `Gate command failed with exit code ${exitCode}`, + }; + } +} + +// --------------------------------------------------------------------------- +// Multi-gate execution +// --------------------------------------------------------------------------- + +/** + * Execute all gates for a node sequentially. Emits events for each gate. + * Returns aggregate results and whether any p0/p1 gate blocked the node. + */ +export async function executeGates( + gates: QualityGateConfig[], + cwd: string, + runId: string, + nodeId: string +): Promise<{ results: GateResult[]; blocked: boolean }> { + const emitter = getWorkflowEventEmitter(); + const results: GateResult[] = []; + let blocked = false; + + for (const gate of gates) { + const gateName = gate.name ?? gate.type; + + emitter.emit({ + type: 'gate_started', + runId, + nodeId, + gateName, + gateType: gate.type, + severity: gate.severity, + }); + + const result = await executeGate(gate, cwd); + results.push(result); + + if (result.passed) { + emitter.emit({ + type: 'gate_passed', + runId, + nodeId, + gateName: result.gateName, + gateType: result.gateType, + evidence: { + exitCode: result.evidence.exitCode, + parsedResults: result.evidence.parsedResults + ? { + total: result.evidence.parsedResults.total, + passed: result.evidence.parsedResults.passed, + failed: result.evidence.parsedResults.failed, + skipped: result.evidence.parsedResults.skipped, + } + : undefined, + }, + }); + getLog().info({ runId, nodeId, gateName, gateType: gate.type }, 'gate.execute_passed'); + } else { + const isBlocking = gate.severity === 'p0' || gate.severity === 'p1'; + + emitter.emit({ + type: 'gate_failed', + runId, + nodeId, + gateName: result.gateName, + gateType: result.gateType, + severity: gate.severity, + evidence: { + exitCode: result.evidence.exitCode, + stdout: result.evidence.stdout.slice(0, 2000), + parsedResults: result.evidence.parsedResults, + }, + }); + + if (isBlocking) { + blocked = true; + emitter.emit({ + type: 'gate_blocked', + runId, + nodeId, + gateName: result.gateName, + message: `${gate.severity.toUpperCase()} gate "${gateName}" failed — node blocked`, + }); + getLog().warn( + { runId, nodeId, gateName, severity: gate.severity, exitCode: result.evidence.exitCode }, + 'gate.execute_blocked' + ); + } else { + getLog().warn( + { runId, nodeId, gateName, severity: gate.severity }, + 'gate.execute_failed_non_blocking' + ); + } + } + } + + return { results, blocked }; +} + +// --------------------------------------------------------------------------- +// Feedback formatting +// --------------------------------------------------------------------------- + +/** + * Format failed gate results as human-readable feedback for loop injection. + * Includes actual test failure names and error counts. + */ +export function formatGateFailureFeedback(results: GateResult[]): string { + const failedGates = results.filter(r => !r.passed); + if (failedGates.length === 0) return ''; + + const lines: string[] = ['Quality gate verification failed:']; + + for (const gate of failedGates) { + lines.push(`\n[${gate.severity.toUpperCase()}] ${gate.gateName} (${gate.gateType}):`); + + if (gate.error) { + lines.push(` Error: ${gate.error}`); + } + + const parsed = gate.evidence.parsedResults; + if (parsed) { + lines.push( + ` Results: ${parsed.passed} passed, ${parsed.failed} failed, ${parsed.skipped} skipped (${parsed.total} total)` + ); + if (parsed.failures.length > 0) { + lines.push(' Failed tests:'); + for (const f of parsed.failures.slice(0, 10)) { + lines.push(` - ${f.name}${f.message ? ': ' + f.message : ''}`); + } + if (parsed.failures.length > 10) { + lines.push(` ... and ${parsed.failures.length - 10} more`); + } + } + } else if (gate.gateType === 'typecheck') { + const tc = parseTypecheckOutput(gate.evidence.stdout); + if (tc.errors > 0) { + lines.push(` TypeScript errors: ${tc.errors}`); + } + } else if (gate.gateType === 'lint') { + const lint = parseLintOutput(gate.evidence.stdout); + if (lint.errors > 0 || lint.warnings > 0) { + lines.push(` Lint: ${lint.errors} errors, ${lint.warnings} warnings`); + } + } + + // Include truncated stdout for context + if (gate.evidence.stdout.length > 0) { + const truncated = gate.evidence.stdout.slice(-500); + lines.push(` Output (last 500 chars):\n${truncated}`); + } + } + + return lines.join('\n'); +} diff --git a/packages/workflows/src/gates/index.ts b/packages/workflows/src/gates/index.ts new file mode 100644 index 0000000000..edbfd7112e --- /dev/null +++ b/packages/workflows/src/gates/index.ts @@ -0,0 +1,2 @@ +export { executeGate, executeGates, formatGateFailureFeedback } from './engine'; +export { parseTestOutput, parseVitestOutput, parseJestOutput } from './parsers'; diff --git a/packages/workflows/src/gates/parsers.test.ts b/packages/workflows/src/gates/parsers.test.ts new file mode 100644 index 0000000000..89a57abe8c --- /dev/null +++ b/packages/workflows/src/gates/parsers.test.ts @@ -0,0 +1,192 @@ +import { describe, expect, test } from 'bun:test'; +import { + parseVitestOutput, + parseJestOutput, + parseTestOutput, + parseTypecheckOutput, + parseLintOutput, +} from './parsers'; + +describe('parseVitestOutput', () => { + test('parses vitest passing output', () => { + const stdout = ` + ✓ src/gates/parsers.test.ts (5 tests) 12ms + Tests 5 passed (5) + Duration 1.23s +`; + const result = parseVitestOutput(stdout); + expect(result.total).toBe(5); + expect(result.passed).toBe(5); + expect(result.failed).toBe(0); + expect(result.skipped).toBe(0); + }); + + test('parses vitest mixed output', () => { + const stdout = ` + ✓ src/a.test.ts (3 tests) 10ms + ✗ src/b.test.ts (2 tests) 5ms + Tests 3 passed | 2 failed | 1 skipped (6) +`; + const result = parseVitestOutput(stdout); + expect(result.total).toBe(6); + expect(result.passed).toBe(3); + expect(result.failed).toBe(2); + expect(result.skipped).toBe(1); + }); + + test('parses vitest all failing', () => { + const stdout = ` + Tests 3 failed (3) +`; + const result = parseVitestOutput(stdout); + expect(result.total).toBe(3); + expect(result.passed).toBe(0); + expect(result.failed).toBe(3); + }); + + test('parses bun test output', () => { + const stdout = ` +bun test v1.2.0 + +src/test.ts: +✓ should work [2.50ms] +✗ should fail [1.20ms] + +3 pass, 1 fail, 0 skip +`; + const result = parseVitestOutput(stdout); + expect(result.total).toBe(4); + expect(result.passed).toBe(3); + expect(result.failed).toBe(1); + }); + + test('returns zeros for unparseable input', () => { + const result = parseVitestOutput('random output with no test results'); + expect(result.total).toBe(0); + expect(result.passed).toBe(0); + expect(result.failed).toBe(0); + expect(result.skipped).toBe(0); + expect(result.failures).toEqual([]); + }); + + test('extracts failure names from FAIL lines', () => { + const stdout = ` +FAIL src/broken.test.ts > should work +FAIL src/broken.test.ts > should also work +Tests 1 passed | 2 failed (3) +`; + const result = parseVitestOutput(stdout); + expect(result.failures.length).toBe(2); + expect(result.failures[0]!.name).toContain('src/broken.test.ts'); + }); +}); + +describe('parseJestOutput', () => { + test('parses jest passing output', () => { + const stdout = ` +Test Suites: 3 passed, 3 total +Tests: 10 passed, 10 total +`; + const result = parseJestOutput(stdout); + expect(result.total).toBe(10); + expect(result.passed).toBe(10); + expect(result.failed).toBe(0); + }); + + test('parses jest mixed output', () => { + const stdout = ` +Test Suites: 1 failed, 2 passed, 3 total +Tests: 2 failed, 8 passed, 10 total +`; + const result = parseJestOutput(stdout); + expect(result.total).toBe(10); + expect(result.passed).toBe(8); + expect(result.failed).toBe(2); + }); + + test('returns zeros for unparseable input', () => { + const result = parseJestOutput('no jest output here'); + expect(result.total).toBe(0); + }); +}); + +describe('parseTestOutput', () => { + test('auto-detects vitest format', () => { + const result = parseTestOutput('Tests 5 passed (5)'); + expect(result.total).toBe(5); + expect(result.passed).toBe(5); + }); + + test('auto-detects jest format', () => { + const result = parseTestOutput('Tests: 3 passed, 3 total'); + expect(result.total).toBe(3); + expect(result.passed).toBe(3); + }); + + test('returns zeros for empty input', () => { + const result = parseTestOutput(''); + expect(result.total).toBe(0); + }); + + test('returns zeros for garbage input', () => { + const result = parseTestOutput('lorem ipsum dolor sit amet'); + expect(result.total).toBe(0); + }); +}); + +describe('parseTypecheckOutput', () => { + test('parses clean typecheck', () => { + const result = parseTypecheckOutput(''); + expect(result.errors).toBe(0); + expect(result.warnings).toBe(0); + }); + + test('counts TypeScript errors', () => { + const stdout = ` +src/foo.ts(10,5): error TS2322: Type 'string' is not assignable to type 'number'. +src/bar.ts(20,3): error TS2345: Argument of type 'string' is not assignable to parameter of type 'number'. +Found 2 errors. +`; + const result = parseTypecheckOutput(stdout); + expect(result.errors).toBe(2); + }); + + test('uses Found N errors summary', () => { + const stdout = 'Found 15 errors in 5 files.\n'; + const result = parseTypecheckOutput(stdout); + expect(result.errors).toBe(15); + }); +}); + +describe('parseLintOutput', () => { + test('parses clean lint', () => { + const result = parseLintOutput(''); + expect(result.errors).toBe(0); + expect(result.warnings).toBe(0); + }); + + test('parses ESLint summary', () => { + const stdout = ` +/src/foo.ts + 10:5 error Unexpected any @typescript-eslint/no-explicit-any + 15:1 warning Unexpected console statement no-console + +✖ 2 problems (1 error, 1 warning) +`; + const result = parseLintOutput(stdout); + expect(result.errors).toBe(1); + expect(result.warnings).toBe(1); + }); + + test('counts individual error/warning lines', () => { + const stdout = ` +/src/a.ts + 1:1 error some error + 2:1 error another error + 3:1 warning a warning +`; + const result = parseLintOutput(stdout); + expect(result.errors).toBe(2); + expect(result.warnings).toBe(1); + }); +}); diff --git a/packages/workflows/src/gates/parsers.ts b/packages/workflows/src/gates/parsers.ts new file mode 100644 index 0000000000..8c6b21ae9b --- /dev/null +++ b/packages/workflows/src/gates/parsers.ts @@ -0,0 +1,178 @@ +/** + * Test output parsers for quality gates. + * + * Pure functions that extract structured test results from stdout of + * vitest/bun test, jest, typecheck, and lint commands. + */ +import type { TestResults } from '../schemas/gate'; + +// --------------------------------------------------------------------------- +// Vitest / Bun test parser +// --------------------------------------------------------------------------- + +/** + * Parse vitest or bun test stdout into structured test results. + * + * Handles formats: + * - Vitest summary: `Tests 3 passed | 1 failed | 1 skipped (5)` + * - Bun test: `3 pass, 1 fail, 1 skip` or `3 pass\n0 fail` + * - Individual FAIL lines for failure extraction + */ +export function parseVitestOutput(stdout: string): TestResults { + const result: TestResults = { total: 0, passed: 0, failed: 0, skipped: 0, failures: [] }; + + // Try vitest summary format: "Tests 3 passed | 1 failed | 1 skipped (5)" + const vitestSummary = + /Tests\s+(?:(\d+)\s+passed)?(?:\s*\|\s*)?(?:(\d+)\s+failed)?(?:\s*\|\s*)?(?:(\d+)\s+skipped)?/.exec( + stdout + ); + if (vitestSummary) { + result.passed = parseInt(vitestSummary[1] ?? '0', 10); + result.failed = parseInt(vitestSummary[2] ?? '0', 10); + result.skipped = parseInt(vitestSummary[3] ?? '0', 10); + result.total = result.passed + result.failed + result.skipped; + extractFailures(stdout, result); + return result; + } + + // Try bun test format: "3 pass, 1 fail" or "3 pass\n0 fail" + const passMatch = /(\d+)\s+pass/.exec(stdout); + const failMatch = /(\d+)\s+fail/.exec(stdout); + const skipMatch = /(\d+)\s+skip/.exec(stdout); + if (passMatch || failMatch) { + result.passed = parseInt(passMatch?.[1] ?? '0', 10); + result.failed = parseInt(failMatch?.[1] ?? '0', 10); + result.skipped = parseInt(skipMatch?.[1] ?? '0', 10); + result.total = result.passed + result.failed + result.skipped; + extractFailures(stdout, result); + return result; + } + + return result; +} + +// --------------------------------------------------------------------------- +// Jest parser +// --------------------------------------------------------------------------- + +/** + * Parse jest stdout into structured test results. + * + * Handles format: "Tests: 1 failed, 3 passed, 4 total" + */ +export function parseJestOutput(stdout: string): TestResults { + const result: TestResults = { total: 0, passed: 0, failed: 0, skipped: 0, failures: [] }; + + // Jest summary: "Tests: 1 failed, 3 passed, 4 total" + const jestTests = + /Tests:\s+(?:(\d+)\s+failed,?\s*)?(?:(\d+)\s+skipped,?\s*)?(?:(\d+)\s+passed,?\s*)?(\d+)\s+total/.exec( + stdout + ); + if (jestTests) { + result.failed = parseInt(jestTests[1] ?? '0', 10); + result.skipped = parseInt(jestTests[2] ?? '0', 10); + result.passed = parseInt(jestTests[3] ?? '0', 10); + result.total = parseInt(jestTests[4] ?? '0', 10); + extractFailures(stdout, result); + return result; + } + + return result; +} + +// --------------------------------------------------------------------------- +// Auto-detect parser +// --------------------------------------------------------------------------- + +/** + * Auto-detect test framework and parse stdout. + * Tries vitest/bun format first, then jest. Returns zeros if unparseable. + */ +export function parseTestOutput(stdout: string): TestResults { + // Try vitest/bun first + const vitest = parseVitestOutput(stdout); + if (vitest.total > 0) return vitest; + + // Try jest + const jest = parseJestOutput(stdout); + if (jest.total > 0) return jest; + + return { total: 0, passed: 0, failed: 0, skipped: 0, failures: [] }; +} + +// --------------------------------------------------------------------------- +// Typecheck parser +// --------------------------------------------------------------------------- + +/** + * Parse TypeScript type-check output for error/warning counts. + */ +export function parseTypecheckOutput(stdout: string): { errors: number; warnings: number } { + let errors = 0; + let warnings = 0; + + // Count "error TS" occurrences + const errorMatches = stdout.match(/error TS\d+/g); + if (errorMatches) errors = errorMatches.length; + + // Count "Found N errors" summary + const foundErrors = /Found (\d+) errors?/.exec(stdout); + if (foundErrors) errors = parseInt(foundErrors[1], 10); + + // Warnings are rare in tsc but count them + const warningMatches = stdout.match(/warning TS\d+/g); + if (warningMatches) warnings = warningMatches.length; + + return { errors, warnings }; +} + +// --------------------------------------------------------------------------- +// Lint parser +// --------------------------------------------------------------------------- + +/** + * Parse ESLint output for error/warning counts. + */ +export function parseLintOutput(stdout: string): { errors: number; warnings: number } { + let errors = 0; + let warnings = 0; + + // ESLint summary: "X problems (Y errors, Z warnings)" + const eslintSummary = /(\d+)\s+problems?\s*\((\d+)\s+errors?,\s*(\d+)\s+warnings?\)/.exec(stdout); + if (eslintSummary) { + errors = parseInt(eslintSummary[2], 10); + warnings = parseInt(eslintSummary[3], 10); + return { errors, warnings }; + } + + // Fallback: count individual error/warning lines + const errorLines = stdout.match(/\d+:\d+\s+error\s/g); + const warningLines = stdout.match(/\d+:\d+\s+warning\s/g); + if (errorLines) errors = errorLines.length; + if (warningLines) warnings = warningLines.length; + + return { errors, warnings }; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** + * Extract failure names from test output (FAIL lines). + */ +function extractFailures(stdout: string, result: TestResults): void { + // Match "FAIL" lines or "✗" lines with test names + const failLines = stdout.match(/(?:FAIL|✗|✘|×)\s+(.+)/g); + if (failLines) { + for (const line of failLines.slice(0, 20)) { + const nameMatch = /(?:FAIL|✗|✘|×)\s+(.+)/.exec(line); + if (nameMatch) { + result.failures.push({ + name: nameMatch[1].trim(), + message: '', + }); + } + } + } +} diff --git a/packages/workflows/src/schemas/dag-node.ts b/packages/workflows/src/schemas/dag-node.ts index 82bd90ac86..84bdafb4d9 100644 --- a/packages/workflows/src/schemas/dag-node.ts +++ b/packages/workflows/src/schemas/dag-node.ts @@ -14,6 +14,7 @@ import { z } from '@hono/zod-openapi'; import { stepRetryConfigSchema } from './retry'; import { loopNodeConfigSchema } from './loop'; import { workflowNodeHooksSchema } from './hooks'; +import { nodeGatesConfigSchema } from './gate'; import { isValidCommandName } from '../command-validation'; import { isModelCompatible } from '../model-validation'; @@ -146,6 +147,7 @@ export type DagNodeBase = z.infer; export const commandNodeSchema = dagNodeBaseSchema.extend({ command: z.string(), + gates: nodeGatesConfigSchema.optional(), }); /** DAG node that runs a named command from .archon/commands/ */ @@ -160,6 +162,7 @@ export type CommandNode = z.infer & { export const promptNodeSchema = dagNodeBaseSchema.extend({ prompt: z.string(), + gates: nodeGatesConfigSchema.optional(), }); /** DAG node with an inline prompt (no command file) */ @@ -220,6 +223,7 @@ export type ScriptNode = z.infer & { */ export const loopNodeSchema = dagNodeBaseSchema.extend({ loop: loopNodeConfigSchema, + gates: nodeGatesConfigSchema.optional(), }); /** DAG node that runs an AI prompt in a loop until a completion condition is met */ @@ -354,6 +358,8 @@ export const dagNodeSchema = dagNodeBaseSchema deps: z.array(z.string().min(1, 'each dep must be a non-empty string')).optional(), // Bash/Script shared timeout: z.number().optional(), + // Quality gates (command, prompt, loop nodes only — ignored on bash/script) + gates: nodeGatesConfigSchema.optional(), }) .superRefine((data, ctx) => { const id = data.id.trim(); @@ -535,11 +541,28 @@ export const dagNodeSchema = dagNodeBaseSchema ...(data.sandbox !== undefined ? { sandbox: data.sandbox } : {}), }; + // Gates — applicable to command, prompt, and loop nodes (not bash/script) + const gatesField = { + ...(data.gates !== undefined ? { gates: data.gates } : {}), + }; + if (data.command !== undefined && data.command.trim().length > 0) { - return { ...base, ...shared, ...aiOnly, command: data.command.trim() } as CommandNode; + return { + ...base, + ...shared, + ...aiOnly, + ...gatesField, + command: data.command.trim(), + } as CommandNode; } if (data.prompt !== undefined && data.prompt.trim().length > 0) { - return { ...base, ...shared, ...aiOnly, prompt: data.prompt.trim() } as PromptNode; + return { + ...base, + ...shared, + ...aiOnly, + ...gatesField, + prompt: data.prompt.trim(), + } as PromptNode; } if (data.bash !== undefined && data.bash.trim().length > 0) { return { @@ -569,7 +592,7 @@ export const dagNodeSchema = dagNodeBaseSchema } // loop — guaranteed by superRefine to be defined at this point if (!data.loop) throw new Error('unreachable: loop must be defined after superRefine'); - return { ...base, loop: data.loop } as LoopNode; + return { ...base, ...gatesField, loop: data.loop } as LoopNode; }) .openapi('DagNode'); diff --git a/packages/workflows/src/schemas/gate.ts b/packages/workflows/src/schemas/gate.ts new file mode 100644 index 0000000000..66c60e2de4 --- /dev/null +++ b/packages/workflows/src/schemas/gate.ts @@ -0,0 +1,100 @@ +/** + * Zod schemas for quality gate configuration and results. + * + * Gates independently verify agent claims (test results, type checks, lint) + * by running real commands and parsing their output. + */ +import { z } from '@hono/zod-openapi'; + +// --------------------------------------------------------------------------- +// Gate severity — determines whether a failure blocks the node +// --------------------------------------------------------------------------- + +export const gateSeveritySchema = z.enum(['p0', 'p1', 'p2']).openapi('GateSeverity'); + +export type GateSeverity = z.infer; + +// --------------------------------------------------------------------------- +// Gate type — built-in gate categories +// --------------------------------------------------------------------------- + +export const gateTypeSchema = z + .enum(['test-suite', 'typecheck', 'lint', 'custom']) + .openapi('GateType'); + +export type GateType = z.infer; + +// --------------------------------------------------------------------------- +// Test results — parsed from command stdout +// --------------------------------------------------------------------------- + +export const testFailureSchema = z.object({ + name: z.string(), + message: z.string(), +}); + +export const testResultsSchema = z + .object({ + total: z.number().int().nonnegative(), + passed: z.number().int().nonnegative(), + failed: z.number().int().nonnegative(), + skipped: z.number().int().nonnegative(), + failures: z.array(testFailureSchema).default([]), + }) + .openapi('TestResults'); + +export type TestResults = z.infer; + +// --------------------------------------------------------------------------- +// Gate evidence — captured from command execution +// --------------------------------------------------------------------------- + +export const gateEvidenceSchema = z + .object({ + stdout: z.string(), + exitCode: z.number().int(), + parsedResults: testResultsSchema.optional(), + }) + .openapi('GateEvidence'); + +export type GateEvidence = z.infer; + +// --------------------------------------------------------------------------- +// Gate result — outcome of a single gate execution +// --------------------------------------------------------------------------- + +export const gateResultSchema = z + .object({ + gateId: z.string(), + gateName: z.string(), + gateType: gateTypeSchema, + severity: gateSeveritySchema, + passed: z.boolean(), + evidence: gateEvidenceSchema, + error: z.string().optional(), + }) + .openapi('GateResult'); + +export type GateResult = z.infer; + +// --------------------------------------------------------------------------- +// Quality gate configuration — defined per-node in workflow YAML +// --------------------------------------------------------------------------- + +export const qualityGateConfigSchema = z.object({ + type: gateTypeSchema, + name: z.string().optional(), + command: z.string().optional(), + severity: gateSeveritySchema.default('p1'), + maxRetries: z.number().int().nonnegative().default(0), +}); + +export type QualityGateConfig = z.infer; + +// --------------------------------------------------------------------------- +// Node gates config — array of gates for a single node +// --------------------------------------------------------------------------- + +export const nodeGatesConfigSchema = z.array(qualityGateConfigSchema); + +export type NodeGatesConfig = z.infer; diff --git a/packages/workflows/src/schemas/index.ts b/packages/workflows/src/schemas/index.ts index 3fe10b562d..204c439946 100644 --- a/packages/workflows/src/schemas/index.ts +++ b/packages/workflows/src/schemas/index.ts @@ -25,6 +25,27 @@ export { } from './hooks'; export type { WorkflowHookEvent, WorkflowHookMatcher, WorkflowNodeHooks } from './hooks'; +// Gate configuration +export { + gateSeveritySchema, + gateTypeSchema, + testResultsSchema, + testFailureSchema, + gateEvidenceSchema, + gateResultSchema, + qualityGateConfigSchema, + nodeGatesConfigSchema, +} from './gate'; +export type { + GateSeverity, + GateType, + TestResults, + GateEvidence, + GateResult, + QualityGateConfig, + NodeGatesConfig, +} from './gate'; + // DAG node types export { triggerRuleSchema, diff --git a/packages/workflows/src/script-node-deps.test.ts b/packages/workflows/src/script-node-deps.test.ts index 5387daf029..8ba4bcf5ea 100644 --- a/packages/workflows/src/script-node-deps.test.ts +++ b/packages/workflows/src/script-node-deps.test.ts @@ -99,6 +99,12 @@ function createMockStore(): IWorkflowStore { cancelWorkflowRun: mock(() => Promise.resolve()), createWorkflowEvent: mock(() => Promise.resolve()), getCompletedDagNodeOutputs: mock(() => Promise.resolve(new Map())), + upsertNodeState: mock(() => Promise.resolve()), + getNodeState: mock(() => Promise.resolve(null)), + getNodeStates: mock(() => Promise.resolve([])), + getValidatedNodeOutputs: mock(() => Promise.resolve(new Map())), + createTestResult: mock(() => Promise.resolve()), + getTestResults: mock(() => Promise.resolve([])), getCodebase: mock(() => Promise.resolve(null)), getCodebaseEnvVars: mock(() => Promise.resolve({})), }; diff --git a/packages/workflows/src/store.ts b/packages/workflows/src/store.ts index da4e832093..70c9100615 100644 --- a/packages/workflows/src/store.ts +++ b/packages/workflows/src/store.ts @@ -5,7 +5,13 @@ * Implementations live in @archon/core (backed by the real DB); * the workflow engine depends only on this narrow interface. */ -import type { WorkflowRun, WorkflowRunStatus, ApprovalContext } from './schemas'; +import type { + WorkflowRun, + WorkflowRunStatus, + ApprovalContext, + NodeState, + GateResult, +} from './schemas'; export const WORKFLOW_EVENT_TYPES = [ 'workflow_started', @@ -30,6 +36,36 @@ export const WORKFLOW_EVENT_TYPES = [ export type WorkflowEventType = (typeof WORKFLOW_EVENT_TYPES)[number]; +/** Row shape for the remote_agent_node_states table. */ +export interface NodeStateRow { + id: string; + workflow_run_id: string; + node_id: string; + status: NodeState; + output: string; + output_validated: boolean; + gate_results: GateResult[]; + attempt_count: number; + started_at: string; + completed_at: string | null; + updated_at: string; +} + +/** Row shape for the remote_agent_test_results table. */ +export interface TestResultRow { + id: string; + node_state_id: string; + suite_name: string; + total: number; + passed: number; + failed: number; + skipped: number; + failures: { name: string; message: string }[]; + stdout: string; + exit_code: number; + created_at: string; +} + export interface IWorkflowStore { // Run lifecycle createWorkflowRun(data: { @@ -80,6 +116,41 @@ export interface IWorkflowStore { */ getCompletedDagNodeOutputs(workflowRunId: string): Promise>; + // Node state persistence + upsertNodeState(data: { + workflow_run_id: string; + node_id: string; + status: NodeState; + output?: string; + output_validated?: boolean; + gate_results?: GateResult[]; + attempt_count?: number; + }): Promise; + + getNodeState(workflowRunId: string, nodeId: string): Promise; + getNodeStates(workflowRunId: string): Promise; + + /** + * Return a map of nodeId → output for all validated completed nodes. + * Preferred over getCompletedDagNodeOutputs for new runs (validated state). + */ + getValidatedNodeOutputs(workflowRunId: string): Promise>; + + // Test result persistence + createTestResult(data: { + node_state_id: string; + suite_name: string; + total: number; + passed: number; + failed: number; + skipped: number; + failures?: { name: string; message: string }[]; + stdout: string; + exit_code: number; + }): Promise; + + getTestResults(nodeStateId: string): Promise; + // Per-codebase env vars for workflow node injection getCodebaseEnvVars(codebaseId: string): Promise>; From fb19d7dbaaf025947a378d69be64a11136d126ac Mon Sep 17 00:00:00 2001 From: Luis Erlacher Date: Sun, 12 Apr 2026 23:38:51 -0300 Subject: [PATCH 2/2] fix: address review findings for quality gate engine PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code fixes: - H1: Gate execution errors now warn user + emit gate_failed event (DAG nodes) - M1: Same fix for loop node gate execution errors (consistency) - M2: Remove double error swallowing in node-states.ts — let callers .catch() - L2: Remove dead `?? 3` fallback (Zod default is 0) - L5: Add safe JSON parsing with fallback for corrupted DB rows Tests added: - H2: 14 unit tests for node-states.ts DB CRUD, normalization, and edge cases Documentation updated: - H4: Add `gates` to CLAUDE.md DAG node options - H5: Update table count 8→10, add node_states + test_results - H6: Add summary + gate-result API endpoints to CLAUDE.md - H7: Update docs-web database.md (table count, new tables, migrations) - H8: Add new endpoints to docs-web api.md - H9: Add Quality Gates section to authoring-workflows guide - M5: Add gates/ subdirectory to CLAUDE.md architecture tree Co-Authored-By: Claude Opus 4.6 (1M context) --- CLAUDE.md | 9 +- packages/core/package.json | 2 +- packages/core/src/db/node-states.test.ts | 374 ++++++++++++++++++ packages/core/src/db/node-states.ts | 129 +++--- .../docs/guides/authoring-workflows.md | 68 +++- .../src/content/docs/reference/api.md | 2 + .../src/content/docs/reference/database.md | 14 +- packages/workflows/src/dag-executor.ts | 36 +- 8 files changed, 557 insertions(+), 77 deletions(-) create mode 100644 packages/core/src/db/node-states.test.ts diff --git a/CLAUDE.md b/CLAUDE.md index f38cb29a98..c4af55b68f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -293,6 +293,7 @@ packages/ │ ├── event-emitter.ts # Workflow observability events │ ├── logger.ts # JSONL file logger │ ├── validator.ts # Resource validation (command files, MCP configs, skill dirs) +│ ├── gates/ # Quality gate engine, parsers, and built-in gate definitions │ ├── defaults/ # Bundled default commands and workflows │ └── utils/ # Variable substitution, tool formatting, execution utilities ├── git/ # @archon/git - Git operations (no @archon/core dep) @@ -375,7 +376,7 @@ import type { DagNode, WorkflowDefinition } from '@/lib/api'; ### Database Schema -**8 Tables (all prefixed with `remote_agent_`):** +**10 Tables (all prefixed with `remote_agent_`):** 1. **`codebases`** - Repository metadata and commands (JSONB) 2. **`conversations`** - Track platform conversations with titles and soft-delete support 3. **`sessions`** - Track AI SDK sessions with resume capability @@ -384,6 +385,8 @@ import type { DagNode, WorkflowDefinition } from '@/lib/api'; 6. **`workflow_events`** - Step-level workflow event log (step transitions, artifacts, errors) 7. **`messages`** - Conversation message history with tool call metadata (JSONB) 8. **`codebase_env_vars`** - Per-project env vars injected into Claude SDK subprocess env (managed via Web UI or `env:` in config) +9. **`node_states`** - Per-node execution state for DAG resume and quality gate results +10. **`test_results`** - Parsed test suite results linked to node states (gate evidence) **Key Patterns:** - Conversation ID format: Platform-specific (`thread_ts`, `chat_id`, `user/repo#123`) @@ -681,7 +684,7 @@ async function createSession(conversationId: string, codebaseId: string) { 2. **Workflows** (YAML-based): - Stored in `.archon/workflows/` (searched recursively) - Multi-step AI execution chains, discovered at runtime - - **`nodes:` (DAG format)**: Nodes with explicit `depends_on` edges; independent nodes in the same topological layer run concurrently. Node types: `command:` (named command file), `prompt:` (inline prompt), `bash:` (shell script, stdout captured as `$nodeId.output`, no AI), `loop:` (iterative AI prompt until completion signal), `approval:` (human gate; pauses until user approves or rejects; `capture_response: true` stores the user's comment as `$.output` for downstream nodes, default false), `script:` (inline TypeScript/Python or named script from `.archon/scripts/`, runs via `bun` or `uv`, stdout captured as `$nodeId.output`, no AI, supports `deps:` for dependency installation and `timeout:` in ms, requires `runtime: bun` or `runtime: uv`) . Supports `when:` conditions, `trigger_rule` join semantics, `$nodeId.output` substitution, `output_format` for structured JSON output (Claude and Codex), `allowed_tools`/`denied_tools` for per-node tool restrictions (Claude only), `hooks` for per-node SDK hook callbacks (Claude only), `mcp` for per-node MCP server config files (Claude only, env vars expanded at execution time), and `skills` for per-node skill preloading via AgentDefinition wrapping (Claude only), and `effort`/`thinking`/`maxBudgetUsd`/`systemPrompt`/`fallbackModel`/`betas`/`sandbox` for Claude SDK advanced options (Claude only, also settable at workflow level) + - **`nodes:` (DAG format)**: Nodes with explicit `depends_on` edges; independent nodes in the same topological layer run concurrently. Node types: `command:` (named command file), `prompt:` (inline prompt), `bash:` (shell script, stdout captured as `$nodeId.output`, no AI), `loop:` (iterative AI prompt until completion signal), `approval:` (human gate; pauses until user approves or rejects; `capture_response: true` stores the user's comment as `$.output` for downstream nodes, default false), `script:` (inline TypeScript/Python or named script from `.archon/scripts/`, runs via `bun` or `uv`, stdout captured as `$nodeId.output`, no AI, supports `deps:` for dependency installation and `timeout:` in ms, requires `runtime: bun` or `runtime: uv`) . Supports `when:` conditions, `trigger_rule` join semantics, `$nodeId.output` substitution, `output_format` for structured JSON output (Claude and Codex), `allowed_tools`/`denied_tools` for per-node tool restrictions (Claude only), `hooks` for per-node SDK hook callbacks (Claude only), `mcp` for per-node MCP server config files (Claude only, env vars expanded at execution time), and `skills` for per-node skill preloading via AgentDefinition wrapping (Claude only), `gates` for per-node quality gate definitions (bash commands run after node completion; `severity`, `type`, `maxRetries`), and `effort`/`thinking`/`maxBudgetUsd`/`systemPrompt`/`fallbackModel`/`betas`/`sandbox` for Claude SDK advanced options (Claude only, also settable at workflow level) - Provider inherited from `.archon/config.yaml` unless explicitly set; per-node `provider` and `model` overrides supported - Model and options can be set per workflow or inherited from config defaults - `interactive: true` at the workflow level forces foreground execution on web (required for approval-gate workflows in the web UI) @@ -756,6 +759,8 @@ Pattern: Use `classifyIsolationError()` (from `@archon/isolation`) to map git er - `POST /api/workflows/runs/{runId}/resume` - Mark a failed run as ready for auto-resume on next invocation - `POST /api/workflows/runs/{runId}/abandon` - Abandon a non-terminal run (marks as cancelled) - `DELETE /api/workflows/runs/{runId}` - Delete a terminal workflow run and its events +- `GET /api/workflows/runs/{runId}/summary` - Get run summary including node states and gate results +- `POST /api/workflows/runs/{runId}/gate-result` - Store a gate result for a node (appends to existing results) **Codebases:** - `GET /api/codebases` / `GET /api/codebases/:id` - List / fetch codebases diff --git a/packages/core/package.json b/packages/core/package.json index d0d93635b6..3ef3454fb7 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -23,7 +23,7 @@ "./state/*": "./src/state/*.ts" }, "scripts": { - "test": "bun test src/clients/codex-binary-guard.test.ts && bun test src/utils/codex-binary-resolver.test.ts && bun test src/utils/codex-binary-resolver-dev.test.ts && bun test src/clients/claude.test.ts src/clients/codex.test.ts src/clients/factory.test.ts && bun test src/handlers/command-handler.test.ts && bun test src/handlers/clone.test.ts && bun test src/db/adapters/postgres.test.ts && bun test src/db/adapters/sqlite.test.ts src/db/codebases.test.ts src/db/connection.test.ts src/db/conversations.test.ts src/db/env-vars.test.ts src/db/isolation-environments.test.ts src/db/messages.test.ts src/db/sessions.test.ts src/db/workflow-events.test.ts src/db/workflows.test.ts src/utils/defaults-copy.test.ts src/utils/worktree-sync.test.ts src/utils/conversation-lock.test.ts src/utils/credential-sanitizer.test.ts src/utils/port-allocation.test.ts src/utils/error.test.ts src/utils/error-formatter.test.ts src/utils/github-graphql.test.ts src/utils/env-allowlist.test.ts src/utils/env-leak-scanner.test.ts src/config/ src/state/ && bun test src/utils/path-validation.test.ts && bun test src/services/cleanup-service.test.ts && bun test src/services/title-generator.test.ts && bun test src/workflows/ && bun test src/operations/workflow-operations.test.ts && bun test src/operations/isolation-operations.test.ts && bun test src/orchestrator/orchestrator.test.ts && bun test src/orchestrator/orchestrator-agent.test.ts && bun test src/orchestrator/orchestrator-isolation.test.ts", + "test": "bun test src/clients/codex-binary-guard.test.ts && bun test src/utils/codex-binary-resolver.test.ts && bun test src/utils/codex-binary-resolver-dev.test.ts && bun test src/clients/claude.test.ts src/clients/codex.test.ts src/clients/factory.test.ts && bun test src/handlers/command-handler.test.ts && bun test src/handlers/clone.test.ts && bun test src/db/adapters/postgres.test.ts && bun test src/db/adapters/sqlite.test.ts src/db/codebases.test.ts src/db/connection.test.ts src/db/conversations.test.ts src/db/env-vars.test.ts src/db/isolation-environments.test.ts src/db/messages.test.ts src/db/sessions.test.ts src/db/workflow-events.test.ts src/db/workflows.test.ts src/db/node-states.test.ts src/utils/defaults-copy.test.ts src/utils/worktree-sync.test.ts src/utils/conversation-lock.test.ts src/utils/credential-sanitizer.test.ts src/utils/port-allocation.test.ts src/utils/error.test.ts src/utils/error-formatter.test.ts src/utils/github-graphql.test.ts src/utils/env-allowlist.test.ts src/utils/env-leak-scanner.test.ts src/config/ src/state/ && bun test src/utils/path-validation.test.ts && bun test src/services/cleanup-service.test.ts && bun test src/services/title-generator.test.ts && bun test src/workflows/ && bun test src/operations/workflow-operations.test.ts && bun test src/operations/isolation-operations.test.ts && bun test src/orchestrator/orchestrator.test.ts && bun test src/orchestrator/orchestrator-agent.test.ts && bun test src/orchestrator/orchestrator-isolation.test.ts", "type-check": "bun x tsc --noEmit", "build": "echo 'No build needed - Bun runs TypeScript directly'" }, diff --git a/packages/core/src/db/node-states.test.ts b/packages/core/src/db/node-states.test.ts new file mode 100644 index 0000000000..d5298b925d --- /dev/null +++ b/packages/core/src/db/node-states.test.ts @@ -0,0 +1,374 @@ +import { mock, describe, test, expect, beforeEach } from 'bun:test'; +import { createQueryResult, mockPostgresDialect } from '../test/mocks/database'; + +const mockQuery = mock(() => Promise.resolve(createQueryResult([]))); + +// Mock logger before import +const mockLogger = { + fatal: mock(() => undefined), + error: mock(() => undefined), + warn: mock(() => undefined), + info: mock(() => undefined), + debug: mock(() => undefined), + trace: mock(() => undefined), +}; +mock.module('@archon/paths', () => ({ createLogger: mock(() => mockLogger) })); + +mock.module('./connection', () => ({ + pool: { query: mockQuery }, + getDialect: () => mockPostgresDialect, +})); + +import { + upsertNodeState, + getNodeState, + getNodeStates, + getValidatedNodeOutputs, + createTestResult, + getTestResults, +} from './node-states'; + +describe('node-states', () => { + beforeEach(() => { + mockQuery.mockClear(); + mockLogger.warn.mockClear(); + }); + + describe('upsertNodeState', () => { + test('inserts with all fields', async () => { + mockQuery.mockResolvedValueOnce(createQueryResult([])); + + await upsertNodeState({ + workflow_run_id: 'run-1', + node_id: 'node-a', + status: 'completed', + output: 'result text', + output_validated: true, + gate_results: [ + { name: 'lint', passed: true, severity: 'p1', exitCode: 0, stdout: '', type: 'builtin' }, + ], + attempt_count: 1, + }); + + expect(mockQuery).toHaveBeenCalledTimes(1); + const [sql, params] = mockQuery.mock.calls[0] as [string, unknown[]]; + expect(sql).toContain('INSERT INTO remote_agent_node_states'); + expect(sql).toContain('ON CONFLICT'); + expect(params[1]).toBe('run-1'); + expect(params[2]).toBe('node-a'); + expect(params[3]).toBe('completed'); + expect(params[4]).toBe('result text'); + expect(params[5]).toBe(true); + expect(params[6]).toBe( + JSON.stringify([ + { name: 'lint', passed: true, severity: 'p1', exitCode: 0, stdout: '', type: 'builtin' }, + ]) + ); + expect(params[7]).toBe(1); + }); + + test('defaults optional fields', async () => { + mockQuery.mockResolvedValueOnce(createQueryResult([])); + + await upsertNodeState({ + workflow_run_id: 'run-1', + node_id: 'node-a', + status: 'skipped', + }); + + const [, params] = mockQuery.mock.calls[0] as [string, unknown[]]; + expect(params[4]).toBe(''); // output defaults to '' + expect(params[5]).toBe(false); // output_validated defaults to false + expect(params[6]).toBe('[]'); // gate_results defaults to [] + expect(params[7]).toBe(0); // attempt_count defaults to 0 + }); + + test('throws on DB error (callers use fire-and-forget catch)', async () => { + mockQuery.mockRejectedValueOnce(new Error('connection lost')); + + await expect( + upsertNodeState({ + workflow_run_id: 'run-1', + node_id: 'node-a', + status: 'completed', + }) + ).rejects.toThrow('connection lost'); + }); + }); + + describe('getNodeState', () => { + test('returns normalized row when found', async () => { + mockQuery.mockResolvedValueOnce( + createQueryResult([ + { + id: 'ns-1', + workflow_run_id: 'run-1', + node_id: 'node-a', + status: 'completed', + output: 'hello', + output_validated: 1, // SQLite integer + gate_results: '[]', // SQLite string + attempt_count: 0, + started_at: '2026-01-01T00:00:00Z', + completed_at: '2026-01-01T00:01:00Z', + updated_at: '2026-01-01T00:01:00Z', + }, + ]) + ); + + const result = await getNodeState('run-1', 'node-a'); + + expect(result).not.toBeNull(); + expect(result!.output_validated).toBe(true); // Coerced from 1 + expect(result!.gate_results).toEqual([]); // Parsed from string + }); + + test('returns null when not found', async () => { + mockQuery.mockResolvedValueOnce(createQueryResult([])); + + const result = await getNodeState('run-1', 'node-missing'); + + expect(result).toBeNull(); + }); + + test('normalizes SQLite boolean 0 to false', async () => { + mockQuery.mockResolvedValueOnce( + createQueryResult([ + { + id: 'ns-2', + workflow_run_id: 'run-1', + node_id: 'node-b', + status: 'failed', + output: '', + output_validated: 0, // SQLite false + gate_results: JSON.stringify([{ name: 'test', passed: false }]), + attempt_count: 1, + started_at: '2026-01-01T00:00:00Z', + completed_at: null, + updated_at: '2026-01-01T00:00:00Z', + }, + ]) + ); + + const result = await getNodeState('run-1', 'node-b'); + + expect(result!.output_validated).toBe(false); + expect(result!.gate_results).toEqual([{ name: 'test', passed: false }]); + }); + }); + + describe('getNodeStates', () => { + test('returns all rows normalized', async () => { + mockQuery.mockResolvedValueOnce( + createQueryResult([ + { + id: 'ns-1', + workflow_run_id: 'run-1', + node_id: 'node-a', + status: 'completed', + output: 'out-a', + output_validated: 1, + gate_results: '[]', + attempt_count: 0, + started_at: '2026-01-01T00:00:00Z', + completed_at: '2026-01-01T00:01:00Z', + updated_at: '2026-01-01T00:01:00Z', + }, + { + id: 'ns-2', + workflow_run_id: 'run-1', + node_id: 'node-b', + status: 'failed', + output: 'out-b', + output_validated: 0, + gate_results: '[]', + attempt_count: 1, + started_at: '2026-01-01T00:00:00Z', + completed_at: null, + updated_at: '2026-01-01T00:00:00Z', + }, + ]) + ); + + const results = await getNodeStates('run-1'); + + expect(results).toHaveLength(2); + expect(results[0].output_validated).toBe(true); + expect(results[1].output_validated).toBe(false); + }); + }); + + describe('getValidatedNodeOutputs', () => { + test('returns map of nodeId to output for validated completed nodes', async () => { + mockQuery.mockResolvedValueOnce( + createQueryResult([ + { node_id: 'node-a', output: 'output-a' }, + { node_id: 'node-b', output: 'output-b' }, + ]) + ); + + const outputs = await getValidatedNodeOutputs('run-1'); + + expect(outputs).toBeInstanceOf(Map); + expect(outputs.get('node-a')).toBe('output-a'); + expect(outputs.get('node-b')).toBe('output-b'); + expect(outputs.size).toBe(2); + }); + + test('returns empty map when no validated nodes', async () => { + mockQuery.mockResolvedValueOnce(createQueryResult([])); + + const outputs = await getValidatedNodeOutputs('run-1'); + + expect(outputs.size).toBe(0); + }); + }); + + describe('createTestResult', () => { + test('inserts with all fields', async () => { + mockQuery.mockResolvedValueOnce(createQueryResult([])); + + await createTestResult({ + node_state_id: 'ns-1', + suite_name: 'unit-tests', + total: 10, + passed: 8, + failed: 1, + skipped: 1, + failures: [{ name: 'test-x', message: 'assertion failed' }], + stdout: 'test output', + exit_code: 1, + }); + + expect(mockQuery).toHaveBeenCalledTimes(1); + const [sql, params] = mockQuery.mock.calls[0] as [string, unknown[]]; + expect(sql).toContain('INSERT INTO remote_agent_test_results'); + expect(params[1]).toBe('ns-1'); + expect(params[2]).toBe('unit-tests'); + expect(params[3]).toBe(10); + expect(params[7]).toBe(JSON.stringify([{ name: 'test-x', message: 'assertion failed' }])); + }); + + test('throws on DB error (callers use fire-and-forget catch)', async () => { + mockQuery.mockRejectedValueOnce(new Error('disk full')); + + await expect( + createTestResult({ + node_state_id: 'ns-1', + suite_name: 'tests', + total: 1, + passed: 0, + failed: 1, + skipped: 0, + stdout: '', + exit_code: 1, + }) + ).rejects.toThrow('disk full'); + }); + }); + + describe('getTestResults', () => { + test('returns normalized rows', async () => { + mockQuery.mockResolvedValueOnce( + createQueryResult([ + { + id: 'tr-1', + node_state_id: 'ns-1', + suite_name: 'tests', + total: 5, + passed: 5, + failed: 0, + skipped: 0, + failures: '[]', // SQLite string + stdout: 'all passed', + exit_code: 0, + created_at: '2026-01-01T00:00:00Z', + }, + ]) + ); + + const results = await getTestResults('ns-1'); + + expect(results).toHaveLength(1); + expect(results[0].failures).toEqual([]); // Parsed from string + }); + }); + + describe('normalization edge cases', () => { + test('handles corrupted JSON in gate_results gracefully', async () => { + mockQuery.mockResolvedValueOnce( + createQueryResult([ + { + id: 'ns-1', + workflow_run_id: 'run-1', + node_id: 'node-a', + status: 'completed', + output: '', + output_validated: 1, + gate_results: 'not-valid-json{', + attempt_count: 0, + started_at: '2026-01-01T00:00:00Z', + completed_at: null, + updated_at: '2026-01-01T00:00:00Z', + }, + ]) + ); + + const result = await getNodeState('run-1', 'node-a'); + + expect(result!.gate_results).toEqual([]); // Fallback to empty array + expect(mockLogger.warn).toHaveBeenCalledTimes(1); + }); + + test('handles corrupted JSON in failures gracefully', async () => { + mockQuery.mockResolvedValueOnce( + createQueryResult([ + { + id: 'tr-1', + node_state_id: 'ns-1', + suite_name: 'tests', + total: 1, + passed: 0, + failed: 1, + skipped: 0, + failures: '{broken', + stdout: '', + exit_code: 1, + created_at: '2026-01-01T00:00:00Z', + }, + ]) + ); + + const results = await getTestResults('ns-1'); + + expect(results[0].failures).toEqual([]); // Fallback + expect(mockLogger.warn).toHaveBeenCalledTimes(1); + }); + + test('passes through already-parsed gate_results array', async () => { + const gateResults = [{ name: 'lint', passed: true }]; + mockQuery.mockResolvedValueOnce( + createQueryResult([ + { + id: 'ns-1', + workflow_run_id: 'run-1', + node_id: 'node-a', + status: 'completed', + output: '', + output_validated: true, // Already boolean (PostgreSQL) + gate_results: gateResults, // Already parsed (PostgreSQL) + attempt_count: 0, + started_at: '2026-01-01T00:00:00Z', + completed_at: null, + updated_at: '2026-01-01T00:00:00Z', + }, + ]) + ); + + const result = await getNodeState('run-1', 'node-a'); + + expect(result!.output_validated).toBe(true); + expect(result!.gate_results).toEqual(gateResults); + }); + }); +}); diff --git a/packages/core/src/db/node-states.ts b/packages/core/src/db/node-states.ts index f204a1d1d0..0e64ea7fbf 100644 --- a/packages/core/src/db/node-states.ts +++ b/packages/core/src/db/node-states.ts @@ -15,7 +15,7 @@ function getLog(): ReturnType { } /** - * Upsert a node state row. Fire-and-forget: catches all errors internally. + * Upsert a node state row. Throws on DB error — callers use fire-and-forget `.catch()`. */ export async function upsertNodeState(data: { workflow_run_id: string; @@ -26,42 +26,34 @@ export async function upsertNodeState(data: { gate_results?: GateResult[]; attempt_count?: number; }): Promise { - try { - const dialect = getDialect(); - const id = dialect.generateUuid(); - const gateResultsJson = JSON.stringify(data.gate_results ?? []); - const outputValidated = data.output_validated ?? false; + const dialect = getDialect(); + const id = dialect.generateUuid(); + const gateResultsJson = JSON.stringify(data.gate_results ?? []); + const outputValidated = data.output_validated ?? false; - await pool.query( - `INSERT INTO remote_agent_node_states - (id, workflow_run_id, node_id, status, output, output_validated, gate_results, attempt_count) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT(workflow_run_id, node_id) DO UPDATE SET - status = $4, - output = $5, - output_validated = $6, - gate_results = $7, - attempt_count = $8, - completed_at = CASE WHEN $4 IN ('completed', 'failed') THEN ${dialect.now()} ELSE remote_agent_node_states.completed_at END, - updated_at = ${dialect.now()}`, - [ - id, - data.workflow_run_id, - data.node_id, - data.status, - data.output ?? '', - outputValidated, - gateResultsJson, - data.attempt_count ?? 0, - ] - ); - } catch (error) { - getLog().error( - { err: error as Error, runId: data.workflow_run_id, nodeId: data.node_id }, - 'db.node_state_upsert_failed' - ); - // Fire-and-forget: never throw - } + await pool.query( + `INSERT INTO remote_agent_node_states + (id, workflow_run_id, node_id, status, output, output_validated, gate_results, attempt_count) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT(workflow_run_id, node_id) DO UPDATE SET + status = $4, + output = $5, + output_validated = $6, + gate_results = $7, + attempt_count = $8, + completed_at = CASE WHEN $4 IN ('completed', 'failed') THEN ${dialect.now()} ELSE remote_agent_node_states.completed_at END, + updated_at = ${dialect.now()}`, + [ + id, + data.workflow_run_id, + data.node_id, + data.status, + data.output ?? '', + outputValidated, + gateResultsJson, + data.attempt_count ?? 0, + ] + ); } /** @@ -112,7 +104,7 @@ export async function getValidatedNodeOutputs(workflowRunId: string): Promise { - try { - const dialect = getDialect(); - const id = dialect.generateUuid(); - const failuresJson = JSON.stringify(data.failures ?? []); + const dialect = getDialect(); + const id = dialect.generateUuid(); + const failuresJson = JSON.stringify(data.failures ?? []); - await pool.query( - `INSERT INTO remote_agent_test_results - (id, node_state_id, suite_name, total, passed, failed, skipped, failures, stdout, exit_code) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, - [ - id, - data.node_state_id, - data.suite_name, - data.total, - data.passed, - data.failed, - data.skipped, - failuresJson, - data.stdout, - data.exit_code, - ] - ); - } catch (error) { - getLog().error( - { err: error as Error, nodeStateId: data.node_state_id }, - 'db.test_result_create_failed' - ); - // Fire-and-forget: never throw - } + await pool.query( + `INSERT INTO remote_agent_test_results + (id, node_state_id, suite_name, total, passed, failed, skipped, failures, stdout, exit_code) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + id, + data.node_state_id, + data.suite_name, + data.total, + data.passed, + data.failed, + data.skipped, + failuresJson, + data.stdout, + data.exit_code, + ] + ); } /** @@ -171,20 +155,29 @@ export async function getTestResults(nodeStateId: string): Promise(value: unknown, fallback: T): T { + if (typeof value !== 'string') return value as T; + try { + return JSON.parse(value) as T; + } catch { + getLog().warn({ raw: value.slice(0, 200) }, 'db.json_parse_failed'); + return fallback; + } +} + function normalizeNodeStateRow(row: NodeStateRow): NodeStateRow { return { ...row, // SQLite stores booleans as INTEGER (0/1); coerce via intermediate unknown output_validated: Boolean(row.output_validated as unknown), // JSON fields may arrive as strings from SQLite - gate_results: - typeof row.gate_results === 'string' ? JSON.parse(row.gate_results) : row.gate_results, + gate_results: safeJsonParse(row.gate_results, []), }; } function normalizeTestResultRow(row: TestResultRow): TestResultRow { return { ...row, - failures: typeof row.failures === 'string' ? JSON.parse(row.failures) : row.failures, + failures: safeJsonParse(row.failures, []), }; } diff --git a/packages/docs-web/src/content/docs/guides/authoring-workflows.md b/packages/docs-web/src/content/docs/guides/authoring-workflows.md index 6481aefac7..4ba99b9a97 100644 --- a/packages/docs-web/src/content/docs/guides/authoring-workflows.md +++ b/packages/docs-web/src/content/docs/guides/authoring-workflows.md @@ -1105,6 +1105,69 @@ Before deploying a workflow: --- +## Quality Gates + +Quality gates run bash commands after a node completes to verify its output meets quality standards. If a gate fails with severity `p0` or `p1`, the node is downgraded to `failed` and downstream nodes are blocked. + +### Basic Usage + +```yaml +nodes: + - id: implement + command: execute + gates: + - name: lint + type: builtin + command: "bun run lint" + severity: p1 + - name: type-check + type: builtin + command: "bun run type-check" + severity: p1 +``` + +### Gate Properties + +| Property | Required | Default | Description | +|----------|----------|---------|-------------| +| `name` | yes | — | Unique name for the gate | +| `type` | no | `custom` | `builtin` (built-in parser) or `custom` | +| `command` | no | — | Bash command to run (exit code 0 = pass) | +| `severity` | no | `p1` | `p0` (critical), `p1` (high), `p2` (medium), `p3` (low) — only `p0`/`p1` block the node | +| `maxRetries` | no | `0` | Max retries before escalating (loop nodes only) | + +### Built-in Gate Types + +Built-in gates (`type: builtin`) automatically parse test runner output: + +- **vitest** / **jest**: Parses `Tests: X passed, Y failed` patterns +- **bun**: Parses `X pass, Y fail` patterns + +### Loop Node Gates + +For loop nodes, gates verify the `COMPLETE` signal. If a gate fails, the loop continues with feedback injected into the next iteration. After `maxRetries` consecutive failures, the loop escalates to human review. + +```yaml +nodes: + - id: iterate-fix + loop: + until: "COMPLETE" + max_iterations: 10 + prompt: "Fix the issues and signal COMPLETE when done." + gates: + - name: tests + type: builtin + command: "bun run test" + severity: p1 + maxRetries: 3 +``` + +### Gate Execution Errors + +If a gate command fails to execute (e.g., missing binary, permission denied), the node proceeds with a warning. A `gate_failed` event is emitted for observability. The node is **not** blocked — only actual gate verification failures block nodes. + +--- + ## Summary 1. **Workflows orchestrate commands** — YAML files defining a DAG of execution nodes @@ -1124,5 +1187,6 @@ Before deploying a workflow: 15. **`systemPrompt`** — override the default system prompt per node (Claude only) 16. **`sandbox`** — OS-level filesystem/network restrictions per node or workflow (Claude only) 17. **Loop nodes** — use `loop:` within a DAG node for iterative execution until completion signal -18. **Defaults as templates** — browse `.archon/workflows/defaults/` for real examples to copy and modify -19. **Test thoroughly** — each command, the artifact flow, and edge cases +18. **Quality gates** — `gates:` run bash commands post-completion to verify output quality; `p0`/`p1` failures block the node +19. **Defaults as templates** — browse `.archon/workflows/defaults/` for real examples to copy and modify +20. **Test thoroughly** — each command, the artifact flow, and edge cases diff --git a/packages/docs-web/src/content/docs/reference/api.md b/packages/docs-web/src/content/docs/reference/api.md index 0e2fa8aa37..c27ada7e2e 100644 --- a/packages/docs-web/src/content/docs/reference/api.md +++ b/packages/docs-web/src/content/docs/reference/api.md @@ -273,6 +273,8 @@ Only user-defined workflows can be deleted. Bundled defaults cannot be removed. | POST | `/api/workflows/runs/{runId}/approve` | Approve a paused workflow | | POST | `/api/workflows/runs/{runId}/reject` | Reject a paused workflow | | DELETE | `/api/workflows/runs/{runId}` | Delete a terminal run and its events | +| GET | `/api/workflows/runs/{runId}/summary` | Get run summary with node states and gate results | +| POST | `/api/workflows/runs/{runId}/gate-result` | Store a gate result for a node | #### Run a Workflow diff --git a/packages/docs-web/src/content/docs/reference/database.md b/packages/docs-web/src/content/docs/reference/database.md index 6cab854622..4c65b676e1 100644 --- a/packages/docs-web/src/content/docs/reference/database.md +++ b/packages/docs-web/src/content/docs/reference/database.md @@ -119,7 +119,7 @@ psql $DATABASE_URL -c "\dt" ## Schema Overview -The database has 8 tables, all prefixed with `remote_agent_`: +The database has 10 tables, all prefixed with `remote_agent_`: 1. **`remote_agent_codebases`** - Repository metadata - Commands stored as JSONB: `{command_name: {path, description}}` @@ -160,6 +160,16 @@ The database has 8 tables, all prefixed with `remote_agent_`: - Injected into Claude SDK subprocess environment at execution time - Managed via Web UI Settings panel; `env:` in `.archon/config.yaml` for CLI users +9. **`remote_agent_node_states`** - Per-node execution state + - Tracks status, output, and gate results per DAG node + - Enables validated resume (skip completed nodes) + - Upsert on `(workflow_run_id, node_id)` composite key + +10. **`remote_agent_test_results`** - Parsed test suite results + - Linked to node states via foreign key + - Stores test counts (total, passed, failed, skipped) and failure details + - Gate evidence for quality gate verification + ## Migration List | Migration | Description | @@ -185,3 +195,5 @@ The database has 8 tables, all prefixed with `remote_agent_`: | `018_fix_workflow_status_default.sql` | Fix workflow status default value | | `019_workflow_resume_path.sql` | Workflow resume path support | | `020_codebase_env_vars.sql` | Per-project environment variables | +| `021_add_allow_env_keys_to_codebases.sql` | Allow env keys consent flag | +| `022_node_states.sql` | Node states and test results tables for quality gates | diff --git a/packages/workflows/src/dag-executor.ts b/packages/workflows/src/dag-executor.ts index d78f5840f4..1813b1a7e9 100644 --- a/packages/workflows/src/dag-executor.ts +++ b/packages/workflows/src/dag-executor.ts @@ -2117,7 +2117,7 @@ async function executeLoopNode( ); // Track consecutive gate failures for escalation loopGateFailureCount = (loopGateFailureCount ?? 0) + 1; - const maxGateRetries = Math.max(...node.gates.map(g => g.maxRetries ?? 3)); + const maxGateRetries = Math.max(...node.gates.map(g => g.maxRetries)); if (loopGateFailureCount > maxGateRetries) { // Escalate to human with gate evidence const escalateMsg = @@ -2147,11 +2147,26 @@ async function executeLoopNode( // Gates passed — reset failure counter loopGateFailureCount = 0; } catch (gateErr) { + const errMsg = (gateErr as Error).message; getLog().error( { err: gateErr as Error, nodeId: node.id }, 'loop_node.gate_execution_error' ); - // Gate execution error doesn't block — accept the COMPLETE signal + await safeSendMessage( + platform, + conversationId, + `Warning: Gate execution failed for loop node '${node.id}': ${errMsg}. Accepting COMPLETE signal without gate verification.`, + msgContext + ); + getWorkflowEventEmitter().emit({ + type: 'gate_failed', + runId: workflowRun.id, + nodeId: node.id, + gateName: 'gate-engine', + gateType: 'execution-error', + severity: 'p2', + evidence: { exitCode: -1, stdout: errMsg }, + }); } } } @@ -2998,8 +3013,23 @@ export async function executeDagWorkflow( continue; } } catch (gateErr) { + const errMsg = (gateErr as Error).message; getLog().error({ err: gateErr as Error, nodeId }, 'dag.gate_execution_error'); - // Gate execution error doesn't block the node — treat as ungated + await safeSendMessage( + platform, + conversationId, + `Warning: Gate execution failed for node '${nodeId}': ${errMsg}. Node proceeding without gate verification.`, + { workflowId: workflowRun.id, nodeName: nodeId } + ); + getWorkflowEventEmitter().emit({ + type: 'gate_failed', + runId: workflowRun.id, + nodeId, + gateName: 'gate-engine', + gateType: 'execution-error', + severity: 'p2', + evidence: { exitCode: -1, stdout: errMsg }, + }); } }