diff --git a/assistant/src/__tests__/channel-approval-routes.test.ts b/assistant/src/__tests__/channel-approval-routes.test.ts index f535110c999..361aa8e714a 100644 --- a/assistant/src/__tests__/channel-approval-routes.test.ts +++ b/assistant/src/__tests__/channel-approval-routes.test.ts @@ -712,7 +712,7 @@ describe('terminal state check before markProcessed', () => { process.env.CHANNEL_APPROVALS_ENABLED = 'true'; }); - test('markProcessed IS called even when run is not in terminal state after poll timeout', async () => { + test('records processing failure when run disappears (non-approval non-terminal state)', async () => { const linkSpy = spyOn(channelDeliveryStore, 'linkMessage').mockImplementation(() => {}); const markSpy = spyOn(channelDeliveryStore, 'markProcessed'); const failureSpy = spyOn(channelDeliveryStore, 'recordProcessingFailure').mockImplementation(() => {}); @@ -733,11 +733,9 @@ describe('terminal state check before markProcessed', () => { updatedAt: Date.now(), }; - // getRun returns null — run disappeared, poll loop breaks, isTerminal = false. - // Even though the run is not terminal, the event is marked as processed - // because the run is still alive and waiting for an approval decision. - // Marking it as failed would cause the retry sweep to replay through - // processMessage and dead-letter the conversation. + // getRun returns null — run disappeared, poll loop breaks. Since the run + // is not in needs_confirmation, it falls through to recordProcessingFailure + // so the retry/dead-letter machinery can handle it. const orchNull = { submitDecision: mock(() => 'applied' as const), getRun: mock(() => null), @@ -750,13 +748,12 @@ describe('terminal state check before markProcessed', () => { // Wait for the background async to complete await new Promise((resolve) => setTimeout(resolve, 800)); - // markProcessed SHOULD have been called — the timeout path now marks as - // processed instead of failed, relying on the post-decision delivery in - // handleApprovalInterception to deliver the reply when the user decides. - expect(markSpy).toHaveBeenCalled(); + // recordProcessingFailure SHOULD have been called because the run is + // not in needs_confirmation (it disappeared — status is null). + expect(failureSpy).toHaveBeenCalled(); - // recordProcessingFailure should NOT have been called - expect(failureSpy).not.toHaveBeenCalled(); + // markProcessed should NOT have been called + expect(markSpy).not.toHaveBeenCalled(); linkSpy.mockRestore(); markSpy.mockRestore(); @@ -988,15 +985,15 @@ describe('stale callback handling', () => { }); // ═══════════════════════════════════════════════════════════════════════════ -// 12. Timeout marks event as processed (not failed) +// 12. Timeout handling: needs_confirmation marks processed, other states fail // ═══════════════════════════════════════════════════════════════════════════ -describe('poll timeout marks event as processed', () => { +describe('poll timeout handling by run state', () => { beforeEach(() => { process.env.CHANNEL_APPROVALS_ENABLED = 'true'; }); - test('marks event as processed (not failed) when run disappears (getRun returns null) before terminal state', async () => { + test('records processing failure when run disappears (getRun returns null) before terminal state', async () => { const linkSpy = spyOn(channelDeliveryStore, 'linkMessage').mockImplementation(() => {}); const markSpy = spyOn(channelDeliveryStore, 'markProcessed'); const failureSpy = spyOn(channelDeliveryStore, 'recordProcessingFailure').mockImplementation(() => {}); @@ -1017,7 +1014,8 @@ describe('poll timeout marks event as processed', () => { updatedAt: Date.now(), }; - // getRun returns null — run disappeared, poll loop breaks, isTerminal = false + // getRun returns null — run disappeared, poll loop breaks. Since the run + // is not in needs_confirmation, it records a processing failure. const orchestrator = { submitDecision: mock(() => 'applied' as const), getRun: mock(() => null), @@ -1030,9 +1028,57 @@ describe('poll timeout marks event as processed', () => { // Wait for the background async to complete await new Promise((resolve) => setTimeout(resolve, 800)); - // markProcessed SHOULD have been called — the run is still alive, just - // waiting for approval. Marking as failed would cause the retry sweep to - // replay through processMessage and dead-letter the conversation. + // recordProcessingFailure SHOULD have been called — the run disappeared + // and is not in needs_confirmation, so the retry machinery should handle it. + expect(failureSpy).toHaveBeenCalled(); + + // markProcessed should NOT have been called + expect(markSpy).not.toHaveBeenCalled(); + + linkSpy.mockRestore(); + markSpy.mockRestore(); + failureSpy.mockRestore(); + deliverSpy.mockRestore(); + }); + + test('marks event as processed when run is in needs_confirmation state after poll timeout', async () => { + const linkSpy = spyOn(channelDeliveryStore, 'linkMessage').mockImplementation(() => {}); + const markSpy = spyOn(channelDeliveryStore, 'markProcessed'); + const failureSpy = spyOn(channelDeliveryStore, 'recordProcessingFailure').mockImplementation(() => {}); + const deliverSpy = spyOn(gatewayClient, 'deliverChannelReply').mockResolvedValue(undefined); + + const mockRun = { + id: 'run-needs-confirm', + conversationId: 'conv-1', + messageId: 'user-msg-202', + status: 'running' as const, + pendingConfirmation: null, + pendingSecret: null, + inputTokens: 0, + outputTokens: 0, + estimatedCost: 0, + error: null, + createdAt: Date.now(), + updatedAt: Date.now(), + }; + + // getRun returns needs_confirmation — run is waiting for approval decision. + // The event should be marked as processed because the post-decision delivery + // in handleApprovalInterception will deliver the reply when the user decides. + const orchestrator = { + submitDecision: mock(() => 'applied' as const), + getRun: mock(() => ({ ...mockRun, status: 'needs_confirmation' as const })), + startRun: mock(async () => mockRun), + } as unknown as RunOrchestrator; + + const req = makeInboundRequest({ content: 'hello needs_confirm' }); + await handleChannelInbound(req, noopProcessMessage, 'token', orchestrator); + + // Wait for the background async to complete + await new Promise((resolve) => setTimeout(resolve, 800)); + + // markProcessed SHOULD have been called — the run is waiting for approval, + // and the post-decision delivery path will handle the final reply. expect(markSpy).toHaveBeenCalled(); // recordProcessingFailure should NOT have been called diff --git a/assistant/src/runtime/routes/channel-routes.ts b/assistant/src/runtime/routes/channel-routes.ts index f503807aa74..8ef788985e6 100644 --- a/assistant/src/runtime/routes/channel-routes.ts +++ b/assistant/src/runtime/routes/channel-routes.ts @@ -548,9 +548,10 @@ function processChannelMessageInBackground(params: BackgroundProcessingParams): const RUN_POLL_INTERVAL_MS = 500; const RUN_POLL_MAX_WAIT_MS = 300_000; // 5 minutes -/** Post-decision delivery poll: shorter window since the run should finish quickly after a decision. */ +/** Post-decision delivery poll: uses the same budget as the main poll since + * this is the only delivery path for late approvals after the main poll exits. */ const POST_DECISION_POLL_INTERVAL_MS = 500; -const POST_DECISION_POLL_MAX_WAIT_MS = 30_000; // 30 seconds +const POST_DECISION_POLL_MAX_WAIT_MS = RUN_POLL_MAX_WAIT_MS; interface ApprovalProcessingParams { orchestrator: RunOrchestrator; @@ -732,9 +733,7 @@ function processChannelMessageWithApprovals(params: ApprovalProcessingParams): v } // Only mark processed and deliver the final reply when the run has - // actually reached a terminal state. If the poll loop timed out while - // the run is still in progress, leave the event unprocessed so it can - // be retried or dead-lettered. + // actually reached a terminal state. const finalRun = orchestrator.getRun(run.id); const isTerminal = finalRun?.status === 'completed' || finalRun?.status === 'failed'; @@ -760,19 +759,31 @@ function processChannelMessageWithApprovals(params: ApprovalProcessingParams): v updateApprovalDecision(approvalReq.id, { status: outcomeStatus }); } } - } else { - // The run is still alive (likely waiting for an approval decision) but - // the poll window has elapsed. Mark the event as processed rather than - // failed — the run will resume when the user clicks approve/reject, - // and `handleApprovalInterception` will deliver the reply via its own + } else if (finalRun?.status === 'needs_confirmation') { + // The run is waiting for an approval decision but the poll window has + // elapsed. Mark the event as processed rather than failed — the run + // will resume when the user clicks approve/reject, and + // `handleApprovalInterception` will deliver the reply via its own // post-decision poll. Marking it failed would cause the generic retry // sweep to replay through `processMessage`, which throws "Session is // already processing a message" and dead-letters a valid conversation. log.warn( - { runId: run.id, status: finalRun?.status, conversationId }, - 'Approval-path poll loop timed out before run reached terminal state; marking event as processed', + { runId: run.id, status: finalRun.status, conversationId }, + 'Approval-path poll loop timed out while run awaits approval decision; marking event as processed', ); channelDeliveryStore.markProcessed(eventId); + } else { + // The run is in a non-terminal, non-approval state (e.g. running, + // needs_secret, or disappeared). Record a processing failure so the + // retry/dead-letter machinery can handle it. + const timeoutErr = new Error( + `Approval poll timeout: run did not reach terminal state within ${RUN_POLL_MAX_WAIT_MS}ms (status: ${finalRun?.status ?? 'null'})`, + ); + log.warn( + { runId: run.id, status: finalRun?.status, conversationId }, + 'Approval-path poll loop timed out before run reached terminal state', + ); + channelDeliveryStore.recordProcessingFailure(eventId, timeoutErr); } } catch (err) { log.error({ err, conversationId }, 'Approval-aware channel message processing failed');