diff --git a/charts/lobu/templates/embeddings-deployment.yaml b/charts/lobu/templates/embeddings-deployment.yaml index 8ff640a9b..2b9e8a667 100644 --- a/charts/lobu/templates/embeddings-deployment.yaml +++ b/charts/lobu/templates/embeddings-deployment.yaml @@ -1,4 +1,7 @@ {{- if .Values.embeddings.enabled }} +{{- if and (gt (int .Values.embeddings.replicaCount) 1) .Values.embeddings.cache.enabled }} +{{- fail "embeddings.replicaCount > 1 requires embeddings.cache.enabled=false: the embeddings cache PVC is ReadWriteOnce and cannot be mounted by multiple pods at once (a second replica would hit a Multi-Attach error). Disable the cache or keep a single embeddings replica." }} +{{- end }} apiVersion: apps/v1 kind: Deployment metadata: diff --git a/charts/lobu/templates/worker-deployment.yaml b/charts/lobu/templates/worker-deployment.yaml index 0d6eacb6b..1d00bb2fb 100644 --- a/charts/lobu/templates/worker-deployment.yaml +++ b/charts/lobu/templates/worker-deployment.yaml @@ -1,4 +1,7 @@ {{- if .Values.worker.enabled }} +{{- if and (gt (int .Values.worker.replicaCount) 1) .Values.worker.cache.enabled }} +{{- fail "worker.replicaCount > 1 requires worker.cache.enabled=false: the worker cache PVC is ReadWriteOnce and cannot be mounted by multiple pods at once (a second replica would hit a Multi-Attach error). Disable the cache or keep a single worker replica." }} +{{- end }} apiVersion: apps/v1 kind: Deployment metadata: @@ -121,6 +124,22 @@ spec: - name: worker-cache mountPath: /app/.cache {{- end }} + # The worker daemon is a poll loop with no HTTP server, so there's no + # endpoint to httpGet. Liveness is an exec probe that confirms the + # daemon process (PID 1, `bun src/bin.ts daemon` per docker/worker) + # is still its own command line — if the loop crashes hard the + # container exits and PID 1 is gone, so kubelet restarts it. Reuses + # the chart's healthCheck.livenessProbe timing. + livenessProbe: + exec: + command: + - sh + - -c + - grep -qa daemon /proc/1/cmdline + initialDelaySeconds: {{ .Values.healthCheck.livenessProbe.initialDelaySeconds }} + periodSeconds: {{ .Values.healthCheck.livenessProbe.periodSeconds }} + timeoutSeconds: {{ .Values.healthCheck.livenessProbe.timeoutSeconds }} + failureThreshold: {{ .Values.healthCheck.livenessProbe.failureThreshold }} resources: {{- toYaml .Values.worker.resources | nindent 12 }} {{- if .Values.worker.cache.enabled }} diff --git a/packages/cli/src/commands/_lib/apply/__tests__/apply-cmd.test.ts b/packages/cli/src/commands/_lib/apply/__tests__/apply-cmd.test.ts index ae5e3a4ed..fa5991760 100644 --- a/packages/cli/src/commands/_lib/apply/__tests__/apply-cmd.test.ts +++ b/packages/cli/src/commands/_lib/apply/__tests__/apply-cmd.test.ts @@ -1,12 +1,16 @@ -import { describe, expect, test } from "bun:test"; +import { describe, expect, mock, test } from "bun:test"; import { + executePlan, locallyDeclaredConnectorKeys, + pushProviderApiKeys, readBoundedBody, validateConnectorState, } from "../apply-cmd.js"; -import type { RemoteConnectorDefinition } from "../client.js"; +import type { ApplyClient, RemoteConnectorDefinition } from "../client.js"; +import type { DiffPlan, RemoteSnapshot } from "../diff.js"; import { validateConnectionAgainstConnector } from "../desired-state.js"; import type { + DesiredAgent, DesiredConnection, DesiredState, ResolvedConnectorSchemas, @@ -111,6 +115,138 @@ describe("readBoundedBody (#3 — bounded source_url fetch)", () => { }); }); +describe("pushProviderApiKeys (#11 — provider keys pushed on a noop-only apply)", () => { + function agentWithKeys( + agentId: string, + providerKeys: { providerId: string; value: string }[] + ): DesiredAgent { + return { + metadata: { agentId, name: agentId }, + settings: {}, + platforms: [], + providerKeys, + }; + } + + test("pushes setProviderApiKey for every declared key (otherwise-noop agents)", async () => { + const setProviderApiKey = mock(async () => { + /* resolve void */ + }); + const client = { setProviderApiKey } as unknown as ApplyClient; + const agents = [ + agentWithKeys("a1", [ + { providerId: "anthropic", value: "k-anthropic" }, + { providerId: "openai", value: "k-openai" }, + ]), + agentWithKeys("a2", [{ providerId: "zai", value: "k-zai" }]), + ]; + + await pushProviderApiKeys(client, agents); + + expect(setProviderApiKey).toHaveBeenCalledTimes(3); + expect(setProviderApiKey).toHaveBeenCalledWith( + "a1", + "anthropic", + "k-anthropic" + ); + expect(setProviderApiKey).toHaveBeenCalledWith("a1", "openai", "k-openai"); + expect(setProviderApiKey).toHaveBeenCalledWith("a2", "zai", "k-zai"); + }); + + test("no-op when no agent declares a provider key", async () => { + const setProviderApiKey = mock(async () => { + /* resolve void */ + }); + const client = { setProviderApiKey } as unknown as ApplyClient; + + await pushProviderApiKeys(client, [agentWithKeys("a1", [])]); + + expect(setProviderApiKey).not.toHaveBeenCalled(); + }); + + // Regression: provider keys target `/agents//providers/...`, so on a + // FIRST apply the agent must be created (executePlan) BEFORE the key push, or + // the server 404s ("Agent not found"). This models that constraint and proves + // the helpers compose in the correct order. + describe("ordering with a first-apply create plan", () => { + function recordingClient(): { + client: ApplyClient; + order: string[]; + } { + const createdAgents = new Set(); + const order: string[] = []; + const client = { + async upsertAgent(meta: { agentId: string }) { + createdAgents.add(meta.agentId); + order.push(`upsertAgent:${meta.agentId}`); + }, + async setProviderApiKey(agentId: string, providerId: string) { + if (!createdAgents.has(agentId)) { + // Mirror the server: the agent must exist first. + throw new Error(`Agent not found: ${agentId}`); + } + order.push(`setProviderApiKey:${agentId}/${providerId}`); + }, + } as unknown as ApplyClient; + return { client, order }; + } + + const desiredAgent = agentWithKeys("new-agent", [ + { providerId: "anthropic", value: "k-anthropic" }, + ]); + const state: DesiredState = { + agents: [desiredAgent], + prune: false, + memorySchema: { entityTypes: [], relationshipTypes: [] }, + watchers: [], + connectors: { definitions: [], authProfiles: [], connections: [] }, + requiredSecrets: [], + }; + const plan: DiffPlan = { + rows: [ + { + kind: "agent", + verb: "create", + id: "new-agent", + desired: desiredAgent.metadata, + }, + ], + counts: { create: 1, update: 0, noop: 0, drift: 0, delete: 0 }, + notes: [], + }; + const remote = { + agents: [], + agentSettings: new Map(), + platformsByAgent: new Map(), + entityTypes: [], + relationshipTypes: [], + watchers: [], + connectorDefinitions: [], + authProfiles: [], + connections: [], + feedsByConnectionId: new Map(), + } as unknown as RemoteSnapshot; + + test("executePlan THEN pushProviderApiKeys succeeds (agent exists first)", async () => { + const { client, order } = recordingClient(); + await executePlan({ client, state, plan, remote }, []); + await pushProviderApiKeys(client, state.agents); + expect(order).toEqual([ + "upsertAgent:new-agent", + "setProviderApiKey:new-agent/anthropic", + ]); + }); + + test("the reverse order (keys before create) reproduces the 404", async () => { + const { client } = recordingClient(); + // Negative control: pushing keys before executePlan is the bug pi caught. + await expect(pushProviderApiKeys(client, state.agents)).rejects.toThrow( + /Agent not found/ + ); + }); + }); +}); + describe("validateConnectorState — skip stale schema for locally-declared keys (#2)", () => { const localDef = { key: "myconn", diff --git a/packages/cli/src/commands/_lib/apply/apply-cmd.ts b/packages/cli/src/commands/_lib/apply/apply-cmd.ts index 953474790..18f27a82a 100644 --- a/packages/cli/src/commands/_lib/apply/apply-cmd.ts +++ b/packages/cli/src/commands/_lib/apply/apply-cmd.ts @@ -556,7 +556,37 @@ interface ApplyContext { remote: RemoteSnapshot; } -async function executePlan( +/** + * Push provider API keys as org-shared `agent_secrets` rows so the worker can + * inject them at runtime without a per-user auth profile. Idempotent (PUT): + * same value → 200, different value → rotation. Walks all desired agents (not + * just those with a settings diff) — the secret value isn't part of the + * settings JSON, so a row can need a key even when every resource is noop (e.g. + * first apply after the gateway picked up support, or a key-only `.env` + * change/rotation). Callers invoke this AFTER executePlan (so a just-created + * agent exists before its `/agents//providers/...` key push), and also in + * the all-noop / key-only branch (no agent creates there). Kept outside + * `executePlan` so both paths can call it without double-pushing. + */ +export async function pushProviderApiKeys( + client: ApplyClient, + agents: DesiredState["agents"] +): Promise { + for (const desired of agents) { + for (const { providerId, value } of desired.providerKeys) { + await client.setProviderApiKey( + desired.metadata.agentId, + providerId, + value + ); + printText( + chalk.dim(` ↻ provider-key ${desired.metadata.agentId}/${providerId}`) + ); + } + } +} + +export async function executePlan( ctx: ApplyContext, pendingAuth: PendingAuthEntry[] ): Promise { @@ -617,25 +647,6 @@ async function executePlan( ); } - // 2b) Provider API keys — pushed as org-shared `agent_secrets` rows so the - // worker can inject them at runtime without a per-user auth profile. Idempotent - // (PUT); same value → 200, different value → rotation. Walk all desired agents - // (not just those with a settings diff) — the secret value isn't part of the - // settings JSON, so a row can need a key even when settings are noop (e.g. - // first apply after the gateway picked up support, or a key rotation). - for (const desired of ctx.state.agents) { - for (const { providerId, value } of desired.providerKeys) { - await ctx.client.setProviderApiKey( - desired.metadata.agentId, - providerId, - value - ); - printText( - chalk.dim(` ↻ provider-key ${desired.metadata.agentId}/${providerId}`) - ); - } - } - // 3) Platforms — upsert only the platforms the diff flagged (create / config // change / key removal). The diff treats an opaque remote secret (`***` / // `secret://`) as unchanged while the key is still declared (see @@ -1231,13 +1242,27 @@ export async function applyCommand(opts: ApplyOptions = {}): Promise { (r) => r.kind === "auth-profile" && "needsAuth" in r && r.needsAuth ); + // Provider API keys live outside the resource diff (the secret value isn't + // serialized into any plan row), so a key-only `.env` change produces an + // all-noop plan. Detect declared keys so the apply isn't short-circuited and + // the keys aren't silently dropped. + const hasProviderKeys = state.agents.some((a) => a.providerKeys.length > 0); + if ( plan.counts.create === 0 && plan.counts.update === 0 && plan.counts.delete === 0 && !hasPendingAuth ) { - printText(chalk.green("\nNothing to apply.")); + // Nothing in the resource diff — but a declared provider key still needs to + // be pushed (idempotent PUT). This is the key-only `.env` change path. + if (hasProviderKeys) { + printText(chalk.bold("\nApplying provider keys:")); + await pushProviderApiKeys(client, state.agents); + printText(chalk.green("\nProvider keys applied; nothing else to apply.")); + } else { + printText(chalk.green("\nNothing to apply.")); + } return; } @@ -1263,24 +1288,38 @@ export async function applyCommand(opts: ApplyOptions = {}): Promise { } } + const hasResourceWork = + plan.counts.create > 0 || plan.counts.update > 0 || plan.counts.delete > 0; + const pendingAuth: PendingAuthEntry[] = []; let applyErr: unknown; - if ( - plan.counts.create > 0 || - plan.counts.update > 0 || - plan.counts.delete > 0 - ) { - printText(chalk.bold("\nApplying:")); - try { + try { + // Resources FIRST: executePlan does `upsertAgent` for created agents, and + // `setProviderApiKey` targets `/agents//providers/...` — pushing keys + // before the agent exists 404s on a first apply. So run the plan, then push + // keys. (The all-noop / key-only short-circuit above pushes keys directly: + // there are no agent creates there, so the agents already exist remotely.) + if (hasResourceWork) { + printText(chalk.bold("\nApplying:")); await executePlan({ client, state, plan, remote }, pendingAuth); + } + // Provider keys live outside the resource diff (idempotent PUT), so push + // them on any confirmed apply (including a pending-auth-only plan). Done + // here, not inside executePlan, so the all-noop short-circuit above can push + // them too without double-pushing. + if (hasProviderKeys) { + printText(chalk.bold("\nApplying provider keys:")); + await pushProviderApiKeys(client, state.agents); + } + if (hasResourceWork) { printText(chalk.green("\nApply complete.")); - } catch (err) { - applyErr = err; - printError(`\n${err instanceof Error ? err.message : String(err)}`); - printError( - "Apply halted on first failure. Re-run `lobu apply` once the underlying issue is resolved — every endpoint is idempotent." - ); } + } catch (err) { + applyErr = err; + printError(`\n${err instanceof Error ? err.message : String(err)}`); + printError( + "Apply halted on first failure. Re-run `lobu apply` once the underlying issue is resolved — every endpoint is idempotent." + ); } // Always render the punch-list — even on partial failure, so the operator diff --git a/packages/core/src/__tests__/utils-lock.test.ts b/packages/core/src/__tests__/utils-lock.test.ts deleted file mode 100644 index ebc2ba02a..000000000 --- a/packages/core/src/__tests__/utils-lock.test.ts +++ /dev/null @@ -1,111 +0,0 @@ -import { describe, expect, test } from "bun:test"; -import { AsyncLock } from "../utils/lock"; - -const wait = (ms: number) => new Promise((r) => setTimeout(r, ms)); - -describe("AsyncLock", () => { - test("returns the value from the wrapped function", async () => { - const lock = new AsyncLock("test"); - const result = await lock.acquire(async () => 42); - expect(result).toBe(42); - }); - - test("serializes concurrent operations (FIFO ordering)", async () => { - const lock = new AsyncLock("serial"); - const events: string[] = []; - - const op = (label: string, ms: number) => - lock.acquire(async () => { - events.push(`start:${label}`); - await wait(ms); - events.push(`end:${label}`); - return label; - }); - - const [a, b, c] = await Promise.all([op("a", 30), op("b", 10), op("c", 5)]); - - expect([a, b, c]).toEqual(["a", "b", "c"]); - // Each operation must end before the next one starts - expect(events).toEqual([ - "start:a", - "end:a", - "start:b", - "end:b", - "start:c", - "end:c", - ]); - }); - - test("releases lock even when wrapped function throws", async () => { - const lock = new AsyncLock("error-path"); - - await expect( - lock.acquire(async () => { - throw new Error("boom"); - }) - ).rejects.toThrow("boom"); - - // Subsequent acquire should succeed (lock was released) - const result = await lock.acquire(async () => "after-error"); - expect(result).toBe("after-error"); - }); - - test("times out when previous holder runs longer than timeoutMs", async () => { - const lock = new AsyncLock("timeout-ctx"); - - // First operation holds the lock for 200ms - const slow = lock.acquire(async () => { - await wait(200); - return "slow"; - }); - - // Second operation should time out after 20ms waiting - await expect(lock.acquire(async () => "fast", 20)).rejects.toThrow( - /Lock acquisition timeout after 20ms.*timeout-ctx/ - ); - - // Wait for the slow op to finish so the test cleans up - await slow; - }); - - test("default context appears in timeout message", async () => { - const lock = new AsyncLock(); - - const slow = lock.acquire(async () => { - await wait(100); - }); - - await expect(lock.acquire(async () => "x", 10)).rejects.toThrow( - /possible deadlock in unknown/ - ); - - await slow; - }); - - test("timeout error from previous slow holder doesn't poison subsequent acquires", async () => { - const lock = new AsyncLock("recovery"); - - const slow = lock.acquire(async () => { - await wait(80); - return "slow-done"; - }); - - await expect(lock.acquire(async () => "skipped", 5)).rejects.toThrow(); - - // After slow finishes, lock should be usable again - expect(await slow).toBe("slow-done"); - expect(await lock.acquire(async () => "ok")).toBe("ok"); - }); - - test("propagates non-Error throws", async () => { - const lock = new AsyncLock(); - await expect( - lock.acquire(async () => { - throw "string-error"; - }) - ).rejects.toBe("string-error"); - - // Lock still works after non-Error throw - expect(await lock.acquire(async () => "recovered")).toBe("recovered"); - }); -}); diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index ea00f67c3..6b542c90a 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -142,7 +142,6 @@ export type { BaseMessage } from "./types/message"; export * from "./utils/encryption"; export * from "./utils/env"; export * from "./utils/json"; -export * from "./utils/lock"; export type { McpStatus, McpToolDef } from "./utils/mcp-tool-instructions"; export * from "./utils/network-domains"; export * from "./utils/retry"; diff --git a/packages/core/src/utils/lock.ts b/packages/core/src/utils/lock.ts deleted file mode 100644 index 2d4b1f903..000000000 --- a/packages/core/src/utils/lock.ts +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Async lock for serializing concurrent operations - * Prevents race conditions in async code by ensuring only one operation runs at a time - * - * @example - * ```typescript - * class StreamSession { - * private streamLock = new AsyncLock(); - * - * async appendDelta(delta: string) { - * return this.streamLock.acquire(() => this.appendDeltaUnsafe(delta)); - * } - * - * private async appendDeltaUnsafe(delta: string) { - * // Critical section - only one execution at a time - * } - * } - * ``` - */ -export class AsyncLock { - private lock: Promise = Promise.resolve(); - private lockContext: string; - - constructor(context: string = "unknown") { - this.lockContext = context; - } - - /** - * Acquire lock and execute function exclusively - * - * @param fn - The async function to execute with exclusive access - * @param timeoutMs - Maximum time to wait for lock acquisition (default: 30s) - * @returns The result of the function - * @throws Error if lock acquisition times out - */ - async acquire( - fn: () => Promise, - timeoutMs: number = 30000 - ): Promise { - const currentLock = this.lock; - let releaseLock: (() => void) | undefined; - - // Create new lock that will be released when fn completes - this.lock = new Promise((resolve) => { - releaseLock = resolve; - }); - - let timeoutId: ReturnType | undefined; - - try { - // Wait for previous operation with timeout to prevent deadlock - const lockTimeout = new Promise((_, reject) => { - timeoutId = setTimeout(() => { - reject( - new Error( - `Lock acquisition timeout after ${timeoutMs}ms - possible deadlock in ${this.lockContext}` - ) - ); - }, timeoutMs); - }); - - await Promise.race([currentLock, lockTimeout]); - - // Execute function with exclusive access - return await fn(); - } finally { - // Clear the timeout to prevent leak - if (timeoutId !== undefined) { - clearTimeout(timeoutId); - } - // Always release lock, even on error - releaseLock?.(); - } - } -} diff --git a/packages/server/src/auth/__tests__/worker-token.test.ts b/packages/server/src/auth/__tests__/worker-token.test.ts new file mode 100644 index 000000000..800e4e67f --- /dev/null +++ b/packages/server/src/auth/__tests__/worker-token.test.ts @@ -0,0 +1,44 @@ +/** + * Worker API trusted-token comparison (#5 — constant-time compare). + * + * The trusted-worker auth path grants full cross-org access; the compare must + * be constant-time (no `===`) and must never throw on a length mismatch. + */ + +import { describe, expect, it } from 'vitest'; +import { compareWorkerToken } from '../worker-token'; + +describe('compareWorkerToken', () => { + const expected = 'lobu_worker_secret_AbCdEf123456'; + + it('accepts the exact configured token', () => { + expect(compareWorkerToken(expected, expected)).toBe(true); + }); + + it('rejects a wrong token of the same length', () => { + const wrong = `${expected.slice(0, -1)}X`; + expect(wrong.length).toBe(expected.length); + expect(compareWorkerToken(wrong, expected)).toBe(false); + }); + + it('rejects a length mismatch without throwing', () => { + expect(() => + compareWorkerToken(`${expected}-extra`, expected) + ).not.toThrow(); + expect(compareWorkerToken(`${expected}-extra`, expected)).toBe(false); + expect(compareWorkerToken(expected.slice(0, 4), expected)).toBe(false); + }); + + it('rejects when the provided token is missing or empty', () => { + expect(compareWorkerToken(undefined, expected)).toBe(false); + expect(compareWorkerToken('', expected)).toBe(false); + }); + + it('rejects when the expected token is unconfigured (env unset)', () => { + // The trusted path is opt-in via WORKER_API_TOKEN — an unset env must never + // grant trusted access, even against an empty provided token. + expect(compareWorkerToken('anything', undefined)).toBe(false); + expect(compareWorkerToken('', undefined)).toBe(false); + expect(compareWorkerToken(undefined, undefined)).toBe(false); + }); +}); diff --git a/packages/server/src/auth/worker-token.ts b/packages/server/src/auth/worker-token.ts new file mode 100644 index 000000000..6239251bb --- /dev/null +++ b/packages/server/src/auth/worker-token.ts @@ -0,0 +1,28 @@ +import { timingSafeEqual } from 'node:crypto'; + +/** + * Constant-time comparison of a provided bearer token against the configured + * `WORKER_API_TOKEN`. The trusted-worker auth path (see `/api/workers/*` in + * index.ts) grants full cross-org access, so a naive `===` would leak the + * secret's length and matching prefix via response timing. + * + * Mirrors the smoke route's `compareTokens` + * (gateway/routes/internal/smoke.ts): a length-equality pre-check first + * (`timingSafeEqual` throws on a length mismatch), then the constant-time + * compare. A missing `expected` (token unconfigured) or `provided` is rejected + * — the trusted path is opt-in via env, never granted by omission. + */ +export function compareWorkerToken( + provided: string | undefined, + expected: string | undefined +): boolean { + if (!provided || !expected) return false; + const a = Buffer.from(provided); + const b = Buffer.from(expected); + if (a.length !== b.length) return false; + try { + return timingSafeEqual(a, b); + } catch { + return false; + } +} diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 22a5d54fd..9905e0d88 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -18,6 +18,7 @@ import { LOBU_LOGO_PNG_BASE64 } from './assets/logo'; import { createAuth } from './auth'; import { getAuthConfig as getAuthConfigFromEnv } from './auth/config'; import { mcpAuth } from './auth/middleware'; +import { compareWorkerToken } from './auth/worker-token'; import { oauthRoutes } from './auth/oauth/routes'; import { findExistingPersonalOrg } from './auth/personal-org-provisioning'; import { credentialRoutes } from './auth/routes'; @@ -680,7 +681,7 @@ app.use('/api/workers/*', async (c, next) => { const expected = c.env.WORKER_API_TOKEN; const provided = c.req.header('Authorization')?.replace('Bearer ', ''); - if (expected && provided === expected) { + if (compareWorkerToken(provided, expected)) { c.set('workerAuthMode', 'trusted'); c.set('workerUserId', null); c.set('workerOrgIds', null);