diff --git a/assistant/src/calls/guardian-action-sweep.ts b/assistant/src/calls/guardian-action-sweep.ts index 1e5a00063a0..59109446731 100644 --- a/assistant/src/calls/guardian-action-sweep.ts +++ b/assistant/src/calls/guardian-action-sweep.ts @@ -58,7 +58,7 @@ export function sweepExpiredGuardianActions( addMessage( delivery.destinationConversationId, 'assistant', - JSON.stringify('This guardian question has expired without a response.'), + JSON.stringify([{ type: 'text', text: 'This guardian question has expired without a response.' }]), ); } else if (delivery.destinationChatId) { // External channel — send expiry notice diff --git a/assistant/src/calls/guardian-dispatch.ts b/assistant/src/calls/guardian-dispatch.ts index 9f4e7cc0927..21414806509 100644 --- a/assistant/src/calls/guardian-dispatch.ts +++ b/assistant/src/calls/guardian-dispatch.ts @@ -21,6 +21,7 @@ import { getUserConsultationTimeoutMs } from './call-constants.js'; import { getOrCreateConversation } from '../memory/conversation-key-store.js'; import { addMessage } from '../memory/conversation-store.js'; import type { CallPendingQuestion } from './types.js'; +import { readHttpToken } from '../util/platform.js'; import type { ServerMessage } from '../daemon/ipc-contract.js'; const log = getLogger('guardian-dispatch'); @@ -123,7 +124,7 @@ export async function dispatchGuardianQuestion(params: GuardianDispatchParams): addMessage( macConversationId, 'assistant', - JSON.stringify(`Your assistant needs your input during a phone call.\n\nQuestion: ${request.questionText}\n\nReply to this message with your answer.`), + JSON.stringify([{ type: 'text', text: `Your assistant needs your input during a phone call.\n\nQuestion: ${request.questionText}\n\nReply to this message with your answer.` }]), ); // Emit IPC event for the mac client with the server-created conversation @@ -146,7 +147,7 @@ export async function dispatchGuardianQuestion(params: GuardianDispatchParams): destinationExternalUserId: dest.externalUserId, }); // External channel — POST to gateway - void deliverToExternalChannel(delivery.id, dest.channel, dest.chatId!, request.questionText, request.requestCode, assistantId); + void deliverToExternalChannel(delivery.id, dest.channel, dest.chatId!, request.questionText, request.requestCode, assistantId, readHttpToken() ?? undefined); } } } catch (err) { @@ -161,6 +162,7 @@ async function deliverToExternalChannel( questionText: string, requestCode: string, assistantId: string, + bearerToken?: string, ): Promise { const gatewayBase = getGatewayBaseUrl(); const deliverUrl = `${gatewayBase}/deliver/${channel}`; @@ -178,7 +180,7 @@ async function deliverToExternalChannel( chatId, text: messageText, assistantId, - }); + }, bearerToken); updateDeliveryStatus(deliveryId, 'sent'); log.info({ deliveryId, channel, chatId }, 'External guardian delivery sent'); } catch (err) { diff --git a/assistant/src/calls/relay-server.ts b/assistant/src/calls/relay-server.ts index e56c0d564a3..7708c3991ff 100644 --- a/assistant/src/calls/relay-server.ts +++ b/assistant/src/calls/relay-server.ts @@ -109,6 +109,14 @@ export interface RelayWebSocketData { /** Active relay connections keyed by callSessionId. */ export const activeRelayConnections = new Map(); +/** Module-level broadcast function, set by the HTTP server during startup. */ +let globalBroadcast: ((msg: import('../daemon/ipc-contract.js').ServerMessage) => void) | undefined; + +/** Register a broadcast function so RelayConnection can forward IPC events. */ +export function setRelayBroadcast(fn: (msg: import('../daemon/ipc-contract.js').ServerMessage) => void): void { + globalBroadcast = fn; +} + // ── RelayConnection ────────────────────────────────────────────────── /** @@ -336,7 +344,10 @@ export class RelayConnection { }); // Create and attach the LLM-driven orchestrator - const orchestrator = new CallOrchestrator(this.callSessionId, this, session?.task ?? null); + const orchestrator = new CallOrchestrator(this.callSessionId, this, session?.task ?? null, { + broadcast: globalBroadcast, + assistantId: session?.assistantId ?? 'self', + }); this.setOrchestrator(orchestrator); // Check if callee verification is enabled diff --git a/assistant/src/daemon/lifecycle.ts b/assistant/src/daemon/lifecycle.ts index e7f5505c285..e4d94a61779 100644 --- a/assistant/src/daemon/lifecycle.ts +++ b/assistant/src/daemon/lifecycle.ts @@ -24,6 +24,7 @@ import { loadConfig } from '../config/loader.js'; import { ensurePromptFiles } from '../config/system-prompt.js'; import { loadPrebuiltHtml } from '../home-base/prebuilt/seed.js'; import { DaemonServer } from './server.js'; +import { setRelayBroadcast } from '../calls/relay-server.js'; import { listWorkItems, updateWorkItem } from '../work-items/work-item-store.js'; import { getLogger, initLogger } from '../util/logger.js'; import { DaemonError } from '../util/errors.js'; @@ -467,6 +468,7 @@ export async function runDaemon(): Promise { try { log.info({ port, hostname }, 'Daemon startup: starting runtime HTTP server'); await runtimeHttp.start(); + setRelayBroadcast((msg) => server.broadcast(msg)); server.setHttpPort(port); log.info({ port, hostname }, 'Daemon startup: runtime HTTP server listening'); } catch (err) { diff --git a/assistant/src/memory/guardian-action-store.ts b/assistant/src/memory/guardian-action-store.ts index aa335a6e884..23cfe0d9510 100644 --- a/assistant/src/memory/guardian-action-store.ts +++ b/assistant/src/memory/guardian-action-store.ts @@ -7,7 +7,7 @@ * answer resolves the request and all other deliveries are marked answered. */ -import { and, eq, lt } from 'drizzle-orm'; +import { and, eq, lt, inArray } from 'drizzle-orm'; import { v4 as uuid } from 'uuid'; import { getDb } from './db.js'; import { @@ -235,7 +235,7 @@ export function expireGuardianActionRequest(id: string): void { .where( and( eq(guardianActionDeliveries.requestId, id), - eq(guardianActionDeliveries.status, 'pending'), + inArray(guardianActionDeliveries.status, ['pending', 'sent']), ), ) .run(); diff --git a/assistant/src/runtime/routes/channel-routes.ts b/assistant/src/runtime/routes/channel-routes.ts index 7982593d19c..ceab9c06cd0 100644 --- a/assistant/src/runtime/routes/channel-routes.ts +++ b/assistant/src/runtime/routes/channel-routes.ts @@ -632,6 +632,32 @@ export async function handleChannelInbound( if (matchedDelivery) { const request = getGuardianActionRequest(matchedDelivery.requestId); if (request) { + // Attempt to deliver the answer to the call first. Only resolve + // the guardian action request if answerCall succeeds, so that a + // failed delivery (e.g. pending question timed out) leaves the + // request pending for retry from another channel. + const answerResult = await answerCall({ callSessionId: request.callSessionId, answer: answerText }); + + if (!('ok' in answerResult) || !answerResult.ok) { + const errorMsg = 'error' in answerResult ? answerResult.error : 'Unknown error'; + log.warn({ callSessionId: request.callSessionId, error: errorMsg }, 'answerCall failed for guardian answer'); + try { + await deliverChannelReply(replyCallbackUrl, { + chatId: externalChatId, + text: 'Failed to deliver your answer to the call. Please try again.', + assistantId, + }, bearerToken); + } catch (deliverErr) { + log.error({ err: deliverErr, externalChatId }, 'Failed to deliver guardian answer failure notice'); + } + return Response.json({ + accepted: true, + duplicate: false, + eventId: result.eventId, + guardianAnswer: 'answer_failed', + }); + } + const resolved = resolveGuardianActionRequest( request.id, answerText, @@ -640,8 +666,6 @@ export async function handleChannelInbound( ); if (resolved) { - // Route the answer to the voice call - void answerCall({ callSessionId: request.callSessionId, answer: answerText }); return Response.json({ accepted: true, duplicate: false,