Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 140 additions & 33 deletions assistant/src/__tests__/channel-approval-routes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
} from '../memory/runs-store.js';
import type { PendingConfirmation } from '../memory/runs-store.js';
import * as channelDeliveryStore from '../memory/channel-delivery-store.js';
import * as conversationStore from '../memory/conversation-store.js';
import type { RunOrchestrator } from '../runtime/run-orchestrator.js';
import { handleChannelInbound, isChannelApprovalsEnabled } from '../runtime/routes/channel-routes.js';
import * as gatewayClient from '../runtime/gateway-client.js';
Expand Down Expand Up @@ -80,6 +81,7 @@ function resetTables(): void {
const db = getDb();
db.run('DELETE FROM message_runs');
db.run('DELETE FROM channel_inbound_events');
db.run('DELETE FROM messages');
db.run('DELETE FROM conversations');
}

Expand Down Expand Up @@ -701,9 +703,10 @@ describe('terminal state check before markProcessed', () => {
process.env.CHANNEL_APPROVALS_ENABLED = 'true';
});

test('does NOT markProcessed when run is not in terminal state after poll timeout', async () => {
test('markProcessed IS called even when run is not in terminal state after poll timeout', async () => {
const linkSpy = spyOn(channelDeliveryStore, 'linkMessage').mockImplementation(() => {});
const markSpy = spyOn(channelDeliveryStore, 'markProcessed');
const failureSpy = spyOn(channelDeliveryStore, 'recordProcessingFailure').mockImplementation(() => {});
const deliverSpy = spyOn(gatewayClient, 'deliverChannelReply').mockResolvedValue(undefined);

const mockRun = {
Expand All @@ -721,18 +724,11 @@ describe('terminal state check before markProcessed', () => {
updatedAt: Date.now(),
};

// getRun always returns 'running' — the run never completes within the poll
const _orchestrator = {
submitDecision: mock(() => 'applied' as const),
getRun: mock(() => ({ ...mockRun, status: 'running' as const })),
startRun: mock(async () => mockRun),
} as unknown as RunOrchestrator;

// To avoid the 5-minute timeout, we patch the poll constants by testing
// through a short scenario: since the poll loop checks Date.now() vs
// RUN_POLL_MAX_WAIT_MS (300_000), we can't easily test the full timeout.
// Instead, test the terminal check by having getRun return null (run
// disappeared), which causes the poll to break without a terminal status.
// getRun returns null — run disappeared, poll loop breaks, isTerminal = false.
// Even though the run is not terminal, the event is marked as processed
// because the run is still alive and waiting for an approval decision.
// Marking it as failed would cause the retry sweep to replay through
// processMessage and dead-letter the conversation.
const orchNull = {
submitDecision: mock(() => 'applied' as const),
getRun: mock(() => null),
Expand All @@ -745,16 +741,17 @@ describe('terminal state check before markProcessed', () => {
// Wait for the background async to complete
await new Promise((resolve) => setTimeout(resolve, 800));

// markProcessed should NOT have been called because the run is not terminal
// (getRun returned null, so isTerminal = false)
const markCalls = markSpy.mock.calls;
// Filter for calls that would correspond to the approval path event
// (the init message from processChannelMessageInBackground could also call markProcessed,
// but that's the non-approval path)
expect(markCalls.length).toBe(0);
// markProcessed SHOULD have been called the timeout path now marks as
// processed instead of failed, relying on the post-decision delivery in
// handleApprovalInterception to deliver the reply when the user decides.
expect(markSpy).toHaveBeenCalled();

// recordProcessingFailure should NOT have been called
expect(failureSpy).not.toHaveBeenCalled();

linkSpy.mockRestore();
markSpy.mockRestore();
failureSpy.mockRestore();
deliverSpy.mockRestore();
});

Expand Down Expand Up @@ -982,15 +979,15 @@ describe('stale callback handling', () => {
});

// ═══════════════════════════════════════════════════════════════════════════
// 12. Timeout-to-retry lifecycle (WS-C)
// 12. Timeout marks event as processed (not failed)
// ═══════════════════════════════════════════════════════════════════════════

describe('poll timeout calls recordProcessingFailure', () => {
describe('poll timeout marks event as processed', () => {
beforeEach(() => {
process.env.CHANNEL_APPROVALS_ENABLED = 'true';
});

test('records processing failure when run disappears (getRun returns null) before terminal state', async () => {
test('marks event as processed (not failed) when run disappears (getRun returns null) before terminal state', async () => {
const linkSpy = spyOn(channelDeliveryStore, 'linkMessage').mockImplementation(() => {});
const markSpy = spyOn(channelDeliveryStore, 'markProcessed');
const failureSpy = spyOn(channelDeliveryStore, 'recordProcessingFailure').mockImplementation(() => {});
Expand Down Expand Up @@ -1024,17 +1021,13 @@ describe('poll timeout calls recordProcessingFailure', () => {
// Wait for the background async to complete
await new Promise((resolve) => setTimeout(resolve, 800));

// recordProcessingFailure should have been called because the run is not terminal
expect(failureSpy).toHaveBeenCalled();
const failureArgs = failureSpy.mock.calls[0];
// First arg is the eventId (string), second is the error
expect(typeof failureArgs[0]).toBe('string');
expect(failureArgs[1]).toBeInstanceOf(Error);
expect((failureArgs[1] as Error).message).toContain('Approval poll timeout');
expect((failureArgs[1] as Error).message).toContain('300000');
// markProcessed SHOULD have been called — the run is still alive, just
// waiting for approval. Marking as failed would cause the retry sweep to
// replay through processMessage and dead-letter the conversation.
expect(markSpy).toHaveBeenCalled();

// markProcessed should NOT have been called
expect(markSpy).not.toHaveBeenCalled();
// recordProcessingFailure should NOT have been called
expect(failureSpy).not.toHaveBeenCalled();

linkSpy.mockRestore();
markSpy.mockRestore();
Expand Down Expand Up @@ -1089,6 +1082,120 @@ describe('poll timeout calls recordProcessingFailure', () => {
});
});

// ═══════════════════════════════════════════════════════════════════════════
// 12b. Post-decision delivery after poll timeout
// ═══════════════════════════════════════════════════════════════════════════

describe('post-decision delivery after poll timeout', () => {
beforeEach(() => {
process.env.CHANNEL_APPROVALS_ENABLED = 'true';
});

test('delivers reply via callback after a late approval decision', async () => {
const deliverSpy = spyOn(gatewayClient, 'deliverChannelReply').mockResolvedValue(undefined);

// Establish the conversation
const initReq = makeInboundRequest({ content: 'init' });
const orchestrator = makeMockOrchestrator();
await handleChannelInbound(initReq, noopProcessMessage, 'token', orchestrator);

const db = getDb();
const events = db.$client.prepare('SELECT conversation_id FROM channel_inbound_events').all() as Array<{ conversation_id: string }>;
const conversationId = events[0]?.conversation_id;
ensureConversation(conversationId!);

// Create a pending run
const run = createRun(conversationId!);
setRunConfirmation(run.id, sampleConfirmation);

// Add a mock assistant message so that deliverReplyViaCallback can find
// the final reply to deliver.
conversationStore.addMessage(conversationId!, 'assistant', 'Here is your result.');

// Now create a second orchestrator that simulates the run completing after
// the decision is applied (getRun returns completed after first call).
let getRunCallCount = 0;
const lateOrchestrator = {
submitDecision: mock(() => 'applied' as const),
getRun: mock(() => {
getRunCallCount++;
// First call returns needs_confirmation (decision just applied, resuming),
// subsequent calls return completed (run finished).
if (getRunCallCount <= 1) {
return {
id: run.id,
conversationId: conversationId!,
messageId: 'user-msg-late',
status: 'needs_confirmation' as const,
pendingConfirmation: null,
pendingSecret: null,
inputTokens: 0,
outputTokens: 0,
estimatedCost: 0,
error: null,
createdAt: Date.now(),
updatedAt: Date.now(),
};
}
return {
id: run.id,
conversationId: conversationId!,
messageId: 'user-msg-late',
status: 'completed' as const,
pendingConfirmation: null,
pendingSecret: null,
inputTokens: 0,
outputTokens: 0,
estimatedCost: 0,
error: null,
createdAt: Date.now(),
updatedAt: Date.now(),
};
}),
startRun: mock(async () => ({
id: run.id,
conversationId: conversationId!,
messageId: 'user-msg-late',
status: 'running' as const,
pendingConfirmation: null,
pendingSecret: null,
inputTokens: 0,
outputTokens: 0,
estimatedCost: 0,
error: null,
createdAt: Date.now(),
updatedAt: Date.now(),
})),
} as unknown as RunOrchestrator;

// Clear spy to only track calls from the decision + post-decision path
deliverSpy.mockClear();

// Send an approval decision — this simulates a late approval after the
// original poll has already timed out.
const req = makeInboundRequest({
content: '',
callbackData: `apr:${run.id}:approve_once`,
});

const res = await handleChannelInbound(req, noopProcessMessage, 'token', lateOrchestrator);
const body = await res.json() as Record<string, unknown>;

expect(body.accepted).toBe(true);
expect(body.approval).toBe('decision_applied');

// Wait for the async post-decision delivery poll to complete.
// It polls every 500ms; the run becomes terminal on the second getRun call.
await new Promise((resolve) => setTimeout(resolve, 1500));

// deliverChannelReply should have been called by the post-decision
// delivery path (deliverReplyViaCallback uses deliverChannelReply).
expect(deliverSpy).toHaveBeenCalled();

deliverSpy.mockRestore();
});
});

// ═══════════════════════════════════════════════════════════════════════════
// 13. sourceChannel is passed to orchestrator.startRun (WS-D)
// ═══════════════════════════════════════════════════════════════════════════
Expand Down
89 changes: 81 additions & 8 deletions assistant/src/runtime/routes/channel-routes.ts
Comment thread
noanflaherty marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,10 @@ function processChannelMessageInBackground(params: BackgroundProcessingParams):
const RUN_POLL_INTERVAL_MS = 500;
const RUN_POLL_MAX_WAIT_MS = 300_000; // 5 minutes

/** Post-decision delivery poll: shorter window since the run should finish quickly after a decision. */
const POST_DECISION_POLL_INTERVAL_MS = 500;
const POST_DECISION_POLL_MAX_WAIT_MS = 30_000; // 30 seconds
Comment thread
noanflaherty marked this conversation as resolved.

interface ApprovalProcessingParams {
orchestrator: RunOrchestrator;
conversationId: string;
Expand Down Expand Up @@ -726,14 +730,18 @@ function processChannelMessageWithApprovals(params: ApprovalProcessingParams): v
}
}
} else {
const timeoutErr = new Error(
`Approval poll timeout: run did not reach terminal state within ${RUN_POLL_MAX_WAIT_MS}ms`,
);
// The run is still alive (likely waiting for an approval decision) but
// the poll window has elapsed. Mark the event as processed rather than
// failed — the run will resume when the user clicks approve/reject,
// and `handleApprovalInterception` will deliver the reply via its own
// post-decision poll. Marking it failed would cause the generic retry
// sweep to replay through `processMessage`, which throws "Session is
// already processing a message" and dead-letters a valid conversation.
log.warn(
{ runId: run.id, status: finalRun?.status, conversationId },
'Approval-path poll loop timed out before run reached terminal state',
'Approval-path poll loop timed out before run reached terminal state; marking event as processed',
);
channelDeliveryStore.recordProcessingFailure(eventId, timeoutErr);
channelDeliveryStore.markProcessed(eventId);
Comment thread
noanflaherty marked this conversation as resolved.
}
} catch (err) {
log.error({ err, conversationId }, 'Approval-aware channel message processing failed');
Expand Down Expand Up @@ -876,6 +884,19 @@ async function handleApprovalInterception(
} catch (err) {
log.error({ err, conversationId: guardianApproval.conversationId }, 'Failed to notify requester of guardian decision');
}

// Schedule post-decision delivery to the requester's chat in case
// the original poll has already exited.
if (result.runId) {
schedulePostDecisionDelivery(
orchestrator,
result.runId,
guardianApproval.conversationId,
guardianApproval.requesterChatId,
replyCallbackUrl,
bearerToken,
);
}
}

return { handled: true, type: 'guardian_decision_applied' };
Expand Down Expand Up @@ -992,9 +1013,21 @@ async function handleApprovalInterception(

const result = handleChannelDecision(conversationId, decision, orchestrator);

// The decision is applied; the final reply will be delivered by the
// terminal run completion path in processChannelMessageWithApprovals.
// No immediate reply is sent here to avoid duplicate messages.
// Schedule a background poll for run terminal state and deliver the reply.
// This handles the case where the original poll in
// processChannelMessageWithApprovals has already exited due to timeout.
// If the original poll is still running and delivers first, the duplicate
// delivery is acceptable (gateway deduplicates or user sees a repeat).
if (result.applied && result.runId) {
schedulePostDecisionDelivery(
orchestrator,
result.runId,
conversationId,
externalChatId,
replyCallbackUrl,
bearerToken,
);
}

return { handled: true, type: 'decision_applied' };
}
Expand Down Expand Up @@ -1025,6 +1058,46 @@ async function handleApprovalInterception(
return { handled: true, type: 'reminder_sent' };
}

/**
* Fire-and-forget: after a decision is applied via `handleApprovalInterception`,
* poll the run briefly for terminal state and deliver the final reply. This
* handles the case where the original poll in `processChannelMessageWithApprovals`
* has already exited due to the 5-minute timeout.
*
* If the original poll already delivered the reply, delivering it again is
* acceptable — the gateway will deduplicate or the user sees a duplicate
* (better than seeing nothing).
*/
function schedulePostDecisionDelivery(
orchestrator: RunOrchestrator,
runId: string,
conversationId: string,
externalChatId: string,
replyCallbackUrl: string,
bearerToken?: string,
): void {
(async () => {
try {
const startTime = Date.now();
while (Date.now() - startTime < POST_DECISION_POLL_MAX_WAIT_MS) {
await new Promise((resolve) => setTimeout(resolve, POST_DECISION_POLL_INTERVAL_MS));
const current = orchestrator.getRun(runId);
if (!current) break;
if (current.status === 'completed' || current.status === 'failed') {
await deliverReplyViaCallback(conversationId, externalChatId, replyCallbackUrl, bearerToken);
return;
}
}
log.warn(
{ runId, conversationId },
'Post-decision delivery poll timed out without run reaching terminal state',
);
} catch (err) {
log.error({ err, runId, conversationId }, 'Post-decision delivery failed');
}
})();
}

async function deliverReplyViaCallback(
conversationId: string,
externalChatId: string,
Expand Down
Loading