diff --git a/CLAUDE.md b/CLAUDE.md index f38cb29a98..e244ef7af6 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -375,7 +375,7 @@ import type { DagNode, WorkflowDefinition } from '@/lib/api'; ### Database Schema -**8 Tables (all prefixed with `remote_agent_`):** +**9 Tables (all prefixed with `remote_agent_`):** 1. **`codebases`** - Repository metadata and commands (JSONB) 2. **`conversations`** - Track platform conversations with titles and soft-delete support 3. **`sessions`** - Track AI SDK sessions with resume capability @@ -384,6 +384,7 @@ import type { DagNode, WorkflowDefinition } from '@/lib/api'; 6. **`workflow_events`** - Step-level workflow event log (step transitions, artifacts, errors) 7. **`messages`** - Conversation message history with tool call metadata (JSONB) 8. **`codebase_env_vars`** - Per-project env vars injected into Claude SDK subprocess env (managed via Web UI or `env:` in config) +9. **`workflow_definitions`** - DB-backed workflow definitions (created/imported via API; takes priority over filesystem workflows of the same name) **Key Patterns:** - Conversation ID format: Platform-specific (`thread_ts`, `chat_id`, `user/repo#123`) @@ -748,9 +749,11 @@ Pattern: Use `classifyIsolationError()` (from `@archon/isolation`) to map git er **Workflow Management:** - `GET /api/workflows` - List available workflows; optional `?cwd=`; returns `{ workflows: [...], errors?: [...] }` - `POST /api/workflows/validate` - Validate a workflow definition in-memory (no save); body: `{ definition: object }`; returns `{ valid: boolean, errors?: string[] }` -- `GET /api/workflows/:name` - Fetch a single workflow by name; optional `?cwd=` query param; returns `{ workflow, filename, source: 'project' | 'bundled' }` -- `PUT /api/workflows/:name` - Save (create or update) a workflow YAML; body: `{ definition: object }`; validates before writing; requires `?cwd=` or registered codebase -- `DELETE /api/workflows/:name` - Delete a user-defined workflow; bundled defaults cannot be deleted +- `GET /api/workflows/:name` - Fetch a single workflow by name; optional `?cwd=` query param; returns `{ workflow, filename, source: 'project' | 'bundled' | 'db' }` (DB-stored workflows take highest priority) +- `PUT /api/workflows/:name` - Save (create or update) a workflow to the database; body: `{ definition: object }`; validates before saving; no `?cwd=` required +- `DELETE /api/workflows/:name` - Delete a DB-stored workflow; bundled defaults and filesystem-only workflows cannot be deleted +- `POST /api/workflows/import` - Import a workflow from raw YAML into the database; body: `{ yaml: string }`; returns `{ workflow, filename, source: 'db' }` +- `GET /api/workflows/:name/export` - Export a workflow as YAML text; tries DB first, then filesystem; returns `text/yaml` **Workflow Run Lifecycle:** - `POST /api/workflows/runs/{runId}/resume` - Mark a failed run as ready for auto-resume on next invocation diff --git a/migrations/022_workflow_definitions.sql b/migrations/022_workflow_definitions.sql new file mode 100644 index 0000000000..18daee1830 --- /dev/null +++ b/migrations/022_workflow_definitions.sql @@ -0,0 +1,18 @@ +-- 022: Add workflow definitions table for DB-backed workflow storage +CREATE TABLE IF NOT EXISTS remote_agent_workflow_definitions ( + id TEXT PRIMARY KEY, + name TEXT UNIQUE NOT NULL, + description TEXT, + definition TEXT NOT NULL, + source TEXT NOT NULL DEFAULT 'user', + codebase_id TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_workflow_definitions_name + ON remote_agent_workflow_definitions (name); + +CREATE INDEX IF NOT EXISTS idx_workflow_definitions_codebase_id + ON remote_agent_workflow_definitions (codebase_id) + WHERE codebase_id IS NOT NULL; diff --git a/packages/cli/src/commands/workflow.ts b/packages/cli/src/commands/workflow.ts index 89dd5911e4..25c53518f9 100644 --- a/packages/cli/src/commands/workflow.ts +++ b/packages/cli/src/commands/workflow.ts @@ -13,13 +13,15 @@ import { configureIsolation, getIsolationProvider } from '@archon/isolation'; import { createLogger, getArchonHome } from '@archon/paths'; import { createWorkflowDeps } from '@archon/core/workflows/store-adapter'; import { discoverWorkflowsWithConfig } from '@archon/workflows/workflow-discovery'; +import { parseWorkflow } from '@archon/workflows/loader'; +import * as workflowDefinitionsDb from '@archon/core/db/workflow-definitions'; import { resolveWorkflowName } from '@archon/workflows/router'; import { executeWorkflow } from '@archon/workflows/executor'; import { getWorkflowEventEmitter, type WorkflowEmitterEvent, } from '@archon/workflows/event-emitter'; -import type { WorkflowLoadResult } from '@archon/workflows/schemas/workflow'; +import type { WorkflowLoadResult, WorkflowWithSource } from '@archon/workflows/schemas/workflow'; import type { WorkflowRun } from '@archon/workflows/schemas/workflow-run'; import { approveWorkflow, @@ -120,9 +122,27 @@ function renderWorkflowEvent(event: WorkflowEmitterEvent, verbose: boolean): voi * Returns the WorkflowLoadResult with both workflows and errors. */ async function loadWorkflows(cwd: string): Promise { + const getDbWorkflows = async (): Promise => { + const records = await workflowDefinitionsDb.listWorkflowDefinitions(); + const results: WorkflowWithSource[] = []; + for (const record of records) { + const parsed = parseWorkflow(record.definition, `${record.name}.yaml`); + if (parsed.error) { + getLog().warn( + { name: record.name, err: parsed.error.error }, + 'workflow.db_record_parse_failed' + ); + continue; + } + results.push({ workflow: parsed.workflow, source: 'db' }); + } + return results; + }; + try { return await discoverWorkflowsWithConfig(cwd, loadConfig, { globalSearchPath: getArchonHome(), + getDbWorkflows, }); } catch (error) { const err = error as Error; diff --git a/packages/core/package.json b/packages/core/package.json index d0d93635b6..2861ad4c4d 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -23,7 +23,7 @@ "./state/*": "./src/state/*.ts" }, "scripts": { - "test": "bun test src/clients/codex-binary-guard.test.ts && bun test src/utils/codex-binary-resolver.test.ts && bun test src/utils/codex-binary-resolver-dev.test.ts && bun test src/clients/claude.test.ts src/clients/codex.test.ts src/clients/factory.test.ts && bun test src/handlers/command-handler.test.ts && bun test src/handlers/clone.test.ts && bun test src/db/adapters/postgres.test.ts && bun test src/db/adapters/sqlite.test.ts src/db/codebases.test.ts src/db/connection.test.ts src/db/conversations.test.ts src/db/env-vars.test.ts src/db/isolation-environments.test.ts src/db/messages.test.ts src/db/sessions.test.ts src/db/workflow-events.test.ts src/db/workflows.test.ts src/utils/defaults-copy.test.ts src/utils/worktree-sync.test.ts src/utils/conversation-lock.test.ts src/utils/credential-sanitizer.test.ts src/utils/port-allocation.test.ts src/utils/error.test.ts src/utils/error-formatter.test.ts src/utils/github-graphql.test.ts src/utils/env-allowlist.test.ts src/utils/env-leak-scanner.test.ts src/config/ src/state/ && bun test src/utils/path-validation.test.ts && bun test src/services/cleanup-service.test.ts && bun test src/services/title-generator.test.ts && bun test src/workflows/ && bun test src/operations/workflow-operations.test.ts && bun test src/operations/isolation-operations.test.ts && bun test src/orchestrator/orchestrator.test.ts && bun test src/orchestrator/orchestrator-agent.test.ts && bun test src/orchestrator/orchestrator-isolation.test.ts", + "test": "bun test src/clients/codex-binary-guard.test.ts && bun test src/utils/codex-binary-resolver.test.ts && bun test src/utils/codex-binary-resolver-dev.test.ts && bun test src/clients/claude.test.ts src/clients/codex.test.ts src/clients/factory.test.ts && bun test src/handlers/command-handler.test.ts && bun test src/handlers/clone.test.ts && bun test src/db/adapters/postgres.test.ts && bun test src/db/adapters/sqlite.test.ts src/db/codebases.test.ts src/db/connection.test.ts src/db/conversations.test.ts src/db/env-vars.test.ts src/db/isolation-environments.test.ts src/db/messages.test.ts src/db/sessions.test.ts src/db/workflow-definitions.test.ts src/db/workflow-events.test.ts src/db/workflows.test.ts src/utils/defaults-copy.test.ts src/utils/worktree-sync.test.ts src/utils/conversation-lock.test.ts src/utils/credential-sanitizer.test.ts src/utils/port-allocation.test.ts src/utils/error.test.ts src/utils/error-formatter.test.ts src/utils/github-graphql.test.ts src/utils/env-allowlist.test.ts src/utils/env-leak-scanner.test.ts src/config/ src/state/ && bun test src/utils/path-validation.test.ts && bun test src/services/cleanup-service.test.ts && bun test src/services/title-generator.test.ts && bun test src/workflows/ && bun test src/operations/workflow-operations.test.ts && bun test src/operations/isolation-operations.test.ts && bun test src/orchestrator/orchestrator.test.ts && bun test src/orchestrator/orchestrator-agent.test.ts && bun test src/orchestrator/orchestrator-isolation.test.ts", "type-check": "bun x tsc --noEmit", "build": "echo 'No build needed - Bun runs TypeScript directly'" }, diff --git a/packages/core/src/db/adapters/sqlite.ts b/packages/core/src/db/adapters/sqlite.ts index 2864e4fc43..9583d6a5af 100644 --- a/packages/core/src/db/adapters/sqlite.ts +++ b/packages/core/src/db/adapters/sqlite.ts @@ -363,6 +363,18 @@ export class SqliteAdapter implements IDatabase { created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); + -- Workflow definitions table (DB-backed workflow storage) + CREATE TABLE IF NOT EXISTS remote_agent_workflow_definitions ( + id TEXT PRIMARY KEY, + name TEXT UNIQUE NOT NULL, + description TEXT, + definition TEXT NOT NULL, + source TEXT NOT NULL DEFAULT 'user', + codebase_id TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + -- Indexes CREATE INDEX IF NOT EXISTS idx_codebase_env_vars_codebase_id ON remote_agent_codebase_env_vars(codebase_id); CREATE INDEX IF NOT EXISTS idx_conversations_platform ON remote_agent_conversations(platform_type, platform_conversation_id); @@ -383,6 +395,10 @@ export class SqliteAdapter implements IDatabase { CREATE INDEX IF NOT EXISTS idx_sessions_codebase ON remote_agent_sessions(codebase_id); CREATE INDEX IF NOT EXISTS idx_isolation_env_status ON remote_agent_isolation_environments(status); + CREATE INDEX IF NOT EXISTS idx_workflow_definitions_name ON remote_agent_workflow_definitions(name); + CREATE INDEX IF NOT EXISTS idx_workflow_definitions_codebase_id + ON remote_agent_workflow_definitions(codebase_id) WHERE codebase_id IS NOT NULL; + -- From PG migration 009: staleness detection for running workflows CREATE INDEX IF NOT EXISTS idx_workflow_runs_last_activity ON remote_agent_workflow_runs(last_activity_at) WHERE status = 'running'; diff --git a/packages/core/src/db/index.ts b/packages/core/src/db/index.ts index 10a72a2412..2b07bd60b2 100644 --- a/packages/core/src/db/index.ts +++ b/packages/core/src/db/index.ts @@ -19,6 +19,7 @@ export * as codebaseDb from './codebases'; export * as sessionDb from './sessions'; export * as isolationEnvDb from './isolation-environments'; export * as workflowDb from './workflows'; +export * as workflowDefinitionsDb from './workflow-definitions'; // Also export individual functions for direct imports export * from './conversations'; @@ -27,3 +28,4 @@ export { SessionNotFoundError } from './sessions'; export * from './sessions'; export * from './isolation-environments'; export * from './workflows'; +export * from './workflow-definitions'; diff --git a/packages/core/src/db/workflow-definitions.test.ts b/packages/core/src/db/workflow-definitions.test.ts new file mode 100644 index 0000000000..150844523b --- /dev/null +++ b/packages/core/src/db/workflow-definitions.test.ts @@ -0,0 +1,152 @@ +import { mock, describe, test, expect, beforeEach } from 'bun:test'; +import { createQueryResult, mockPostgresDialect } from '../test/mocks/database'; +import type { WorkflowDefinitionRecord } from './workflow-definitions'; + +const mockQuery = mock(() => Promise.resolve(createQueryResult([]))); + +mock.module('./connection', () => ({ + pool: { query: mockQuery }, + getDialect: () => mockPostgresDialect, +})); + +import { + upsertWorkflowDefinition, + getWorkflowDefinition, + listWorkflowDefinitions, + deleteWorkflowDefinition, +} from './workflow-definitions'; + +const mockRecord: WorkflowDefinitionRecord = { + id: 'uuid-1', + name: 'my-workflow', + description: 'Test workflow', + definition: '{"name":"my-workflow","nodes":[]}', + source: 'user', + codebase_id: null, + created_at: '2026-01-01T00:00:00Z', + updated_at: '2026-01-01T00:00:00Z', +}; + +describe('workflow-definitions database', () => { + beforeEach(() => { + mockQuery.mockReset(); + mockQuery.mockImplementation(() => Promise.resolve(createQueryResult([]))); + }); + + describe('upsertWorkflowDefinition', () => { + test('inserts a new workflow definition and returns the record', async () => { + mockQuery.mockResolvedValueOnce(createQueryResult([mockRecord])); + + const result = await upsertWorkflowDefinition({ + name: 'my-workflow', + description: 'Test workflow', + definition: '{"name":"my-workflow","nodes":[]}', + }); + + expect(result.name).toBe('my-workflow'); + expect(result.source).toBe('user'); // default + expect(mockQuery).toHaveBeenCalledWith( + expect.stringContaining('ON CONFLICT (name) DO UPDATE SET'), + expect.arrayContaining(['my-workflow']) + ); + }); + + test('uses provided source when specified as imported', async () => { + const importedRecord = { ...mockRecord, source: 'imported' as const }; + mockQuery.mockResolvedValueOnce(createQueryResult([importedRecord])); + + const result = await upsertWorkflowDefinition({ + name: 'my-workflow', + definition: '{"name":"my-workflow","nodes":[]}', + source: 'imported', + }); + + expect(result.source).toBe('imported'); + }); + + test('throws and logs when DB query fails', async () => { + mockQuery.mockRejectedValueOnce(new Error('DB connection lost')); + + await expect(upsertWorkflowDefinition({ name: 'fail', definition: '{}' })).rejects.toThrow( + 'DB connection lost' + ); + }); + + test('throws when upsert returns no rows', async () => { + mockQuery.mockResolvedValueOnce(createQueryResult([])); + + await expect( + upsertWorkflowDefinition({ name: 'my-workflow', definition: '{}' }) + ).rejects.toThrow('Upsert returned no rows'); + }); + }); + + describe('getWorkflowDefinition', () => { + test('returns null when workflow not found', async () => { + mockQuery.mockResolvedValueOnce(createQueryResult([])); + + const result = await getWorkflowDefinition('nonexistent'); + expect(result).toBeNull(); + }); + + test('returns the record when found', async () => { + mockQuery.mockResolvedValueOnce(createQueryResult([mockRecord])); + + const result = await getWorkflowDefinition('my-workflow'); + expect(result?.name).toBe('my-workflow'); + expect(result?.source).toBe('user'); + }); + + test('throws and logs when DB query fails', async () => { + mockQuery.mockRejectedValueOnce(new Error('Connection timeout')); + + await expect(getWorkflowDefinition('my-workflow')).rejects.toThrow('Connection timeout'); + }); + }); + + describe('listWorkflowDefinitions', () => { + test('lists all without WHERE clause when codebaseId is omitted', async () => { + mockQuery.mockResolvedValueOnce(createQueryResult([mockRecord])); + + const result = await listWorkflowDefinitions(); + + expect(result).toHaveLength(1); + expect(result[0]?.name).toBe('my-workflow'); + const [sql] = mockQuery.mock.calls[0] as [string, unknown[]]; + expect(sql).not.toContain('WHERE'); + expect(sql).toContain('ORDER BY name ASC'); + }); + + test('applies codebaseId filter when provided', async () => { + mockQuery.mockResolvedValueOnce(createQueryResult([])); + + await listWorkflowDefinitions('codebase-1'); + + const [sql, params] = mockQuery.mock.calls[0] as [string, unknown[]]; + expect(sql).toContain('WHERE (codebase_id = $1 OR codebase_id IS NULL)'); + expect(params).toContain('codebase-1'); + }); + + test('throws and logs when DB query fails', async () => { + mockQuery.mockRejectedValueOnce(new Error('Pool exhausted')); + + await expect(listWorkflowDefinitions()).rejects.toThrow('Pool exhausted'); + }); + }); + + describe('deleteWorkflowDefinition', () => { + test('returns false when no rows were deleted', async () => { + mockQuery.mockResolvedValueOnce(createQueryResult([], 0)); + + const result = await deleteWorkflowDefinition('nonexistent'); + expect(result).toBe(false); + }); + + test('returns true when row was deleted', async () => { + mockQuery.mockResolvedValueOnce(createQueryResult([], 1)); + + const result = await deleteWorkflowDefinition('my-workflow'); + expect(result).toBe(true); + }); + }); +}); diff --git a/packages/core/src/db/workflow-definitions.ts b/packages/core/src/db/workflow-definitions.ts new file mode 100644 index 0000000000..cd24f983e5 --- /dev/null +++ b/packages/core/src/db/workflow-definitions.ts @@ -0,0 +1,115 @@ +/** + * Database operations for workflow definitions (DB-backed storage). + * These are separate from workflow RUNS (packages/core/src/db/workflows.ts). + */ +import { pool, getDialect } from './connection'; +import { createLogger } from '@archon/paths'; + +let cachedLog: ReturnType | undefined; +function getLog(): ReturnType { + if (!cachedLog) cachedLog = createLogger('db.workflow-definitions'); + return cachedLog; +} + +export interface WorkflowDefinitionRecord { + id: string; + name: string; + description: string | null; + definition: string; // JSON string of parsed WorkflowDefinition + source: 'user' | 'imported'; + codebase_id: string | null; + created_at: string; + updated_at: string; +} + +export async function upsertWorkflowDefinition(data: { + name: string; + description?: string | null; + definition: string; // JSON.stringify(WorkflowDefinition) + source?: 'user' | 'imported'; + codebase_id?: string | null; +}): Promise { + const dialect = getDialect(); + let result: Awaited>>; + try { + result = await pool.query( + `INSERT INTO remote_agent_workflow_definitions + (id, name, description, definition, source, codebase_id, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, ${dialect.now()}, ${dialect.now()}) + ON CONFLICT (name) DO UPDATE SET + description = EXCLUDED.description, + definition = EXCLUDED.definition, + source = EXCLUDED.source, + codebase_id = EXCLUDED.codebase_id, + updated_at = ${dialect.now()} + RETURNING *`, + [ + dialect.generateUuid(), + data.name, + data.description ?? null, + data.definition, + data.source ?? 'user', + data.codebase_id ?? null, + ] + ); + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + getLog().error({ err, name: data.name }, 'workflow_definition.upsert_failed'); + throw err; + } + const row = result.rows[0]; + if (!row) { + const err = new Error('Upsert returned no rows'); + getLog().error({ name: data.name }, 'workflow_definition.upsert_no_rows'); + throw err; + } + getLog().info({ name: data.name }, 'workflow_definition.upsert_completed'); + return row; +} + +export async function getWorkflowDefinition( + name: string +): Promise { + try { + const result = await pool.query( + 'SELECT * FROM remote_agent_workflow_definitions WHERE name = $1', + [name] + ); + if (result.rows.length === 0) return null; + return result.rows[0]; + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + getLog().error({ err, name }, 'workflow_definition.get_failed'); + throw err; + } +} + +export async function listWorkflowDefinitions( + codebaseId?: string | null +): Promise { + let sql = 'SELECT * FROM remote_agent_workflow_definitions'; + const params: unknown[] = []; + if (codebaseId !== undefined) { + sql += ' WHERE (codebase_id = $1 OR codebase_id IS NULL)'; + params.push(codebaseId); + } + sql += ' ORDER BY name ASC'; + try { + const result = await pool.query(sql, params); + return [...result.rows]; + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + getLog().error({ err, codebaseId }, 'workflow_definition.list_failed'); + throw err; + } +} + +export async function deleteWorkflowDefinition(name: string): Promise { + const result = await pool.query('DELETE FROM remote_agent_workflow_definitions WHERE name = $1', [ + name, + ]); + if (result.rowCount > 0) { + getLog().info({ name }, 'workflow_definition.delete_completed'); + } + return result.rowCount > 0; +} diff --git a/packages/server/src/routes/api.ts b/packages/server/src/routes/api.ts index cfade2c012..0a4301c4ab 100644 --- a/packages/server/src/routes/api.ts +++ b/packages/server/src/routes/api.ts @@ -45,6 +45,7 @@ import { BUNDLED_IS_BINARY, } from '@archon/paths'; import { discoverWorkflowsWithConfig } from '@archon/workflows/workflow-discovery'; +import type { WorkflowWithSource } from '@archon/workflows/schemas/workflow'; import { parseWorkflow } from '@archon/workflows/loader'; import { isValidCommandName } from '@archon/workflows/command-validation'; import { BUNDLED_WORKFLOWS, BUNDLED_COMMANDS, isBinaryBuild } from '@archon/workflows/defaults'; @@ -66,6 +67,7 @@ import * as codebaseDb from '@archon/core/db/codebases'; import * as envVarDb from '@archon/core/db/env-vars'; import * as isolationEnvDb from '@archon/core/db/isolation-environments'; import * as workflowDb from '@archon/core/db/workflows'; +import * as workflowDefinitionsDb from '@archon/core/db/workflow-definitions'; import * as workflowEventDb from '@archon/core/db/workflow-events'; import * as messageDb from '@archon/core/db/messages'; import { errorSchema } from './schemas/common.schemas'; @@ -89,6 +91,8 @@ import { workflowRunsQuerySchema, approveWorkflowRunBodySchema, rejectWorkflowRunBodySchema, + importWorkflowBodySchema, + importWorkflowResponseSchema, } from './schemas/workflow.schemas'; import { conversationListResponseSchema, @@ -136,7 +140,7 @@ try { ); } -type WorkflowSource = 'project' | 'bundled'; +type WorkflowSource = 'project' | 'bundled' | 'db'; // ========================================================================= // OpenAPI route configs (module-scope — pure config, no runtime dependencies) @@ -189,6 +193,44 @@ const validateWorkflowRoute = createRoute({ }, }); +const importWorkflowRoute = createRoute({ + method: 'post', + path: '/api/workflows/import', + tags: ['Workflows'], + summary: 'Import a workflow from raw YAML text into the database', + request: { + body: { + content: { 'application/json': { schema: importWorkflowBodySchema } }, + required: true, + }, + }, + responses: { + 200: { + content: { 'application/json': { schema: importWorkflowResponseSchema } }, + description: 'Imported workflow', + }, + 400: jsonError('Invalid YAML'), + }, +}); + +const exportWorkflowRoute = createRoute({ + method: 'get', + path: '/api/workflows/{name}/export', + tags: ['Workflows'], + summary: 'Export a workflow as YAML text', + request: { + params: z.object({ name: z.string() }), + query: cwdQuerySchema, + }, + responses: { + 200: { + content: { 'text/yaml': { schema: z.string() } }, + description: 'YAML export', + }, + 404: jsonError('Not found'), + }, +}); + const getWorkflowRoute = createRoute({ method: 'get', path: '/api/workflows/{name}', @@ -1763,7 +1805,24 @@ export function registerApiRoutes( return c.json({ workflows: [] }); } - const result = await discoverWorkflowsWithConfig(workingDir, loadConfig); + const getDbWorkflows = async (): Promise => { + const records = await workflowDefinitionsDb.listWorkflowDefinitions(); + const results: WorkflowWithSource[] = []; + for (const record of records) { + const parsed = parseWorkflow(record.definition, `${record.name}.yaml`); + if (parsed.error) { + getLog().warn( + { name: record.name, err: parsed.error.error }, + 'workflow.db_record_parse_failed' + ); + continue; + } + results.push({ workflow: parsed.workflow, source: 'db' }); + } + return results; + }; + + const result = await discoverWorkflowsWithConfig(workingDir, loadConfig, { getDbWorkflows }); return c.json({ workflows: result.workflows.map(ws => ({ workflow: ws.workflow, source: ws.source })), errors: result.errors.length > 0 ? result.errors : undefined, @@ -2181,6 +2240,75 @@ export function registerApiRoutes( } }); + // POST /api/workflows/import - Import a workflow from raw YAML into the database + // MUST be registered before GET /api/workflows/:name so "import" is not treated as :name + registerOpenApiRoute(importWorkflowRoute, async c => { + const { yaml } = getValidatedBody(c, importWorkflowBodySchema); + const parsed = parseWorkflow(yaml, 'import.yaml'); + if (parsed.error) { + return apiError(c, 400, 'Invalid workflow YAML', parsed.error.error); + } + + try { + await workflowDefinitionsDb.upsertWorkflowDefinition({ + name: parsed.workflow.name, + description: parsed.workflow.description ?? null, + definition: JSON.stringify(parsed.workflow), + source: 'imported', + codebase_id: null, + }); + + return c.json({ + workflow: parsed.workflow, + filename: `${parsed.workflow.name}.yaml`, + source: 'db' as WorkflowSource, + }); + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + getLog().error({ err, name: parsed.workflow.name }, 'workflow.import_failed'); + return apiError(c, 500, 'Failed to import workflow'); + } + }); + + // GET /api/workflows/:name/export - Export a workflow as YAML text + registerOpenApiRoute(exportWorkflowRoute, async c => { + const name = c.req.param('name') ?? ''; + const cwd = c.req.query('cwd') ?? ''; + + // Try DB first (highest priority) + try { + const dbRecord = await workflowDefinitionsDb.getWorkflowDefinition(name); + if (dbRecord) { + const definition = JSON.parse(dbRecord.definition) as unknown; + const yaml = Bun.YAML.stringify(definition); + return new Response(yaml, { + headers: { 'Content-Type': 'text/yaml; charset=utf-8' }, + }); + } + } catch (err) { + getLog().error({ err, name }, 'workflow.export_db_lookup_failed'); + getLog().info({ name }, 'workflow.export_falling_back_to_filesystem'); + } + + // Fall back to filesystem discovery + try { + const result = await discoverWorkflowsWithConfig(cwd || getArchonHome(), loadConfig); + const entry = result.workflows.find(w => w.workflow.name === name); + if (!entry) { + return apiError(c, 404, `Workflow '${name}' not found`); + } + + const yaml = Bun.YAML.stringify(entry.workflow); + return new Response(yaml, { + headers: { 'Content-Type': 'text/yaml; charset=utf-8' }, + }); + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + getLog().error({ err, name }, 'workflow.export_failed'); + return apiError(c, 500, 'Failed to export workflow'); + } + }); + // GET /api/workflows/:name - Fetch a single workflow definition registerOpenApiRoute(getWorkflowRoute, async c => { const name = c.req.param('name') ?? ''; @@ -2202,6 +2330,25 @@ export function registerApiRoutes( const filename = `${name}.yaml`; + // 0. Try DB-stored workflow (highest priority) + try { + const dbRecord = await workflowDefinitionsDb.getWorkflowDefinition(name); + if (dbRecord) { + const result = parseWorkflow(dbRecord.definition, filename); + if (result.error) { + return apiError(c, 500, `DB workflow is invalid: ${result.error.error}`); + } + return c.json({ + workflow: result.workflow, + filename, + source: 'db' as WorkflowSource, + }); + } + } catch (err) { + getLog().error({ err, name }, 'workflow.db_lookup_failed'); + getLog().info({ name }, 'workflow.db_lookup_falling_back_to_filesystem'); + } + // 1. Try user-defined workflow in cwd if (workingDir) { const [workflowFolder] = getWorkflowFolderSearchPaths(); @@ -2264,30 +2411,16 @@ export function registerApiRoutes( } }); - // PUT /api/workflows/:name - Save (create or update) a workflow + // PUT /api/workflows/:name - Save (create or update) a workflow to database registerOpenApiRoute(saveWorkflowRoute, async c => { const name = c.req.param('name') ?? ''; if (!isValidCommandName(name)) { return apiError(c, 400, 'Invalid workflow name'); } - const cwd = c.req.query('cwd'); - let workingDir = cwd; - if (cwd) { - if (!(await validateCwd(cwd))) { - return apiError(c, 400, 'Invalid cwd: must match a registered codebase path'); - } - } else { - const codebases = await codebaseDb.listCodebases(); - if (codebases.length > 0) workingDir = codebases[0].default_cwd; - } - if (!workingDir) { - workingDir = getArchonHome(); - } - const { definition } = getValidatedBody(c, saveWorkflowBodySchema); - // Serialize and validate before writing + // Serialize and validate before saving let yamlContent: string; try { yamlContent = Bun.YAML.stringify(definition); @@ -2303,24 +2436,26 @@ export function registerApiRoutes( } try { - const [workflowFolder] = getWorkflowFolderSearchPaths(); - const dirPath = join(workingDir, workflowFolder); - await mkdir(dirPath, { recursive: true }); - const filePath = join(dirPath, `${name}.yaml`); - await writeFile(filePath, yamlContent, 'utf-8'); + await workflowDefinitionsDb.upsertWorkflowDefinition({ + name, + description: parsed.workflow.description ?? null, + definition: JSON.stringify(parsed.workflow), + source: 'user', + codebase_id: null, + }); return c.json({ workflow: parsed.workflow, filename: `${name}.yaml`, - source: 'project' as WorkflowSource, + source: 'db' as WorkflowSource, }); } catch (error) { const err = error instanceof Error ? error : new Error(String(error)); - getLog().error({ err, name }, 'workflow.save_failed'); + getLog().error({ err, name }, 'workflow.definition_save_failed'); return apiError(c, 500, 'Failed to save workflow'); } }); - // DELETE /api/workflows/:name - Delete a user-defined workflow + // DELETE /api/workflows/:name - Delete a DB-stored workflow registerOpenApiRoute(deleteWorkflowRoute, async c => { const name = c.req.param('name') ?? ''; if (!isValidCommandName(name)) { @@ -2332,31 +2467,15 @@ export function registerApiRoutes( return apiError(c, 400, `Cannot delete bundled default workflow: ${name}`); } - const cwd = c.req.query('cwd'); - let workingDir = cwd; - if (cwd) { - if (!(await validateCwd(cwd))) { - return apiError(c, 400, 'Invalid cwd: must match a registered codebase path'); - } - } else { - const codebases = await codebaseDb.listCodebases(); - if (codebases.length > 0) workingDir = codebases[0].default_cwd; - } - if (!workingDir) { - workingDir = getArchonHome(); - } - - const [workflowFolder] = getWorkflowFolderSearchPaths(); - const filePath = join(workingDir, workflowFolder, `${name}.yaml`); - try { - await unlink(filePath); - return c.json({ deleted: true, name }); - } catch (err) { - if ((err as NodeJS.ErrnoException).code === 'ENOENT') { - return apiError(c, 404, `Workflow not found: ${name}`); + const deleted = await workflowDefinitionsDb.deleteWorkflowDefinition(name); + if (!deleted) { + return apiError(c, 404, `Workflow '${name}' not found in database`); } - getLog().error({ err, name }, 'workflow.delete_failed'); + return c.json({ deleted: true, name }); + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + getLog().error({ err, name }, 'workflow.definition_delete_failed'); return apiError(c, 500, 'Failed to delete workflow'); } }); diff --git a/packages/server/src/routes/api.workflows.test.ts b/packages/server/src/routes/api.workflows.test.ts index e50b252640..e971b358ba 100644 --- a/packages/server/src/routes/api.workflows.test.ts +++ b/packages/server/src/routes/api.workflows.test.ts @@ -92,6 +92,26 @@ mock.module('@archon/core/db/workflows', () => ({})); mock.module('@archon/core/db/workflow-events', () => ({})); mock.module('@archon/core/db/messages', () => ({})); +const mockUpsertWorkflowDefinition = mock(async (data: { name: string }) => ({ + id: 'test-uuid', + name: data.name, + description: null, + definition: '{}', + source: 'user', + codebase_id: null, + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), +})); +const mockGetWorkflowDefinition = mock(async () => null); +const mockListWorkflowDefinitions = mock(async () => []); +const mockDeleteWorkflowDefinition = mock(async () => false); +mock.module('@archon/core/db/workflow-definitions', () => ({ + upsertWorkflowDefinition: mockUpsertWorkflowDefinition, + getWorkflowDefinition: mockGetWorkflowDefinition, + listWorkflowDefinitions: mockListWorkflowDefinitions, + deleteWorkflowDefinition: mockDeleteWorkflowDefinition, +})); + const mockListCodebases = mock(async () => [{ default_cwd: '/tmp/project' }]); mock.module('@archon/core/db/codebases', () => ({ listCodebases: mockListCodebases, @@ -116,7 +136,11 @@ describe('GET /api/workflows', () => { expect(body.workflows[0]?.workflow.name).toBe('deploy'); expect(body.workflows[0]?.source).toBe('bundled'); expect(body.workflows.workflows).toBeUndefined(); - expect(mockDiscoverWorkflows).toHaveBeenCalledWith('/tmp/project', expect.any(Function)); + expect(mockDiscoverWorkflows).toHaveBeenCalledWith( + '/tmp/project', + expect.any(Function), + expect.objectContaining({ getDbWorkflows: expect.any(Function) }) + ); expect(body.errors).toBeDefined(); expect(Array.isArray(body.errors)).toBe(true); }); @@ -314,38 +338,29 @@ describe('PUT /api/workflows/:name', () => { expect(body.error).toContain('definition'); }); - test('falls back to getArchonHome() when no cwd and no codebases registered', async () => { - const testArchonHome = join(tmpdir(), `archon-home-test-${Date.now()}`); - process.env.ARCHON_HOME = testArchonHome; + test('saves workflow to database and returns source:db', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); - try { - const app = createTestApp(); - registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + mockParseWorkflow.mockReturnValueOnce({ + workflow: makeTestWorkflow({ name: 'my-workflow', description: 'test' }), + error: null, + }); - mockListCodebases.mockImplementationOnce(async () => []); - mockParseWorkflow.mockReturnValueOnce({ - workflow: makeTestWorkflow({ name: 'my-workflow', description: 'test' }), - error: null, - }); - - const response = await app.request('/api/workflows/my-workflow', { - method: 'PUT', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - definition: { - name: 'my-workflow', - description: 'test', - nodes: [{ id: 'n1', command: 'assist' }], - }, - }), - }); - expect(response.status).toBe(200); - const body = (await response.json()) as { workflow: object; source: string }; - expect(body.source).toBe('project'); - } finally { - delete process.env.ARCHON_HOME; - await rm(testArchonHome, { recursive: true, force: true }); - } + const response = await app.request('/api/workflows/my-workflow', { + method: 'PUT', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + definition: { + name: 'my-workflow', + description: 'test', + nodes: [{ id: 'n1', command: 'assist' }], + }, + }), + }); + expect(response.status).toBe(200); + const body = (await response.json()) as { workflow: object; source: string }; + expect(body.source).toBe('db'); }); test('returns 400 when definition fails validation', async () => { @@ -372,38 +387,31 @@ describe('PUT /api/workflows/:name', () => { expect(body.detail).toBeDefined(); }); - test('saves valid workflow and returns parsed workflow with source:project', async () => { - const testDir = join(tmpdir(), `wf-put-test-${Date.now()}`); - - try { - const app = createTestApp(); - registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + test('saves valid workflow to DB and returns parsed workflow with source:db', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); - mockListCodebases.mockImplementationOnce(async () => [{ default_cwd: testDir }]); - const response = await app.request(`/api/workflows/my-workflow?cwd=${testDir}`, { - method: 'PUT', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - definition: { - name: 'my-workflow', - description: 'Test', - nodes: [{ id: 'plan', command: 'plan' }], - }, - }), - }); + const response = await app.request('/api/workflows/my-workflow', { + method: 'PUT', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + definition: { + name: 'my-workflow', + description: 'Test', + nodes: [{ id: 'plan', command: 'plan' }], + }, + }), + }); - expect(response.status).toBe(200); - const body = (await response.json()) as { - workflow: { name: string }; - filename: string; - source: string; - }; - expect(body.workflow).toBeDefined(); - expect(body.filename).toBe('my-workflow.yaml'); - expect(body.source).toBe('project'); - } finally { - await rm(testDir, { recursive: true, force: true }); - } + expect(response.status).toBe(200); + const body = (await response.json()) as { + workflow: { name: string }; + filename: string; + source: string; + }; + expect(body.workflow).toBeDefined(); + expect(body.filename).toBe('my-workflow.yaml'); + expect(body.source).toBe('db'); }); }); @@ -432,44 +440,32 @@ describe('DELETE /api/workflows/:name', () => { expect(body.error).toContain('test-nonexistent-workflow-xyz'); }); - test('falls back to getArchonHome() when no cwd and no codebases, returns 404 for missing file', async () => { + test('returns 404 when workflow not found in database', async () => { const app = createTestApp(); registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); - mockListCodebases.mockImplementationOnce(async () => []); + mockDeleteWorkflowDefinition.mockResolvedValueOnce(false); - const response = await app.request('/api/workflows/nonexistent-no-cwd-test', { + const response = await app.request('/api/workflows/nonexistent-workflow', { method: 'DELETE', }); expect(response.status).toBe(404); const body = (await response.json()) as { error: string }; - expect(body.error).toContain('nonexistent-no-cwd-test'); + expect(body.error).toContain('nonexistent-workflow'); }); - test('removes existing workflow file and returns deleted:true', async () => { - const testDir = join(tmpdir(), `wf-del-test-${Date.now()}`); - const workflowDir = join(testDir, '.archon', 'workflows'); - await mkdir(workflowDir, { recursive: true }); - await writeFile( - join(workflowDir, 'to-delete.yaml'), - 'name: x\ndescription: y\nnodes:\n - id: z\n command: z\n' - ); - - try { - const app = createTestApp(); - registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + test('removes DB workflow and returns deleted:true', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); - mockListCodebases.mockImplementationOnce(async () => [{ default_cwd: testDir }]); - const response = await app.request(`/api/workflows/to-delete?cwd=${testDir}`, { - method: 'DELETE', - }); - expect(response.status).toBe(200); - const body = (await response.json()) as { deleted: boolean; name: string }; - expect(body.deleted).toBe(true); - expect(body.name).toBe('to-delete'); - } finally { - await rm(testDir, { recursive: true, force: true }); - } + mockDeleteWorkflowDefinition.mockResolvedValueOnce(true); + const response = await app.request('/api/workflows/to-delete', { + method: 'DELETE', + }); + expect(response.status).toBe(200); + const body = (await response.json()) as { deleted: boolean; name: string }; + expect(body.deleted).toBe(true); + expect(body.name).toBe('to-delete'); }); }); @@ -495,8 +491,8 @@ describe('GET /api/workflows - cwd validation', () => { }); }); -describe('PUT /api/workflows/:name - cwd validation', () => { - test('returns 400 when cwd is not a registered codebase path', async () => { +describe('PUT /api/workflows/:name - DB storage', () => { + test('ignores cwd param and saves to DB (cwd no longer relevant)', async () => { const app = createTestApp(); registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); @@ -505,23 +501,24 @@ describe('PUT /api/workflows/:name - cwd validation', () => { headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ definition: { name: 'my-workflow', description: 'test', nodes: [] } }), }); - expect(response.status).toBe(400); - const body = (await response.json()) as { error: string }; - expect(body.error).toContain('Invalid cwd'); + // PUT now saves to DB regardless of cwd + expect(response.status).toBe(200); + const body = (await response.json()) as { source: string }; + expect(body.source).toBe('db'); }); }); -describe('DELETE /api/workflows/:name - cwd validation', () => { - test('returns 400 when cwd is not a registered codebase path', async () => { +describe('DELETE /api/workflows/:name - DB storage', () => { + test('ignores cwd param and deletes from DB', async () => { const app = createTestApp(); registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + // Not found in DB → 404 + mockDeleteWorkflowDefinition.mockResolvedValueOnce(false); const response = await app.request('/api/workflows/some-workflow?cwd=/etc/secrets', { method: 'DELETE', }); - expect(response.status).toBe(400); - const body = (await response.json()) as { error: string }; - expect(body.error).toContain('Invalid cwd'); + expect(response.status).toBe(404); }); }); @@ -561,3 +558,229 @@ describe('GET /api/commands', () => { expect(archonAssist?.source).toBe('bundled'); }); }); + +describe('POST /api/workflows/import', () => { + test('imports valid YAML and returns source:db', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + + mockParseWorkflow.mockReturnValueOnce({ + workflow: makeTestWorkflow({ name: 'imported-wf', description: 'from import' }), + error: null, + }); + + const response = await app.request('/api/workflows/import', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ yaml: 'name: imported-wf\ndescription: from import\nnodes: []' }), + }); + + expect(response.status).toBe(200); + const body = (await response.json()) as { source: string; workflow: { name: string } }; + expect(body.source).toBe('db'); + expect(body.workflow.name).toBe('imported-wf'); + expect(mockUpsertWorkflowDefinition).toHaveBeenCalledWith( + expect.objectContaining({ source: 'imported' }) + ); + }); + + test('returns 400 when YAML fails parsing', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + + mockParseWorkflow.mockReturnValueOnce({ + workflow: null, + error: { filename: 'import.yaml', error: 'bad yaml', errorType: 'validation_error' as const }, + }); + + const response = await app.request('/api/workflows/import', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ yaml: 'not: valid: yaml: here' }), + }); + + expect(response.status).toBe(400); + }); + + test('returns 400 when yaml field is empty', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + + const response = await app.request('/api/workflows/import', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ yaml: '' }), + }); + + expect(response.status).toBe(400); + }); + + test('returns 500 when DB upsert fails', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + + mockParseWorkflow.mockReturnValueOnce({ + workflow: makeTestWorkflow({ name: 'my-wf', description: 'test' }), + error: null, + }); + mockUpsertWorkflowDefinition.mockRejectedValueOnce(new Error('DB connection lost')); + + const response = await app.request('/api/workflows/import', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ yaml: 'name: my-wf\nnodes: []' }), + }); + + expect(response.status).toBe(500); + }); +}); + +describe('GET /api/workflows/:name/export', () => { + test('returns YAML from DB when workflow exists in DB', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + + const wf = makeTestWorkflow({ name: 'my-wf', description: 'test' }); + mockGetWorkflowDefinition.mockResolvedValueOnce({ + id: 'uuid-1', + name: 'my-wf', + description: 'test', + definition: JSON.stringify(wf), + source: 'user' as const, + codebase_id: null, + created_at: '2026-01-01T00:00:00Z', + updated_at: '2026-01-01T00:00:00Z', + }); + + const response = await app.request('/api/workflows/my-wf/export'); + expect(response.status).toBe(200); + expect(response.headers.get('content-type')).toContain('text/yaml'); + const yamlText = await response.text(); + expect(yamlText).toBeTruthy(); + }); + + test('returns 404 when workflow not found in DB or filesystem', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + + // DB returns null (default mock already returns null) + // discoverWorkflowsWithConfig returns empty list + mockDiscoverWorkflows.mockResolvedValueOnce({ workflows: [], errors: [] }); + + const response = await app.request('/api/workflows/does-not-exist/export'); + expect(response.status).toBe(404); + }); + + test('falls back to filesystem when DB lookup returns null', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + + // DB returns null (default mock) + // Filesystem has the workflow + mockDiscoverWorkflows.mockResolvedValueOnce({ + workflows: [ + makeTestWorkflowWithSource({ name: 'deploy', description: 'Deploy app' }, 'bundled'), + ], + errors: [], + }); + + const response = await app.request('/api/workflows/deploy/export'); + expect(response.status).toBe(200); + expect(response.headers.get('content-type')).toContain('text/yaml'); + }); +}); + +describe('GET /api/workflows/:name - DB source', () => { + test('returns workflow with source:db when found in database', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + + const wf = makeTestWorkflow({ name: 'db-workflow', description: 'from db' }); + mockGetWorkflowDefinition.mockResolvedValueOnce({ + id: 'uuid-1', + name: 'db-workflow', + description: 'from db', + definition: JSON.stringify(wf), + source: 'user' as const, + codebase_id: null, + created_at: '2026-01-01T00:00:00Z', + updated_at: '2026-01-01T00:00:00Z', + }); + mockParseWorkflow.mockReturnValueOnce({ workflow: wf, error: null }); + + const response = await app.request('/api/workflows/db-workflow'); + expect(response.status).toBe(200); + const body = (await response.json()) as { source: string; workflow: { name: string } }; + expect(body.source).toBe('db'); + expect(body.workflow.name).toBe('db-workflow'); + }); + + test('returns 500 when DB record contains invalid workflow definition', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + + mockGetWorkflowDefinition.mockResolvedValueOnce({ + id: 'uuid-1', + name: 'corrupt-workflow', + description: null, + definition: '{"corrupt":true}', + source: 'user' as const, + codebase_id: null, + created_at: '2026-01-01T00:00:00Z', + updated_at: '2026-01-01T00:00:00Z', + }); + mockParseWorkflow.mockReturnValueOnce({ + workflow: null, + error: { + filename: 'corrupt-workflow.yaml', + error: 'invalid schema', + errorType: 'validation_error' as const, + }, + }); + + const response = await app.request('/api/workflows/corrupt-workflow'); + expect(response.status).toBe(500); + const body = (await response.json()) as { error: string }; + expect(body.error).toContain('DB workflow is invalid'); + }); +}); + +describe('DELETE /api/workflows/:name - DB error handling', () => { + test('returns 500 when DB delete throws', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + + mockDeleteWorkflowDefinition.mockRejectedValueOnce(new Error('DB connection lost')); + + const response = await app.request('/api/workflows/my-workflow', { method: 'DELETE' }); + expect(response.status).toBe(500); + const body = (await response.json()) as { error: string }; + expect(body.error).toContain('Failed to delete workflow'); + }); +}); + +describe('PUT /api/workflows/:name - mock call assertion', () => { + test('calls upsertWorkflowDefinition with name and source:user', async () => { + const app = createTestApp(); + registerApiRoutes(app, {} as WebAdapter, {} as ConversationLockManager); + + mockParseWorkflow.mockReturnValueOnce({ + workflow: makeTestWorkflow({ name: 'my-workflow', description: 'test' }), + error: null, + }); + mockUpsertWorkflowDefinition.mockClear(); + + await app.request('/api/workflows/my-workflow', { + method: 'PUT', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ definition: { name: 'my-workflow', description: 'test', nodes: [] } }), + }); + + expect(mockUpsertWorkflowDefinition).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'my-workflow', + source: 'user', + }) + ); + }); +}); diff --git a/packages/server/src/routes/schemas/workflow.schemas.ts b/packages/server/src/routes/schemas/workflow.schemas.ts index 40fb9497d1..bc937a11f9 100644 --- a/packages/server/src/routes/schemas/workflow.schemas.ts +++ b/packages/server/src/routes/schemas/workflow.schemas.ts @@ -17,8 +17,8 @@ export const workflowLoadErrorSchema = z }) .openapi('WorkflowLoadError'); -/** Workflow source — project-defined or bundled default. */ -export const workflowSourceSchema = z.enum(['project', 'bundled']).openapi('WorkflowSource'); +/** Workflow source — project-defined, bundled default, or database-stored. */ +export const workflowSourceSchema = z.enum(['project', 'bundled', 'db']).openapi('WorkflowSource'); /** A workflow entry in the list response, including its source. */ export const workflowListEntrySchema = z @@ -223,3 +223,21 @@ export const workflowRunsQuerySchema = z.object({ codebaseId: z.string().optional(), limit: z.string().optional(), }); + +// ========================================================================= +// Workflow import/export schemas +// ========================================================================= + +/** POST /api/workflows/import request body — raw YAML text. */ +export const importWorkflowBodySchema = z + .object({ yaml: z.string().min(1) }) + .openapi('ImportWorkflowBody'); + +/** POST /api/workflows/import response. */ +export const importWorkflowResponseSchema = z + .object({ + workflow: workflowDefinitionSchema, + filename: z.string(), + source: workflowSourceSchema, + }) + .openapi('ImportWorkflowResponse'); diff --git a/packages/workflows/src/schemas/workflow.ts b/packages/workflows/src/schemas/workflow.ts index 008ef19a8f..9ade7c7243 100644 --- a/packages/workflows/src/schemas/workflow.ts +++ b/packages/workflows/src/schemas/workflow.ts @@ -92,8 +92,8 @@ export type WorkflowExecutionResult = // WorkflowLoadError / WorkflowLoadResult — workflow discovery results // --------------------------------------------------------------------------- -/** Workflow origin — bundled default or project-defined. */ -export type WorkflowSource = 'bundled' | 'project'; +/** Workflow origin — bundled default, project-defined (filesystem), or database-stored. */ +export type WorkflowSource = 'bundled' | 'project' | 'db'; /** A workflow definition paired with its discovery source. */ export interface WorkflowWithSource { diff --git a/packages/workflows/src/workflow-discovery.ts b/packages/workflows/src/workflow-discovery.ts index bcd5d531ce..69ad4b3eb6 100644 --- a/packages/workflows/src/workflow-discovery.ts +++ b/packages/workflows/src/workflow-discovery.ts @@ -291,7 +291,7 @@ export async function discoverWorkflows( export async function discoverWorkflowsWithConfig( cwd: string, loadConfig: (cwd: string) => Promise<{ defaults?: { loadDefaultWorkflows?: boolean } }>, - options?: { globalSearchPath?: string } + options?: { globalSearchPath?: string; getDbWorkflows?: () => Promise } ): Promise { let loadDefaults = true; try { @@ -303,5 +303,39 @@ export async function discoverWorkflowsWithConfig( 'config_load_failed_using_default_workflow_discovery' ); } - return discoverWorkflows(cwd, { ...options, loadDefaults }); + const base = await discoverWorkflows(cwd, { ...options, loadDefaults }); + + if (!options?.getDbWorkflows) return base; + + let dbWorkflows: WorkflowWithSource[] = []; + try { + dbWorkflows = await options.getDbWorkflows(); + } catch (error) { + getLog().warn({ err: error as Error }, 'workflow.db_discovery_failed'); + return { + ...base, + errors: [ + ...base.errors, + { + filename: '', + error: `DB workflow discovery failed: ${(error as Error).message}`, + errorType: 'read_error' as const, + }, + ], + }; + } + + // Merge DB workflows last so they take highest priority. + // PUT /api/workflows/:name saves to DB; if the same workflow also exists on + // the filesystem (e.g., a committed YAML), the DB version wins — ensuring + // the API-managed version is always served. + const merged = new Map(); + for (const entry of base.workflows) { + merged.set(entry.workflow.name, entry); + } + for (const entry of dbWorkflows) { + merged.set(entry.workflow.name, entry); + } + + return { workflows: Array.from(merged.values()), errors: base.errors }; }