From 7648b383059c6a6f17a3d0858b2fa85f3df48266 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Mon, 18 May 2026 14:47:18 +0100 Subject: [PATCH 1/2] feat(server,chart): flip snapshot mode default + drop workspaces PVC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 5 of the multi-replica rollout. Snapshot mode is now the default session-store path; LOBU_SESSION_STORE=file is the explicit opt-out for legacy single-replica / local-dev paths. The chart's workspaces PVC defaults to disabled — workers fall back to pod-local ephemeral {WORKSPACE_DIR}/{input,output,temp} (cleared at run start) and PG-backed session.jsonl, with the per-conversation advisory lock pinning a conversation's runs to one pod for the run lifetime. - agent-worker: invert hydrate + cleanup gates to `!== "file"` - server: invert acquireConversationLock + /agent-history fallback gates; only forward `LOBU_SESSION_STORE=file` (the opt-out) to workers - chart: `app.workspaces.enabled` defaults to false; comments updated - test: `default-off` test now exercises the LOBU_SESSION_STORE=file opt-out path helm template charts/lobu (defaults) no longer renders app-pvc.yaml; helm template charts/lobu --set app.workspaces.enabled=true still renders the PVC + volume mount for back-compat. --- charts/lobu/templates/deployment.yaml | 30 ++++++++------ charts/lobu/values.yaml | 39 +++++++++++-------- .../src/openclaw/transcript-snapshot.ts | 6 +-- packages/agent-worker/src/openclaw/worker.ts | 33 +++++++++------- .../agent-transcript-snapshot.test.ts | 19 +++++---- packages/server/src/gateway/gateway/index.ts | 7 ++-- .../orchestration/impl/embedded-deployment.ts | 16 +++++--- .../gateway/routes/public/agent-history.ts | 24 ++++++------ 8 files changed, 102 insertions(+), 72 deletions(-) diff --git a/charts/lobu/templates/deployment.yaml b/charts/lobu/templates/deployment.yaml index 876af4163..9fddaf5a4 100644 --- a/charts/lobu/templates/deployment.yaml +++ b/charts/lobu/templates/deployment.yaml @@ -15,9 +15,14 @@ spec: This is the operator opt-in path for true blue/green deploys. 3. Else → Recreate (the safe default). - Why `allowMultiReplica` is an explicit flag, not auto-detected from - RWX: even with RWX storage, several in-memory components break with - >1 gateway replicas OR during the brief RollingUpdate overlap: + Phase 5 (workspaces PVC is OFF by default + LOBU_SESSION_STORE + defaults to snapshot mode) means most deploys naturally land on the + rolling-update branch as soon as `allowMultiReplica: true` is set. + The RWX check still applies for self-hosters who re-enable the PVC. + + Why `allowMultiReplica` is an explicit flag, not auto-detected: + several in-memory components break with >1 gateway replicas OR + during the brief RollingUpdate overlap: * `SseManager` (gateway/services/sse-manager.ts) — SSE streams are pod-local; a job claimed by pod B broadcasts to no-one if the client is on pod A. @@ -28,9 +33,10 @@ spec: * Telegram polling mode (gateway/connections/chat-instance-manager .ts:610-613) — every replica long-polls the same bot, causing conflicts. - RWX is necessary but not sufficient. The flag forces operators to - acknowledge "I have only webhook-mode Chat connections AND no - AskUser/SSE flows in flight" before opting in. + Snapshot-mode session state and the per-conversation advisory lock + are necessary but not sufficient. The flag forces operators to + acknowledge "I have only webhook-mode Chat connections AND accept + the SSE / AskUser handoff caveats" before opting in. */}} {{- $rwxConfigured := has "ReadWriteMany" (.Values.app.workspaces.accessModes | default (list)) }} {{- $rollSafe := and .Values.app.allowMultiReplica (or (not .Values.app.workspaces.enabled) $rwxConfigured) }} @@ -139,12 +145,12 @@ spec: already serving, so deregistering the old pod via Service endpoint removal + giving downstream caches time to notice it shrinks the "old pod kept getting traffic during drain" window). - Under `Recreate` (the workspaces-RWO default), the new pod - doesn't start until the old one fully terminates — adding a - preStop sleep would EXTEND the no-available-server window by - its duration. We only emit the hook when preStopDelaySeconds - is explicitly > 0; ops repos using RollingUpdate set it, - Recreate-mode deploys leave it at the default 0. + Under `Recreate`, the new pod doesn't start until the old one + fully terminates — adding a preStop sleep would EXTEND the + no-available-server window by its duration. We only emit the + hook when preStopDelaySeconds is explicitly > 0; ops repos + using RollingUpdate set it, Recreate-mode deploys leave it at + the default 0. */ -}} {{- if gt (int (.Values.app.preStopDelaySeconds | default 0)) 0 }} lifecycle: diff --git a/charts/lobu/values.yaml b/charts/lobu/values.yaml index 2177f1bc3..35f771947 100644 --- a/charts/lobu/values.yaml +++ b/charts/lobu/values.yaml @@ -40,12 +40,10 @@ app: # is going away, so in-flight requests during the deregistration lag # still hit a live process. # - # Default 0 (preStop hook NOT emitted). Set to ~15 only when running - # with `app.strategy.type: RollingUpdate` and replicaCount > 1. - # Under the default Recreate strategy (workspaces PVC is RWO), the - # new pod doesn't start until the old one terminates, so adding a - # preStop sleep would EXTEND the no-available-server window — the - # opposite of what we want. + # Default 0 (preStop hook NOT emitted). Set to ~15 when running with + # `app.strategy.type: RollingUpdate` and replicaCount > 1. The + # workspaces PVC is OFF by default in Phase 5 so the chart now picks + # RollingUpdate automatically when `allowMultiReplica: true`. preStopDelaySeconds: 0 # Total time k8s waits for the pod to stop before SIGKILL. Must be # > preStopDelaySeconds + actual shutdown time. The gateway's graceful @@ -67,17 +65,15 @@ app: # Opt-in to multi-replica / rolling deploys. DEFAULT FALSE — leaving # this off keeps the safe `strategy: Recreate` behavior. Setting it # true makes the chart pick `RollingUpdate` (maxSurge: 1, - # maxUnavailable: 0) when workspaces is RWX or disabled. + # maxUnavailable: 0) as long as `app.workspaces.enabled` is false (the + # Phase 5 default) or workspaces is RWX-configured. # # Prerequisites BEFORE setting true — chart cannot detect these: - # 1. Workspaces volume must be RWX-capable: - # `app.workspaces.accessModes: [ReadWriteMany]` + a storage class - # that backs it (NFS, EFS, CephFS, Longhorn-RWX, …). - # 2. NO active Chat connections in `mode: "polling"` (Telegram). + # 1. NO active Chat connections in `mode: "polling"` (Telegram). # Multiple replicas long-polling the same bot conflict. Use # webhook mode only — see # gateway/connections/chat-instance-manager.ts:610. - # 3. Acknowledge that the gateway has in-memory state for SSE + # 2. Acknowledge that the gateway has in-memory state for SSE # streams (gateway/services/sse-manager.ts) and AskUser # questions (gateway/connections/interaction-bridge.ts:193). # A request whose SSE stream / AskUser click lands on a @@ -87,17 +83,28 @@ app: # then, occasional dropped streams / button clicks are the cost # of zero-downtime deploys on this configuration. # + # If you re-enable the workspaces PVC (legacy path), you must either + # provision RWX-capable storage (NFS / EFS / CephFS / Longhorn-RWX) + # or leave `allowMultiReplica: false` — RWO + multi-replica is + # rejected by the chart's strategy helper and falls back to Recreate. + # # Single-replica + RollingUpdate (replicaCount: 1, allowMultiReplica: true) # still creates a brief overlap window where both pods are running. # The same in-memory caveats apply during that window, just for a # shorter span (~5-15s). allowMultiReplica: false - # Persistent workspaces volume. Embedded agent workers store session and - # workspace state below /app/workspaces (watcher run scratch, agent panel - # sessions, etc.). + # Persistent workspaces volume. Embedded agent workers used to store + # session and workspace state below /app/workspaces (session.jsonl, + # input/output/temp scratch). Phase 5 moved session.jsonl to Postgres + # (`agent_transcript_snapshot`) and made input/output/temp pod-local + # ephemeral storage — a per-conversation advisory lock pins all turns + # of one conversation to a single pod for its run lifetime, so no PVC + # is required for correctness. Set `enabled: true` only if you have a + # specific reason to persist scratch files across pod restarts; the + # PVC blocks rolling deploys on the default RWO storage class. workspaces: - enabled: true + enabled: false size: 20Gi storageClass: "" accessModes: diff --git a/packages/agent-worker/src/openclaw/transcript-snapshot.ts b/packages/agent-worker/src/openclaw/transcript-snapshot.ts index 331554f7e..cacfc2d10 100644 --- a/packages/agent-worker/src/openclaw/transcript-snapshot.ts +++ b/packages/agent-worker/src/openclaw/transcript-snapshot.ts @@ -23,9 +23,9 @@ * hydrate, `POST /worker/transcript/snapshot` for write. (org, agent, * conv) are pulled from the worker JWT on the gateway side, so the * worker can't impersonate another conversation. - * - Selection is opt-in via `LOBU_SESSION_STORE=snapshot`. Default unset - * preserves today's file-only behavior. Phase 5 flips the default, - * Phase 6 drops the env var. + * - Phase 5: snapshot mode is the default. `LOBU_SESSION_STORE=file` + * opts out for legacy/local-dev single-replica deploys. Phase 6 + * drops the env var entirely. * * Trade-off accepted: a mid-run crash loses the partial transcript for that * run. The next attempt re-runs from the previous user message. Tools must diff --git a/packages/agent-worker/src/openclaw/worker.ts b/packages/agent-worker/src/openclaw/worker.ts index 45f7dffc8..fc19b222b 100644 --- a/packages/agent-worker/src/openclaw/worker.ts +++ b/packages/agent-worker/src/openclaw/worker.ts @@ -344,15 +344,19 @@ export class OpenClawWorker implements WorkerExecutor { // turns; throwing here surfaces it on the first turn and the runs // queue's retry path handles re-delivery. Codex round 2 quality // win D on PR #865. - if (process.env.LOBU_SESSION_STORE === "snapshot") { + // + // Phase 5: snapshot is the default; setting LOBU_SESSION_STORE=file + // opts out (legacy / local-dev path that keeps reading session.jsonl + // straight off disk without writing to Postgres). + if (process.env.LOBU_SESSION_STORE !== "file") { if (typeof this.config.runId !== "number") { throw new Error( - "LOBU_SESSION_STORE=snapshot but WorkerConfig.runId is missing — runs-queue dispatch did not stamp runId on the job payload" + "Snapshot mode (LOBU_SESSION_STORE != 'file') but WorkerConfig.runId is missing — runs-queue dispatch did not stamp runId on the job payload" ); } if (!this.config.runJobToken) { throw new Error( - "LOBU_SESSION_STORE=snapshot but WorkerConfig.runJobToken is missing — MessageConsumer did not mint a per-run worker token" + "Snapshot mode (LOBU_SESSION_STORE != 'file') but WorkerConfig.runJobToken is missing — MessageConsumer did not mint a per-run worker token" ); } } @@ -607,8 +611,8 @@ export class OpenClawWorker implements WorkerExecutor { // (possibly on a different pod) can hydrate from it. Hydrate filters // `terminal_status='completed'`, so we ONLY POST on the success path // — writing `failed`/`timeout`/`cancelled` rows is pure network - // waste (codex round 2 quality win C on PR #865). Opt-in via - // LOBU_SESSION_STORE=snapshot. + // waste (codex round 2 quality win C on PR #865). Default-on in + // Phase 5; LOBU_SESSION_STORE=file opts out for legacy/local-dev. // // The runs queue has already moved this run to a terminal state by // the time cleanup() fires (sse-client.ts:865 finally block runs @@ -617,7 +621,7 @@ export class OpenClawWorker implements WorkerExecutor { // released when the subprocess exits, so by the next claim's boot // this snapshot is the visible "latest" row. if ( - process.env.LOBU_SESSION_STORE === "snapshot" && + process.env.LOBU_SESSION_STORE !== "file" && this.sessionFilePath && this.terminalStatus === "completed" ) { @@ -957,9 +961,9 @@ export class OpenClawWorker implements WorkerExecutor { const sessionFile = path.join(workspaceDir, ".openclaw", "session.jsonl"); // Capture for cleanup() — it reads the file back to write the snapshot - // at terminal time. Set unconditionally so non-snapshot deployments - // still get a defined value (snapshot writer no-ops when LOBU_SESSION_STORE - // is unset). + // at terminal time. Set unconditionally so file-mode opt-outs + // still get a defined value (snapshot writer no-ops when + // LOBU_SESSION_STORE=file). this.sessionFilePath = sessionFile; const providerStateFile = path.join( workspaceDir, @@ -968,9 +972,10 @@ export class OpenClawWorker implements WorkerExecutor { ); // Hydrate from the latest completed Postgres snapshot BEFORE the - // provider-state check or SessionManager.open(). Opt-in via - // LOBU_SESSION_STORE=snapshot — default unset preserves today's - // file-only behavior. Phase 5 flips the default. + // provider-state check or SessionManager.open(). Phase 5: snapshot + // mode is the default; LOBU_SESSION_STORE=file opts out and keeps + // the legacy file-only behaviour for local-dev / single-replica + // self-hosters. // // Order matters: hydrate → provider check (may unlink) → // SessionManager.open(). The provider-change unlink at line ~925 still @@ -980,7 +985,7 @@ export class OpenClawWorker implements WorkerExecutor { // PG rows remain readable without poisoning the new conversation // (hydrate would only resurrect them if a subsequent run completes // successfully and overwrites the latest pointer). - if (process.env.LOBU_SESSION_STORE === "snapshot") { + if (process.env.LOBU_SESSION_STORE !== "file") { const gatewayUrl = process.env.DISPATCHER_URL; const workerToken = process.env.WORKER_TOKEN; if (gatewayUrl && workerToken) { @@ -1000,7 +1005,7 @@ export class OpenClawWorker implements WorkerExecutor { } } else { logger.warn( - "LOBU_SESSION_STORE=snapshot set but DISPATCHER_URL or WORKER_TOKEN missing; snapshot disabled" + "Snapshot mode active (LOBU_SESSION_STORE != 'file') but DISPATCHER_URL or WORKER_TOKEN missing; snapshot disabled" ); } } diff --git a/packages/server/src/gateway/__tests__/agent-transcript-snapshot.test.ts b/packages/server/src/gateway/__tests__/agent-transcript-snapshot.test.ts index 571d4d801..fe17f4678 100644 --- a/packages/server/src/gateway/__tests__/agent-transcript-snapshot.test.ts +++ b/packages/server/src/gateway/__tests__/agent-transcript-snapshot.test.ts @@ -254,14 +254,15 @@ describe("agent_transcript_snapshot — snapshot route", () => { expect(out).toBe(big); }); - test("default-off: no snapshot rows ever created when LOBU_SESSION_STORE is unset", async () => { - // Asserts the env gate at the resolver level. The route layer doesn't + test("opt-out: no snapshot rows ever created when LOBU_SESSION_STORE=file", async () => { + // Phase 5 flipped the default: snapshot mode is on unless explicitly + // opted out via LOBU_SESSION_STORE=file. The route layer doesn't // check the env (writes are always honoured if the JWT is valid), but - // /agent-history's readLatestSnapshotJsonl is the consumer of that env - // gate. With no snapshot row, the resolver returns null and the - // existing disk-read fallback path runs. + // /agent-history's readLatestSnapshotJsonl is the consumer of that + // env gate via its callers. With no snapshot row in either mode, the + // resolver returns null and the existing disk-read fallback runs. const previous = process.env.LOBU_SESSION_STORE; - delete process.env.LOBU_SESSION_STORE; + process.env.LOBU_SESSION_STORE = "file"; try { const orgId = await seedAgentRow("agent-off", { organizationId: "org_off", @@ -276,7 +277,11 @@ describe("agent_transcript_snapshot — snapshot route", () => { const out = await readLatestSnapshotJsonl("agent-off", orgId); expect(out).toBeNull(); } finally { - if (previous !== undefined) process.env.LOBU_SESSION_STORE = previous; + if (previous === undefined) { + delete process.env.LOBU_SESSION_STORE; + } else { + process.env.LOBU_SESSION_STORE = previous; + } } }); diff --git a/packages/server/src/gateway/gateway/index.ts b/packages/server/src/gateway/gateway/index.ts index a7281df87..6299ded04 100644 --- a/packages/server/src/gateway/gateway/index.ts +++ b/packages/server/src/gateway/gateway/index.ts @@ -107,9 +107,10 @@ export class WorkerGateway { // Per-run transcript snapshots — backs the multi-replica unblock. // Workers hydrate from the latest completed snapshot on boot and POST - // a new snapshot on every terminal state. Opt-in via - // `LOBU_SESSION_STORE=snapshot` on the worker side; the routes - // themselves are always mounted (gated by the JWT scope check inside). + // a new snapshot on every terminal state. Phase 5: snapshot is the + // default; LOBU_SESSION_STORE=file opts out on the worker side. The + // routes themselves are always mounted (gated by the JWT scope check + // inside). this.app.route("/transcript", createTranscriptRoutes()); logger.debug("Worker gateway routes registered"); diff --git a/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts b/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts index 9e891f977..1a4e1e62f 100644 --- a/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts +++ b/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts @@ -129,7 +129,8 @@ interface EmbeddedWorkerEntry { * Release the cross-pod advisory lock held for this conversation while the * worker is alive. Called from the `exit` handler so the lock survives the * entire subprocess lifetime, not just the spawn transaction. Undefined - * when `LOBU_SESSION_STORE=snapshot` is unset (no PG lock taken). + * when `LOBU_SESSION_STORE=file` opts out of snapshot mode (no PG lock + * taken on the legacy single-replica / RWO-PVC path). */ releaseConvLock?: () => Promise; } @@ -597,9 +598,10 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { // Only enforced when the gateway is in snapshot mode (the env flag is // read from the gateway process, not from the worker env that's still // being assembled below). PVC-based legacy behaviour keeps single- - // writer at the kernel level via the RWO mount. + // writer at the kernel level via the RWO mount. Phase 5: snapshot + // mode is the default; LOBU_SESSION_STORE=file opts out. const snapshotModeEnabled = - process.env.LOBU_SESSION_STORE === "snapshot"; + process.env.LOBU_SESSION_STORE !== "file"; const conversationId = typeof messageData?.conversationId === "string" ? messageData.conversationId @@ -661,9 +663,11 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { // Forward the snapshot-mode flag so workers know to hydrate from // Postgres and write back on cleanup. Mirrors gateway-side // process.env so the lock acquisition above and the worker's - // session-store selection stay in lockstep. - if (snapshotModeEnabled) { - commonEnvVars.LOBU_SESSION_STORE = "snapshot"; + // session-store selection stay in lockstep. Phase 5: snapshot is + // the default; only forward LOBU_SESSION_STORE=file (the opt-out) + // so the worker sees the same mode the gateway picked. + if (!snapshotModeEnabled) { + commonEnvVars.LOBU_SESSION_STORE = "file"; } const embeddedPath = buildEmbeddedWorkerPath( this.config.worker.binPathEntries, diff --git a/packages/server/src/gateway/routes/public/agent-history.ts b/packages/server/src/gateway/routes/public/agent-history.ts index a9279aee8..bdf2019fe 100644 --- a/packages/server/src/gateway/routes/public/agent-history.ts +++ b/packages/server/src/gateway/routes/public/agent-history.ts @@ -34,8 +34,9 @@ import { verifySettingsSession } from "./settings-auth.js"; * - `organizationId` is empty / undefined (no scope to query under) * - no completed snapshot exists for `(org, agent)` * - * Only fires when LOBU_SESSION_STORE=snapshot — file-mode keeps reading - * workspaces/* untouched, so existing deploys see no behaviour change. + * Only fires when snapshot mode is active (the default in Phase 5+). + * LOBU_SESSION_STORE=file opts out and keeps reading workspaces/* + * untouched for legacy/local-dev single-replica deploys. */ export async function readLatestSnapshotJsonl( agentId: string, @@ -212,12 +213,13 @@ async function readSessionMessages( limit: number, organizationId: string | undefined ) { - // In snapshot mode, the disk file may be empty (a fresh pod has no - // workspaces/ tree on a multi-replica gateway). Try the PG snapshot - // first; fall through to the disk read if the snapshot is missing so - // local-dev workspaces/* trees keep working without DB migrations. + // In snapshot mode (the Phase 5 default), the disk file may be empty + // (a fresh pod has no workspaces/ tree on a multi-replica gateway). + // Try the PG snapshot first; fall through to the disk read if the + // snapshot is missing so local-dev workspaces/* trees keep working + // without DB migrations. LOBU_SESSION_STORE=file opts back to disk-only. let content: string | null = null; - if (process.env.LOBU_SESSION_STORE === "snapshot") { + if (process.env.LOBU_SESSION_STORE !== "file") { content = await readLatestSnapshotJsonl(agentId, organizationId); } if (content === null) { @@ -262,10 +264,10 @@ async function readSessionStats( agentId: string, organizationId: string | undefined ) { - // Same fallback shape as readSessionMessages — DB first in snapshot mode, - // disk read if absent. + // Same fallback shape as readSessionMessages — DB first in snapshot mode + // (Phase 5 default), disk read if absent. LOBU_SESSION_STORE=file opts out. let content: string | null = null; - if (process.env.LOBU_SESSION_STORE === "snapshot") { + if (process.env.LOBU_SESSION_STORE !== "file") { content = await readLatestSnapshotJsonl(agentId, organizationId); } if (content === null) { @@ -414,7 +416,7 @@ export function createAgentHistoryRoutes(deps: { // second. Avoids reporting `connected: false` when the worker is dead // but a PG snapshot is recoverable. let hasSessionFile = false; - if (process.env.LOBU_SESSION_STORE === "snapshot") { + if (process.env.LOBU_SESSION_STORE !== "file") { hasSessionFile = (await readLatestSnapshotJsonl( resolvedAgentId, From 02a08e1c172b441ff09a2ad4fedb781c0e70eaa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Mon, 18 May 2026 14:57:16 +0100 Subject: [PATCH 2/2] fix(server,worker): purge agent_transcript_snapshot rows on session reset MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P1 on PR #871. With snapshot mode default-on, the `/new` reset path was only unlinking the local session.jsonl — the next worker boot would rehydrate from Postgres and the user-visible "Starting fresh" would silently resurrect the conversation we just flushed. Two new write paths covering both legs of the reset: - Worker → gateway: new `DELETE /worker/transcript/snapshot` route scoped to the JWT's (org, agent, conv); worker calls `clearSnapshots` from `transcript-snapshot.ts` after the local unlink in the session-reset branch. - Gateway bridge: `chat-response-bridge` runs a `DELETE … USING agents` to scope to the right org without re-deriving it from the payload (the bridge doesn't carry org). Belt-and-suspenders for the case where the worker exits before its own purge call lands. Both paths gate on `LOBU_SESSION_STORE !== "file"` so file-mode opt-outs see no behaviour change. Adds an integration test that seeds three snapshots in the reset conversation + one in a sibling conversation, calls DELETE, and verifies (a) all three reset rows go, (b) the sibling row survives, (c) a second DELETE is idempotent. --- .../src/openclaw/transcript-snapshot.ts | 37 +++++++ packages/agent-worker/src/openclaw/worker.ts | 16 ++++ .../agent-transcript-snapshot.test.ts | 96 +++++++++++++++++++ .../connections/chat-response-bridge.ts | 34 +++++++ .../src/gateway/gateway/transcript-routes.ts | 42 ++++++++ 5 files changed, 225 insertions(+) diff --git a/packages/agent-worker/src/openclaw/transcript-snapshot.ts b/packages/agent-worker/src/openclaw/transcript-snapshot.ts index cacfc2d10..38b43c5cd 100644 --- a/packages/agent-worker/src/openclaw/transcript-snapshot.ts +++ b/packages/agent-worker/src/openclaw/transcript-snapshot.ts @@ -197,5 +197,42 @@ export async function writeSnapshot( logger.error( `Snapshot POST threw: ${err instanceof Error ? err.message : String(err)}` ); + return; + } +} + +/** + * Purge all snapshot rows for this worker's (org, agent, conv). Called + * by the session-reset path so the next boot doesn't rehydrate the + * conversation from Postgres after a `/new`. Idempotent — a 404 / empty + * result is treated as success. + * + * Failures are logged but not thrown — reset is best-effort; if the + * purge HTTP call fails the worst case is the next boot hydrates from + * the previous transcript (the legacy file-mode behaviour). The local + * session.jsonl unlink is the primary signal; this is the multi-replica + * complement to it. + */ +export async function clearSnapshots( + opts: Pick +): Promise { + const url = `${opts.gatewayUrl}/worker/transcript/snapshot`; + try { + const res = await fetch(url, { + method: "DELETE", + headers: { Authorization: `Bearer ${opts.workerToken}` }, + signal: AbortSignal.timeout(30_000), + }); + if (!res.ok) { + logger.warn( + `Snapshot DELETE failed: ${res.status} ${res.statusText} — next boot may rehydrate stale history` + ); + return; + } + logger.info("Purged conversation snapshots for session reset"); + } catch (err) { + logger.warn( + `Snapshot DELETE threw: ${err instanceof Error ? err.message : String(err)} — next boot may rehydrate stale history` + ); } } diff --git a/packages/agent-worker/src/openclaw/worker.ts b/packages/agent-worker/src/openclaw/worker.ts index fc19b222b..ba8d171f4 100644 --- a/packages/agent-worker/src/openclaw/worker.ts +++ b/packages/agent-worker/src/openclaw/worker.ts @@ -55,6 +55,7 @@ import { } from "./model-resolver"; import { checkSandboxLeak } from "./sandbox-leak"; import { + clearSnapshots, hydrateFromSnapshot, type TerminalStatus, writeSnapshot, @@ -1550,6 +1551,21 @@ Use it when the user references past discussions or you need context.`); // File may not exist } + // Also purge the Postgres snapshots for this (org, agent, conv) + // — in snapshot mode (the Phase 5 default) the next worker boot + // would otherwise rehydrate from the now-flushed conversation + // and the user-visible "Starting fresh" would be a lie. Best- + // effort: a failure here is logged but doesn't block the reset + // since the local unlink already happened and the snapshot + // helper is a no-op in file mode. + if (process.env.LOBU_SESSION_STORE !== "file") { + const gatewayUrl = process.env.DISPATCHER_URL; + const workerToken = process.env.WORKER_TOKEN; + if (gatewayUrl && workerToken) { + await clearSnapshots({ gatewayUrl, workerToken }); + } + } + // Send visible confirmation to user await onProgress({ type: "output", diff --git a/packages/server/src/gateway/__tests__/agent-transcript-snapshot.test.ts b/packages/server/src/gateway/__tests__/agent-transcript-snapshot.test.ts index fe17f4678..deddb7bbb 100644 --- a/packages/server/src/gateway/__tests__/agent-transcript-snapshot.test.ts +++ b/packages/server/src/gateway/__tests__/agent-transcript-snapshot.test.ts @@ -1198,6 +1198,102 @@ describe("agent_transcript_snapshot — codex P1/P2 regressions", () => { expect(rows).toHaveLength(1); expect(rows[0]!.run_id).toBe(claimedByA); }); + + test("session-reset purges all snapshot rows for this conversation only", async () => { + // Phase 5: workers (and the gateway-side bridge) must purge PG + // snapshots on /new, otherwise the next pod boot rehydrates the + // flushed conversation and the user-visible "Starting fresh" is a + // lie. This test exercises the worker-side DELETE endpoint. + const orgId = await seedAgentRow("agent-reset", { + organizationId: "org_reset", + }); + const agentId = "agent-reset"; + const conversationId = "conv-reset"; + + // Seed 3 completed snapshots for this (org, agent, conv). + for (let i = 0; i < 3; i++) { + const runId = await insertRun({ + organizationId: orgId, + agentId, + conversationId, + }); + const token = mintWorkerToken({ + organizationId: orgId, + agentId, + conversationId, + runId, + }); + const jsonl = `{"type":"message","id":"m${i}"}\n`; + const res = await callRoute("POST", "/snapshot", token, { + terminalStatus: "completed", + snapshotJsonl: jsonl, + runId, + }); + expect(res.status).toBe(200); + } + + // Seed an unrelated row in a SIBLING conversation that must NOT be + // touched by the reset. + const siblingConv = "conv-sibling"; + const siblingRun = await insertRun({ + organizationId: orgId, + agentId, + conversationId: siblingConv, + }); + const siblingToken = mintWorkerToken({ + organizationId: orgId, + agentId, + conversationId: siblingConv, + runId: siblingRun, + }); + let res = await callRoute("POST", "/snapshot", siblingToken, { + terminalStatus: "completed", + snapshotJsonl: `{"type":"sibling"}\n`, + runId: siblingRun, + }); + expect(res.status).toBe(200); + + // DELETE under a per-run token for the reset conversation. + const resetRunId = await insertRun({ + organizationId: orgId, + agentId, + conversationId, + }); + const resetToken = mintWorkerToken({ + organizationId: orgId, + agentId, + conversationId, + runId: resetRunId, + }); + res = await callRoute("DELETE", "/snapshot", resetToken); + expect(res.status).toBe(200); + const body = (await res.json()) as { deleted: number }; + expect(body.deleted).toBe(3); + + // Sibling conversation row survives; reset conversation has zero rows. + const sql = getDb(); + const resetRows = (await sql` + SELECT id FROM public.agent_transcript_snapshot + WHERE organization_id = ${orgId} + AND agent_id = ${agentId} + AND conversation_id = ${conversationId} + `) as Array<{ id: number }>; + expect(resetRows).toHaveLength(0); + + const siblingRows = (await sql` + SELECT id FROM public.agent_transcript_snapshot + WHERE organization_id = ${orgId} + AND agent_id = ${agentId} + AND conversation_id = ${siblingConv} + `) as Array<{ id: number }>; + expect(siblingRows).toHaveLength(1); + + // Second DELETE is idempotent — returns 200 with deleted=0. + res = await callRoute("DELETE", "/snapshot", resetToken); + expect(res.status).toBe(200); + const body2 = (await res.json()) as { deleted: number }; + expect(body2.deleted).toBe(0); + }); }); // Drop the auth-provider stub between tests so other suites that share the diff --git a/packages/server/src/gateway/connections/chat-response-bridge.ts b/packages/server/src/gateway/connections/chat-response-bridge.ts index 1f31797ad..e5cbba0a1 100644 --- a/packages/server/src/gateway/connections/chat-response-bridge.ts +++ b/packages/server/src/gateway/connections/chat-response-bridge.ts @@ -14,6 +14,7 @@ import { unlink } from "node:fs/promises"; import { resolve } from "node:path"; import { createLogger } from "@lobu/core"; +import { getDb } from "../../db/client.js"; import type { ThreadResponsePayload } from "../infrastructure/queue/index.js"; import { extractSettingsLinkButtons } from "../platform/link-buttons.js"; import type { ResponseRenderer } from "../platform/response-renderer.js"; @@ -246,6 +247,39 @@ export class ChatResponseBridge implements ResponseRenderer { "No session file to delete on reset" ); } + + // Phase 5: in snapshot mode the next worker boot would rehydrate + // from Postgres unless we also purge the snapshot rows for this + // conversation. The worker's reset path does the same purge via + // the `/worker/transcript/snapshot` DELETE endpoint, but we + // belt-and-suspenders here in case the worker exited before it + // got to that step. Resolve the org via `agents.organization_id` + // — the bridge doesn't carry org on the response payload. + if (process.env.LOBU_SESSION_STORE !== "file") { + try { + const sql = getDb(); + const deleted = await sql<{ id: number }>` + DELETE FROM public.agent_transcript_snapshot s + USING public.agents a + WHERE s.agent_id = ${agentId} + AND s.conversation_id = ${payload.conversationId} + AND a.id = s.agent_id + AND a.organization_id = s.organization_id + RETURNING s.id + `; + if (deleted.length > 0) { + logger.info( + { agentId, conversationId: payload.conversationId, count: deleted.length }, + "Purged agent_transcript_snapshot rows for session reset" + ); + } + } catch (error) { + logger.warn( + { agentId, conversationId: payload.conversationId, error: String(error) }, + "Failed to purge transcript snapshots on session reset (next boot may rehydrate stale history)" + ); + } + } } } diff --git a/packages/server/src/gateway/gateway/transcript-routes.ts b/packages/server/src/gateway/gateway/transcript-routes.ts index d5d0016bc..d62ac7083 100644 --- a/packages/server/src/gateway/gateway/transcript-routes.ts +++ b/packages/server/src/gateway/gateway/transcript-routes.ts @@ -254,5 +254,47 @@ export function createTranscriptRoutes(): Hono { } }); + /** + * DELETE — purge all snapshot rows for this worker's (org, agent, conv). + * + * Called by the worker's session-reset path so the next boot doesn't + * rehydrate the now-flushed conversation from Postgres. The local + * session.jsonl is still unlinked separately by the reset handler; + * this endpoint covers the second leg now that snapshot mode is the + * default. + * + * Scope comes from the JWT; no body. Idempotent — deleting zero rows + * is success. + */ + app.delete("/snapshot", async (c) => { + const token = authenticate(c); + if (!token) return c.json({ error: "Invalid token" }, 401); + + const { organizationId, agentId, conversationId } = token; + if (!organizationId || !agentId || !conversationId) { + return c.json({ error: "Token missing required scope" }, 400); + } + + const sql = getDb(); + try { + const deleted = await sql<{ id: number }>` + DELETE FROM public.agent_transcript_snapshot + WHERE organization_id = ${organizationId} + AND agent_id = ${agentId} + AND conversation_id = ${conversationId} + RETURNING id + `; + logger.info( + `Purged ${deleted.length} snapshot row(s) for (${organizationId}, ${agentId}, ${conversationId}) on session reset` + ); + return c.json({ deleted: deleted.length }); + } catch (err) { + logger.error( + `Snapshot DELETE failed: ${err instanceof Error ? err.message : String(err)}` + ); + return c.json({ error: "Internal error" }, 500); + } + }); + return app; }