Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions packages/workflows/src/dag-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6079,3 +6079,161 @@ describe('shouldContinueStreamingForStatus', () => {
expect(shouldContinueStreamingForStatus('invalid-status')).toBe(false);
});
});

describe('executeDagWorkflow -- final status derivation', () => {
// Invariant: if ANY non-skipped node has failed status, the run must be
// marked 'failed' — never 'completed' — regardless of how many other nodes
// succeeded. This covers the anyFailed branch in executeDagWorkflow
// (dag-executor.ts ~line 2956), which had no direct test coverage.
let testDir: string;

beforeEach(async () => {
testDir = join(
tmpdir(),
`dag-status-test-${Date.now()}-${Math.random().toString(36).slice(2)}`
);
await mkdir(testDir, { recursive: true });

mockSendQueryDag.mockClear();
mockGetAgentProviderDag.mockClear();
mockSendQueryDag.mockImplementation(function* () {
yield { type: 'assistant', content: 'DAG AI response' };
yield { type: 'result', sessionId: 'dag-session-id' };
});
mockGetAgentProviderDag.mockImplementation(() => ({
sendQuery: mockSendQueryDag,
getType: () => 'claude',
getCapabilities: mockClaudeCapabilities,
}));
});

afterEach(async () => {
try {
await rm(testDir, { recursive: true, force: true });
} catch {
// ignore cleanup errors
}
});

it('one success + one independent failure -> failWorkflowRun, not completeWorkflowRun', async () => {
const mockStore = createMockStore();
const mockDeps = createMockDeps(mockStore);
const platform = createMockPlatform();
const workflowRun = makeWorkflowRun('dag-status-run-1');

const nodes: DagNode[] = [
{ id: 'pass', bash: 'echo ok' } as BashNode,
{ id: 'fail', bash: 'exit 1' } as BashNode,
];

await executeDagWorkflow(
mockDeps,
platform,
'conv-status',
testDir,
{ name: 'status-test', nodes },
workflowRun,
'claude',
undefined,
join(testDir, 'artifacts'),
join(testDir, 'logs'),
'main',
'docs/',
minimalConfig
);

expect((mockStore.failWorkflowRun as ReturnType<typeof mock>).mock.calls.length).toBe(1);
expect((mockStore.completeWorkflowRun as ReturnType<typeof mock>).mock.calls.length).toBe(0);
expect(mockStore.failWorkflowRun).toHaveBeenCalledWith(
expect.anything(),
expect.stringContaining('fail')
);

// Confirm the failure message names the failing node
const sendMessage = platform.sendMessage as ReturnType<typeof mock>;
const messages = sendMessage.mock.calls.map((call: unknown[]) => call[1] as string);
const failMsg = messages.find((m: string) => m.includes('completed with failures'));
expect(failMsg).toBeDefined();
});

it('multiple successes + one failure -> failWorkflowRun, not completeWorkflowRun', async () => {
const mockStore = createMockStore();
const mockDeps = createMockDeps(mockStore);
const platform = createMockPlatform();
const workflowRun = makeWorkflowRun('dag-status-run-2');

const nodes: DagNode[] = [
{ id: 'a', bash: 'echo a' } as BashNode,
{ id: 'b', bash: 'echo b' } as BashNode,
{ id: 'c', bash: 'echo c' } as BashNode,
{ id: 'fail', bash: 'exit 1' } as BashNode,
];

await executeDagWorkflow(
mockDeps,
platform,
'conv-status',
testDir,
{ name: 'status-test-multi', nodes },
workflowRun,
'claude',
undefined,
join(testDir, 'artifacts'),
join(testDir, 'logs'),
'main',
'docs/',
minimalConfig
);

expect((mockStore.failWorkflowRun as ReturnType<typeof mock>).mock.calls.length).toBe(1);
expect((mockStore.completeWorkflowRun as ReturnType<typeof mock>).mock.calls.length).toBe(0);
expect(mockStore.failWorkflowRun).toHaveBeenCalledWith(
expect.anything(),
expect.stringContaining('fail')
);

const sendMessage = platform.sendMessage as ReturnType<typeof mock>;
const messages = sendMessage.mock.calls.map((call: unknown[]) => call[1] as string);
const failMsg = messages.find((m: string) => m.includes('completed with failures'));
expect(failMsg).toBeDefined();
});

it('trigger_rule: none_failed skips dependent node + anyFailed still marks run failed', async () => {
const mockStore = createMockStore();
const mockDeps = createMockDeps(mockStore);
const platform = createMockPlatform();
const workflowRun = makeWorkflowRun('dag-status-run-3');

// Layer 1: A and B run in parallel. B fails.
// Layer 2: C depends on B with trigger_rule: none_failed — so C is skipped.
// Expected: anyFailed=true (from B), so run must be marked failed even though C is only skipped.
const nodes: DagNode[] = [
{ id: 'a', bash: 'echo a' } as BashNode,
{ id: 'b', bash: 'exit 1' } as BashNode,
{ id: 'c', bash: 'echo c', depends_on: ['b'], trigger_rule: 'none_failed' } as BashNode,
];

await executeDagWorkflow(
mockDeps,
platform,
'conv-status',
testDir,
{ name: 'status-test-skip', nodes },
workflowRun,
'claude',
undefined,
join(testDir, 'artifacts'),
join(testDir, 'logs'),
'main',
'docs/',
minimalConfig
);

expect((mockStore.failWorkflowRun as ReturnType<typeof mock>).mock.calls.length).toBe(1);
expect((mockStore.completeWorkflowRun as ReturnType<typeof mock>).mock.calls.length).toBe(0);
expect(mockStore.failWorkflowRun).toHaveBeenCalledWith(
expect.anything(),
expect.stringContaining('b')
);
});
});
Loading