From fac7af5091a783e42d1a10108b420da03356d944 Mon Sep 17 00:00:00 2001 From: siddseethepalli Date: Tue, 24 Feb 2026 19:54:35 +0000 Subject: [PATCH] feat: add circuit breaker to gateway runtime client Co-Authored-By: Claude --- .../routes/twilio-connect-action-webhook.ts | 8 +- .../src/http/routes/twilio-status-webhook.ts | 8 +- .../src/http/routes/twilio-voice-webhook.ts | 8 +- gateway/src/index.ts | 7 + gateway/src/runtime/client.ts | 264 +++++++++++++++--- 5 files changed, 246 insertions(+), 49 deletions(-) diff --git a/gateway/src/http/routes/twilio-connect-action-webhook.ts b/gateway/src/http/routes/twilio-connect-action-webhook.ts index ea3cd9f4a1f..1ac85af0e3e 100644 --- a/gateway/src/http/routes/twilio-connect-action-webhook.ts +++ b/gateway/src/http/routes/twilio-connect-action-webhook.ts @@ -1,6 +1,6 @@ import type { GatewayConfig } from "../../config.js"; import { getLogger } from "../../logger.js"; -import { forwardTwilioConnectActionWebhook } from "../../runtime/client.js"; +import { CircuitBreakerOpenError, forwardTwilioConnectActionWebhook } from "../../runtime/client.js"; import { validateTwilioWebhookRequest } from "../../twilio/validate-webhook.js"; const log = getLogger("twilio-connect-action-webhook"); @@ -20,6 +20,12 @@ export function createTwilioConnectActionWebhookHandler(config: GatewayConfig) { headers: runtimeResponse.headers, }); } catch (err) { + if (err instanceof CircuitBreakerOpenError) { + return Response.json( + { error: "Service temporarily unavailable" }, + { status: 503, headers: { "Retry-After": String(err.retryAfterSecs) } }, + ); + } log.error({ err }, "Failed to forward Twilio connect-action webhook to runtime"); return Response.json({ error: "Internal server error" }, { status: 502 }); } diff --git a/gateway/src/http/routes/twilio-status-webhook.ts b/gateway/src/http/routes/twilio-status-webhook.ts index 26fe56384f7..7610df51f4e 100644 --- a/gateway/src/http/routes/twilio-status-webhook.ts +++ b/gateway/src/http/routes/twilio-status-webhook.ts @@ -1,6 +1,6 @@ import type { GatewayConfig } from "../../config.js"; import { getLogger } from "../../logger.js"; -import { forwardTwilioStatusWebhook } from "../../runtime/client.js"; +import { CircuitBreakerOpenError, forwardTwilioStatusWebhook } from "../../runtime/client.js"; import { validateTwilioWebhookRequest } from "../../twilio/validate-webhook.js"; const log = getLogger("twilio-status-webhook"); @@ -23,6 +23,12 @@ export function createTwilioStatusWebhookHandler(config: GatewayConfig) { headers: runtimeResponse.headers, }); } catch (err) { + if (err instanceof CircuitBreakerOpenError) { + return Response.json( + { error: "Service temporarily unavailable" }, + { status: 503, headers: { "Retry-After": String(err.retryAfterSecs) } }, + ); + } log.error({ err }, "Failed to forward Twilio status webhook to runtime"); return Response.json({ error: "Internal server error" }, { status: 502 }); } diff --git a/gateway/src/http/routes/twilio-voice-webhook.ts b/gateway/src/http/routes/twilio-voice-webhook.ts index f88edd46993..0bd70746998 100644 --- a/gateway/src/http/routes/twilio-voice-webhook.ts +++ b/gateway/src/http/routes/twilio-voice-webhook.ts @@ -1,6 +1,6 @@ import type { GatewayConfig } from "../../config.js"; import { getLogger } from "../../logger.js"; -import { forwardTwilioVoiceWebhook } from "../../runtime/client.js"; +import { CircuitBreakerOpenError, forwardTwilioVoiceWebhook } from "../../runtime/client.js"; import { resolveAssistant, resolveAssistantByPhoneNumber, isRejection } from "../../routing/resolve-assistant.js"; import { validateTwilioWebhookRequest } from "../../twilio/validate-webhook.js"; @@ -69,6 +69,12 @@ export function createTwilioVoiceWebhookHandler(config: GatewayConfig) { headers: runtimeResponse.headers, }); } catch (err) { + if (err instanceof CircuitBreakerOpenError) { + return Response.json( + { error: "Service temporarily unavailable" }, + { status: 503, headers: { "Retry-After": String(err.retryAfterSecs) } }, + ); + } log.error({ err }, "Failed to forward Twilio voice webhook to runtime"); return Response.json({ error: "Internal server error" }, { status: 502 }); } diff --git a/gateway/src/index.ts b/gateway/src/index.ts index 08e3b36e735..129efeb639b 100644 --- a/gateway/src/index.ts +++ b/gateway/src/index.ts @@ -17,6 +17,7 @@ import { createWhatsAppDeliverHandler } from "./http/routes/whatsapp-deliver.js" import { createOAuthCallbackHandler } from "./http/routes/oauth-callback.js"; import { createPairingProxyHandler } from "./http/routes/pairing-proxy.js"; import { getLogger, initLogger } from "./logger.js"; +import { CircuitBreakerOpenError } from "./runtime/client.js"; import { buildSchema } from "./schema.js"; import { callTelegramApi } from "./telegram/api.js"; import { reconcileTelegramWebhook } from "./telegram/webhook-manager.js"; @@ -64,6 +65,12 @@ function main() { port: config.port, websocket: getRelayWebsocketHandlers(), error(err) { + if (err instanceof CircuitBreakerOpenError) { + return Response.json( + { error: "Service temporarily unavailable — runtime is unreachable" }, + { status: 503, headers: { "Retry-After": String(err.retryAfterSecs) } }, + ); + } log.error({ err }, "Unhandled gateway error"); return Response.json({ error: "Internal server error" }, { status: 500 }); }, diff --git a/gateway/src/runtime/client.ts b/gateway/src/runtime/client.ts index 6548ebc675e..4955c057265 100644 --- a/gateway/src/runtime/client.ts +++ b/gateway/src/runtime/client.ts @@ -5,6 +5,94 @@ import { getLogger } from "../logger.js"; const log = getLogger("runtime-client"); +// ── Circuit breaker ────────────────────────────────────────────────── + +const enum CircuitState { + CLOSED = 0, + OPEN = 1, + HALF_OPEN = 2, +} + +const CB_FAILURE_THRESHOLD = 5; +const CB_COOLDOWN_MS = 30_000; + +/** + * Thrown when the circuit breaker is open. Callers should return 503 + * with a Retry-After header derived from `retryAfterSecs`. + */ +export class CircuitBreakerOpenError extends Error { + readonly retryAfterSecs: number; + constructor(retryAfterSecs: number) { + super("Circuit breaker is open — runtime is unavailable"); + this.name = "CircuitBreakerOpenError"; + this.retryAfterSecs = retryAfterSecs; + } +} + +let cbState: CircuitState = CircuitState.CLOSED; +let cbConsecutiveFailures = 0; +let cbOpenedAt = 0; + +function cbRetryAfterSecs(): number { + const elapsed = Date.now() - cbOpenedAt; + return Math.max(1, Math.ceil((CB_COOLDOWN_MS - elapsed) / 1000)); +} + +/** + * Check the circuit before making a request. Throws if open. + * Returns true when this is a half-open probe (caller must record outcome). + */ +function cbBeforeRequest(): boolean { + if (cbState === CircuitState.CLOSED) return false; + + if (cbState === CircuitState.OPEN) { + if (Date.now() - cbOpenedAt >= CB_COOLDOWN_MS) { + cbState = CircuitState.HALF_OPEN; + log.info("Circuit breaker entering HALF_OPEN — allowing probe request"); + return true; + } + throw new CircuitBreakerOpenError(cbRetryAfterSecs()); + } + + // HALF_OPEN: only one probe in flight; reject additional requests + throw new CircuitBreakerOpenError(cbRetryAfterSecs()); +} + +function cbOnSuccess(): void { + if (cbState !== CircuitState.CLOSED) { + log.info("Circuit breaker closing — runtime recovered"); + } + cbState = CircuitState.CLOSED; + cbConsecutiveFailures = 0; +} + +function cbOnFailure(): void { + cbConsecutiveFailures++; + + if (cbState === CircuitState.HALF_OPEN) { + cbState = CircuitState.OPEN; + cbOpenedAt = Date.now(); + log.warn({ failures: cbConsecutiveFailures }, "Circuit breaker re-opening after failed probe"); + return; + } + + if (cbConsecutiveFailures >= CB_FAILURE_THRESHOLD) { + cbState = CircuitState.OPEN; + cbOpenedAt = Date.now(); + log.warn( + { failures: cbConsecutiveFailures }, + "Circuit breaker opening — runtime appears down", + ); + } +} + +/** Exported for testing — resets circuit breaker to initial state. */ +export function _resetCircuitBreaker(): void { + cbState = CircuitState.CLOSED; + cbConsecutiveFailures = 0; + cbOpenedAt = 0; +} + /** * Header name used to prove a request originated from the gateway. * The value is the dedicated gateway-origin secret (or the bearer token as @@ -92,6 +180,8 @@ export async function forwardToRuntime( payload: RuntimeInboundPayload, options?: ForwardOptions, ): Promise { + cbBeforeRequest(); + const url = `${config.assistantRuntimeBaseUrl}/v1/assistants/${encodeURIComponent(assistantId)}/channels/inbound`; const extraHeaders: Record = { "Content-Type": "application/json" }; @@ -122,6 +212,8 @@ export async function forwardToRuntime( { status: response.status, body, assistantId }, "Runtime returned client error, not retrying", ); + // 4xx = client error, not a daemon outage — don't trip the breaker + cbOnSuccess(); throw new Error(`Runtime returned ${response.status}: ${body}`); } @@ -140,6 +232,7 @@ export async function forwardToRuntime( { assistantId, eventId: result.eventId, duplicate: result.duplicate }, "Runtime forward succeeded", ); + cbOnSuccess(); return result; } catch (err) { if ( @@ -156,6 +249,7 @@ export async function forwardToRuntime( } } + cbOnFailure(); throw lastError ?? new Error("Runtime forward failed after retries"); } @@ -165,19 +259,30 @@ export async function resetConversation( sourceChannel: ChannelId, externalChatId: string, ): Promise { + cbBeforeRequest(); + const url = `${config.assistantRuntimeBaseUrl}/v1/assistants/${encodeURIComponent(assistantId)}/channels/conversation`; - const response = await fetchImpl(url, { - method: "DELETE", - headers: runtimeHeaders(config, { "Content-Type": "application/json" }), - body: JSON.stringify({ sourceChannel, externalChatId }), - signal: AbortSignal.timeout(config.runtimeTimeoutMs), - }); + let response: Response; + try { + response = await fetchImpl(url, { + method: "DELETE", + headers: runtimeHeaders(config, { "Content-Type": "application/json" }), + body: JSON.stringify({ sourceChannel, externalChatId }), + signal: AbortSignal.timeout(config.runtimeTimeoutMs), + }); + } catch (err) { + cbOnFailure(); + throw err; + } if (!response.ok) { const body = await response.text(); + if (response.status >= 500) cbOnFailure(); else cbOnSuccess(); throw new Error(`Reset conversation failed (${response.status}): ${body}`); } + + cbOnSuccess(); } export type UploadAttachmentInput = { @@ -195,19 +300,29 @@ export async function downloadAttachment( assistantId: string, attachmentId: string, ): Promise { + cbBeforeRequest(); + const url = `${config.assistantRuntimeBaseUrl}/v1/assistants/${encodeURIComponent(assistantId)}/attachments/${encodeURIComponent(attachmentId)}`; - const response = await fetchImpl(url, { - method: "GET", - headers: runtimeHeaders(config), - signal: AbortSignal.timeout(config.runtimeTimeoutMs), - }); + let response: Response; + try { + response = await fetchImpl(url, { + method: "GET", + headers: runtimeHeaders(config), + signal: AbortSignal.timeout(config.runtimeTimeoutMs), + }); + } catch (err) { + cbOnFailure(); + throw err; + } if (!response.ok) { const body = await response.text(); + if (response.status >= 500) cbOnFailure(); else cbOnSuccess(); throw new Error(`Attachment download failed (${response.status}): ${body}`); } + cbOnSuccess(); return (await response.json()) as RuntimeAttachmentPayload; } @@ -219,19 +334,29 @@ export async function downloadAttachmentById( config: GatewayConfig, attachmentId: string, ): Promise { + cbBeforeRequest(); + const url = `${config.assistantRuntimeBaseUrl}/v1/attachments/${encodeURIComponent(attachmentId)}`; - const response = await fetchImpl(url, { - method: "GET", - headers: runtimeHeaders(config), - signal: AbortSignal.timeout(config.runtimeTimeoutMs), - }); + let response: Response; + try { + response = await fetchImpl(url, { + method: "GET", + headers: runtimeHeaders(config), + signal: AbortSignal.timeout(config.runtimeTimeoutMs), + }); + } catch (err) { + cbOnFailure(); + throw err; + } if (!response.ok) { const body = await response.text(); + if (response.status >= 500) cbOnFailure(); else cbOnSuccess(); throw new Error(`Attachment download failed (${response.status}): ${body}`); } + cbOnSuccess(); return (await response.json()) as RuntimeAttachmentPayload; } @@ -257,20 +382,29 @@ export async function forwardTwilioVoiceWebhook( originalUrl: string, assistantId?: string, ): Promise { + cbBeforeRequest(); + const url = `${config.assistantRuntimeBaseUrl}/v1/internal/twilio/voice-webhook`; - const response = await fetchImpl(url, { - method: "POST", - headers: runtimeHeaders(config, { "Content-Type": "application/json" }), - body: JSON.stringify({ params, originalUrl, assistantId }), - signal: AbortSignal.timeout(config.runtimeTimeoutMs), - }); + let response: Response; + try { + response = await fetchImpl(url, { + method: "POST", + headers: runtimeHeaders(config, { "Content-Type": "application/json" }), + body: JSON.stringify({ params, originalUrl, assistantId }), + signal: AbortSignal.timeout(config.runtimeTimeoutMs), + }); + } catch (err) { + cbOnFailure(); + throw err; + } const body = await response.text(); const headers: Record = {}; const contentType = response.headers.get("content-type"); if (contentType) headers["Content-Type"] = contentType; + if (response.status >= 500) cbOnFailure(); else cbOnSuccess(); return { status: response.status, body, headers }; } @@ -281,20 +415,29 @@ export async function forwardTwilioStatusWebhook( config: GatewayConfig, params: Record, ): Promise { + cbBeforeRequest(); + const url = `${config.assistantRuntimeBaseUrl}/v1/internal/twilio/status`; - const response = await fetchImpl(url, { - method: "POST", - headers: runtimeHeaders(config, { "Content-Type": "application/json" }), - body: JSON.stringify({ params }), - signal: AbortSignal.timeout(config.runtimeTimeoutMs), - }); + let response: Response; + try { + response = await fetchImpl(url, { + method: "POST", + headers: runtimeHeaders(config, { "Content-Type": "application/json" }), + body: JSON.stringify({ params }), + signal: AbortSignal.timeout(config.runtimeTimeoutMs), + }); + } catch (err) { + cbOnFailure(); + throw err; + } const body = await response.text(); const headers: Record = {}; const contentType = response.headers.get("content-type"); if (contentType) headers["Content-Type"] = contentType; + if (response.status >= 500) cbOnFailure(); else cbOnSuccess(); return { status: response.status, body, headers }; } @@ -305,20 +448,29 @@ export async function forwardTwilioConnectActionWebhook( config: GatewayConfig, params: Record, ): Promise { + cbBeforeRequest(); + const url = `${config.assistantRuntimeBaseUrl}/v1/internal/twilio/connect-action`; - const response = await fetchImpl(url, { - method: "POST", - headers: runtimeHeaders(config, { "Content-Type": "application/json" }), - body: JSON.stringify({ params }), - signal: AbortSignal.timeout(config.runtimeTimeoutMs), - }); + let response: Response; + try { + response = await fetchImpl(url, { + method: "POST", + headers: runtimeHeaders(config, { "Content-Type": "application/json" }), + body: JSON.stringify({ params }), + signal: AbortSignal.timeout(config.runtimeTimeoutMs), + }); + } catch (err) { + cbOnFailure(); + throw err; + } const body = await response.text(); const headers: Record = {}; const contentType = response.headers.get("content-type"); if (contentType) headers["Content-Type"] = contentType; + if (response.status >= 500) cbOnFailure(); else cbOnSuccess(); return { status: response.status, body, headers }; } @@ -327,14 +479,22 @@ export async function uploadAttachment( assistantId: string, input: UploadAttachmentInput, ): Promise { + cbBeforeRequest(); + const url = `${config.assistantRuntimeBaseUrl}/v1/assistants/${encodeURIComponent(assistantId)}/attachments`; - const response = await fetchImpl(url, { - method: "POST", - headers: runtimeHeaders(config, { "Content-Type": "application/json" }), - body: JSON.stringify(input), - signal: AbortSignal.timeout(config.runtimeTimeoutMs), - }); + let response: Response; + try { + response = await fetchImpl(url, { + method: "POST", + headers: runtimeHeaders(config, { "Content-Type": "application/json" }), + body: JSON.stringify(input), + signal: AbortSignal.timeout(config.runtimeTimeoutMs), + }); + } catch (err) { + cbOnFailure(); + throw err; + } if (!response.ok) { const body = await response.text(); @@ -342,13 +502,16 @@ export async function uploadAttachment( // extension, missing fields). Distinguish from transient 5xx/network errors // so callers can decide whether to skip or propagate. if (response.status >= 400 && response.status < 500) { + cbOnSuccess(); throw new AttachmentValidationError( `Attachment rejected (${response.status}): ${body}`, ); } + cbOnFailure(); throw new Error(`Attachment upload failed (${response.status}): ${body}`); } + cbOnSuccess(); return (await response.json()) as UploadAttachmentResponse; } @@ -370,15 +533,24 @@ export async function forwardOAuthCallback( code?: string, error?: string, ): Promise { + cbBeforeRequest(); + const url = `${config.assistantRuntimeBaseUrl}/v1/internal/oauth/callback`; - const response = await fetchImpl(url, { - method: "POST", - headers: runtimeHeaders(config, { "Content-Type": "application/json" }), - body: JSON.stringify({ state, code, error }), - signal: AbortSignal.timeout(config.runtimeTimeoutMs), - }); + let response: Response; + try { + response = await fetchImpl(url, { + method: "POST", + headers: runtimeHeaders(config, { "Content-Type": "application/json" }), + body: JSON.stringify({ state, code, error }), + signal: AbortSignal.timeout(config.runtimeTimeoutMs), + }); + } catch (err) { + cbOnFailure(); + throw err; + } const body = await response.text(); + if (response.status >= 500) cbOnFailure(); else cbOnSuccess(); return { status: response.status, body }; }