Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ import type { DagNode, WorkflowDefinition } from '@/lib/api';

### Database Schema

**8 Tables (all prefixed with `remote_agent_`):**
**9 Tables (all prefixed with `remote_agent_`):**
1. **`codebases`** - Repository metadata and commands (JSONB)
2. **`conversations`** - Track platform conversations with titles and soft-delete support
3. **`sessions`** - Track AI SDK sessions with resume capability
Expand All @@ -384,6 +384,7 @@ import type { DagNode, WorkflowDefinition } from '@/lib/api';
6. **`workflow_events`** - Step-level workflow event log (step transitions, artifacts, errors)
7. **`messages`** - Conversation message history with tool call metadata (JSONB)
8. **`codebase_env_vars`** - Per-project env vars injected into Claude SDK subprocess env (managed via Web UI or `env:` in config)
9. **`workflow_definitions`** - DB-backed workflow definitions (created/imported via API; takes priority over filesystem workflows of the same name)

**Key Patterns:**
- Conversation ID format: Platform-specific (`thread_ts`, `chat_id`, `user/repo#123`)
Expand Down Expand Up @@ -748,9 +749,11 @@ Pattern: Use `classifyIsolationError()` (from `@archon/isolation`) to map git er
**Workflow Management:**
- `GET /api/workflows` - List available workflows; optional `?cwd=`; returns `{ workflows: [...], errors?: [...] }`
- `POST /api/workflows/validate` - Validate a workflow definition in-memory (no save); body: `{ definition: object }`; returns `{ valid: boolean, errors?: string[] }`
- `GET /api/workflows/:name` - Fetch a single workflow by name; optional `?cwd=` query param; returns `{ workflow, filename, source: 'project' | 'bundled' }`
- `PUT /api/workflows/:name` - Save (create or update) a workflow YAML; body: `{ definition: object }`; validates before writing; requires `?cwd=` or registered codebase
- `DELETE /api/workflows/:name` - Delete a user-defined workflow; bundled defaults cannot be deleted
- `GET /api/workflows/:name` - Fetch a single workflow by name; optional `?cwd=` query param; returns `{ workflow, filename, source: 'project' | 'bundled' | 'db' }` (DB-stored workflows take highest priority)
- `PUT /api/workflows/:name` - Save (create or update) a workflow to the database; body: `{ definition: object }`; validates before saving; no `?cwd=` required
- `DELETE /api/workflows/:name` - Delete a DB-stored workflow; bundled defaults and filesystem-only workflows cannot be deleted
- `POST /api/workflows/import` - Import a workflow from raw YAML into the database; body: `{ yaml: string }`; returns `{ workflow, filename, source: 'db' }`
- `GET /api/workflows/:name/export` - Export a workflow as YAML text; tries DB first, then filesystem; returns `text/yaml`

**Workflow Run Lifecycle:**
- `POST /api/workflows/runs/{runId}/resume` - Mark a failed run as ready for auto-resume on next invocation
Expand Down
18 changes: 18 additions & 0 deletions migrations/022_workflow_definitions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- 022: Add workflow definitions table for DB-backed workflow storage
CREATE TABLE IF NOT EXISTS remote_agent_workflow_definitions (
id TEXT PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
description TEXT,
definition TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'user',
codebase_id TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_workflow_definitions_name
ON remote_agent_workflow_definitions (name);

CREATE INDEX IF NOT EXISTS idx_workflow_definitions_codebase_id
ON remote_agent_workflow_definitions (codebase_id)
WHERE codebase_id IS NOT NULL;
22 changes: 21 additions & 1 deletion packages/cli/src/commands/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ import { configureIsolation, getIsolationProvider } from '@archon/isolation';
import { createLogger, getArchonHome } from '@archon/paths';
import { createWorkflowDeps } from '@archon/core/workflows/store-adapter';
import { discoverWorkflowsWithConfig } from '@archon/workflows/workflow-discovery';
import { parseWorkflow } from '@archon/workflows/loader';
import * as workflowDefinitionsDb from '@archon/core/db/workflow-definitions';
import { resolveWorkflowName } from '@archon/workflows/router';
import { executeWorkflow } from '@archon/workflows/executor';
import {
getWorkflowEventEmitter,
type WorkflowEmitterEvent,
} from '@archon/workflows/event-emitter';
import type { WorkflowLoadResult } from '@archon/workflows/schemas/workflow';
import type { WorkflowLoadResult, WorkflowWithSource } from '@archon/workflows/schemas/workflow';
import type { WorkflowRun } from '@archon/workflows/schemas/workflow-run';
import {
approveWorkflow,
Expand Down Expand Up @@ -120,9 +122,27 @@ function renderWorkflowEvent(event: WorkflowEmitterEvent, verbose: boolean): voi
* Returns the WorkflowLoadResult with both workflows and errors.
*/
async function loadWorkflows(cwd: string): Promise<WorkflowLoadResult> {
const getDbWorkflows = async (): Promise<WorkflowWithSource[]> => {
const records = await workflowDefinitionsDb.listWorkflowDefinitions();
const results: WorkflowWithSource[] = [];
for (const record of records) {
const parsed = parseWorkflow(record.definition, `${record.name}.yaml`);
if (parsed.error) {
getLog().warn(
{ name: record.name, err: parsed.error.error },
'workflow.db_record_parse_failed'
);
continue;
}
results.push({ workflow: parsed.workflow, source: 'db' });
}
return results;
};

try {
return await discoverWorkflowsWithConfig(cwd, loadConfig, {
globalSearchPath: getArchonHome(),
getDbWorkflows,
});
} catch (error) {
const err = error as Error;
Expand Down
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"./state/*": "./src/state/*.ts"
},
"scripts": {
"test": "bun test src/clients/codex-binary-guard.test.ts && bun test src/utils/codex-binary-resolver.test.ts && bun test src/utils/codex-binary-resolver-dev.test.ts && bun test src/clients/claude.test.ts src/clients/codex.test.ts src/clients/factory.test.ts && bun test src/handlers/command-handler.test.ts && bun test src/handlers/clone.test.ts && bun test src/db/adapters/postgres.test.ts && bun test src/db/adapters/sqlite.test.ts src/db/codebases.test.ts src/db/connection.test.ts src/db/conversations.test.ts src/db/env-vars.test.ts src/db/isolation-environments.test.ts src/db/messages.test.ts src/db/sessions.test.ts src/db/workflow-events.test.ts src/db/workflows.test.ts src/utils/defaults-copy.test.ts src/utils/worktree-sync.test.ts src/utils/conversation-lock.test.ts src/utils/credential-sanitizer.test.ts src/utils/port-allocation.test.ts src/utils/error.test.ts src/utils/error-formatter.test.ts src/utils/github-graphql.test.ts src/utils/env-allowlist.test.ts src/utils/env-leak-scanner.test.ts src/config/ src/state/ && bun test src/utils/path-validation.test.ts && bun test src/services/cleanup-service.test.ts && bun test src/services/title-generator.test.ts && bun test src/workflows/ && bun test src/operations/workflow-operations.test.ts && bun test src/operations/isolation-operations.test.ts && bun test src/orchestrator/orchestrator.test.ts && bun test src/orchestrator/orchestrator-agent.test.ts && bun test src/orchestrator/orchestrator-isolation.test.ts",
"test": "bun test src/clients/codex-binary-guard.test.ts && bun test src/utils/codex-binary-resolver.test.ts && bun test src/utils/codex-binary-resolver-dev.test.ts && bun test src/clients/claude.test.ts src/clients/codex.test.ts src/clients/factory.test.ts && bun test src/handlers/command-handler.test.ts && bun test src/handlers/clone.test.ts && bun test src/db/adapters/postgres.test.ts && bun test src/db/adapters/sqlite.test.ts src/db/codebases.test.ts src/db/connection.test.ts src/db/conversations.test.ts src/db/env-vars.test.ts src/db/isolation-environments.test.ts src/db/messages.test.ts src/db/sessions.test.ts src/db/workflow-definitions.test.ts src/db/workflow-events.test.ts src/db/workflows.test.ts src/utils/defaults-copy.test.ts src/utils/worktree-sync.test.ts src/utils/conversation-lock.test.ts src/utils/credential-sanitizer.test.ts src/utils/port-allocation.test.ts src/utils/error.test.ts src/utils/error-formatter.test.ts src/utils/github-graphql.test.ts src/utils/env-allowlist.test.ts src/utils/env-leak-scanner.test.ts src/config/ src/state/ && bun test src/utils/path-validation.test.ts && bun test src/services/cleanup-service.test.ts && bun test src/services/title-generator.test.ts && bun test src/workflows/ && bun test src/operations/workflow-operations.test.ts && bun test src/operations/isolation-operations.test.ts && bun test src/orchestrator/orchestrator.test.ts && bun test src/orchestrator/orchestrator-agent.test.ts && bun test src/orchestrator/orchestrator-isolation.test.ts",
"type-check": "bun x tsc --noEmit",
"build": "echo 'No build needed - Bun runs TypeScript directly'"
},
Expand Down
16 changes: 16 additions & 0 deletions packages/core/src/db/adapters/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,18 @@ export class SqliteAdapter implements IDatabase {
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Workflow definitions table (DB-backed workflow storage)
CREATE TABLE IF NOT EXISTS remote_agent_workflow_definitions (
id TEXT PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
description TEXT,
definition TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'user',
codebase_id TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);

-- Indexes
CREATE INDEX IF NOT EXISTS idx_codebase_env_vars_codebase_id ON remote_agent_codebase_env_vars(codebase_id);
CREATE INDEX IF NOT EXISTS idx_conversations_platform ON remote_agent_conversations(platform_type, platform_conversation_id);
Expand All @@ -383,6 +395,10 @@ export class SqliteAdapter implements IDatabase {
CREATE INDEX IF NOT EXISTS idx_sessions_codebase ON remote_agent_sessions(codebase_id);
CREATE INDEX IF NOT EXISTS idx_isolation_env_status ON remote_agent_isolation_environments(status);

CREATE INDEX IF NOT EXISTS idx_workflow_definitions_name ON remote_agent_workflow_definitions(name);
CREATE INDEX IF NOT EXISTS idx_workflow_definitions_codebase_id
ON remote_agent_workflow_definitions(codebase_id) WHERE codebase_id IS NOT NULL;

-- From PG migration 009: staleness detection for running workflows
CREATE INDEX IF NOT EXISTS idx_workflow_runs_last_activity
ON remote_agent_workflow_runs(last_activity_at) WHERE status = 'running';
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export * as codebaseDb from './codebases';
export * as sessionDb from './sessions';
export * as isolationEnvDb from './isolation-environments';
export * as workflowDb from './workflows';
export * as workflowDefinitionsDb from './workflow-definitions';

// Also export individual functions for direct imports
export * from './conversations';
Expand All @@ -27,3 +28,4 @@ export { SessionNotFoundError } from './sessions';
export * from './sessions';
export * from './isolation-environments';
export * from './workflows';
export * from './workflow-definitions';
152 changes: 152 additions & 0 deletions packages/core/src/db/workflow-definitions.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import { mock, describe, test, expect, beforeEach } from 'bun:test';
import { createQueryResult, mockPostgresDialect } from '../test/mocks/database';
import type { WorkflowDefinitionRecord } from './workflow-definitions';

const mockQuery = mock(() => Promise.resolve(createQueryResult([])));

mock.module('./connection', () => ({
pool: { query: mockQuery },
getDialect: () => mockPostgresDialect,
}));

import {
upsertWorkflowDefinition,
getWorkflowDefinition,
listWorkflowDefinitions,
deleteWorkflowDefinition,
} from './workflow-definitions';

const mockRecord: WorkflowDefinitionRecord = {
id: 'uuid-1',
name: 'my-workflow',
description: 'Test workflow',
definition: '{"name":"my-workflow","nodes":[]}',
source: 'user',
codebase_id: null,
created_at: '2026-01-01T00:00:00Z',
updated_at: '2026-01-01T00:00:00Z',
};

describe('workflow-definitions database', () => {
beforeEach(() => {
mockQuery.mockReset();
mockQuery.mockImplementation(() => Promise.resolve(createQueryResult([])));
});

describe('upsertWorkflowDefinition', () => {
test('inserts a new workflow definition and returns the record', async () => {
mockQuery.mockResolvedValueOnce(createQueryResult([mockRecord]));

const result = await upsertWorkflowDefinition({
name: 'my-workflow',
description: 'Test workflow',
definition: '{"name":"my-workflow","nodes":[]}',
});

expect(result.name).toBe('my-workflow');
expect(result.source).toBe('user'); // default
expect(mockQuery).toHaveBeenCalledWith(
expect.stringContaining('ON CONFLICT (name) DO UPDATE SET'),
expect.arrayContaining(['my-workflow'])
);
});

test('uses provided source when specified as imported', async () => {
const importedRecord = { ...mockRecord, source: 'imported' as const };
mockQuery.mockResolvedValueOnce(createQueryResult([importedRecord]));

const result = await upsertWorkflowDefinition({
name: 'my-workflow',
definition: '{"name":"my-workflow","nodes":[]}',
source: 'imported',
});

expect(result.source).toBe('imported');
});

test('throws and logs when DB query fails', async () => {
mockQuery.mockRejectedValueOnce(new Error('DB connection lost'));

await expect(upsertWorkflowDefinition({ name: 'fail', definition: '{}' })).rejects.toThrow(
'DB connection lost'
);
});

test('throws when upsert returns no rows', async () => {
mockQuery.mockResolvedValueOnce(createQueryResult([]));

await expect(
upsertWorkflowDefinition({ name: 'my-workflow', definition: '{}' })
).rejects.toThrow('Upsert returned no rows');
});
});

describe('getWorkflowDefinition', () => {
test('returns null when workflow not found', async () => {
mockQuery.mockResolvedValueOnce(createQueryResult([]));

const result = await getWorkflowDefinition('nonexistent');
expect(result).toBeNull();
});

test('returns the record when found', async () => {
mockQuery.mockResolvedValueOnce(createQueryResult([mockRecord]));

const result = await getWorkflowDefinition('my-workflow');
expect(result?.name).toBe('my-workflow');
expect(result?.source).toBe('user');
});

test('throws and logs when DB query fails', async () => {
mockQuery.mockRejectedValueOnce(new Error('Connection timeout'));

await expect(getWorkflowDefinition('my-workflow')).rejects.toThrow('Connection timeout');
});
});

describe('listWorkflowDefinitions', () => {
test('lists all without WHERE clause when codebaseId is omitted', async () => {
mockQuery.mockResolvedValueOnce(createQueryResult([mockRecord]));

const result = await listWorkflowDefinitions();

expect(result).toHaveLength(1);
expect(result[0]?.name).toBe('my-workflow');
const [sql] = mockQuery.mock.calls[0] as [string, unknown[]];
expect(sql).not.toContain('WHERE');
expect(sql).toContain('ORDER BY name ASC');
});

test('applies codebaseId filter when provided', async () => {
mockQuery.mockResolvedValueOnce(createQueryResult([]));

await listWorkflowDefinitions('codebase-1');

const [sql, params] = mockQuery.mock.calls[0] as [string, unknown[]];
expect(sql).toContain('WHERE (codebase_id = $1 OR codebase_id IS NULL)');
expect(params).toContain('codebase-1');
});

test('throws and logs when DB query fails', async () => {
mockQuery.mockRejectedValueOnce(new Error('Pool exhausted'));

await expect(listWorkflowDefinitions()).rejects.toThrow('Pool exhausted');
});
});

describe('deleteWorkflowDefinition', () => {
test('returns false when no rows were deleted', async () => {
mockQuery.mockResolvedValueOnce(createQueryResult([], 0));

const result = await deleteWorkflowDefinition('nonexistent');
expect(result).toBe(false);
});

test('returns true when row was deleted', async () => {
mockQuery.mockResolvedValueOnce(createQueryResult([], 1));

const result = await deleteWorkflowDefinition('my-workflow');
expect(result).toBe(true);
});
});
});
Loading
Loading