Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 65 additions & 19 deletions assistant/src/__tests__/channel-approval-routes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ describe('terminal state check before markProcessed', () => {
process.env.CHANNEL_APPROVALS_ENABLED = 'true';
});

test('markProcessed IS called even when run is not in terminal state after poll timeout', async () => {
test('records processing failure when run disappears (non-approval non-terminal state)', async () => {
const linkSpy = spyOn(channelDeliveryStore, 'linkMessage').mockImplementation(() => {});
const markSpy = spyOn(channelDeliveryStore, 'markProcessed');
const failureSpy = spyOn(channelDeliveryStore, 'recordProcessingFailure').mockImplementation(() => {});
Expand All @@ -733,11 +733,9 @@ describe('terminal state check before markProcessed', () => {
updatedAt: Date.now(),
};

// getRun returns null — run disappeared, poll loop breaks, isTerminal = false.
// Even though the run is not terminal, the event is marked as processed
// because the run is still alive and waiting for an approval decision.
// Marking it as failed would cause the retry sweep to replay through
// processMessage and dead-letter the conversation.
// getRun returns null — run disappeared, poll loop breaks. Since the run
// is not in needs_confirmation, it falls through to recordProcessingFailure
// so the retry/dead-letter machinery can handle it.
const orchNull = {
submitDecision: mock(() => 'applied' as const),
getRun: mock(() => null),
Expand All @@ -750,13 +748,12 @@ describe('terminal state check before markProcessed', () => {
// Wait for the background async to complete
await new Promise((resolve) => setTimeout(resolve, 800));

// markProcessed SHOULD have been called — the timeout path now marks as
// processed instead of failed, relying on the post-decision delivery in
// handleApprovalInterception to deliver the reply when the user decides.
expect(markSpy).toHaveBeenCalled();
// recordProcessingFailure SHOULD have been called because the run is
// not in needs_confirmation (it disappeared — status is null).
expect(failureSpy).toHaveBeenCalled();

// recordProcessingFailure should NOT have been called
expect(failureSpy).not.toHaveBeenCalled();
// markProcessed should NOT have been called
expect(markSpy).not.toHaveBeenCalled();

linkSpy.mockRestore();
markSpy.mockRestore();
Expand Down Expand Up @@ -988,15 +985,15 @@ describe('stale callback handling', () => {
});

// ═══════════════════════════════════════════════════════════════════════════
// 12. Timeout marks event as processed (not failed)
// 12. Timeout handling: needs_confirmation marks processed, other states fail
// ═══════════════════════════════════════════════════════════════════════════

describe('poll timeout marks event as processed', () => {
describe('poll timeout handling by run state', () => {
beforeEach(() => {
process.env.CHANNEL_APPROVALS_ENABLED = 'true';
});

test('marks event as processed (not failed) when run disappears (getRun returns null) before terminal state', async () => {
test('records processing failure when run disappears (getRun returns null) before terminal state', async () => {
const linkSpy = spyOn(channelDeliveryStore, 'linkMessage').mockImplementation(() => {});
const markSpy = spyOn(channelDeliveryStore, 'markProcessed');
const failureSpy = spyOn(channelDeliveryStore, 'recordProcessingFailure').mockImplementation(() => {});
Expand All @@ -1017,7 +1014,8 @@ describe('poll timeout marks event as processed', () => {
updatedAt: Date.now(),
};

// getRun returns null — run disappeared, poll loop breaks, isTerminal = false
// getRun returns null — run disappeared, poll loop breaks. Since the run
// is not in needs_confirmation, it records a processing failure.
const orchestrator = {
submitDecision: mock(() => 'applied' as const),
getRun: mock(() => null),
Expand All @@ -1030,9 +1028,57 @@ describe('poll timeout marks event as processed', () => {
// Wait for the background async to complete
await new Promise((resolve) => setTimeout(resolve, 800));

// markProcessed SHOULD have been called — the run is still alive, just
// waiting for approval. Marking as failed would cause the retry sweep to
// replay through processMessage and dead-letter the conversation.
// recordProcessingFailure SHOULD have been called — the run disappeared
// and is not in needs_confirmation, so the retry machinery should handle it.
expect(failureSpy).toHaveBeenCalled();

// markProcessed should NOT have been called
expect(markSpy).not.toHaveBeenCalled();

linkSpy.mockRestore();
markSpy.mockRestore();
failureSpy.mockRestore();
deliverSpy.mockRestore();
});

test('marks event as processed when run is in needs_confirmation state after poll timeout', async () => {
const linkSpy = spyOn(channelDeliveryStore, 'linkMessage').mockImplementation(() => {});
const markSpy = spyOn(channelDeliveryStore, 'markProcessed');
const failureSpy = spyOn(channelDeliveryStore, 'recordProcessingFailure').mockImplementation(() => {});
const deliverSpy = spyOn(gatewayClient, 'deliverChannelReply').mockResolvedValue(undefined);

const mockRun = {
id: 'run-needs-confirm',
conversationId: 'conv-1',
messageId: 'user-msg-202',
status: 'running' as const,
pendingConfirmation: null,
pendingSecret: null,
inputTokens: 0,
outputTokens: 0,
estimatedCost: 0,
error: null,
createdAt: Date.now(),
updatedAt: Date.now(),
};

// getRun returns needs_confirmation — run is waiting for approval decision.
// The event should be marked as processed because the post-decision delivery
// in handleApprovalInterception will deliver the reply when the user decides.
const orchestrator = {
submitDecision: mock(() => 'applied' as const),
getRun: mock(() => ({ ...mockRun, status: 'needs_confirmation' as const })),
startRun: mock(async () => mockRun),
} as unknown as RunOrchestrator;

const req = makeInboundRequest({ content: 'hello needs_confirm' });
await handleChannelInbound(req, noopProcessMessage, 'token', orchestrator);

// Wait for the background async to complete
await new Promise((resolve) => setTimeout(resolve, 800));
Comment thread
noanflaherty marked this conversation as resolved.

// markProcessed SHOULD have been called — the run is waiting for approval,
// and the post-decision delivery path will handle the final reply.
expect(markSpy).toHaveBeenCalled();

// recordProcessingFailure should NOT have been called
Expand Down
35 changes: 23 additions & 12 deletions assistant/src/runtime/routes/channel-routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -548,9 +548,10 @@ function processChannelMessageInBackground(params: BackgroundProcessingParams):
const RUN_POLL_INTERVAL_MS = 500;
const RUN_POLL_MAX_WAIT_MS = 300_000; // 5 minutes

/** Post-decision delivery poll: shorter window since the run should finish quickly after a decision. */
/** Post-decision delivery poll: uses the same budget as the main poll since
* this is the only delivery path for late approvals after the main poll exits. */
const POST_DECISION_POLL_INTERVAL_MS = 500;
const POST_DECISION_POLL_MAX_WAIT_MS = 30_000; // 30 seconds
const POST_DECISION_POLL_MAX_WAIT_MS = RUN_POLL_MAX_WAIT_MS;

interface ApprovalProcessingParams {
orchestrator: RunOrchestrator;
Expand Down Expand Up @@ -732,9 +733,7 @@ function processChannelMessageWithApprovals(params: ApprovalProcessingParams): v
}

// Only mark processed and deliver the final reply when the run has
// actually reached a terminal state. If the poll loop timed out while
// the run is still in progress, leave the event unprocessed so it can
// be retried or dead-lettered.
// actually reached a terminal state.
const finalRun = orchestrator.getRun(run.id);
const isTerminal = finalRun?.status === 'completed' || finalRun?.status === 'failed';

Expand All @@ -760,19 +759,31 @@ function processChannelMessageWithApprovals(params: ApprovalProcessingParams): v
updateApprovalDecision(approvalReq.id, { status: outcomeStatus });
}
}
} else {
// The run is still alive (likely waiting for an approval decision) but
// the poll window has elapsed. Mark the event as processed rather than
// failed — the run will resume when the user clicks approve/reject,
// and `handleApprovalInterception` will deliver the reply via its own
} else if (finalRun?.status === 'needs_confirmation') {
// The run is waiting for an approval decision but the poll window has
Comment thread
noanflaherty marked this conversation as resolved.
// elapsed. Mark the event as processed rather than failed — the run
// will resume when the user clicks approve/reject, and
// `handleApprovalInterception` will deliver the reply via its own
// post-decision poll. Marking it failed would cause the generic retry
// sweep to replay through `processMessage`, which throws "Session is
// already processing a message" and dead-letters a valid conversation.
log.warn(
{ runId: run.id, status: finalRun?.status, conversationId },
'Approval-path poll loop timed out before run reached terminal state; marking event as processed',
{ runId: run.id, status: finalRun.status, conversationId },
'Approval-path poll loop timed out while run awaits approval decision; marking event as processed',
);
channelDeliveryStore.markProcessed(eventId);
} else {
// The run is in a non-terminal, non-approval state (e.g. running,
// needs_secret, or disappeared). Record a processing failure so the
// retry/dead-letter machinery can handle it.
const timeoutErr = new Error(
`Approval poll timeout: run did not reach terminal state within ${RUN_POLL_MAX_WAIT_MS}ms (status: ${finalRun?.status ?? 'null'})`,
);
log.warn(
{ runId: run.id, status: finalRun?.status, conversationId },
'Approval-path poll loop timed out before run reached terminal state',
);
channelDeliveryStore.recordProcessingFailure(eventId, timeoutErr);
}
} catch (err) {
log.error({ err, conversationId }, 'Approval-aware channel message processing failed');
Expand Down
Loading