Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .archon/workflows/maintainer/maintainer-review-pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ interactive: true # Required for the decline-approval gate

worktree:
enabled: false # Live checkout — needs to read .archon/maintainer-standup/
mutates_checkout: false # Read-only + per-run artifact writes; concurrent runs safe

nodes:
# ═══════════════════════════════════════════════════════════════
Expand Down
37 changes: 37 additions & 0 deletions packages/workflows/src/executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,43 @@ describe('executeWorkflow', () => {
expect(sentMessage).toContain('--branch');
});

it('skips path-lock check when mutates_checkout is false', async () => {
const getActiveSpy = mock(async () =>
makeRun({ id: 'other-run', status: 'running' as const })
);
const store = makeStore({ getActiveWorkflowRunByPath: getActiveSpy });
const deps = makeDeps(store);
const result = await executeWorkflow(
deps,
makePlatform(),
'conv-1',
'/tmp',
makeWorkflow({ mutates_checkout: false }),
'test message',
'db-conv-1'
);
// Guard skipped: spy never called, run succeeds
expect(getActiveSpy).not.toHaveBeenCalled();
expect(result.workflowRunId).toBe('run-123');
});

it('still enforces path lock when mutates_checkout is true', async () => {
const otherRun = makeRun({ id: 'other-run-456', status: 'running' as const });
const store = makeStore({ getActiveWorkflowRunByPath: mock(async () => otherRun) });
const deps = makeDeps(store);
const result = await executeWorkflow(
deps,
makePlatform(),
'conv-1',
'/tmp',
makeWorkflow({ mutates_checkout: true }),
'test message',
'db-conv-1'
);
expect(result.success).toBe(false);
expect(result.error).toContain('already active');
});

it('still returns failure when guard self-cancel update throws (best-effort)', async () => {
const selfRun = makeRun({ id: 'self-run', status: 'pending' });
const otherRun = makeRun({ id: 'other-run', status: 'running' });
Expand Down
145 changes: 75 additions & 70 deletions packages/workflows/src/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -477,92 +477,97 @@ export async function executeWorkflow(

// Path-lock guard: ensure no other workflow run holds this working_path.
//
// Skipped when `workflow.mutates_checkout` is false — the author asserts
// that concurrent runs will not race (e.g. all writes are per-run-scoped).
//
// Runs after workflowRun is finalized (pre-created, resumed, or freshly
// created) so we always have self-ID + started_at for the deterministic
// older-wins tiebreaker. The query treats `pending` rows older than 5 min
// as orphaned, so leaks from crashed dispatches or resume orphans don't
// permanently block the path.
try {
const activeWorkflow = await deps.store.getActiveWorkflowRunByPath(cwd, {
id: workflowRun.id,
startedAt: new Date(parseDbTimestamp(workflowRun.started_at)),
});
if (activeWorkflow) {
// The lock query found another active row that wins the older-wins
// tiebreaker. Mark our own row terminal so it falls out of the
// active set immediately — without this, our row sits as
// pending/running and blocks the path until the 5-min stale window
// (or never, if we'd already promoted it to running via resume).
if (workflow.mutates_checkout !== false) {
try {
const activeWorkflow = await deps.store.getActiveWorkflowRunByPath(cwd, {
id: workflowRun.id,
startedAt: new Date(parseDbTimestamp(workflowRun.started_at)),
});
if (activeWorkflow) {
// The lock query found another active row that wins the older-wins
// tiebreaker. Mark our own row terminal so it falls out of the
// active set immediately — without this, our row sits as
// pending/running and blocks the path until the 5-min stale window
// (or never, if we'd already promoted it to running via resume).
await deps.store
.updateWorkflowRun(workflowRun.id, { status: 'cancelled' })
.catch((cleanupErr: Error) => {
getLog().warn(
{ err: cleanupErr, workflowRunId: workflowRun?.id, cwd },
'workflow.guard_self_cancel_failed'
);
});

const elapsedMs = Date.now() - parseDbTimestamp(activeWorkflow.started_at);
const duration = formatDuration(elapsedMs);
const shortId = activeWorkflow.id.slice(0, 8);

// Status-aware copy. The lock query returns running, paused, and
// fresh-pending rows — telling the user to "wait for it to finish"
// is wrong for `paused` (waiting on user action via approve/reject).
let stateLine: string;
let actionLines: string;
if (activeWorkflow.status === 'paused') {
stateLine = `paused waiting for user input (${duration} since started, run \`${shortId}\`)`;
actionLines =
`• Approve it: \`/workflow approve ${shortId}\`\n` +
`• Reject it: \`/workflow reject ${shortId}\`\n` +
`• Cancel it: \`/workflow cancel ${shortId}\`\n` +
'• Use a different branch: `--branch <other>`';
} else {
const verb = activeWorkflow.status === 'pending' ? 'starting' : 'running';
stateLine = `${verb} ${duration}, run \`${shortId}\``;
actionLines =
'• Wait for it to finish: `/workflow status`\n' +
`• Cancel it: \`/workflow cancel ${shortId}\`\n` +
'• Use a different branch: `--branch <other>`';
}
await sendCriticalMessage(
platform,
conversationId,
`❌ **This worktree is in use** by \`${activeWorkflow.workflow_name}\` ` +
`(${stateLine}).\n${actionLines}`
);
return {
success: false,
error: `Workflow already active on this path (${activeWorkflow.status}): ${activeWorkflow.workflow_name}`,
};
}
} catch (error) {
const err = error as Error;
getLog().error(
{ err, conversationId, cwd, pendingRunId: workflowRun.id },
'db_active_workflow_check_failed'
);
// Release the lock token. workflowRun is finalized at this point
// (pre-created or resumed or freshly created) and would otherwise sit
// as pending/running, blocking the path. For pending the 5-min stale
// window would clear it eventually; for a row already promoted to
// running (e.g., resumed), nothing would clear it without manual
// intervention.
await deps.store
.updateWorkflowRun(workflowRun.id, { status: 'cancelled' })
.catch((cleanupErr: Error) => {
getLog().warn(
{ err: cleanupErr, workflowRunId: workflowRun?.id, cwd },
'workflow.guard_self_cancel_failed'
{ err: cleanupErr, workflowRunId: workflowRun?.id },
'workflow.guard_query_failure_cleanup_failed'
);
});

const elapsedMs = Date.now() - parseDbTimestamp(activeWorkflow.started_at);
const duration = formatDuration(elapsedMs);
const shortId = activeWorkflow.id.slice(0, 8);

// Status-aware copy. The lock query returns running, paused, and
// fresh-pending rows — telling the user to "wait for it to finish"
// is wrong for `paused` (waiting on user action via approve/reject).
let stateLine: string;
let actionLines: string;
if (activeWorkflow.status === 'paused') {
stateLine = `paused waiting for user input (${duration} since started, run \`${shortId}\`)`;
actionLines =
`• Approve it: \`/workflow approve ${shortId}\`\n` +
`• Reject it: \`/workflow reject ${shortId}\`\n` +
`• Cancel it: \`/workflow cancel ${shortId}\`\n` +
'• Use a different branch: `--branch <other>`';
} else {
const verb = activeWorkflow.status === 'pending' ? 'starting' : 'running';
stateLine = `${verb} ${duration}, run \`${shortId}\``;
actionLines =
'• Wait for it to finish: `/workflow status`\n' +
`• Cancel it: \`/workflow cancel ${shortId}\`\n` +
'• Use a different branch: `--branch <other>`';
}
await sendCriticalMessage(
platform,
conversationId,
`❌ **This worktree is in use** by \`${activeWorkflow.workflow_name}\` ` +
`(${stateLine}).\n${actionLines}`
'❌ **Workflow blocked**: Unable to verify if another workflow is running (database error). Please try again in a moment.'
);
return {
success: false,
error: `Workflow already active on this path (${activeWorkflow.status}): ${activeWorkflow.workflow_name}`,
};
return { success: false, error: 'Database error checking for active workflow' };
}
} catch (error) {
const err = error as Error;
getLog().error(
{ err, conversationId, cwd, pendingRunId: workflowRun.id },
'db_active_workflow_check_failed'
);
// Release the lock token. workflowRun is finalized at this point
// (pre-created or resumed or freshly created) and would otherwise sit
// as pending/running, blocking the path. For pending the 5-min stale
// window would clear it eventually; for a row already promoted to
// running (e.g., resumed), nothing would clear it without manual
// intervention.
await deps.store
.updateWorkflowRun(workflowRun.id, { status: 'cancelled' })
.catch((cleanupErr: Error) => {
getLog().warn(
{ err: cleanupErr, workflowRunId: workflowRun?.id },
'workflow.guard_query_failure_cleanup_failed'
);
});
await sendCriticalMessage(
platform,
conversationId,
'❌ **Workflow blocked**: Unable to verify if another workflow is running (database error). Please try again in a moment.'
);
return { success: false, error: 'Database error checking for active workflow' };
}

// Resolve external artifact and log directories
Expand Down
38 changes: 38 additions & 0 deletions packages/workflows/src/loader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,44 @@ describe('Workflow Loader', () => {
expect(result.workflows[0].workflow.tags).toBeUndefined();
});

it('should parse mutates_checkout: false correctly', async () => {
const workflowDir = join(testDir, '.archon', 'workflows');
await mkdir(workflowDir, { recursive: true });
const yaml = `name: test\ndescription: read-only workflow\nmutates_checkout: false\nnodes:\n - id: n\n prompt: p\n`;
await writeFile(join(workflowDir, 'test.yaml'), yaml);
const result = await discoverWorkflows(testDir, { loadDefaults: false });
expect(result.workflows[0].workflow.mutates_checkout).toBe(false);
});

it('should parse mutates_checkout: true correctly', async () => {
const workflowDir = join(testDir, '.archon', 'workflows');
await mkdir(workflowDir, { recursive: true });
const yaml = `name: test\ndescription: explicit true\nmutates_checkout: true\nnodes:\n - id: n\n prompt: p\n`;
await writeFile(join(workflowDir, 'test.yaml'), yaml);
const result = await discoverWorkflows(testDir, { loadDefaults: false });
expect(result.workflows[0].workflow.mutates_checkout).toBe(true);
});

it('should omit mutates_checkout when not set', async () => {
const workflowDir = join(testDir, '.archon', 'workflows');
await mkdir(workflowDir, { recursive: true });
const yaml = `name: test\ndescription: no field\nnodes:\n - id: n\n prompt: p\n`;
await writeFile(join(workflowDir, 'test.yaml'), yaml);
const result = await discoverWorkflows(testDir, { loadDefaults: false });
expect(result.workflows[0].workflow.mutates_checkout).toBeUndefined();
});

it('should warn and omit mutates_checkout for invalid value', async () => {
const workflowDir = join(testDir, '.archon', 'workflows');
await mkdir(workflowDir, { recursive: true });
// YAML string "yes" is not a boolean — should be dropped and field omitted
const yaml = `name: test\ndescription: typo\nmutates_checkout: "yes"\nnodes:\n - id: n\n prompt: p\n`;
await writeFile(join(workflowDir, 'test.yaml'), yaml);
const result = await discoverWorkflows(testDir, { loadDefaults: false });
expect(result.workflows).toHaveLength(1);
expect(result.workflows[0].workflow.mutates_checkout).toBeUndefined();
});

it('should parse valid DAG workflow YAML', async () => {
const workflowDir = join(testDir, '.archon', 'workflows');
await mkdir(workflowDir, { recursive: true });
Expand Down
45 changes: 30 additions & 15 deletions packages/workflows/src/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,27 @@ function parseDagNode(raw: unknown, index: number, errors: string[]): DagNode |
const node = result.data;

// Warn about AI-specific fields on non-AI nodes (runtime behavior, not schema errors)
let nodeType: string | undefined;
let aiFields: readonly string[] | undefined;
let nonAiNode: { type: string; fields: readonly string[] } | undefined;
if (isCancelNode(node)) {
nodeType = 'cancel';
aiFields = BASH_NODE_AI_FIELDS;
nonAiNode = { type: 'cancel', fields: BASH_NODE_AI_FIELDS };
} else if (isApprovalNode(node)) {
nodeType = 'approval';
aiFields = BASH_NODE_AI_FIELDS;
nonAiNode = { type: 'approval', fields: BASH_NODE_AI_FIELDS };
} else if (isLoopNode(node)) {
nodeType = 'loop';
aiFields = LOOP_NODE_AI_FIELDS;
nonAiNode = { type: 'loop', fields: LOOP_NODE_AI_FIELDS };
} else if (isScriptNode(node)) {
nodeType = 'script';
aiFields = SCRIPT_NODE_AI_FIELDS;
nonAiNode = { type: 'script', fields: SCRIPT_NODE_AI_FIELDS };
} else if ('bash' in node && typeof node.bash === 'string') {
nodeType = 'bash';
aiFields = BASH_NODE_AI_FIELDS;
nonAiNode = { type: 'bash', fields: BASH_NODE_AI_FIELDS };
}
if (nodeType !== undefined && aiFields !== undefined) {
const presentAiFields = aiFields.filter(f => (raw as Record<string, unknown>)[f] !== undefined);
if (nonAiNode) {
const presentAiFields = nonAiNode.fields.filter(
f => (raw as Record<string, unknown>)[f] !== undefined
);
if (presentAiFields.length > 0) {
getLog().warn({ id: node.id, fields: presentAiFields }, `${nodeType}_node_ai_fields_ignored`);
getLog().warn(
{ id: node.id, fields: presentAiFields },
`${nonAiNode.type}_node_ai_fields_ignored`
);
}
}

Expand Down Expand Up @@ -361,6 +360,21 @@ export function parseWorkflow(content: string, filename: string): ParseResult {
}
}

// Parse mutates_checkout — boolean, omitted means true (run the path-lock guard).
// Same parse/warn pattern as `interactive` (invalid non-boolean values are dropped).
// When false, the executor skips the path-lock guard and allows concurrent runs on the same checkout.
let mutatesCheckout: boolean | undefined;
if (raw.mutates_checkout !== undefined) {
if (typeof raw.mutates_checkout === 'boolean') {
mutatesCheckout = raw.mutates_checkout;
} else {
getLog().warn(
{ filename, value: raw.mutates_checkout },
'invalid_mutates_checkout_value_ignored'
);
}
}

// Parse optional tags — type-narrow, trim, and dedupe so authors can't
// ship ["GitLab", "GitLab ", "gitlab"] as three distinct values.
// An explicit empty array is preserved (suppresses keyword inference in the
Expand Down Expand Up @@ -390,6 +404,7 @@ export function parseWorkflow(content: string, filename: string): ParseResult {
webSearchMode,
additionalDirectories,
interactive,
...(mutatesCheckout !== undefined ? { mutates_checkout: mutatesCheckout } : {}),
nodes: dagNodes,
...(worktreePolicy ? { worktree: worktreePolicy } : {}),
...(tags !== undefined ? { tags } : {}),
Expand Down
7 changes: 7 additions & 0 deletions packages/workflows/src/schemas/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ export const workflowBaseSchema = z.object({
betas: z.array(z.string().min(1)).nonempty("'betas' must be a non-empty array").optional(),
sandbox: sandboxSettingsSchema.optional(),
worktree: workflowWorktreePolicySchema.optional(),
/**
* When `false`, the engine skips the path-exclusive lock for this workflow,
* allowing N concurrent runs on the same live checkout. The author asserts
* that concurrent runs will not race (e.g. all writes are per-run-scoped).
* Defaults to `true` (safe: serialize runs on the same path).
*/
mutates_checkout: z.boolean().optional(),
tags: z.array(z.string().min(1)).optional(),
});

Expand Down
Loading