diff --git a/assistant/src/__tests__/channel-approval-routes.test.ts b/assistant/src/__tests__/channel-approval-routes.test.ts index 389fc832077..d7ab72d12f9 100644 --- a/assistant/src/__tests__/channel-approval-routes.test.ts +++ b/assistant/src/__tests__/channel-approval-routes.test.ts @@ -48,6 +48,7 @@ import { } from '../memory/runs-store.js'; import type { PendingConfirmation } from '../memory/runs-store.js'; import * as channelDeliveryStore from '../memory/channel-delivery-store.js'; +import * as conversationStore from '../memory/conversation-store.js'; import type { RunOrchestrator } from '../runtime/run-orchestrator.js'; import { handleChannelInbound, isChannelApprovalsEnabled } from '../runtime/routes/channel-routes.js'; import * as gatewayClient from '../runtime/gateway-client.js'; @@ -80,6 +81,7 @@ function resetTables(): void { const db = getDb(); db.run('DELETE FROM message_runs'); db.run('DELETE FROM channel_inbound_events'); + db.run('DELETE FROM messages'); db.run('DELETE FROM conversations'); } @@ -701,9 +703,10 @@ describe('terminal state check before markProcessed', () => { process.env.CHANNEL_APPROVALS_ENABLED = 'true'; }); - test('does NOT markProcessed when run is not in terminal state after poll timeout', async () => { + test('markProcessed IS called even when run is not in terminal state after poll timeout', async () => { const linkSpy = spyOn(channelDeliveryStore, 'linkMessage').mockImplementation(() => {}); const markSpy = spyOn(channelDeliveryStore, 'markProcessed'); + const failureSpy = spyOn(channelDeliveryStore, 'recordProcessingFailure').mockImplementation(() => {}); const deliverSpy = spyOn(gatewayClient, 'deliverChannelReply').mockResolvedValue(undefined); const mockRun = { @@ -721,18 +724,11 @@ describe('terminal state check before markProcessed', () => { updatedAt: Date.now(), }; - // getRun always returns 'running' — the run never completes within the poll - const _orchestrator = { - submitDecision: mock(() => 'applied' as const), - getRun: mock(() => ({ ...mockRun, status: 'running' as const })), - startRun: mock(async () => mockRun), - } as unknown as RunOrchestrator; - - // To avoid the 5-minute timeout, we patch the poll constants by testing - // through a short scenario: since the poll loop checks Date.now() vs - // RUN_POLL_MAX_WAIT_MS (300_000), we can't easily test the full timeout. - // Instead, test the terminal check by having getRun return null (run - // disappeared), which causes the poll to break without a terminal status. + // getRun returns null — run disappeared, poll loop breaks, isTerminal = false. + // Even though the run is not terminal, the event is marked as processed + // because the run is still alive and waiting for an approval decision. + // Marking it as failed would cause the retry sweep to replay through + // processMessage and dead-letter the conversation. const orchNull = { submitDecision: mock(() => 'applied' as const), getRun: mock(() => null), @@ -745,16 +741,17 @@ describe('terminal state check before markProcessed', () => { // Wait for the background async to complete await new Promise((resolve) => setTimeout(resolve, 800)); - // markProcessed should NOT have been called because the run is not terminal - // (getRun returned null, so isTerminal = false) - const markCalls = markSpy.mock.calls; - // Filter for calls that would correspond to the approval path event - // (the init message from processChannelMessageInBackground could also call markProcessed, - // but that's the non-approval path) - expect(markCalls.length).toBe(0); + // markProcessed SHOULD have been called — the timeout path now marks as + // processed instead of failed, relying on the post-decision delivery in + // handleApprovalInterception to deliver the reply when the user decides. + expect(markSpy).toHaveBeenCalled(); + + // recordProcessingFailure should NOT have been called + expect(failureSpy).not.toHaveBeenCalled(); linkSpy.mockRestore(); markSpy.mockRestore(); + failureSpy.mockRestore(); deliverSpy.mockRestore(); }); @@ -982,15 +979,15 @@ describe('stale callback handling', () => { }); // ═══════════════════════════════════════════════════════════════════════════ -// 12. Timeout-to-retry lifecycle (WS-C) +// 12. Timeout marks event as processed (not failed) // ═══════════════════════════════════════════════════════════════════════════ -describe('poll timeout calls recordProcessingFailure', () => { +describe('poll timeout marks event as processed', () => { beforeEach(() => { process.env.CHANNEL_APPROVALS_ENABLED = 'true'; }); - test('records processing failure when run disappears (getRun returns null) before terminal state', async () => { + test('marks event as processed (not failed) when run disappears (getRun returns null) before terminal state', async () => { const linkSpy = spyOn(channelDeliveryStore, 'linkMessage').mockImplementation(() => {}); const markSpy = spyOn(channelDeliveryStore, 'markProcessed'); const failureSpy = spyOn(channelDeliveryStore, 'recordProcessingFailure').mockImplementation(() => {}); @@ -1024,17 +1021,13 @@ describe('poll timeout calls recordProcessingFailure', () => { // Wait for the background async to complete await new Promise((resolve) => setTimeout(resolve, 800)); - // recordProcessingFailure should have been called because the run is not terminal - expect(failureSpy).toHaveBeenCalled(); - const failureArgs = failureSpy.mock.calls[0]; - // First arg is the eventId (string), second is the error - expect(typeof failureArgs[0]).toBe('string'); - expect(failureArgs[1]).toBeInstanceOf(Error); - expect((failureArgs[1] as Error).message).toContain('Approval poll timeout'); - expect((failureArgs[1] as Error).message).toContain('300000'); + // markProcessed SHOULD have been called — the run is still alive, just + // waiting for approval. Marking as failed would cause the retry sweep to + // replay through processMessage and dead-letter the conversation. + expect(markSpy).toHaveBeenCalled(); - // markProcessed should NOT have been called - expect(markSpy).not.toHaveBeenCalled(); + // recordProcessingFailure should NOT have been called + expect(failureSpy).not.toHaveBeenCalled(); linkSpy.mockRestore(); markSpy.mockRestore(); @@ -1089,6 +1082,120 @@ describe('poll timeout calls recordProcessingFailure', () => { }); }); +// ═══════════════════════════════════════════════════════════════════════════ +// 12b. Post-decision delivery after poll timeout +// ═══════════════════════════════════════════════════════════════════════════ + +describe('post-decision delivery after poll timeout', () => { + beforeEach(() => { + process.env.CHANNEL_APPROVALS_ENABLED = 'true'; + }); + + test('delivers reply via callback after a late approval decision', async () => { + const deliverSpy = spyOn(gatewayClient, 'deliverChannelReply').mockResolvedValue(undefined); + + // Establish the conversation + const initReq = makeInboundRequest({ content: 'init' }); + const orchestrator = makeMockOrchestrator(); + await handleChannelInbound(initReq, noopProcessMessage, 'token', orchestrator); + + const db = getDb(); + const events = db.$client.prepare('SELECT conversation_id FROM channel_inbound_events').all() as Array<{ conversation_id: string }>; + const conversationId = events[0]?.conversation_id; + ensureConversation(conversationId!); + + // Create a pending run + const run = createRun(conversationId!); + setRunConfirmation(run.id, sampleConfirmation); + + // Add a mock assistant message so that deliverReplyViaCallback can find + // the final reply to deliver. + conversationStore.addMessage(conversationId!, 'assistant', 'Here is your result.'); + + // Now create a second orchestrator that simulates the run completing after + // the decision is applied (getRun returns completed after first call). + let getRunCallCount = 0; + const lateOrchestrator = { + submitDecision: mock(() => 'applied' as const), + getRun: mock(() => { + getRunCallCount++; + // First call returns needs_confirmation (decision just applied, resuming), + // subsequent calls return completed (run finished). + if (getRunCallCount <= 1) { + return { + id: run.id, + conversationId: conversationId!, + messageId: 'user-msg-late', + status: 'needs_confirmation' as const, + pendingConfirmation: null, + pendingSecret: null, + inputTokens: 0, + outputTokens: 0, + estimatedCost: 0, + error: null, + createdAt: Date.now(), + updatedAt: Date.now(), + }; + } + return { + id: run.id, + conversationId: conversationId!, + messageId: 'user-msg-late', + status: 'completed' as const, + pendingConfirmation: null, + pendingSecret: null, + inputTokens: 0, + outputTokens: 0, + estimatedCost: 0, + error: null, + createdAt: Date.now(), + updatedAt: Date.now(), + }; + }), + startRun: mock(async () => ({ + id: run.id, + conversationId: conversationId!, + messageId: 'user-msg-late', + status: 'running' as const, + pendingConfirmation: null, + pendingSecret: null, + inputTokens: 0, + outputTokens: 0, + estimatedCost: 0, + error: null, + createdAt: Date.now(), + updatedAt: Date.now(), + })), + } as unknown as RunOrchestrator; + + // Clear spy to only track calls from the decision + post-decision path + deliverSpy.mockClear(); + + // Send an approval decision — this simulates a late approval after the + // original poll has already timed out. + const req = makeInboundRequest({ + content: '', + callbackData: `apr:${run.id}:approve_once`, + }); + + const res = await handleChannelInbound(req, noopProcessMessage, 'token', lateOrchestrator); + const body = await res.json() as Record; + + expect(body.accepted).toBe(true); + expect(body.approval).toBe('decision_applied'); + + // Wait for the async post-decision delivery poll to complete. + // It polls every 500ms; the run becomes terminal on the second getRun call. + await new Promise((resolve) => setTimeout(resolve, 1500)); + + // deliverChannelReply should have been called by the post-decision + // delivery path (deliverReplyViaCallback uses deliverChannelReply). + expect(deliverSpy).toHaveBeenCalled(); + + deliverSpy.mockRestore(); + }); +}); + // ═══════════════════════════════════════════════════════════════════════════ // 13. sourceChannel is passed to orchestrator.startRun (WS-D) // ═══════════════════════════════════════════════════════════════════════════ diff --git a/assistant/src/runtime/routes/channel-routes.ts b/assistant/src/runtime/routes/channel-routes.ts index 2eb5170cc75..c3d921486cc 100644 --- a/assistant/src/runtime/routes/channel-routes.ts +++ b/assistant/src/runtime/routes/channel-routes.ts @@ -540,6 +540,10 @@ function processChannelMessageInBackground(params: BackgroundProcessingParams): const RUN_POLL_INTERVAL_MS = 500; const RUN_POLL_MAX_WAIT_MS = 300_000; // 5 minutes +/** Post-decision delivery poll: shorter window since the run should finish quickly after a decision. */ +const POST_DECISION_POLL_INTERVAL_MS = 500; +const POST_DECISION_POLL_MAX_WAIT_MS = 30_000; // 30 seconds + interface ApprovalProcessingParams { orchestrator: RunOrchestrator; conversationId: string; @@ -726,14 +730,18 @@ function processChannelMessageWithApprovals(params: ApprovalProcessingParams): v } } } else { - const timeoutErr = new Error( - `Approval poll timeout: run did not reach terminal state within ${RUN_POLL_MAX_WAIT_MS}ms`, - ); + // The run is still alive (likely waiting for an approval decision) but + // the poll window has elapsed. Mark the event as processed rather than + // failed — the run will resume when the user clicks approve/reject, + // and `handleApprovalInterception` will deliver the reply via its own + // post-decision poll. Marking it failed would cause the generic retry + // sweep to replay through `processMessage`, which throws "Session is + // already processing a message" and dead-letters a valid conversation. log.warn( { runId: run.id, status: finalRun?.status, conversationId }, - 'Approval-path poll loop timed out before run reached terminal state', + 'Approval-path poll loop timed out before run reached terminal state; marking event as processed', ); - channelDeliveryStore.recordProcessingFailure(eventId, timeoutErr); + channelDeliveryStore.markProcessed(eventId); } } catch (err) { log.error({ err, conversationId }, 'Approval-aware channel message processing failed'); @@ -876,6 +884,19 @@ async function handleApprovalInterception( } catch (err) { log.error({ err, conversationId: guardianApproval.conversationId }, 'Failed to notify requester of guardian decision'); } + + // Schedule post-decision delivery to the requester's chat in case + // the original poll has already exited. + if (result.runId) { + schedulePostDecisionDelivery( + orchestrator, + result.runId, + guardianApproval.conversationId, + guardianApproval.requesterChatId, + replyCallbackUrl, + bearerToken, + ); + } } return { handled: true, type: 'guardian_decision_applied' }; @@ -992,9 +1013,21 @@ async function handleApprovalInterception( const result = handleChannelDecision(conversationId, decision, orchestrator); - // The decision is applied; the final reply will be delivered by the - // terminal run completion path in processChannelMessageWithApprovals. - // No immediate reply is sent here to avoid duplicate messages. + // Schedule a background poll for run terminal state and deliver the reply. + // This handles the case where the original poll in + // processChannelMessageWithApprovals has already exited due to timeout. + // If the original poll is still running and delivers first, the duplicate + // delivery is acceptable (gateway deduplicates or user sees a repeat). + if (result.applied && result.runId) { + schedulePostDecisionDelivery( + orchestrator, + result.runId, + conversationId, + externalChatId, + replyCallbackUrl, + bearerToken, + ); + } return { handled: true, type: 'decision_applied' }; } @@ -1025,6 +1058,46 @@ async function handleApprovalInterception( return { handled: true, type: 'reminder_sent' }; } +/** + * Fire-and-forget: after a decision is applied via `handleApprovalInterception`, + * poll the run briefly for terminal state and deliver the final reply. This + * handles the case where the original poll in `processChannelMessageWithApprovals` + * has already exited due to the 5-minute timeout. + * + * If the original poll already delivered the reply, delivering it again is + * acceptable — the gateway will deduplicate or the user sees a duplicate + * (better than seeing nothing). + */ +function schedulePostDecisionDelivery( + orchestrator: RunOrchestrator, + runId: string, + conversationId: string, + externalChatId: string, + replyCallbackUrl: string, + bearerToken?: string, +): void { + (async () => { + try { + const startTime = Date.now(); + while (Date.now() - startTime < POST_DECISION_POLL_MAX_WAIT_MS) { + await new Promise((resolve) => setTimeout(resolve, POST_DECISION_POLL_INTERVAL_MS)); + const current = orchestrator.getRun(runId); + if (!current) break; + if (current.status === 'completed' || current.status === 'failed') { + await deliverReplyViaCallback(conversationId, externalChatId, replyCallbackUrl, bearerToken); + return; + } + } + log.warn( + { runId, conversationId }, + 'Post-decision delivery poll timed out without run reaching terminal state', + ); + } catch (err) { + log.error({ err, runId, conversationId }, 'Post-decision delivery failed'); + } + })(); +} + async function deliverReplyViaCallback( conversationId: string, externalChatId: string,