Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 87 additions & 1 deletion packages/core/src/db/messages.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,30 @@ import { createQueryResult, mockPostgresDialect } from '../test/mocks/database';
import type { MessageRow } from './messages';

const mockQuery = mock(() => Promise.resolve(createQueryResult([])));
const mockGetDatabaseType = mock(() => 'postgresql' as const);

// Mock the connection module before importing the module under test
mock.module('./connection', () => ({
pool: {
query: mockQuery,
},
getDialect: () => mockPostgresDialect,
getDatabaseType: mockGetDatabaseType,
}));

import { addMessage, listMessages } from './messages';
// Mock @archon/paths to avoid lazy logger initialization issues in tests
mock.module('@archon/paths', () => ({
createLogger: mock(() => ({
fatal: mock(() => undefined),
error: mock(() => undefined),
warn: mock(() => undefined),
info: mock(() => undefined),
debug: mock(() => undefined),
trace: mock(() => undefined),
})),
}));

import { addMessage, listMessages, getRecentWorkflowResultMessages } from './messages';

describe('messages', () => {
beforeEach(() => {
Expand Down Expand Up @@ -121,4 +135,76 @@ describe('messages', () => {
expect(mockQuery).toHaveBeenCalledWith(expect.any(String), ['conv-456', 50]);
});
});

describe('getRecentWorkflowResultMessages', () => {
beforeEach(() => {
mockGetDatabaseType.mockClear();
});

test('uses PostgreSQL JSON extraction syntax when dbType is postgresql', async () => {
mockGetDatabaseType.mockReturnValueOnce('postgresql');
mockQuery.mockResolvedValueOnce(createQueryResult([]));

await getRecentWorkflowResultMessages('conv-1');

const sql = mockQuery.mock.calls[0]?.[0] as string;
expect(sql).toContain("metadata->>'workflowResult'");
expect(sql).not.toContain('json_extract');
});

test('uses SQLite JSON extraction syntax when dbType is sqlite', async () => {
mockGetDatabaseType.mockReturnValueOnce('sqlite');
mockQuery.mockResolvedValueOnce(createQueryResult([]));

await getRecentWorkflowResultMessages('conv-1');

const sql = mockQuery.mock.calls[0]?.[0] as string;
expect(sql).toContain("json_extract(metadata, '$.workflowResult')");
expect(sql).not.toContain("->>'" + 'workflowResult');
});

test('passes correct parameters: conversationId and limit', async () => {
mockGetDatabaseType.mockReturnValueOnce('postgresql');
mockQuery.mockResolvedValueOnce(createQueryResult([]));

await getRecentWorkflowResultMessages('conv-42', 5);

expect(mockQuery).toHaveBeenCalledWith(expect.any(String), ['conv-42', 5]);
});

test('default limit is 3', async () => {
mockGetDatabaseType.mockReturnValueOnce('postgresql');
mockQuery.mockResolvedValueOnce(createQueryResult([]));

await getRecentWorkflowResultMessages('conv-1');

expect(mockQuery).toHaveBeenCalledWith(expect.any(String), ['conv-1', 3]);
});

test('returns empty array on query error (non-throwing contract)', async () => {
mockGetDatabaseType.mockReturnValueOnce('postgresql');
mockQuery.mockRejectedValueOnce(new Error('connection refused'));

const result = await getRecentWorkflowResultMessages('conv-1');

expect(result).toEqual([]);
});

test('returns rows from successful query', async () => {
const row: MessageRow = {
id: 'msg-1',
conversation_id: 'conv-1',
role: 'assistant',
content: 'Workflow summary here.',
metadata: '{"workflowResult":{"workflowName":"plan","runId":"run-1"}}',
created_at: '2026-01-01T00:00:00Z',
};
mockGetDatabaseType.mockReturnValueOnce('postgresql');
mockQuery.mockResolvedValueOnce(createQueryResult([row]));

const result = await getRecentWorkflowResultMessages('conv-1');

expect(result).toEqual([row]);
});
});
});
37 changes: 34 additions & 3 deletions packages/core/src/db/messages.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* Database operations for conversation messages (Web UI history)
* Database operations for conversation messages (Web UI history and orchestrator prompt enrichment)
*/
import { pool, getDialect } from './connection';
import { pool, getDialect, getDatabaseType } from './connection';
import { createLogger } from '@archon/paths';

/** Lazy-initialized logger (deferred so test mocks can intercept createLogger) */
Expand All @@ -16,7 +16,7 @@ export interface MessageRow {
conversation_id: string;
role: 'user' | 'assistant';
content: string;
metadata: string; // JSON string - parsed by frontend
metadata: string; // JSON string - parsed by frontend and server-side (orchestrator prompt enrichment)
created_at: string;
}

Expand Down Expand Up @@ -64,3 +64,34 @@ export async function listMessages(
);
return result.rows;
}

/**
* Get recent messages with workflowResult metadata for a conversation.
* Used to inject workflow context into the orchestrator prompt.
* Non-throwing — returns empty array on error.
*/
export async function getRecentWorkflowResultMessages(
conversationId: string,
limit = 3
): Promise<readonly MessageRow[]> {
const dbType = getDatabaseType();
const metadataFilter =
dbType === 'postgresql'
? "(metadata->>'workflowResult') IS NOT NULL"
: "json_extract(metadata, '$.workflowResult') IS NOT NULL";
try {
const result = await pool.query<Pick<MessageRow, 'id' | 'content' | 'metadata'>>(
`SELECT id, content, metadata FROM remote_agent_messages
WHERE conversation_id = $1
AND ${metadataFilter}
ORDER BY created_at DESC
LIMIT $2`,
[conversationId, limit]
);
return result.rows as MessageRow[];
} catch (error) {
const err = error as Error;
getLog().warn({ err, conversationId }, 'db.workflow_result_messages_query_failed');
return [];
}
}
83 changes: 83 additions & 0 deletions packages/core/src/orchestrator/orchestrator-agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ mock.module('./orchestrator', () => ({
mock.module('./prompt-builder', () => ({
buildOrchestratorPrompt: mock(() => 'orchestrator system prompt'),
buildProjectScopedPrompt: mock(() => 'project scoped system prompt'),
formatWorkflowContextSection: mock((results: unknown[]) =>
results.length > 0 ? '## Recent Workflow Results\n\n...' : ''
),
}));

const mockGetRecentWorkflowResultMessages = mock(() => Promise.resolve([]));
mock.module('../db/messages', () => ({
addMessage: mock(() => Promise.resolve()),
listMessages: mock(() => Promise.resolve([])),
getRecentWorkflowResultMessages: mockGetRecentWorkflowResultMessages,
}));

mock.module('@archon/isolation', () => ({
Expand Down Expand Up @@ -1407,3 +1417,76 @@ describe('discoverAllWorkflows — merge repo workflows over global', () => {
expect(mockDiscoverWorkflowsWithConfig).toHaveBeenCalledTimes(2);
});
});

// ─── handleMessage — workflow context injection ───────────────────────────────

describe('handleMessage — workflow context injection', () => {
beforeEach(() => {
mockGetRecentWorkflowResultMessages.mockClear();
mockGetOrCreateConversation.mockReset();
mockListCodebases.mockReset();
mockDiscoverWorkflowsWithConfig.mockReset();
mockLogger.warn.mockClear();

mockGetOrCreateConversation.mockImplementation(() => Promise.resolve(makeConversation()));
mockListCodebases.mockImplementation(() => Promise.resolve([]));
mockDiscoverWorkflowsWithConfig.mockImplementation(() =>
Promise.resolve({ workflows: [], errors: [] })
);
mockGetRecentWorkflowResultMessages.mockImplementation(() => Promise.resolve([]));
});

test('calls getRecentWorkflowResultMessages for the conversation', async () => {
const platform = makePlatform();
await handleMessage(platform, 'conv-1', 'What happened?');

expect(mockGetRecentWorkflowResultMessages).toHaveBeenCalledWith('conv-1', 3);
});

test('does not throw when getRecentWorkflowResultMessages returns empty array', async () => {
mockGetRecentWorkflowResultMessages.mockResolvedValueOnce([]);
const platform = makePlatform();

await expect(handleMessage(platform, 'conv-1', 'Hello')).resolves.toBeUndefined();
});

test('handles malformed metadata JSON without throwing', async () => {
const badRow = {
id: 'msg-1',
conversation_id: 'conv-1',
role: 'assistant' as const,
content: 'Summary.',
metadata: 'not-valid-json',
created_at: '2026-01-01T00:00:00Z',
};
mockGetRecentWorkflowResultMessages.mockResolvedValueOnce([badRow]);
const platform = makePlatform();

await expect(
handleMessage(platform, 'conv-1', 'What did the workflow do?')
).resolves.toBeUndefined();
});

test('handles metadata with missing workflowResult key gracefully', async () => {
const rowNoWorkflowResult = {
id: 'msg-2',
conversation_id: 'conv-1',
role: 'assistant' as const,
content: 'Summary.',
metadata: '{"someOtherKey":"value"}',
created_at: '2026-01-01T00:00:00Z',
};
mockGetRecentWorkflowResultMessages.mockResolvedValueOnce([rowNoWorkflowResult]);
const platform = makePlatform();

await expect(handleMessage(platform, 'conv-1', 'Follow-up')).resolves.toBeUndefined();
});

test('continues without workflow context when outer fetch throws', async () => {
mockGetRecentWorkflowResultMessages.mockRejectedValueOnce(new Error('unexpected'));
const platform = makePlatform();

// Non-critical path — must not block message handling
await expect(handleMessage(platform, 'conv-1', 'Hello')).resolves.toBeUndefined();
});
});
64 changes: 60 additions & 4 deletions packages/core/src/orchestrator/orchestrator-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ import type { MergedConfig } from '../config/config-types';
import { generateAndSetTitle } from '../services/title-generator';
import { validateAndResolveIsolation, dispatchBackgroundWorkflow } from './orchestrator';
import { IsolationBlockedError } from '@archon/isolation';
import { buildOrchestratorPrompt, buildProjectScopedPrompt } from './prompt-builder';
import {
buildOrchestratorPrompt,
buildProjectScopedPrompt,
formatWorkflowContextSection,
} from './prompt-builder';
import type { WorkflowResultContext } from './prompt-builder';
import * as messageDb from '../db/messages';
import * as workflowDb from '../db/workflows';
import * as workflowEventDb from '../db/workflow-events';
import type { ApprovalContext } from '@archon/workflows/schemas/workflow-run';
Expand Down Expand Up @@ -451,7 +457,8 @@ function buildFullPrompt(
message: string,
issueContext: string | undefined,
threadContext: string | undefined,
attachedFiles?: AttachedFile[]
attachedFiles?: AttachedFile[],
workflowContext?: string
): string {
const scopedCodebase = conversation.codebase_id
? codebases.find(c => c.id === conversation.codebase_id)
Expand All @@ -471,19 +478,29 @@ function buildFullPrompt(
.join('\n')
: '';

const workflowContextSuffix = workflowContext ? '\n\n---\n\n' + workflowContext : '';

if (threadContext) {
return (
systemPrompt +
'\n\n---\n\n## Thread Context (previous messages)\n\n' +
threadContext +
workflowContextSuffix +
'\n\n---\n\n## Current Request\n\n' +
message +
contextSuffix +
fileSuffix
);
}

return systemPrompt + '\n\n---\n\n## User Message\n\n' + message + contextSuffix + fileSuffix;
return (
systemPrompt +
workflowContextSuffix +
'\n\n---\n\n## User Message\n\n' +
message +
contextSuffix +
fileSuffix
);
}

// ─── Main Handler ───────────────────────────────────────────────────────────
Expand Down Expand Up @@ -731,14 +748,53 @@ export async function handleMessage(
});
}

// Build workflow context for follow-up awareness
let workflowContext: string | undefined;
try {
const recentResultMessages = await messageDb.getRecentWorkflowResultMessages(
conversation.id,
3
);
if (recentResultMessages.length > 0) {
const workflowResults: WorkflowResultContext[] = recentResultMessages.map(msg => {
let workflowName = 'unknown';
let runId = 'unknown';
try {
const parsed =
typeof msg.metadata === 'string' ? JSON.parse(msg.metadata) : msg.metadata;
const meta = parsed as {
workflowResult?: { workflowName?: string; runId?: string };
};
workflowName = meta.workflowResult?.workflowName ?? 'unknown';
runId = meta.workflowResult?.runId ?? 'unknown';
} catch (metaErr) {
// Malformed metadata — use defaults
getLog().warn(
{ err: metaErr as Error, conversationId, messageId: msg.id },
'orchestrator.workflow_result_metadata_parse_failed'
);
}
return { workflowName, runId, summary: msg.content };
});
workflowContext = formatWorkflowContextSection(workflowResults);
}
} catch (error) {
getLog().warn(
{ err: error as Error, conversationId },
'orchestrator.workflow_context_fetch_failed'
);
// Non-critical — continue without context
}

const fullPrompt = buildFullPrompt(
conversation,
codebases,
workflows,
message,
issueContext,
threadContext,
attachedFiles
attachedFiles,
workflowContext
);
const cwd = getArchonWorkspacesPath();

Expand Down
Loading
Loading