Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion assistant/src/calls/guardian-action-sweep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export function sweepExpiredGuardianActions(
addMessage(
delivery.destinationConversationId,
'assistant',
JSON.stringify('This guardian question has expired without a response.'),
JSON.stringify([{ type: 'text', text: 'This guardian question has expired without a response.' }]),
);
} else if (delivery.destinationChatId) {
// External channel — send expiry notice
Expand Down
8 changes: 5 additions & 3 deletions assistant/src/calls/guardian-dispatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { getUserConsultationTimeoutMs } from './call-constants.js';
import { getOrCreateConversation } from '../memory/conversation-key-store.js';
import { addMessage } from '../memory/conversation-store.js';
import type { CallPendingQuestion } from './types.js';
import { readHttpToken } from '../util/platform.js';
import type { ServerMessage } from '../daemon/ipc-contract.js';

const log = getLogger('guardian-dispatch');
Expand Down Expand Up @@ -123,7 +124,7 @@ export async function dispatchGuardianQuestion(params: GuardianDispatchParams):
addMessage(
macConversationId,
'assistant',
JSON.stringify(`Your assistant needs your input during a phone call.\n\nQuestion: ${request.questionText}\n\nReply to this message with your answer.`),
JSON.stringify([{ type: 'text', text: `Your assistant needs your input during a phone call.\n\nQuestion: ${request.questionText}\n\nReply to this message with your answer.` }]),
);

// Emit IPC event for the mac client with the server-created conversation
Expand All @@ -146,7 +147,7 @@ export async function dispatchGuardianQuestion(params: GuardianDispatchParams):
destinationExternalUserId: dest.externalUserId,
});
// External channel — POST to gateway
void deliverToExternalChannel(delivery.id, dest.channel, dest.chatId!, request.questionText, request.requestCode, assistantId);
void deliverToExternalChannel(delivery.id, dest.channel, dest.chatId!, request.questionText, request.requestCode, assistantId, readHttpToken() ?? undefined);
}
}
} catch (err) {
Expand All @@ -161,6 +162,7 @@ async function deliverToExternalChannel(
questionText: string,
requestCode: string,
assistantId: string,
bearerToken?: string,
): Promise<void> {
const gatewayBase = getGatewayBaseUrl();
const deliverUrl = `${gatewayBase}/deliver/${channel}`;
Expand All @@ -178,7 +180,7 @@ async function deliverToExternalChannel(
chatId,
text: messageText,
assistantId,
});
}, bearerToken);
updateDeliveryStatus(deliveryId, 'sent');
log.info({ deliveryId, channel, chatId }, 'External guardian delivery sent');
} catch (err) {
Expand Down
13 changes: 12 additions & 1 deletion assistant/src/calls/relay-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ export interface RelayWebSocketData {
/** Active relay connections keyed by callSessionId. */
export const activeRelayConnections = new Map<string, RelayConnection>();

/** Module-level broadcast function, set by the HTTP server during startup. */
let globalBroadcast: ((msg: import('../daemon/ipc-contract.js').ServerMessage) => void) | undefined;

/** Register a broadcast function so RelayConnection can forward IPC events. */
export function setRelayBroadcast(fn: (msg: import('../daemon/ipc-contract.js').ServerMessage) => void): void {
globalBroadcast = fn;
}

// ── RelayConnection ──────────────────────────────────────────────────

/**
Expand Down Expand Up @@ -336,7 +344,10 @@ export class RelayConnection {
});

// Create and attach the LLM-driven orchestrator
const orchestrator = new CallOrchestrator(this.callSessionId, this, session?.task ?? null);
const orchestrator = new CallOrchestrator(this.callSessionId, this, session?.task ?? null, {
broadcast: globalBroadcast,
assistantId: session?.assistantId ?? 'self',
});
this.setOrchestrator(orchestrator);

// Check if callee verification is enabled
Expand Down
2 changes: 2 additions & 0 deletions assistant/src/daemon/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { loadConfig } from '../config/loader.js';
import { ensurePromptFiles } from '../config/system-prompt.js';
import { loadPrebuiltHtml } from '../home-base/prebuilt/seed.js';
import { DaemonServer } from './server.js';
import { setRelayBroadcast } from '../calls/relay-server.js';
import { listWorkItems, updateWorkItem } from '../work-items/work-item-store.js';
import { getLogger, initLogger } from '../util/logger.js';
import { DaemonError } from '../util/errors.js';
Expand Down Expand Up @@ -467,6 +468,7 @@ export async function runDaemon(): Promise<void> {
try {
log.info({ port, hostname }, 'Daemon startup: starting runtime HTTP server');
await runtimeHttp.start();
setRelayBroadcast((msg) => server.broadcast(msg));
server.setHttpPort(port);
log.info({ port, hostname }, 'Daemon startup: runtime HTTP server listening');
} catch (err) {
Expand Down
4 changes: 2 additions & 2 deletions assistant/src/memory/guardian-action-store.ts
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 cancelGuardianActionRequest only cancels 'pending' deliveries, missing 'sent' deliveries

The PR correctly fixed expireGuardianActionRequest to use inArray(['pending', 'sent']) so that deliveries already in 'sent' status are expired. However, cancelGuardianActionRequest at guardian-action-store.ts:293-300 has the exact same pre-existing bug that was not fixed: it only cancels deliveries with status = 'pending', leaving deliveries in 'sent' status untouched.

Root Cause and Impact

When a guardian action request is cancelled, cancelGuardianActionRequest updates delivery rows but only those with eq(guardianActionDeliveries.status, 'pending'). In the dispatch flow (guardian-dispatch.ts:139-140), mac deliveries are immediately set to 'sent' via updateDeliveryStatus(delivery.id, 'sent'), and external channel deliveries are set to 'sent' after successful HTTP POST. This means most active deliveries will be in 'sent' status by the time a cancellation occurs.

As a result, cancelling a guardian action request leaves sent deliveries in 'sent' status instead of 'cancelled'. Any logic that checks delivery status to determine if a response should still be accepted would incorrectly treat these as active deliveries. The same reasoning that motivated the inArray(['pending', 'sent']) fix for expireGuardianActionRequest applies equally here.

(Refers to lines 293-300)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* answer resolves the request and all other deliveries are marked answered.
*/

import { and, eq, lt } from 'drizzle-orm';
import { and, eq, lt, inArray } from 'drizzle-orm';
import { v4 as uuid } from 'uuid';
import { getDb } from './db.js';
import {
Expand Down Expand Up @@ -235,7 +235,7 @@ export function expireGuardianActionRequest(id: string): void {
.where(
and(
eq(guardianActionDeliveries.requestId, id),
eq(guardianActionDeliveries.status, 'pending'),
inArray(guardianActionDeliveries.status, ['pending', 'sent']),
),
)
.run();
Expand Down
28 changes: 26 additions & 2 deletions assistant/src/runtime/routes/channel-routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,32 @@ export async function handleChannelInbound(
if (matchedDelivery) {
const request = getGuardianActionRequest(matchedDelivery.requestId);
if (request) {
// Attempt to deliver the answer to the call first. Only resolve
// the guardian action request if answerCall succeeds, so that a
// failed delivery (e.g. pending question timed out) leaves the
// request pending for retry from another channel.
const answerResult = await answerCall({ callSessionId: request.callSessionId, answer: answerText });

if (!('ok' in answerResult) || !answerResult.ok) {
const errorMsg = 'error' in answerResult ? answerResult.error : 'Unknown error';
log.warn({ callSessionId: request.callSessionId, error: errorMsg }, 'answerCall failed for guardian answer');
try {
await deliverChannelReply(replyCallbackUrl, {
chatId: externalChatId,
text: 'Failed to deliver your answer to the call. Please try again.',
assistantId,
}, bearerToken);
} catch (deliverErr) {
log.error({ err: deliverErr, externalChatId }, 'Failed to deliver guardian answer failure notice');
}
return Response.json({
accepted: true,
duplicate: false,
eventId: result.eventId,
guardianAnswer: 'answer_failed',
});
}

const resolved = resolveGuardianActionRequest(
request.id,
answerText,
Expand All @@ -640,8 +666,6 @@ export async function handleChannelInbound(
);

if (resolved) {
// Route the answer to the voice call
void answerCall({ callSessionId: request.callSessionId, answer: answerText });
return Response.json({
accepted: true,
duplicate: false,
Expand Down
Loading