diff --git a/charts/lobu/templates/deployment.yaml b/charts/lobu/templates/deployment.yaml index 9fddaf5a4..fa43e8cb5 100644 --- a/charts/lobu/templates/deployment.yaml +++ b/charts/lobu/templates/deployment.yaml @@ -103,6 +103,19 @@ spec: - name: PUBLIC_WEB_URL value: {{ printf "https://%s" (first .Values.ingress.hosts) | quote }} {{- end }} + {{- $workerSmoke := .Values.releaseGates.smokeTest.workerSmoke | default dict }} + {{- if and $workerSmoke (hasKey $workerSmoke "enabled") $workerSmoke.enabled }} + # Pin SMOKE_TEST_ALLOWED_HOST to the in-cluster app Service DNS + # name so /api/internal/smoke/dispatch refuses any request + # whose Host header is not the cluster-internal service. The + # smoke Job hits this exact hostname via its curl URL; public + # ingress traffic always carries the operator's external host + # in Host, so this is the second layer of ingress-bypass + # defense (the first is the x-forwarded-* refusal in the + # route handler). + - name: SMOKE_TEST_ALLOWED_HOST + value: {{ printf "%s-app" (include "lobu.fullname" .) | quote }} + {{- end }} {{- if .Values.embeddings.service.port }} {{- if .Values.embeddings.serviceUrl }} - name: EMBEDDINGS_SERVICE_URL diff --git a/charts/lobu/templates/smoke-test-job.yaml b/charts/lobu/templates/smoke-test-job.yaml index 7625bb8f5..32c00d227 100644 --- a/charts/lobu/templates/smoke-test-job.yaml +++ b/charts/lobu/templates/smoke-test-job.yaml @@ -1,4 +1,8 @@ {{- if .Values.releaseGates.smokeTest.enabled }} +{{- $workerSmoke := .Values.releaseGates.smokeTest.workerSmoke | default dict -}} +{{- $workerSmokeEnabled := and $workerSmoke (hasKey $workerSmoke "enabled") $workerSmoke.enabled -}} +{{- $workerSmokeTimeout := default 90 $workerSmoke.timeoutSeconds | int -}} +{{- $totalTimeout := add (.Values.releaseGates.smokeTest.timeoutSeconds | int) (ternary $workerSmokeTimeout 0 $workerSmokeEnabled) -}} apiVersion: batch/v1 kind: Job metadata: @@ -11,7 +15,7 @@ metadata: "helm.sh/hook-delete-policy": before-hook-creation,hook-succeeded spec: backoffLimit: 0 - activeDeadlineSeconds: {{ add (.Values.releaseGates.smokeTest.timeoutSeconds | int) 30 }} + activeDeadlineSeconds: {{ add $totalTimeout 30 }} template: metadata: labels: @@ -27,6 +31,19 @@ spec: - name: smoke-test image: {{ include "lobu.appImage" . }} imagePullPolicy: {{ .Values.image.pullPolicy }} + {{- if $workerSmokeEnabled }} + {{- $secretName := include "lobu.secretName" . }} + {{- if $secretName }} + envFrom: + - secretRef: + # Phase 3 needs SMOKE_TEST_TOKEN from the deployment Secret + # to authenticate against /api/internal/smoke/dispatch. + # DATABASE_URL also comes from the same Secret on production + # installs (chart-managed secrets.create path also exposes it + # via the inline `env` block below). + name: {{ $secretName }} + {{- end }} + {{- end }} env: {{- if and .Values.secrets.create (hasKey .Values.secrets.stringData "DATABASE_URL") }} - name: DATABASE_URL @@ -34,6 +51,27 @@ spec: {{- end }} - name: REQUIRED_SCHEMA value: {{ join "," (default (list) .Values.releaseGates.smokeTest.requiredSchema) | quote }} + {{- if $workerSmokeEnabled }} + - name: WORKER_SMOKE_ENABLED + value: "1" + # NOTE: the smoke agentId and organizationId are NOT passed + # to the dispatch endpoint — the gateway pins them + # server-side from SMOKE_TEST_AGENT_ID / SMOKE_TEST_ORG_ID + # in the deployment Secret so a leaked SMOKE_TEST_TOKEN + # cannot target a real tenant. The chart values below are + # only here so operators can keep the chart-side knobs in + # sync with what they configure in the Secret. + - name: WORKER_SMOKE_CONV_PREFIX + value: {{ default "smoke-" $workerSmoke.conversationIdPrefix | quote }} + - name: WORKER_SMOKE_TIMEOUT + value: {{ $workerSmokeTimeout | quote }} + - name: WORKER_SMOKE_INTERVAL + value: {{ default 3 $workerSmoke.intervalSeconds | int | quote }} + - name: WORKER_SMOKE_RELEASE + value: {{ .Release.Name | quote }} + - name: WORKER_SMOKE_REVISION + value: {{ .Release.Revision | quote }} + {{- end }} command: - /bin/bash - -ec @@ -121,4 +159,96 @@ spec: await sql.end({ timeout: 1 }).catch(() => {}); } NODE + + if [ -z "${WORKER_SMOKE_ENABLED:-}" ]; then + echo "workerSmoke disabled — skipping phase 3" + exit 0 + fi + if [ -z "${SMOKE_TEST_TOKEN:-}" ]; then + echo "SMOKE_TEST_TOKEN env not set — cannot run worker smoke" >&2 + exit 1 + fi + + conv_id="${WORKER_SMOKE_CONV_PREFIX}${WORKER_SMOKE_RELEASE}-${WORKER_SMOKE_REVISION}" + dispatch_url="http://{{ include "lobu.fullname" . }}-app:{{ .Values.service.port }}/api/internal/smoke/dispatch" + echo "phase 3: dispatching smoke run to $dispatch_url (conversation_id=$conv_id)" + + http_status=$(curl -sS --max-time 10 -o /tmp/lobu-smoke-dispatch -w '%{http_code}' \ + -H "Authorization: Bearer ${SMOKE_TEST_TOKEN}" \ + -H 'Content-Type: application/json' \ + -X POST "$dispatch_url" \ + --data "$(printf '{"conversationId":"%s","messageText":"smoke ping"}' \ + "${conv_id}")" \ + || echo "000") + + if [ "${http_status}" != "200" ]; then + echo "smoke dispatch returned HTTP ${http_status}" >&2 + cat /tmp/lobu-smoke-dispatch >&2 || true + exit 1 + fi + + run_id=$(node --input-type=module <<'NODE' + import fs from "node:fs"; + try { + const body = JSON.parse(fs.readFileSync("/tmp/lobu-smoke-dispatch", "utf8")); + if (typeof body.runId === "number" && body.runId > 0) { + process.stdout.write(String(body.runId)); + } else { + process.exit(2); + } + } catch { + process.exit(3); + } + NODE + ) + if [ -z "${run_id}" ]; then + echo "smoke dispatch did not return a runId" >&2 + cat /tmp/lobu-smoke-dispatch >&2 || true + exit 1 + fi + echo "smoke dispatch enqueued run_id=${run_id}, polling for completion" + + deadline=$((SECONDS + WORKER_SMOKE_TIMEOUT)) + while true; do + terminal=$(WORKER_SMOKE_RUN_ID="${run_id}" node --input-type=module <<'NODE' + import postgres from "postgres"; + const sql = postgres(process.env.DATABASE_URL, { + max: 1, + connect_timeout: 10, + idle_timeout: 1, + onnotice: () => {}, + }); + try { + const rows = await sql` + SELECT terminal_status + FROM public.agent_transcript_snapshot + WHERE run_id = ${Number(process.env.WORKER_SMOKE_RUN_ID)} + LIMIT 1 + `; + if (rows.length > 0) { + process.stdout.write(String(rows[0].terminal_status || "")); + } + } finally { + await sql.end({ timeout: 1 }).catch(() => {}); + } + NODE + ) + + if [ "${terminal}" = "completed" ]; then + echo "worker smoke run completed (run_id=${run_id})" + exit 0 + fi + + if [ -n "${terminal}" ] && [ "${terminal}" != "completed" ]; then + echo "worker smoke run terminated with status='${terminal}' (run_id=${run_id})" >&2 + exit 1 + fi + + if [ "$SECONDS" -ge "$deadline" ]; then + echo "worker smoke timed out after ${WORKER_SMOKE_TIMEOUT}s waiting for run_id=${run_id}" >&2 + exit 1 + fi + + sleep "${WORKER_SMOKE_INTERVAL}" + done {{- end }} diff --git a/charts/lobu/values.yaml b/charts/lobu/values.yaml index 35f771947..625385950 100644 --- a/charts/lobu/values.yaml +++ b/charts/lobu/values.yaml @@ -260,3 +260,45 @@ releaseGates: - device_workers.organization_id - connections.device_worker_id - connections.organization_id + + # Phase 3 of the smoke job — drive an actual worker run end-to-end. + # + # When enabled, the Job POSTs to the internal /api/internal/smoke/dispatch + # endpoint, which inserts a synthetic chat_message run. The runs-queue + # MessageConsumer in the app pod claims it, spawns a worker subprocess, + # the worker runs, and on terminal cleanup writes a row into + # `agent_transcript_snapshot`. The Job polls that row and fails the + # deploy if `terminal_status='completed'` doesn't materialise inside + # `workerSmokeTimeoutSeconds`. This makes the recurring class of + # "gateway boots fine but workers can't process a single message" + # regressions un-shippable. + # + # Default OFF: the chart cannot preprovision the synthetic agent for + # you. Operators MUST add three keys to the deployment Secret before + # enabling, AND preprovision a matching agent row: + # + # 1. Generate a random token (≥32 chars) and add to the Secret: + # SMOKE_TEST_TOKEN= + # SMOKE_TEST_AGENT_ID= + # SMOKE_TEST_ORG_ID= + # The gateway PINS the smoke agentId + organizationId from the + # env at dispatch time — caller-supplied values are ignored. + # This makes it structurally impossible for a leaked + # SMOKE_TEST_TOKEN to dispatch a synthetic run against a real + # tenant's agent. + # 2. Preprovision the synthetic agent. The simplest path is + # `lobu apply` against a dedicated "smoke" org/agent project + # whose only agent.id matches `SMOKE_TEST_AGENT_ID`. + # 3. Bump this stanza to `enabled: true` and roll the chart. + workerSmoke: + enabled: false + # The smoke Job appends a release-scoped suffix so each helm + # upgrade gets its own conversation id within this prefix. + conversationIdPrefix: "smoke-" + # How long to wait for the snapshot row to appear with + # `terminal_status='completed'`. Worker spawn + LLM round-trip + + # snapshot POST is ~10-30s on the prod cluster; 90s leaves + # headroom for a cold image pull on the worker subprocess. + timeoutSeconds: 90 + # DB poll interval inside the Job. + intervalSeconds: 3 diff --git a/packages/server/src/gateway/__tests__/smoke-dispatch.test.ts b/packages/server/src/gateway/__tests__/smoke-dispatch.test.ts new file mode 100644 index 000000000..9228970c5 --- /dev/null +++ b/packages/server/src/gateway/__tests__/smoke-dispatch.test.ts @@ -0,0 +1,361 @@ +/** + * Unit tests for the internal smoke-dispatch endpoint. + * + * Covers the three-layer auth contract: + * 1. SMOKE_TEST_TOKEN bearer (constant-time compare) + * 2. Ingress-bypass: any x-forwarded-* header → 403 + * 3. Server-pinned smoke namespace from SMOKE_TEST_AGENT_ID / + * SMOKE_TEST_ORG_ID env (caller-supplied agentId/organizationId + * are silently ignored) + * + * Plus input validation (conversationId required + prefix-checked), + * runs INSERT happy path, and idempotency. + * + * The end-to-end "real worker actually processes the synthetic run" + * path is gated by the Helm post-upgrade smoke Job at deploy time, not + * by these unit tests. + */ + +import { + afterEach, + beforeAll, + beforeEach, + describe, + expect, + test, +} from "bun:test"; +import { Hono } from "hono"; +import { getDb } from "../../db/client.js"; +import { createSmokeRoutes } from "../routes/internal/smoke.js"; +import { + ensurePgliteForGatewayTests, + resetTestDatabase, +} from "./helpers/db-setup.js"; + +const SMOKE_TOKEN = "test-smoke-token-deadbeef-cafef00d-feedface"; +const SMOKE_AGENT_ID = "smoke-test"; +const SMOKE_ORG_ID = "smoke-org"; + +const savedEnv: Record = {}; + +function snapshotEnv() { + savedEnv.SMOKE_TEST_TOKEN = process.env.SMOKE_TEST_TOKEN; + savedEnv.SMOKE_TEST_AGENT_ID = process.env.SMOKE_TEST_AGENT_ID; + savedEnv.SMOKE_TEST_ORG_ID = process.env.SMOKE_TEST_ORG_ID; + savedEnv.SMOKE_TEST_ALLOWED_HOST = process.env.SMOKE_TEST_ALLOWED_HOST; +} + +function restoreEnv() { + for (const k of Object.keys(savedEnv)) { + const v = savedEnv[k]; + if (v === undefined) delete process.env[k]; + else process.env[k] = v; + } +} + +beforeAll(async () => { + await ensurePgliteForGatewayTests(); +}); + +beforeEach(async () => { + await resetTestDatabase(); + snapshotEnv(); + process.env.SMOKE_TEST_TOKEN = SMOKE_TOKEN; + process.env.SMOKE_TEST_AGENT_ID = SMOKE_AGENT_ID; + process.env.SMOKE_TEST_ORG_ID = SMOKE_ORG_ID; +}); + +afterEach(() => { + restoreEnv(); +}); + +function mountSmoke(): Hono { + const app = new Hono(); + app.route("/api/internal/smoke", createSmokeRoutes()); + return app; +} + +async function dispatch( + app: Hono, + body: Record, + token: string | null = SMOKE_TOKEN, + extraHeaders: Record = {} +): Promise { + const headers: Record = { + "content-type": "application/json", + ...extraHeaders, + }; + if (token !== null) headers.authorization = `Bearer ${token}`; + return app.request("/api/internal/smoke/dispatch", { + method: "POST", + headers, + body: JSON.stringify(body), + }); +} + +describe("smoke dispatch auth", () => { + test("rejects missing bearer", async () => { + const res = await dispatch(mountSmoke(), { conversationId: "smoke-x" }, null); + expect(res.status).toBe(401); + }); + + test("rejects wrong-length token", async () => { + const res = await dispatch( + mountSmoke(), + { conversationId: "smoke-x" }, + "short" + ); + expect(res.status).toBe(401); + }); + + test("rejects same-length but wrong token", async () => { + const wrong = SMOKE_TOKEN.slice(0, -1) + "X"; + const res = await dispatch(mountSmoke(), { conversationId: "smoke-x" }, wrong); + expect(res.status).toBe(401); + }); + + test("503 when SMOKE_TEST_TOKEN unset", async () => { + delete process.env.SMOKE_TEST_TOKEN; + const res = await dispatch(mountSmoke(), { conversationId: "smoke-x" }); + expect(res.status).toBe(503); + }); + + test("503 when SMOKE_TEST_TOKEN empty string", async () => { + process.env.SMOKE_TEST_TOKEN = ""; + const res = await dispatch(mountSmoke(), { conversationId: "smoke-x" }); + expect(res.status).toBe(503); + }); + + test("503 when SMOKE_TEST_AGENT_ID unset", async () => { + delete process.env.SMOKE_TEST_AGENT_ID; + const res = await dispatch(mountSmoke(), { conversationId: "smoke-x" }); + expect(res.status).toBe(503); + }); + + test("503 when SMOKE_TEST_ORG_ID unset", async () => { + delete process.env.SMOKE_TEST_ORG_ID; + const res = await dispatch(mountSmoke(), { conversationId: "smoke-x" }); + expect(res.status).toBe(503); + }); +}); + +describe("smoke dispatch Host allowlist", () => { + test("accepts request when SMOKE_TEST_ALLOWED_HOST unset (no Host check)", async () => { + delete process.env.SMOKE_TEST_ALLOWED_HOST; + const res = await dispatch(mountSmoke(), { conversationId: "smoke-no-host" }); + expect(res.status).toBe(200); + }); + + test("rejects request whose Host does not match SMOKE_TEST_ALLOWED_HOST", async () => { + process.env.SMOKE_TEST_ALLOWED_HOST = "release-name-lobu-app"; + const res = await dispatch( + mountSmoke(), + { conversationId: "smoke-bad-host" }, + SMOKE_TOKEN, + { host: "app.lobu.ai" } + ); + expect(res.status).toBe(403); + }); + + test("accepts request whose Host matches SMOKE_TEST_ALLOWED_HOST exactly", async () => { + process.env.SMOKE_TEST_ALLOWED_HOST = "release-name-lobu-app"; + const res = await dispatch( + mountSmoke(), + { conversationId: "smoke-good-host" }, + SMOKE_TOKEN, + { host: "release-name-lobu-app" } + ); + expect(res.status).toBe(200); + }); + + test("accepts request whose Host carries a port suffix", async () => { + process.env.SMOKE_TEST_ALLOWED_HOST = "release-name-lobu-app"; + const res = await dispatch( + mountSmoke(), + { conversationId: "smoke-port-host" }, + SMOKE_TOKEN, + { host: "release-name-lobu-app:8787" } + ); + expect(res.status).toBe(200); + }); + + test("accepts request whose Host carries the cluster.local FQDN suffix", async () => { + process.env.SMOKE_TEST_ALLOWED_HOST = "release-name-lobu-app"; + const res = await dispatch( + mountSmoke(), + { conversationId: "smoke-fqdn-host" }, + SMOKE_TOKEN, + { host: "release-name-lobu-app.my-ns.svc.cluster.local" } + ); + expect(res.status).toBe(200); + }); +}); + +describe("smoke dispatch ingress-bypass defense", () => { + test("rejects request with x-forwarded-for", async () => { + const res = await dispatch( + mountSmoke(), + { conversationId: "smoke-ingress" }, + SMOKE_TOKEN, + { "x-forwarded-for": "203.0.113.1" } + ); + expect(res.status).toBe(403); + }); + + test("rejects request with forwarded header", async () => { + const res = await dispatch( + mountSmoke(), + { conversationId: "smoke-ingress" }, + SMOKE_TOKEN, + { forwarded: "for=203.0.113.1" } + ); + expect(res.status).toBe(403); + }); + + test("rejects request with x-real-ip", async () => { + const res = await dispatch( + mountSmoke(), + { conversationId: "smoke-ingress" }, + SMOKE_TOKEN, + { "x-real-ip": "203.0.113.1" } + ); + expect(res.status).toBe(403); + }); + + test("rejects request with x-forwarded-host", async () => { + const res = await dispatch( + mountSmoke(), + { conversationId: "smoke-ingress" }, + SMOKE_TOKEN, + { "x-forwarded-host": "evil.example.com" } + ); + expect(res.status).toBe(403); + }); +}); + +describe("smoke dispatch input validation", () => { + test("rejects missing conversationId", async () => { + const res = await dispatch(mountSmoke(), {}); + expect(res.status).toBe(400); + }); + + test("rejects conversationId without smoke- prefix", async () => { + const res = await dispatch(mountSmoke(), { + conversationId: "production-conv-id", + }); + expect(res.status).toBe(400); + const body = await res.json(); + expect(body.error).toContain("smoke-"); + }); + + test("rejects invalid JSON", async () => { + const res = await mountSmoke().request("/api/internal/smoke/dispatch", { + method: "POST", + headers: { + authorization: `Bearer ${SMOKE_TOKEN}`, + "content-type": "application/json", + }, + body: "not-json", + }); + expect(res.status).toBe(400); + }); +}); + +describe("smoke dispatch insert + namespace pinning", () => { + test("inserts chat_message run with env-pinned agentId/organizationId", async () => { + const res = await dispatch(mountSmoke(), { + conversationId: "smoke-release-1", + messageText: "hello", + }); + expect(res.status).toBe(200); + const body = (await res.json()) as { runId: number; idempotencyKey: string }; + expect(typeof body.runId).toBe("number"); + expect(body.runId).toBeGreaterThan(0); + expect(body.idempotencyKey).toBe("smoke:smoke-release-1"); + + const sql = getDb(); + const rows = await sql<{ + run_type: string; + queue_name: string; + status: string; + idempotency_key: string; + action_input: Record; + }>` + SELECT run_type, queue_name, status, idempotency_key, action_input + FROM public.runs + WHERE id = ${body.runId} + `; + expect(rows.length).toBe(1); + const row = rows[0]!; + expect(row.run_type).toBe("chat_message"); + expect(row.queue_name).toBe("messages"); + expect(row.status).toBe("pending"); + expect(row.idempotency_key).toBe("smoke:smoke-release-1"); + expect(row.action_input.agentId).toBe(SMOKE_AGENT_ID); + expect(row.action_input.organizationId).toBe(SMOKE_ORG_ID); + expect(row.action_input.conversationId).toBe("smoke-release-1"); + expect(row.action_input.platform).toBe("smoke"); + expect(row.action_input.messageText).toBe("hello"); + }); + + test("caller-supplied agentId/organizationId in body are silently ignored", async () => { + // A leaked SMOKE_TEST_TOKEN trying to target a real tenant — the + // body fields must be ignored, and the env-pinned smoke namespace + // is the one that lands in the runs row. + const res = await mountSmoke().request("/api/internal/smoke/dispatch", { + method: "POST", + headers: { + authorization: `Bearer ${SMOKE_TOKEN}`, + "content-type": "application/json", + }, + body: JSON.stringify({ + agentId: "real-tenant-agent", + organizationId: "real-tenant-org", + conversationId: "smoke-attempt", + }), + }); + expect(res.status).toBe(200); + const body = (await res.json()) as { runId: number }; + const sql = getDb(); + const rows = await sql<{ action_input: Record }>` + SELECT action_input FROM public.runs WHERE id = ${body.runId} + `; + expect(rows[0]!.action_input.agentId).toBe(SMOKE_AGENT_ID); + expect(rows[0]!.action_input.organizationId).toBe(SMOKE_ORG_ID); + }); + + test("default messageText when omitted", async () => { + const res = await dispatch(mountSmoke(), { + conversationId: "smoke-default-msg", + }); + expect(res.status).toBe(200); + const body = (await res.json()) as { runId: number }; + const sql = getDb(); + const rows = await sql<{ action_input: Record }>` + SELECT action_input FROM public.runs WHERE id = ${body.runId} + `; + expect(rows[0]!.action_input.messageText).toBe("smoke-test ping"); + }); + + test("idempotent: second dispatch with same conv returns same runId", async () => { + const first = await dispatch(mountSmoke(), { + conversationId: "smoke-idem", + }); + expect(first.status).toBe(200); + const firstBody = (await first.json()) as { runId: number }; + + const second = await dispatch(mountSmoke(), { + conversationId: "smoke-idem", + }); + expect(second.status).toBe(200); + const secondBody = (await second.json()) as { runId: number }; + expect(secondBody.runId).toBe(firstBody.runId); + + const sql = getDb(); + const rows = await sql` + SELECT COUNT(*)::int AS cnt FROM public.runs + WHERE idempotency_key = 'smoke:smoke-idem' + `; + expect((rows[0] as { cnt: number }).cnt).toBe(1); + }); +}); diff --git a/packages/server/src/gateway/routes/internal/smoke.ts b/packages/server/src/gateway/routes/internal/smoke.ts new file mode 100644 index 000000000..a5a063805 --- /dev/null +++ b/packages/server/src/gateway/routes/internal/smoke.ts @@ -0,0 +1,326 @@ +/** + * Internal smoke-test dispatch endpoint. + * + * POST /api/internal/smoke/dispatch + * + * Inserts a synthetic chat_message run into `public.runs`. The runs-queue + * MessageConsumer (running in the same app pod) claims it, spawns the + * worker subprocess, the worker runs end-to-end, and on terminal cleanup + * writes a row to `agent_transcript_snapshot`. The Helm post-upgrade + * smoke-test Job polls that row to gate the release: if the snapshot + * doesn't materialize with `terminal_status='completed'` within the + * configured window, Helm rolls the release back. + * + * Why a dedicated endpoint and not "just call the public chat API": + * - The public Agent API requires an OAuth bearer / PAT from a real + * authenticated user — we can't easily mint one from inside a Helm + * post-upgrade Job without per-cluster bootstrap. + * - Real chat connections (Telegram/Slack) require platform-side + * secrets and webhook configuration that an in-cluster smoke Job + * shouldn't carry. + * - The path under test is the worker spawn + run completion pipeline, + * not the platform ingress. Synthesising the message directly into + * the runs queue exercises everything from MessageConsumer onward, + * which is exactly the surface that has been silently broken across + * the recent regressions (Phase 5 env flip, runs denormalize/revert, + * JobEventSchema, action_input JSONB shape). + * + * Auth model (four layers — all must pass): + * 1. Bearer token equal to `process.env.SMOKE_TEST_TOKEN` + * (constant-time compare). The token is loaded into the deployment + * Secret and into the smoke-test Job's envFrom; ingress doesn't + * proxy it. + * 2. The request MUST NOT carry any `x-forwarded-*` / `forwarded` / + * `x-real-ip` header. A standards-compliant reverse proxy or + * ingress controller (nginx, istio, traefik, etc.) adds at least + * one of these; the in-cluster smoke Job hits the + * `-app` Service via cluster DNS, which never traverses + * ingress, so the headers are absent on legitimate calls. + * 3. The Host header MUST start with the in-cluster app service + * hostname (`-lobu-app`) — set explicitly by the smoke + * Job's curl URL. Public ingress traffic always carries the + * operator's external hostname (e.g. `app.lobu.ai`) in Host, so + * this rejects requests routed through ingress even if (2) is + * somehow bypassed by a non-compliant proxy. The required Host + * prefix is operator-configurable via `SMOKE_TEST_ALLOWED_HOST` + * env (defaults to "" — empty means "any Host accepted", which is + * only safe when (2) is honoured; operators are encouraged to set + * this). + * 4. If `SMOKE_TEST_TOKEN` / `SMOKE_TEST_AGENT_ID` / `SMOKE_TEST_ORG_ID` + * are unset OR empty the endpoint returns 503 so the check fails + * closed: a deployment with partial configuration will fail its + * smoke gate. + * + * Isolation guarantees (server-pinned, not client-supplied): + * - The synthetic agent + organization identifiers come from + * `process.env.SMOKE_TEST_AGENT_ID` and `process.env.SMOKE_TEST_ORG_ID` + * — caller-supplied values are ignored. This makes it structurally + * impossible for a leaked token to trigger runs on a real tenant's + * agent: even with the token, the caller can only dispatch against + * the env-configured smoke namespace. + * - The conversationId is caller-supplied so each release gets a unique + * run id, but must carry the configured `smoke-` prefix. + * - The chat_message row is tagged with `idempotency_key= + * smoke:` so repeated calls within a deployment do + * not flood the queue. + * + * The actual snapshot poll lives in the smoke-test Job's shell loop + * (charts/lobu/templates/smoke-test-job.yaml) — keeping the polling out + * of this handler avoids tying up a Hono request for the full timeout + * window. + */ + +import { timingSafeEqual } from "node:crypto"; +import { createLogger } from "@lobu/core"; +import { Hono } from "hono"; +import { getDb } from "../../../db/client.js"; + +const logger = createLogger("smoke-dispatch"); + +interface SmokeDispatchBody { + conversationId?: string; + messageText?: string; +} + +function compareTokens(provided: string, expected: string): boolean { + if (provided.length !== expected.length) return false; + try { + return timingSafeEqual(Buffer.from(provided), Buffer.from(expected)); + } catch { + return false; + } +} + +/** + * A request that passed through an ingress / reverse proxy carries at + * least one of these headers. The smoke Job hits the in-cluster + * `-app` Service via cluster DNS — direct ClusterIP → Pod, no + * ingress hop — so none of these headers are set on a legitimate call. + * Reject any request that carries one: that's a clear sign the route was + * reached through public ingress, which is never how the smoke Job + * speaks to the app. + */ +const FORWARDED_HEADERS = [ + "x-forwarded-for", + "x-forwarded-host", + "x-forwarded-proto", + "x-forwarded-port", + "x-forwarded-server", + "x-real-ip", + "forwarded", +]; + +export function createSmokeRoutes(): Hono { + const app = new Hono(); + + app.post("/dispatch", async (c) => { + const expected = process.env.SMOKE_TEST_TOKEN ?? ""; + const smokeAgentId = (process.env.SMOKE_TEST_AGENT_ID ?? "").trim(); + const smokeOrgId = (process.env.SMOKE_TEST_ORG_ID ?? "").trim(); + if (expected.length === 0 || smokeAgentId === "" || smokeOrgId === "") { + // Fail closed: an operator that hasn't configured every piece of + // the smoke trio (token + agent + org) cannot satisfy the gate. + // Returning 503 here also makes the smoke Job's curl fail before + // it can ever land a synthetic run in a partially-configured + // tenant. + return c.json( + { + error: + "Smoke dispatch disabled (SMOKE_TEST_TOKEN/SMOKE_TEST_AGENT_ID/SMOKE_TEST_ORG_ID unset)", + }, + 503 + ); + } + + // Ingress-bypass defense (layer A — forwarded-headers). + // A request that came through ingress carries x-forwarded-* headers; + // the in-cluster smoke Job never does. + for (const h of FORWARDED_HEADERS) { + if (c.req.header(h)) { + logger.warn( + `Smoke dispatch refused: ${h} header present (request came through ingress)` + ); + return c.json({ error: "Forwarded request refused" }, 403); + } + } + + // Ingress-bypass defense (layer B — Host header). + // When the operator configures SMOKE_TEST_ALLOWED_HOST, the request's + // Host header must start with that value. The chart wires this to + // `-lobu-app` so a request that came through public ingress + // (Host: app.lobu.ai or similar) is rejected even if some + // non-compliant proxy stripped the x-forwarded-* headers. + const allowedHost = (process.env.SMOKE_TEST_ALLOWED_HOST ?? "").trim(); + if (allowedHost !== "") { + const rawHost = (c.req.header("host") ?? "").toLowerCase(); + // Strip any : suffix before comparing — operators set the + // unsuffixed service DNS name, but curl sends ":". + const hostPart = rawHost.split(":")[0] ?? ""; + const expected = allowedHost.toLowerCase(); + if (hostPart !== expected && !hostPart.startsWith(`${expected}.`)) { + logger.warn( + `Smoke dispatch refused: Host '${rawHost}' does not match SMOKE_TEST_ALLOWED_HOST '${allowedHost}'` + ); + return c.json({ error: "Host header refused" }, 403); + } + } + + const auth = c.req.header("authorization") ?? ""; + if (!auth.startsWith("Bearer ")) { + return c.json({ error: "Missing bearer token" }, 401); + } + const provided = auth.substring(7); + if (!compareTokens(provided, expected)) { + return c.json({ error: "Invalid smoke token" }, 401); + } + + let body: SmokeDispatchBody; + try { + body = (await c.req.json()) as SmokeDispatchBody; + } catch { + return c.json({ error: "Invalid JSON body" }, 400); + } + + // agentId + organizationId are server-pinned from env, NOT caller- + // supplied. The previous draft accepted them in the body, which a + // codex review correctly flagged as convention-only isolation — a + // leaked token could then trigger runs on real tenants. Here we + // ignore the body fields entirely and force the smoke namespace. + const agentId = smokeAgentId; + const organizationId = smokeOrgId; + const conversationId = body.conversationId?.trim(); + const messageText = body.messageText?.trim() || "smoke-test ping"; + + if (!conversationId) { + return c.json({ error: "conversationId is required" }, 400); + } + + // Defence-in-depth: the synthetic conversationId must carry the + // `smoke-` prefix the chart sets. Operators that override the prefix + // can adjust this guard locally; production smoke runs always use + // the default. This trades a tiny amount of operator flexibility for + // an unambiguous audit trail. + if (!conversationId.startsWith("smoke-")) { + return c.json( + { + error: "conversationId must start with 'smoke-' for safety", + }, + 400 + ); + } + + const idempotencyKey = `smoke:${conversationId}`; + const messageId = `smoke-msg-${Date.now()}`; + + // Synthetic MessagePayload. Mirrors the shape that + // message-handler-bridge.ts builds for a real platform inbound, but + // with platform="smoke" and a minimal platformMetadata block so + // the chat-response bridge has nothing real to deliver to. + const payload = { + platform: "smoke", + userId: "smoke-user", + botId: "smoke", + conversationId, + teamId: "smoke", + agentId, + organizationId, + messageId, + messageText, + channelId: conversationId, + platformMetadata: { + agentId, + chatId: conversationId, + senderId: "smoke-user", + responseChannel: conversationId, + responseId: messageId, + responseThreadId: conversationId, + }, + agentOptions: {}, + }; + + const sql = getDb(); + + // Insert directly into `public.runs` to mirror what RunsQueue.send + // does — going through queueProducer would also work, but the + // direct INSERT keeps this handler dependency-free and avoids + // requiring the gateway to be fully initialised at smoke time. The + // pg_notify wakeup is what cues MessageConsumer to claim the row. + try { + const result = await sql<{ id: number | string }>` + INSERT INTO public.runs ( + run_type, + queue_name, + action_input, + idempotency_key, + max_attempts, + attempts, + status, + run_at, + priority, + retry_delay_seconds + ) VALUES ( + 'chat_message', + 'messages', + ${sql.json(payload)}, + ${idempotencyKey}, + 1, + 0, + 'pending', + now(), + 0, + NULL + ) + ON CONFLICT (idempotency_key) + WHERE idempotency_key IS NOT NULL + AND status IN ('pending', 'claimed', 'running') + DO NOTHING + RETURNING id + `; + + let runId: number | null = null; + if (result.length > 0 && result[0]) { + runId = Number(result[0].id); + } else { + // ON CONFLICT swallowed the insert — surface the live run id so + // the smoke job can still poll for its outcome. + const existing = await sql<{ id: number | string }>` + SELECT id FROM public.runs + WHERE idempotency_key = ${idempotencyKey} + AND status IN ('pending', 'claimed', 'running') + ORDER BY id DESC + LIMIT 1 + `; + if (existing.length > 0 && existing[0]) { + runId = Number(existing[0].id); + } + } + + if (runId === null) { + return c.json({ error: "Failed to enqueue smoke run" }, 500); + } + + // Fire pg_notify so the MessageConsumer wakes immediately rather + // than waiting for the next poll tick. Matches the wakeup that + // RunsQueue.send issues post-commit. + try { + await sql`SELECT pg_notify('runs_lobu:messages', 'chat_message')`; + } catch (err) { + logger.warn( + `pg_notify after smoke dispatch failed (non-fatal): ${err instanceof Error ? err.message : String(err)}` + ); + } + + logger.info( + `Smoke dispatch: runId=${runId} agentId=${agentId} org=${organizationId} conv=${conversationId}` + ); + return c.json({ runId, idempotencyKey }); + } catch (err) { + logger.error( + `Smoke dispatch INSERT failed: ${err instanceof Error ? err.message : String(err)}` + ); + return c.json({ error: "Internal error" }, 500); + } + }); + + return app; +} diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 9c72653e5..8d0ddacc4 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -583,6 +583,13 @@ app.get('/legal', (c) => { // Health check and worker endpoints must be before mcpAuth middleware app.get('/api/health', restHealth); +// Internal smoke-test dispatch. Authentication is a shared bearer +// (`SMOKE_TEST_TOKEN`) loaded into the pod via the deployment Secret — +// not exposed to public ingress consumers. Mounted before mcpAuth so the +// route handles its own auth without falling into the OAuth-bearer path. +import { createSmokeRoutes } from './gateway/routes/internal/smoke'; +app.route('/api/internal/smoke', createSmokeRoutes()); + import { completeActionRun, completeAuthRun,