perf(server): denormalize runs.agent_id+conversation_id + reserve-connection cap#870
Conversation
…nection cap for snapshot prereqs Two perf prerequisites required before flipping LOBU_SESSION_STORE=snapshot to default: 1. isRunOwnedByJwtScope JSONB seq scan The verifier ran 'action_input->>agentId' / '... ->>conversationId' against runs on every snapshot POST. On a multi-million-row runs table that's 10-100ms per worker completion. Add nullable agent_id + conversation_id columns to runs, populate them on insert via RunsQueue.send, backfill historical rows in migration, and add a partial index over (id, organization_id, agent_id, conversation_id) WHERE both non-null. isRunOwnedByJwtScope now reads the columns directly so the planner resolves an index-only seek. 2. acquireConversationLock unbounded pool pressure Every active worker holds a sql.reserve() connection for its entire lifetime (10s-1h+). Multi-pod x multi-worker exhausts the postgres-js pool. Add LOBU_MAX_RESERVED_LOCKS (default 50) cap before reservation, an in-process counter exposed via getReservedLockCount(), and an 80% threshold WARN. Reaching the cap returns null so spawnDeployment re-queues the run. Migration ships chunked-friendly UPDATE for the backfill with lock_timeout relaxation, and a non-CONCURRENTLY CREATE INDEX following the precedent in 20260426130001_db_integrity_cleanup_concurrent.sql (dbmate + pq still present these as in-transaction).
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis pull request denormalizes run-scoped identifiers ( ChangesRuns denormalization and reserved locks cap
Sequence DiagramsequenceDiagram
participant RunsQueue
participant Database
participant Verifier
RunsQueue->>Database: INSERT run (agent_id, conversation_id, action_input, ...)
Verifier->>Database: SELECT ... WHERE COALESCE(agent_id, action_input->>'agentId')=$1 AND COALESCE(conversation_id, action_input->>'conversationId')=$2
Database->>Verifier: row / null (with fallback for pre-backfill rows)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
|
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
…back + metric endpoint Addresses 4 findings from codex review of PR #870: 1. Migration was wrapping ALTER + UPDATE + CREATE INDEX in a single transaction → AccessExclusive on runs held until commit, blocking writes for the duration of a million-row backfill. Now transaction:false with chunked backfill in a DO block (5000 rows / batch, tunable via lobu.backfill_chunk GUC), each step committing independently. 2. Deploy-order race: an old gateway still inserting rows post-migration would write only action_input, no scalar columns. isRunOwnedByJwtScope now COALESCEs the scalar columns onto the JSONB extraction so the crossover window is safe. Tested. 3. Index shape: dropped the over-engineered cover index leading with id (PK already serves single-row lookups). New index is (agent_id, conversation_id, id DESC) WHERE both non-null — useful for queries that look up runs by the (agent, conversation) pair, which is the genuine value of denormalization. 4. Metric exposed: new /health/orchestrator route returns reserved_conversation_locks + cap + near_cap so operators can scrape without code changes.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
packages/server/src/gateway/__tests__/runs-denormalize-and-reserve-cap.test.ts (1)
289-304: ⚡ Quick winTest intent and assertion diverge in the “below cap” branch.
The comment says this path should stop hitting cap rejection, but the code sets
count=1andcap=1and still asserts rejection. Either adjust the narrative or split this into a true “below cap” case so the branch claim is actually validated.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/server/src/gateway/__tests__/runs-denormalize-and-reserve-cap.test.ts` around lines 289 - 304, The test's comment says it should verify that dropping the reserved count "below the cap" stops cap rejections, but the code sets setReservedLockCountForTests(1) and process.env.LOBU_MAX_RESERVED_LOCKS = "1" then expects a rejection; to fix, make the assertion and setup consistent: either change the setup to truly go below cap (e.g., setReservedLockCountForTests(0) and keep LOBU_MAX_RESERVED_LOCKS="1" then expect acquireConversationLock("org-a","agent-a","conv-b") not to be null and assert getReservedLockCount() reflects the new value), or keep the current numeric setup but update the comment to explain this is a re-bump/check that still rejects; update the test around acquireConversationLock, setReservedLockCountForTests, process.env.LOBU_MAX_RESERVED_LOCKS, and getReservedLockCount accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@db/migrations/20260518050000_runs_denormalize_agent_conversation.sql`:
- Around line 58-59: The WHERE clause for the backfill currently only checks
action_input ? 'agentId' and agent_id IS NULL, allowing conversation_id to
remain NULL; update the predicate in the backfill (the SQL that references
agent_id, conversation_id and action_input) to require both JSON keys
(action_input ? 'agentId' AND action_input ? 'conversationId') and both target
columns be NULL (agent_id IS NULL AND conversation_id IS NULL) so rows are only
updated when both denormalized values will be populated.
In `@packages/server/src/gateway/orchestration/impl/embedded-deployment.ts`:
- Around line 292-300: The cleanup path may skip calling decrementOnce() if
reserved.release() throws, leaking reservedLockCount; change the two early-exit
blocks that call reserved.release() then decrementOnce() (the branches around
rows[0]?.acquired check and the catch block) to mirror the pattern used later:
call reserved.release() inside its own try/catch and always call decrementOnce()
in a finally (or ensure decrementOnce() runs after a guarded release) so that
decrementOnce() cannot be skipped even if reserved.release() throws; update the
code paths referencing reserved.release() and decrementOnce() accordingly.
---
Nitpick comments:
In
`@packages/server/src/gateway/__tests__/runs-denormalize-and-reserve-cap.test.ts`:
- Around line 289-304: The test's comment says it should verify that dropping
the reserved count "below the cap" stops cap rejections, but the code sets
setReservedLockCountForTests(1) and process.env.LOBU_MAX_RESERVED_LOCKS = "1"
then expects a rejection; to fix, make the assertion and setup consistent:
either change the setup to truly go below cap (e.g.,
setReservedLockCountForTests(0) and keep LOBU_MAX_RESERVED_LOCKS="1" then expect
acquireConversationLock("org-a","agent-a","conv-b") not to be null and assert
getReservedLockCount() reflects the new value), or keep the current numeric
setup but update the comment to explain this is a re-bump/check that still
rejects; update the test around acquireConversationLock,
setReservedLockCountForTests, process.env.LOBU_MAX_RESERVED_LOCKS, and
getReservedLockCount accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: c0da1fbf-f2c3-49b7-9034-507b7382edbe
📒 Files selected for processing (7)
db/migrations/20260518050000_runs_denormalize_agent_conversation.sqldb/schema.sqlpackages/server/src/gateway/__tests__/agent-transcript-snapshot.test.tspackages/server/src/gateway/__tests__/runs-denormalize-and-reserve-cap.test.tspackages/server/src/gateway/gateway/transcript-routes.tspackages/server/src/gateway/infrastructure/queue/runs-queue.tspackages/server/src/gateway/orchestration/impl/embedded-deployment.ts
| if (!rows[0]?.acquired) { | ||
| reserved.release(); | ||
| decrementOnce(); | ||
| return null; | ||
| } | ||
| } catch (err) { | ||
| reserved.release(); | ||
| decrementOnce(); | ||
| throw err; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify all reserved release paths around lock acquisition are guarded so decrement always happens.
rg -n -C3 'reserved\.release\(\)|decrementOnce\(' packages/server/src/gateway/orchestration/impl/embedded-deployment.tsRepository: lobu-ai/lobu
Length of output: 1164
Ensure counter decrement cannot be skipped when release throws.
On lines 293-294 and 298-299, reserved.release() runs before decrementOnce(). If release() throws, decrementOnce() won't execute, leaking reservedLockCount and eventually causing false cap rejections. The cleanup path (lines 337-345) already implements the correct pattern with try-catch around release and a finally-block decrement.
Proposed fix
+ const releaseReservedAndDecrement = (): void => {
+ try {
+ reserved.release();
+ } catch (releaseErr) {
+ logger.warn(
+ `Failed to release reserved DB connection for ${organizationId}/${agentId}/${conversationId}: ${
+ releaseErr instanceof Error ? releaseErr.message : String(releaseErr)
+ }`
+ );
+ } finally {
+ decrementOnce();
+ }
+ };
+
try {
const rows = (await reserved`SELECT pg_try_advisory_lock(${CONV_LOCK_KEY1}, ${key2}) AS acquired`) as Array<{ acquired: boolean }>;
if (!rows[0]?.acquired) {
- reserved.release();
- decrementOnce();
+ releaseReservedAndDecrement();
return null;
}
} catch (err) {
- reserved.release();
- decrementOnce();
+ releaseReservedAndDecrement();
throw err;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (!rows[0]?.acquired) { | |
| reserved.release(); | |
| decrementOnce(); | |
| return null; | |
| } | |
| } catch (err) { | |
| reserved.release(); | |
| decrementOnce(); | |
| throw err; | |
| const releaseReservedAndDecrement = (): void => { | |
| try { | |
| reserved.release(); | |
| } catch (releaseErr) { | |
| logger.warn( | |
| `Failed to release reserved DB connection for ${organizationId}/${agentId}/${conversationId}: ${ | |
| releaseErr instanceof Error ? releaseErr.message : String(releaseErr) | |
| }` | |
| ); | |
| } finally { | |
| decrementOnce(); | |
| } | |
| }; | |
| try { | |
| const rows = (await reserved`SELECT pg_try_advisory_lock(${CONV_LOCK_KEY1}, ${key2}) AS acquired`) as Array<{ acquired: boolean }>; | |
| if (!rows[0]?.acquired) { | |
| releaseReservedAndDecrement(); | |
| return null; | |
| } | |
| } catch (err) { | |
| releaseReservedAndDecrement(); | |
| throw err; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/server/src/gateway/orchestration/impl/embedded-deployment.ts` around
lines 292 - 300, The cleanup path may skip calling decrementOnce() if
reserved.release() throws, leaking reservedLockCount; change the two early-exit
blocks that call reserved.release() then decrementOnce() (the branches around
rows[0]?.acquired check and the catch block) to mirror the pattern used later:
call reserved.release() inside its own try/catch and always call decrementOnce()
in a finally (or ensure decrementOnce() runs after a guarded release) so that
decrementOnce() cannot be skipped even if reserved.release() throws; update the
code paths referencing reserved.release() and decrementOnce() accordingly.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/server/src/index.ts`:
- Around line 450-454: The parsedCap/cap logic allows 0 but clamps negatives to
50 and also calls Number.parseInt(rawCap, 10) twice; update the code that
computes parsedCap (using rawCap) to parse once into a local value (e.g., const
parsed = Number.parseInt(rawCap, 10)) and then treat parsed <= 0 as the fallback
50 (or explicitly clamp zero to 50) so that both negative and zero map to 50,
finally assign cap from that clamped parsed value (keep variable names parsedCap
and cap for clarity).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 816dac00-e486-4fbd-95d5-ec37bbe545f9
📒 Files selected for processing (5)
db/migrations/20260518050000_runs_denormalize_agent_conversation.sqldb/schema.sqlpackages/server/src/gateway/__tests__/runs-denormalize-and-reserve-cap.test.tspackages/server/src/gateway/gateway/transcript-routes.tspackages/server/src/index.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- packages/server/src/gateway/gateway/transcript-routes.ts
- packages/server/src/gateway/tests/runs-denormalize-and-reserve-cap.test.ts
…ool size Round 2 review of PR #870: 1. Migration backfill is now actually chunk-committed. Previous DO block ran as a single implicit transaction so the LOOP didn't commit between iterations. Switched to a temp PROCEDURE (PG11+ feature) which DOES support COMMIT inside the loop body — each 5000-row chunk releases its row locks before the next batch. 2. LOBU_MAX_RESERVED_LOCKS default is now derived from DB_POOL_MAX with a POOL_HEADROOM of 5 (default cap = 15 against a 20-connection pool). The prior fixed default of 50 against a 20-connection pool meant callers 21-50 would block inside sql.reserve() instead of hitting the cap at all — defeating the cap's purpose. /health/orchestrator now reports the derived cap, not a hardcoded one. 3. Comment in runs-queue.ts no longer overclaims that isRunOwnedByJwtScope gets an index-only seek win from the new index — runs_pkey already serves single-row lookups on id. The index earns its keep on (agent_id, conversation_id) lookups that don't specify id.
…igration Round 3 review of PR #870: 1. Backfill predicate now requires action_input->>'agentId' IS NOT NULL instead of '? agentId'. A row with {"agentId": null} would have ? agentId TRUE but ->>'agentId' = NULL → SET agent_id = NULL is a no-op and the chunk row count never falls to 0. One bad row could wedge the migration LOOP forever. 2. Index build is now in its own transaction:false migration (20260518050001) so CREATE INDEX CONCURRENTLY can run without an enclosing transaction block. The hot multi-million-row runs table no longer eats a write-blocking AccessExclusive during the build. Test loader handles the second migration correctly because postgres.js runs a single-statement unsafe() via the simple query protocol — no implicit BEGIN/COMMIT wrap.
… index Round 4 review of PR #870: 1. CREATE INDEX CONCURRENTLY does not work in this repo's dbmate+pq setup even under transaction:false (per the comments in 20260426130001_db_integrity_cleanup_concurrent.sql:145-148 and 20260430151215). Plain CREATE INDEX would block writes for the duration of the build on the multi-million-row runs table. Since runs_pkey already serves the verifier's hot path, the index has no required user — dropped entirely. 2. The chunked backfill scans WHERE agent_id IS NULL with LIMIT, no keyset cursor; on millions of rows this degenerates to O(N²) repeated scans of the already-updated prefix. Dropped the backfill from the migration entirely. The COALESCE fallback in isRunOwnedByJwtScope already handles historical NULL rows correctly. Operators who want the columns populated for diagnostic queries can run a keyset-anchored backfill via a runbook. Migration now does just one thing: ADD COLUMN (metadata-only on PG11+, brief AccessExclusive). All complexity moved into runtime fallback. Updated test renames the backfill test to verify the COALESCE protects legacy NULL-column rows + drops the index-shape assertion.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@packages/server/src/gateway/__tests__/runs-denormalize-and-reserve-cap.test.ts`:
- Around line 6-7: Update the stale header comment in the test file
runs-denormalize-and-reserve-cap.test.ts to remove the claim that a partial
index exists and that the verifier is index-only; instead state that index
creation was removed by the migration and any queries may require a full scan or
appropriate new index if needed. Locate the comment block containing the phrase
"`... ->> 'conversationId'`. A partial index covers the predicate so the
verifier is index-only" and replace it with wording that reflects the current
migration state (e.g., note that the partial index was removed and performance
assumptions no longer hold).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: f28e8ddc-4b46-47f5-8556-d1c5516f6ff9
📒 Files selected for processing (4)
db/migrations/20260518050000_runs_denormalize_agent_conversation.sqldb/schema.sqlpackages/server/src/gateway/__tests__/runs-denormalize-and-reserve-cap.test.tspackages/server/src/gateway/infrastructure/queue/runs-queue.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/server/src/gateway/infrastructure/queue/runs-queue.ts
…ate output CI's migration check regenerates db/schema.sql via dbmate and compares. The pre-existing schema in main has the constraint as agent_transcript_snapshot_org_agent_conv_run_key (likely renamed by hand or in an older dbmate version) while dbmate now emits the auto-named agent_transcript_snapshot_organization_id_agent_id_conversa_key. Align the snapshot file with dbmate's output to clear the drift check.
… job The 'embedded mode returns no-op sentinel' and 'lock-cross-conv-parallelism (embedded sentinel)' tests assert behavior specific to LOBU_DISABLE_PREPARE=1 (PGlite). On the real-PG integration job they'd hit the cap+reserve path and the embedded-sentinel assertions don't apply. Gated both with skipIf.
…unused) (#873) PR #870 added two columns (`runs.agent_id`, `runs.conversation_id`) on the premise that the snapshot verifier (`isRunOwnedByJwtScope`) was doing a JSONB full-scan on `runs`. That premise was wrong: the verifier query is a PK lookup (`WHERE id = $1`) and the JSONB extraction on the single matched row is microseconds. The columns landed with a COALESCE fallback to JSONB on every read, have zero downstream consumers (claim loop doesn't filter on them, reaper doesn't either), and add INSERT-path overhead in `RunsQueue.send` for no win. Remove the slop. Kept from PR #870: the `acquireConversationLock` reserved-connection cap, the counter helper, and `/health/orchestrator`. Those are genuine multi-replica plumbing and unrelated to the JSONB perf claim. * Add migration `20260518060000_revert_runs_denormalize.sql` to drop both columns (DROP COLUMN IF EXISTS — metadata-only on PG11+). * Update `db/schema.sql` to match; `schema_migrations.version` preserved as `varchar(128)` per the known CI footgun. * Revert `isRunOwnedByJwtScope` to JSONB-only. * Revert `RunsQueue.send` INSERT shape (drop agent_id / conversation_id / organization_id from the column list). * Rename `runs-denormalize-and-reserve-cap.test.ts` → `reserve-cap.test.ts` and remove the denormalize describe block. Reserve-cap tests stay verbatim. * Strip the new-columns from the `insertRun` helper in `agent-transcript-snapshot.test.ts`. Snapshot suite still passes via the JSONB-only verifier (all 858 gateway tests green). Validation: - `make typecheck`: same 18 pre-existing errors on origin/main, no new errors introduced. - `make build-packages`: clean. - `bun test packages/server/src/gateway/__tests__/`: 858 pass / 0 fail.
Summary
Two perf prerequisites for flipping
LOBU_SESSION_STORE=snapshotto default (Phase 5 of the PR #865 snapshot work). Both surfaced by the 6-agent review on #865.Fix 1 —
isRunOwnedByJwtScopeJSONB full scanisRunOwnedByJwtScope(called from every snapshot POST under the snapshot path) was running:No functional/expression index covered the JSONB shape. On the prod runs table it's a 10–100ms seq scan per worker completion.
Changes:
20260518050000_runs_denormalize_agent_conversation.sqladds nullableagent_id+conversation_idcolumns, backfills fromaction_inputfor historical rows, and creates a partial indexruns_agent_conv_idx (id, organization_id, agent_id, conversation_id) WHERE both NOT NULL.RunsQueue.sendextracts the keys from the payload at insert time and populates the new columns alongsideaction_input.isRunOwnedByJwtScopereads the scalar columns directly so the planner resolves an index-only scan.CREATE INDEX is non-CONCURRENT following the precedent in
20260426130001_db_integrity_cleanup_concurrent.sql(dbmate + pq still present CONCURRENTLY statements as in-transaction). Defensivelock_timeout=30sfor the DDL; bumped to 0 around the backfill so row-locks can wait under live traffic.Fix 2 —
acquireConversationLockreserve() pressureEach spawned worker under snapshot mode pinned one
sql.reserve()connection for the full subprocess lifetime (10s–1h+). Multi-pod × multi-conversation pressure exhausts the postgres-js pool (default max 20).Changes:
LOBU_MAX_RESERVED_LOCKSenv (default 50). Acquirer checks the in-process counter before callingsql.reserve(); when the cap is reached, returns null sospawnDeploymentre-queues the run via the existingDEPLOYMENT_CREATE_FAILEDretry path.getReservedLockCount()helper exposes the counter for metrics / health probes.await sql.reserve()to gate concurrent acquirers; decrement is idempotent and runs on every release path (lock-busy, throw, normal release).Cap value 50 is a conservative starting point; real tuning happens by env once we observe the metric in prod.
Reproducer
Fix 1 — index lookup vs seq scan (on local PGlite):
Fix 2 — cap rejection:
# Same file; cap=2 + staged counter=2 returns null, counter unchanged.Validation
make typecheck→ cleanmake build-packages→ cleanbun test packages/server→ 1498 pass / 117 fail / 64 errors. Baseline onorigin/mainfrom same harness: 1490 pass / 118 fail / 65 errors. Delta: +8 passes (new tests), -1 fail, -1 error. All remaining failures are pre-existing and unrelated to this PR (admin auth routes, isolated-vm sandbox, settings-auth cookie tests).schema_migrations.versionconfirmed stillvarchar(128)after schema.sql regeneration.Test plan
agent_id/conversation_idcolumnsisRunOwnedByJwtScopecorrectness: true for matching scope; false for wrong agent / conv / orgaction_inputruns_agent_conv_idxexists with the expected shapeLOBU_DISABLE_PREPARE=1) returns no-op sentinel without touching counterNotes for review
sync/action/watcher/auth) carry NULL in the new columns. The partial index'sWHERE NOT NULLpredicate keeps it small.LOBU_SESSION_STORE=snapshotmode reads throughisRunOwnedByJwtScope, and that flag isn't default until Phase 5.Summary by CodeRabbit
New Features
/health/orchestratorendpoint to monitor reserved conversation lock utilization and capacity metrics.Tests