Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions charts/lobu/templates/embeddings-deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{{- if .Values.embeddings.enabled }}
{{- if and (gt (int .Values.embeddings.replicaCount) 1) .Values.embeddings.cache.enabled }}
{{- fail "embeddings.replicaCount > 1 requires embeddings.cache.enabled=false: the embeddings cache PVC is ReadWriteOnce and cannot be mounted by multiple pods at once (a second replica would hit a Multi-Attach error). Disable the cache or keep a single embeddings replica." }}
{{- end }}
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down
19 changes: 19 additions & 0 deletions charts/lobu/templates/worker-deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{{- if .Values.worker.enabled }}
{{- if and (gt (int .Values.worker.replicaCount) 1) .Values.worker.cache.enabled }}
{{- fail "worker.replicaCount > 1 requires worker.cache.enabled=false: the worker cache PVC is ReadWriteOnce and cannot be mounted by multiple pods at once (a second replica would hit a Multi-Attach error). Disable the cache or keep a single worker replica." }}
{{- end }}
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down Expand Up @@ -121,6 +124,22 @@ spec:
- name: worker-cache
mountPath: /app/.cache
{{- end }}
# The worker daemon is a poll loop with no HTTP server, so there's no
# endpoint to httpGet. Liveness is an exec probe that confirms the
# daemon process (PID 1, `bun src/bin.ts daemon` per docker/worker)
# is still its own command line — if the loop crashes hard the
# container exits and PID 1 is gone, so kubelet restarts it. Reuses
# the chart's healthCheck.livenessProbe timing.
livenessProbe:
exec:
command:
- sh
- -c
- grep -qa daemon /proc/1/cmdline
initialDelaySeconds: {{ .Values.healthCheck.livenessProbe.initialDelaySeconds }}
periodSeconds: {{ .Values.healthCheck.livenessProbe.periodSeconds }}
timeoutSeconds: {{ .Values.healthCheck.livenessProbe.timeoutSeconds }}
failureThreshold: {{ .Values.healthCheck.livenessProbe.failureThreshold }}
resources:
{{- toYaml .Values.worker.resources | nindent 12 }}
{{- if .Values.worker.cache.enabled }}
Expand Down
140 changes: 138 additions & 2 deletions packages/cli/src/commands/_lib/apply/__tests__/apply-cmd.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { describe, expect, test } from "bun:test";
import { describe, expect, mock, test } from "bun:test";
import {
executePlan,
locallyDeclaredConnectorKeys,
pushProviderApiKeys,
readBoundedBody,
validateConnectorState,
} from "../apply-cmd.js";
import type { RemoteConnectorDefinition } from "../client.js";
import type { ApplyClient, RemoteConnectorDefinition } from "../client.js";
import type { DiffPlan, RemoteSnapshot } from "../diff.js";
import { validateConnectionAgainstConnector } from "../desired-state.js";
import type {
DesiredAgent,
DesiredConnection,
DesiredState,
ResolvedConnectorSchemas,
Expand Down Expand Up @@ -111,6 +115,138 @@ describe("readBoundedBody (#3 — bounded source_url fetch)", () => {
});
});

describe("pushProviderApiKeys (#11 — provider keys pushed on a noop-only apply)", () => {
function agentWithKeys(
agentId: string,
providerKeys: { providerId: string; value: string }[]
): DesiredAgent {
return {
metadata: { agentId, name: agentId },
settings: {},
platforms: [],
providerKeys,
};
}

test("pushes setProviderApiKey for every declared key (otherwise-noop agents)", async () => {
const setProviderApiKey = mock(async () => {
/* resolve void */
});
const client = { setProviderApiKey } as unknown as ApplyClient;
const agents = [
agentWithKeys("a1", [
{ providerId: "anthropic", value: "k-anthropic" },
{ providerId: "openai", value: "k-openai" },
]),
agentWithKeys("a2", [{ providerId: "zai", value: "k-zai" }]),
];

await pushProviderApiKeys(client, agents);

expect(setProviderApiKey).toHaveBeenCalledTimes(3);
expect(setProviderApiKey).toHaveBeenCalledWith(
"a1",
"anthropic",
"k-anthropic"
);
expect(setProviderApiKey).toHaveBeenCalledWith("a1", "openai", "k-openai");
expect(setProviderApiKey).toHaveBeenCalledWith("a2", "zai", "k-zai");
});

test("no-op when no agent declares a provider key", async () => {
const setProviderApiKey = mock(async () => {
/* resolve void */
});
const client = { setProviderApiKey } as unknown as ApplyClient;

await pushProviderApiKeys(client, [agentWithKeys("a1", [])]);

expect(setProviderApiKey).not.toHaveBeenCalled();
});

// Regression: provider keys target `/agents/<id>/providers/...`, so on a
// FIRST apply the agent must be created (executePlan) BEFORE the key push, or
// the server 404s ("Agent not found"). This models that constraint and proves
// the helpers compose in the correct order.
describe("ordering with a first-apply create plan", () => {
function recordingClient(): {
client: ApplyClient;
order: string[];
} {
const createdAgents = new Set<string>();
const order: string[] = [];
const client = {
async upsertAgent(meta: { agentId: string }) {
createdAgents.add(meta.agentId);
order.push(`upsertAgent:${meta.agentId}`);
},
async setProviderApiKey(agentId: string, providerId: string) {
if (!createdAgents.has(agentId)) {
// Mirror the server: the agent must exist first.
throw new Error(`Agent not found: ${agentId}`);
}
order.push(`setProviderApiKey:${agentId}/${providerId}`);
},
} as unknown as ApplyClient;
return { client, order };
}

const desiredAgent = agentWithKeys("new-agent", [
{ providerId: "anthropic", value: "k-anthropic" },
]);
const state: DesiredState = {
agents: [desiredAgent],
prune: false,
memorySchema: { entityTypes: [], relationshipTypes: [] },
watchers: [],
connectors: { definitions: [], authProfiles: [], connections: [] },
requiredSecrets: [],
};
const plan: DiffPlan = {
rows: [
{
kind: "agent",
verb: "create",
id: "new-agent",
desired: desiredAgent.metadata,
},
],
counts: { create: 1, update: 0, noop: 0, drift: 0, delete: 0 },
notes: [],
};
const remote = {
agents: [],
agentSettings: new Map(),
platformsByAgent: new Map(),
entityTypes: [],
relationshipTypes: [],
watchers: [],
connectorDefinitions: [],
authProfiles: [],
connections: [],
feedsByConnectionId: new Map(),
} as unknown as RemoteSnapshot;

test("executePlan THEN pushProviderApiKeys succeeds (agent exists first)", async () => {
const { client, order } = recordingClient();
await executePlan({ client, state, plan, remote }, []);
await pushProviderApiKeys(client, state.agents);
expect(order).toEqual([
"upsertAgent:new-agent",
"setProviderApiKey:new-agent/anthropic",
]);
});

test("the reverse order (keys before create) reproduces the 404", async () => {
const { client } = recordingClient();
// Negative control: pushing keys before executePlan is the bug pi caught.
await expect(pushProviderApiKeys(client, state.agents)).rejects.toThrow(
/Agent not found/
);
});
});
});

describe("validateConnectorState — skip stale schema for locally-declared keys (#2)", () => {
const localDef = {
key: "myconn",
Expand Down
107 changes: 73 additions & 34 deletions packages/cli/src/commands/_lib/apply/apply-cmd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,37 @@ interface ApplyContext {
remote: RemoteSnapshot;
}

async function executePlan(
/**
* Push provider API keys as org-shared `agent_secrets` rows so the worker can
* inject them at runtime without a per-user auth profile. Idempotent (PUT):
* same value → 200, different value → rotation. Walks all desired agents (not
* just those with a settings diff) — the secret value isn't part of the
* settings JSON, so a row can need a key even when every resource is noop (e.g.
* first apply after the gateway picked up support, or a key-only `.env`
* change/rotation). Callers invoke this AFTER executePlan (so a just-created
* agent exists before its `/agents/<id>/providers/...` key push), and also in
* the all-noop / key-only branch (no agent creates there). Kept outside
* `executePlan` so both paths can call it without double-pushing.
*/
export async function pushProviderApiKeys(
client: ApplyClient,
agents: DesiredState["agents"]
): Promise<void> {
for (const desired of agents) {
for (const { providerId, value } of desired.providerKeys) {
await client.setProviderApiKey(
desired.metadata.agentId,
providerId,
value
);
printText(
chalk.dim(` ↻ provider-key ${desired.metadata.agentId}/${providerId}`)
);
}
}
}

export async function executePlan(
ctx: ApplyContext,
pendingAuth: PendingAuthEntry[]
): Promise<void> {
Expand Down Expand Up @@ -617,25 +647,6 @@ async function executePlan(
);
}

// 2b) Provider API keys — pushed as org-shared `agent_secrets` rows so the
// worker can inject them at runtime without a per-user auth profile. Idempotent
// (PUT); same value → 200, different value → rotation. Walk all desired agents
// (not just those with a settings diff) — the secret value isn't part of the
// settings JSON, so a row can need a key even when settings are noop (e.g.
// first apply after the gateway picked up support, or a key rotation).
for (const desired of ctx.state.agents) {
for (const { providerId, value } of desired.providerKeys) {
await ctx.client.setProviderApiKey(
desired.metadata.agentId,
providerId,
value
);
printText(
chalk.dim(` ↻ provider-key ${desired.metadata.agentId}/${providerId}`)
);
}
}

// 3) Platforms — upsert only the platforms the diff flagged (create / config
// change / key removal). The diff treats an opaque remote secret (`***` /
// `secret://`) as unchanged while the key is still declared (see
Expand Down Expand Up @@ -1231,13 +1242,27 @@ export async function applyCommand(opts: ApplyOptions = {}): Promise<void> {
(r) => r.kind === "auth-profile" && "needsAuth" in r && r.needsAuth
);

// Provider API keys live outside the resource diff (the secret value isn't
// serialized into any plan row), so a key-only `.env` change produces an
// all-noop plan. Detect declared keys so the apply isn't short-circuited and
// the keys aren't silently dropped.
const hasProviderKeys = state.agents.some((a) => a.providerKeys.length > 0);

if (
plan.counts.create === 0 &&
plan.counts.update === 0 &&
plan.counts.delete === 0 &&
!hasPendingAuth
) {
printText(chalk.green("\nNothing to apply."));
// Nothing in the resource diff — but a declared provider key still needs to
// be pushed (idempotent PUT). This is the key-only `.env` change path.
if (hasProviderKeys) {
printText(chalk.bold("\nApplying provider keys:"));
await pushProviderApiKeys(client, state.agents);
printText(chalk.green("\nProvider keys applied; nothing else to apply."));
} else {
printText(chalk.green("\nNothing to apply."));
}
return;
}

Expand All @@ -1263,24 +1288,38 @@ export async function applyCommand(opts: ApplyOptions = {}): Promise<void> {
}
}

const hasResourceWork =
plan.counts.create > 0 || plan.counts.update > 0 || plan.counts.delete > 0;

const pendingAuth: PendingAuthEntry[] = [];
let applyErr: unknown;
if (
plan.counts.create > 0 ||
plan.counts.update > 0 ||
plan.counts.delete > 0
) {
printText(chalk.bold("\nApplying:"));
try {
try {
// Resources FIRST: executePlan does `upsertAgent` for created agents, and
// `setProviderApiKey` targets `/agents/<id>/providers/...` — pushing keys
// before the agent exists 404s on a first apply. So run the plan, then push
// keys. (The all-noop / key-only short-circuit above pushes keys directly:
// there are no agent creates there, so the agents already exist remotely.)
if (hasResourceWork) {
printText(chalk.bold("\nApplying:"));
await executePlan({ client, state, plan, remote }, pendingAuth);
}
// Provider keys live outside the resource diff (idempotent PUT), so push
// them on any confirmed apply (including a pending-auth-only plan). Done
// here, not inside executePlan, so the all-noop short-circuit above can push
// them too without double-pushing.
if (hasProviderKeys) {
printText(chalk.bold("\nApplying provider keys:"));
await pushProviderApiKeys(client, state.agents);
}
if (hasResourceWork) {
printText(chalk.green("\nApply complete."));
} catch (err) {
applyErr = err;
printError(`\n${err instanceof Error ? err.message : String(err)}`);
printError(
"Apply halted on first failure. Re-run `lobu apply` once the underlying issue is resolved — every endpoint is idempotent."
);
}
} catch (err) {
applyErr = err;
printError(`\n${err instanceof Error ? err.message : String(err)}`);
printError(
"Apply halted on first failure. Re-run `lobu apply` once the underlying issue is resolved — every endpoint is idempotent."
);
}

// Always render the punch-list — even on partial failure, so the operator
Expand Down
Loading
Loading