Skip to content

refactor(server): move pending interactions and SSE fan-out to Postgres#834

Merged
buremba merged 4 commits into
mainfrom
refactor/ha-pending-interactions-and-sse
May 18, 2026
Merged

refactor(server): move pending interactions and SSE fan-out to Postgres#834
buremba merged 4 commits into
mainfrom
refactor/ha-pending-interactions-and-sse

Conversation

@buremba
Copy link
Copy Markdown
Member

@buremba buremba commented May 18, 2026

Summary

Removes the in-process Map<questionId, PendingQuestionEntry> in interaction-bridge.ts that pinned the chat interaction bridge to a single replica. Pending questions now live in public.pending_interactions; claimPendingQuestion is an atomic UPDATE … SET claimed_at = now() WHERE id = $1 AND claimed_at IS NULL RETURNING entry_payload so a button click landing on pod B can claim a question registered on pod A. Slack webhook retries and racing clicks serialize on the row — the winner gets the payload, losers see null and no-op.

The non-serializable platform SentMessage handle (used to edit the original card after a click) stays in a small per-pod cache. Losing it cross-pod only degrades card-edit UX; the answer still routes correctly through the worker.

24h stale-row sweep piggybacks on the existing scheduled sweep-ephemeral-tables task (registered in core-services.ts).

Scoped down from the original ask

  • P1 — pending_interactions → Postgres (REQUIRED): done.
  • P2 — SSE fan-out → Postgres LISTEN/NOTIFY (REQUIRED if time): deferred to a follow-up PR. A correct implementation needs payload-size handling (Postgres NOTIFY caps at 8 KB, agent output chunks can exceed that), which adds either chunking logic or a spill table — neither was a one-line change, so per the "we'd rather have two small PRs than one big one" rule I'm shipping P1 alone. The single-pod SSE behavior is unchanged and the bridge fix is the bigger HA wedge.
  • P3 — rate limiter → Postgres: not touched.

Migration

Additive: one new table pending_interactions with (id text PK, organization_id text FK → organization(id) ON DELETE CASCADE, entry_payload jsonb, created_at, claimed_at), plus a partial index on the unclaimed predicate and one on created_at for the sweep. db/schema.sql updated by hand to match.

Test plan

  • make typecheck passes (strict — matches the Dockerfile)
  • make build-packages passes
  • New pending-interaction-store.test.ts covers atomic claim (second claim returns null), restash, and stale-row sweep against PGlite
  • Existing interaction-bridge-action-handlers.test.ts and interaction-bridge-slack-webhook.test.ts still pass
  • (Manual, post-deploy) A question:created posted on pod A, clicked on pod B, routes back through the worker

Summary by CodeRabbit

  • New Features

    • Persistent storage for pending chat interactions and a local cache for sent-message handles to improve cross-instance handling and UI cleanup.
  • Bug Fixes

    • Button clicks now correctly ignore stale or mismatched interactions and return ownership to persistence to prevent duplicate or unauthorized claims.
  • Tests

    • Added tests validating claim/scoping behavior and stale-interaction sweep.
  • Chores

    • Database schema updated with pending-interactions table, indexes, and migration; sweep integrated into periodic cleanup.

Review Change Stack

Replaces the in-process `Map<questionId, PendingQuestionEntry>` in
`interaction-bridge.ts` with a `public.pending_interactions` table so a
button click landing on pod B can claim a question registered on pod A.

`claimPendingQuestion` is an atomic `UPDATE … SET claimed_at = now()
WHERE id = $1 AND claimed_at IS NULL RETURNING entry_payload` — Slack
webhook retries and racing clicks serialize on the row, the winner gets
the payload, losers see null and no-op.

The non-serializable platform `SentMessage` handle (used to edit the
original card after a click) stays in a small per-pod cache. Losing it
cross-pod only degrades card-edit UX; the answer still routes correctly.

A 24h stale-row sweep piggybacks on the existing scheduled
`sweep-ephemeral-tables` task via `sweepEphemeralTables` in
`core-services.ts`.

Migration is additive (one new table, two indexes). Single-pod behavior
is unchanged.

SSE fan-out (sse-manager.ts) is still per-pod and left for a follow-up
PR.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 5673e714-4219-4900-8607-93fc23ebf57b

📥 Commits

Reviewing files that changed from the base of the PR and between 096b377 and 3c1f16f.

📒 Files selected for processing (3)
  • db/schema.sql
  • packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts
  • packages/server/src/gateway/__tests__/connections-platform-isolation.test.ts
✅ Files skipped from review due to trivial changes (1)
  • packages/cli/src/commands/_lib/apply/tests/diff.test.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • db/schema.sql

📝 Walkthrough

Walkthrough

Adds Postgres persistence for pending chat interactions: a new pending_interactions table, a store API to upsert/claim/sweep pending questions with scoped ownership, bridge wiring to persist and claim interactions across pods, tests for scoping and sweep, and scheduler integration to delete stale entries.

Changes

Persistent Pending Interactions

Layer / File(s) Summary
Database schema and migration
db/migrations/20260518000000_pending_interactions.sql, db/schema.sql
Create pending_interactions with id PK, organization_id FK (ON DELETE CASCADE), connection_id, expected_user_id, entry_payload JSONB, created_at default now, nullable claimed_at. Add partial index idx_pending_interactions_unclaimed for unclaimed lookups and idx_pending_interactions_created_at for cleanup; migration down drops indexes and table.
Persistence API implementation
packages/server/src/gateway/connections/pending-interaction-store.ts
Export StoredPendingQuestion and three functions: storePendingQuestion(questionId, organizationId, connectionId, expectedUserId, entry) (upsert, reset claimed_at), claimPendingQuestion(questionId, organizationId, connectionId, expectedUserId) (atomic claim returning payload or null), and sweepStalePendingInteractions(maxAgeMs?) (delete stale rows, return deleted ids). restashPendingQuestion is not present.
Bridge integration
packages/server/src/gateway/connections/interaction-bridge.ts
Persist pending question before posting cards, locally cache SentMessage handles for stripping, await claimPendingQuestion on clicks to enforce scoped ownership across pods, remove prior in-memory-only restash/timers, and ensure persisted rows are removed if card posting fails.
Persistence tests
packages/server/src/gateway/__tests__/pending-interaction-store.test.ts, packages/server/src/gateway/__tests__/connections-platform-isolation.test.ts
Add tests that seed organizations, assert single-winner claim behavior, verify wrong-organization/connection/user claims do not consume rows, and validate sweepStalePendingInteractions deletes stale rows and leaves fresh rows claimable. Also set organizationId on connection fixture to satisfy bridge expectations.
Ephemeral table cleanup scheduler
packages/server/src/gateway/services/core-services.ts
Add sweepStalePendingInteractions to CoreServices.sweepEphemeralTables() and include pending-interactions deletion count in logging/threshold checks.

Test formatting-only changes

Layer / File(s) Summary
apply diff tests reformat
packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts
Large whitespace/formatting-only reformat across helper builders and many apply diff test suites; assertions and logic unchanged.

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Possibly related PRs:
    • lobu-ai/lobu#750: Related org-scoping plumbing; changes intersect with per-tenant pending_interactions writes.

🐇 I cached the card and dug the hole,
Stored the question to keep it whole.
Pods may click, but only one may claim,
Stale ones swept—no lingering name.
HOP — persistence now plays the game!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 26.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title directly reflects the main objective: migrating pending interactions from in-process memory to Postgres for cross-replica claiming.
Description check ✅ Passed Description includes comprehensive summary, detailed migration notes, clear scope boundaries, and test plan with completed items; aligns well with the provided template structure.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch refactor/ha-pending-interactions-and-sse

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ESLint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

ESLint skipped: no ESLint configuration detected in root package.json. To enable, add eslint to devDependencies.


Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov-commenter
Copy link
Copy Markdown

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/server/src/gateway/connections/interaction-bridge.ts (1)

291-297: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Persist the pending row before making the card clickable.

thread.post() exposes the buttons before storePendingQuestion() completes. If that insert is slow or fails, the first click sees “no pending entry” and gets dropped even though the card was visible. Store first, then post, or clean up the row if posting fails.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/server/src/gateway/connections/interaction-bridge.ts` around lines
291 - 297, The pending-question row must be persisted before making the card
clickable: call the function that inserts the pending entry
(storePendingQuestion or the code path that creates the pending row) before
invoking postWithFallback(thread, { card, fallbackText }, connectionId,
"question interaction"), and pass the pending ID/metadata through to
trackQuestion so the saved row is associated with the post; if postWithFallback
throws or returns a failure, delete/rollback the pending row to avoid orphaned
rows visible to users. Ensure thread.post()/postWithFallback is only called
after the insert completes successfully and add error handling to clean up the
pending entry on posting failure.
🧹 Nitpick comments (1)
packages/server/src/gateway/__tests__/pending-interaction-store.test.ts (1)

48-57: ⚡ Quick win

Add a truly concurrent claim test for race coverage.

This case is sequential and would still pass with a non-atomic select-then-update implementation. Running two claims concurrently would better lock in the intended single-winner guarantee.

🧪 Suggested test shape
   test("claim returns the stored payload exactly once", async () => {
     const q = buildQuestion("q-1");
     await storePendingQuestion(q.id, undefined, { question: q });

-    const first = await claimPendingQuestion(q.id);
-    expect(first?.question.id).toBe("q-1");
-
-    const second = await claimPendingQuestion(q.id);
-    expect(second).toBeNull();
+    const [a, b] = await Promise.all([
+      claimPendingQuestion(q.id),
+      claimPendingQuestion(q.id),
+    ]);
+    const winners = [a, b].filter((x) => x !== null);
+    expect(winners).toHaveLength(1);
+    expect(winners[0]?.question.id).toBe("q-1");
   });
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/server/src/gateway/__tests__/pending-interaction-store.test.ts`
around lines 48 - 57, Add a concurrent test that verifies only one caller wins
when two claims run at the same time: use buildQuestion to create a question,
storePendingQuestion to insert it, then start two simultaneous calls to
claimPendingQuestion (e.g., via Promise.all or Promise.allSettled) and assert
that exactly one result is non-null and has question.id "q-1" while the other is
null; reference the existing test name and functions claimPendingQuestion,
storePendingQuestion, and buildQuestion to locate where to add this new test.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/server/src/gateway/connections/interaction-bridge.ts`:
- Around line 203-206: The pendingSentMessages map currently never evicts
entries; modify trackQuestion and restashQuestion to call a shared helper
trackPendingSentMessage(questionId, SentMessage) that adds the entry to
pendingSentMessages and starts a TTL timer (match DB sweep horizon, e.g. 24
hours) to remove it when expired; ensure trackPendingSentMessage replaces any
existing timer for the same questionId. Also update the unregister/teardown path
to clear any outstanding timers and remove entries from pendingSentMessages so
timers don’t leak on connection teardown.
- Around line 463-468: The code currently calls claimQuestion(questionId) before
checking manager.getInstance(connectionId), which can leave a question
permanently claimed if this pod can't service it; move the live-instance check
(manager.getInstance(connectionId)) before calling claimQuestion(questionId), or
if you prefer to keep the current order, ensure you call the unclaim/restash
path (the function that releases the DB claim) when manager.getInstance(...)
returns a miss so the row is released; update the handler in the async
(questionId, value, thread, author) => block (and the similar block at lines
~477-484) to either perform manager.getInstance first or call the
release/unclaim helper on the missed path before returning.

In `@packages/server/src/gateway/connections/pending-interaction-store.ts`:
- Around line 82-85: The restashPendingQuestion SQL clears claimed_at but
doesn't update created_at, so a row restashed near the 24h cutoff can be
immediately swept; modify the UPSERT in restashPendingQuestion to also refresh
created_at (e.g., set created_at = NOW() or created_at = EXCLUDED.created_at) in
the ON CONFLICT ... DO UPDATE clause so restashed rows get a fresh timestamp and
won't be immediately deleted by the sweeper.

---

Outside diff comments:
In `@packages/server/src/gateway/connections/interaction-bridge.ts`:
- Around line 291-297: The pending-question row must be persisted before making
the card clickable: call the function that inserts the pending entry
(storePendingQuestion or the code path that creates the pending row) before
invoking postWithFallback(thread, { card, fallbackText }, connectionId,
"question interaction"), and pass the pending ID/metadata through to
trackQuestion so the saved row is associated with the post; if postWithFallback
throws or returns a failure, delete/rollback the pending row to avoid orphaned
rows visible to users. Ensure thread.post()/postWithFallback is only called
after the insert completes successfully and add error handling to clean up the
pending entry on posting failure.

---

Nitpick comments:
In `@packages/server/src/gateway/__tests__/pending-interaction-store.test.ts`:
- Around line 48-57: Add a concurrent test that verifies only one caller wins
when two claims run at the same time: use buildQuestion to create a question,
storePendingQuestion to insert it, then start two simultaneous calls to
claimPendingQuestion (e.g., via Promise.all or Promise.allSettled) and assert
that exactly one result is non-null and has question.id "q-1" while the other is
null; reference the existing test name and functions claimPendingQuestion,
storePendingQuestion, and buildQuestion to locate where to add this new test.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: cb9e7dea-504f-44e0-a486-ab641681094b

📥 Commits

Reviewing files that changed from the base of the PR and between f597e76 and 0f59916.

📒 Files selected for processing (6)
  • db/migrations/20260518000000_pending_interactions.sql
  • db/schema.sql
  • packages/server/src/gateway/__tests__/pending-interaction-store.test.ts
  • packages/server/src/gateway/connections/interaction-bridge.ts
  • packages/server/src/gateway/connections/pending-interaction-store.ts
  • packages/server/src/gateway/services/core-services.ts

Comment thread packages/server/src/gateway/connections/interaction-bridge.ts Outdated
Comment thread packages/server/src/gateway/connections/interaction-bridge.ts Outdated
Comment thread packages/server/src/gateway/connections/pending-interaction-store.ts Outdated
…entMessage TTL, persist-then-post

Addresses the four pi-review findings on PR #834.

[HIGH #1] Cross-tenant claim hole — keying by `id` alone let a leaked or
forged question id be consumed by any connection in any org. The claim
SQL now scopes by `(id, organization_id, connection_id, expected_user_id)`
and the migration adds those columns as NOT NULL. Cross-tenant and
cross-connection clicks return null without consuming the row.

[MED #2] Unbounded pendingSentMessages — the old 5-minute timer was
removed by mistake. A per-bridge `setInterval` now sweeps SentMessage
handles older than 24h (matching the DB row TTL) and also drops handles
for any row the scheduled DB sweep deleted. The sweep is unref'd and
cleared on bridge unregister.

[MED #3] Claim-then-auth race — folded into the SQL claim. The
`expected_user_id = $4` predicate means wrong-user clicks never set
`claimed_at`, so process death after the claim can no longer leave the
row in a half-consumed state. The old "claim → check author → async
restash" three-step (and `restashPendingQuestion` itself) is gone.

[LOW #4] Post-then-persist ordering — `onQuestionCreated` now persists
the row first, then posts the card. If the persist fails the card is
never shown; if the post fails after a successful persist the row is
dropped immediately rather than waiting for the 24h sweep.

Red→green coverage: the new test file asserts the scoped claim rejects
cross-tenant, cross-connection, and wrong-user clicks AND that the row
remains claimable by the rightful owner. These same assertions fail on
the pre-fix commit (verified locally against `claimPendingQuestion(id)`).
@buremba
Copy link
Copy Markdown
Member Author

buremba commented May 18, 2026

Pushed fix-up commit 096b377b addressing all four findings.

Per finding

[HIGH #1] Cross-tenant claim hole — fixed.
Migration now requires organization_id, connection_id, expected_user_id as NOT NULL on pending_interactions. The claim is a single scoped SQL:

UPDATE pending_interactions
   SET claimed_at = now()
 WHERE id               = $1
   AND organization_id  = $2
   AND connection_id    = $3
   AND expected_user_id = $4
   AND claimed_at IS NULL
RETURNING entry_payload

The partial unclaimed-predicate index was widened to (id, organization_id, connection_id, expected_user_id) so the lookup stays index-only.

[MED #2] Unbounded pendingSentMessages — fixed.
Each entry now carries a registeredAt timestamp. A per-bridge setInterval (1h cadence, unref'd, cleared on unregister) evicts entries older than 24h AND deletes local entries for any DB row the scheduled sweeper just dropped. sweepStalePendingInteractions now returns the deleted ids so the bridge and DB stay in sync. core-services.ts was adjusted to read .length from the returned array.

[MED #3] Claim-then-auth race — fixed by going with option (a). expected_user_id is part of the SQL predicate, so a wrong-user click never sets claimed_at. The post-claim author check + async restashQuestion + restashPendingQuestion are deleted entirely — no restash needed when the claim never happened.

[LOW #4] Post-then-persist — fixed.
onQuestionCreated now: (1) validates organizationId + event.userId, (2) persists the pending row, (3) posts the card, (4) caches the SentMessage handle. If persist fails the card is never shown. If post fails after a successful persist the row is dropped immediately (claim-with-correct-scope) rather than waiting for the 24h sweep. Connections without an organizationId short-circuit before any write.

Red → green

Wrote a temporary pending-interaction-store.red-proof.test.ts against the pre-fix API (claimPendingQuestion(id)) and ran it at 0f599162:

(fail) [#1] cross-tenant claim is rejected
  expect(received).toBeNull()
  Received: { question: { id: "q-cross-tenant", userId: "U_A", ... } }
(fail) [#3] wrong-user claim is rejected without consuming the row
  expect(received).toBeNull()
  Received: { question: { id: "q-wrong-user", userId: "U_A", ... } }

 0 pass
 2 fail
 2 expect() calls
Ran 2 tests across 1 file. [2.08s]

Then deleted the red-proof file (it only made sense against the old API), restored the fix, and ran the post-fix tests with the new scoped signatures:

 5 pass
 0 fail
 11 expect() calls
Ran 5 tests across 1 file. [2.19s]

The fix-branch pending-interaction-store.test.ts now contains explicit [#1] and [#3] cases that assert: (a) the mismatched-scope claim returns null, and (b) the row is still claimable by the rightful org/connection/user. The existing interaction-bridge-action-handlers.test.ts and interaction-bridge-slack-webhook.test.ts still pass (25 / 25 across the three test files).

Validation

  • make typecheck — clean
  • make build-packages — clean
  • bridge + store bun:test suites — 25 / 25 green

Pushed back on

Nothing. The author-userId check in the click handler is gone (it was redundant once the SQL claim does the work), but I did NOT touch the existing shouldHandle connection/platform filter — claim-scoping is layered on top, not in place of, that pre-existing guard.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/server/src/gateway/connections/interaction-bridge.ts (1)

351-377: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't keep a pending row when the card fell back to plain text.

postWithFallback() returns a SentMessage for both the interactive card and the plain-text fallback, so this branch only cleans up on total post failure. If card rendering fails but the fallback text posts, the DB row stays claimable even though there are no action buttons to produce a question:* click, leaving an unanswerable interaction behind.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/server/src/gateway/connections/interaction-bridge.ts` around lines
351 - 377, postWithFallback can return a SentMessage even when it only posted
the plain-text fallback, leaving a pending DB row that can never receive a
question:* click; update the logic after the postWithFallback call to detect
when the returned SentMessage represents a fallback (e.g. check a property like
sent.isFallback / sent.type / absence of renderedCard or actions) and, in that
case, call claimPendingQuestion(...) to drop the pending row (same cleanup as
the total-post-failure path) instead of calling rememberSentMessage(event.id,
sent); keep the existing try/catch pattern and logging used in the failure
branch when performing this cleanup.
🧹 Nitpick comments (1)
packages/server/src/gateway/__tests__/pending-interaction-store.test.ts (1)

70-79: ⚡ Quick win

Exercise the actual race here, not just sequential re-reads.

This test proves "claim once, then null", but it doesn't cover the concurrent single-winner guarantee the SQL is supposed to provide. Racing two claimPendingQuestion(...) calls with Promise.all would catch regressions in the atomic-claim path.

Example update
-    const first = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A);
-    expect(first?.question.id).toBe("q-1");
-
-    const second = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A);
-    expect(second).toBeNull();
+    const [first, second] = await Promise.all([
+      claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A),
+      claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A),
+    ]);
+
+    const winners = [first, second].filter(
+      (result): result is NonNullable<typeof result> => result !== null
+    );
+
+    expect(winners).toHaveLength(1);
+    expect(winners[0].question.id).toBe("q-1");
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/server/src/gateway/__tests__/pending-interaction-store.test.ts`
around lines 70 - 79, The test currently calls claimPendingQuestion
sequentially; change it to exercise a real race by invoking two concurrent
claimPendingQuestion(...) calls (e.g., via Promise.all) after storing the
pending question from storePendingQuestion(...) for the built question
(buildQuestion("q-1")), then assert that exactly one of the results is non-null
with question.id "q-1" and the other is null to validate the SQL atomic
single-winner behavior of claimPendingQuestion.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/server/src/gateway/connections/interaction-bridge.ts`:
- Around line 216-246: sweepPendingSent() running per-connection is calling the
global sweepStalePendingInteractions() which issues DELETEs across the whole
pending_interactions table; remove that DB-wide cleanup from the per-connection
path and only perform the local cache eviction there. Specifically, in the
function sweepPendingSent (invoked by pendingSentSweepTimer, interval
PENDING_SENT_SWEEP_INTERVAL_MS) remove the try/catch and await
sweepStalePendingInteractions() call and the deletedIds handling so the function
only prunes pendingSentMessages by registeredAt/ttlCutoff; if you need DB->cache
synchronization instead, have the single scheduled global sweeper publish
deleted IDs and wire that into each bridge to delete those ids from
pendingSentMessages rather than calling sweepStalePendingInteractions() from
sweepPendingSent().

---

Outside diff comments:
In `@packages/server/src/gateway/connections/interaction-bridge.ts`:
- Around line 351-377: postWithFallback can return a SentMessage even when it
only posted the plain-text fallback, leaving a pending DB row that can never
receive a question:* click; update the logic after the postWithFallback call to
detect when the returned SentMessage represents a fallback (e.g. check a
property like sent.isFallback / sent.type / absence of renderedCard or actions)
and, in that case, call claimPendingQuestion(...) to drop the pending row (same
cleanup as the total-post-failure path) instead of calling
rememberSentMessage(event.id, sent); keep the existing try/catch pattern and
logging used in the failure branch when performing this cleanup.

---

Nitpick comments:
In `@packages/server/src/gateway/__tests__/pending-interaction-store.test.ts`:
- Around line 70-79: The test currently calls claimPendingQuestion sequentially;
change it to exercise a real race by invoking two concurrent
claimPendingQuestion(...) calls (e.g., via Promise.all) after storing the
pending question from storePendingQuestion(...) for the built question
(buildQuestion("q-1")), then assert that exactly one of the results is non-null
with question.id "q-1" and the other is null to validate the SQL atomic
single-winner behavior of claimPendingQuestion.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 2319e1c1-368e-4ca6-9507-9b1e577ada37

📥 Commits

Reviewing files that changed from the base of the PR and between 0f59916 and 096b377.

📒 Files selected for processing (6)
  • db/migrations/20260518000000_pending_interactions.sql
  • db/schema.sql
  • packages/server/src/gateway/__tests__/pending-interaction-store.test.ts
  • packages/server/src/gateway/connections/interaction-bridge.ts
  • packages/server/src/gateway/connections/pending-interaction-store.ts
  • packages/server/src/gateway/services/core-services.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • db/migrations/20260518000000_pending_interactions.sql
  • packages/server/src/gateway/services/core-services.ts

Comment on lines +216 to 246
const pendingSentSweepTimer = setInterval(() => {
sweepPendingSent().catch((error) => {
logger.warn(
{ connectionId, error: String(error) },
"pendingSentMessages sweep failed"
);
});
}, PENDING_SENT_SWEEP_INTERVAL_MS);
pendingSentSweepTimer.unref?.();
async function sweepPendingSent(): Promise<void> {
const ttlCutoff = Date.now() - PENDING_SENT_TTL_MS;
for (const [id, entry] of pendingSentMessages) {
if (entry.registeredAt <= ttlCutoff) {
pendingSentMessages.delete(id);
}
}
// Also drop local handles for any DB rows the scheduled sweeper just
// deleted — keeps the local cache from outliving its DB row.
let deletedIds: string[] = [];
try {
deletedIds = await sweepStalePendingInteractions();
} catch (error) {
// The store logs its own DB errors; treat as best-effort here.
logger.debug(
{ connectionId, error: String(error) },
"sweepStalePendingInteractions failed during local sweep"
);
}
for (const id of deletedIds) {
pendingSentMessages.delete(id);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep DB stale-row cleanup out of the per-connection cache sweep.

This interval is created once per registered bridge, but sweepPendingSent() calls sweepStalePendingInteractions(), which deletes across the whole pending_interactions table. With many live connections, that turns into many identical global DELETE sweeps every hour on top of the scheduled ephemeral-table sweep from this PR.

Suggested direction
   async function sweepPendingSent(): Promise<void> {
     const ttlCutoff = Date.now() - PENDING_SENT_TTL_MS;
     for (const [id, entry] of pendingSentMessages) {
       if (entry.registeredAt <= ttlCutoff) {
         pendingSentMessages.delete(id);
       }
     }
-    // Also drop local handles for any DB rows the scheduled sweeper just
-    // deleted — keeps the local cache from outliving its DB row.
-    let deletedIds: string[] = [];
-    try {
-      deletedIds = await sweepStalePendingInteractions();
-    } catch (error) {
-      // The store logs its own DB errors; treat as best-effort here.
-      logger.debug(
-        { connectionId, error: String(error) },
-        "sweepStalePendingInteractions failed during local sweep"
-      );
-    }
-    for (const id of deletedIds) {
-      pendingSentMessages.delete(id);
-    }
   }

If you still want exact cache/DB synchronization, wire deleted ids in from the single scheduled sweep instead of having every bridge issue its own global delete.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/server/src/gateway/connections/interaction-bridge.ts` around lines
216 - 246, sweepPendingSent() running per-connection is calling the global
sweepStalePendingInteractions() which issues DELETEs across the whole
pending_interactions table; remove that DB-wide cleanup from the per-connection
path and only perform the local cache eviction there. Specifically, in the
function sweepPendingSent (invoked by pendingSentSweepTimer, interval
PENDING_SENT_SWEEP_INTERVAL_MS) remove the try/catch and await
sweepStalePendingInteractions() call and the deletedIds handling so the function
only prunes pendingSentMessages by registeredAt/ttlCutoff; if you need DB->cache
synchronization instead, have the single scheduled global sweeper publish
deleted IDs and wire that into each bridge to delete those ids from
pendingSentMessages rather than calling sweepStalePendingInteractions() from
sweepPendingSent().

…orm-isolation

The bridge's new per-tenant guard returns before resolveThread when
connection.organizationId is missing — three tests asserting
instanceChat.channel() got called fell to 0 calls. Fixture now sets
organizationId: "test-org" so the path reaches resolveThread; the
subsequent persist still fails on FK in CI (no org row), but is
caught by the bridge's persist-then-post catch block — which is what
the bridge does in production when persistence drops.
buremba added a commit that referenced this pull request May 18, 2026
…ckiness (#848)

* feat(chart): expose service.sessionAffinity so multi-replica deploys can pin SSE clients

When app.replicaCount > 1, kube-proxy round-robins traffic across pods.
The SSE manager keeps connections + a 100-event/2-min backlog in memory
per pod (packages/server/src/gateway/services/sse-manager.ts), so a
client whose stream lands on pod A while the worker run is claimed by
pod B silently misses events. PR #834 fixed the AskUser click path
(now DB-backed) but SSE chunks still need to flow through the pod that
holds the connection.

ClientIP affinity at the Service level pins each client IP to a single
pod for the affinity window. No ingress-controller config required —
the chart stays controller-agnostic. Disabled by default; helmrelease
opts in alongside replicaCount: 2.

Verified with helm template:
  - default (None): no sessionAffinity stanza emitted
  - --set service.sessionAffinity=ClientIP: full stanza with 10800s timeout

* feat(chart): NOTES warning when replicaCount > 1 but sticky is off

The chart change only exposes the knob; an operator can still scale
replicaCount past 1 without enabling sessionAffinity and silently
degrade SSE delivery. Make the foot-gun loud at install/upgrade time
so it's caught before the symptom shows up in production.

Verified with helm install --dry-run:
  - default (replicaCount=1, sessionAffinity unset): no warning
  - replicaCount=2 + sessionAffinity=None: WARNING emitted
  - replicaCount=2 + sessionAffinity=ClientIP: no warning
@buremba buremba merged commit de81ebe into main May 18, 2026
20 of 23 checks passed
@buremba buremba deleted the refactor/ha-pending-interactions-and-sse branch May 18, 2026 02:20
buremba added a commit that referenced this pull request May 18, 2026
#855)

Both caches were "lazy refresh on read" — they update an entry's
expiresAt when the same agentId is looked up again, but never delete
entries for agentIds that are never re-queried. Net: cache size
grows monotonically with distinct agentIds the gateway has ever
seen, bounded only by the pod's lifetime.

In practice the growth rate is small (~200 bytes per distinct
agentId, hundreds-to-thousands of agents per day) and almost
certainly NOT the cause of the 1 Gi OOM that prompted #782 — but
it's still a genuine bound-less Map that ideally cleans up after
itself.

Adds a tiny cacheSet helper with a 1024-entry cap that evicts the
oldest insertion (Maps iterate in insertion order, so size-1
peek-and-delete is O(1)). Test exercises 2048 distinct lookups and
asserts both caches stay <= 1024.

Refs #782 — hardening, not root-cause fix. SSE keepalive teardown
(#833) and in-memory pending-interactions removal (#834) remain
the most likely actual OOM fixes.
buremba added a commit that referenced this pull request May 18, 2026
buremba added a commit that referenced this pull request May 18, 2026
* fix(runs): add heartbeat-lost reaper for connector lanes

When a connector worker crashes, OOM-kills, or scales down mid-run the
`runs` row sits in `claimed`/`running` forever and the feed never gets a
retry. The legacy `checkStalledExecutions` cron caught some of this on a
5-minute cadence but only covered `sync` + `embed_backfill`, did a full
table scan, and had no cross-pod coordination.

`reapStaleRuns()` (packages/server/src/scheduled/check-stalled-executions.ts)
covers all four connector lanes (`sync`, `action`, `embed_backfill`,
`auth`), wraps the sweep in `pg_try_advisory_lock` for multi-pod safety,
and reads the threshold from `RUNS_REAPER_STALE_AFTER_SECONDS` (default
120s). It writes the failure as `error_message='worker_heartbeat_lost'`
and re-queues stalled `sync` runs so the feed self-heals.

Wired on a 30s `setInterval` from the gateway boot path (server.ts and
start-local.ts) with explicit teardown on SIGTERM. The legacy 5-minute
TaskScheduler cron stays as a backstop and now delegates to the same
function; the advisory lock keeps the two cadences from double-failing
rows. Watcher lane stays out of scope — it already has its own 2h sweep
in watchers/automation.ts.

Adds the missing partial index `idx_runs_heartbeat_inflight` on
`runs(last_heartbeat_at) WHERE status IN ('claimed','running') AND
run_type IN ('sync','action','embed_backfill','auth')` so the sweeper
query is index-only. Also sets `last_heartbeat_at = current_timestamp`
on the worker poll claim path so a freshly-claimed row has a sane
initial timestamp instead of relying on the first 30s worker heartbeat
to land.

Slice extracted from #615 — placement-tagging, organization-default-device,
and scale-from-zero remain out of scope and the bigger design is
unchanged by this PR.

* fix(schema): regenerate db/schema.sql after heartbeat index migration

* fix(migration): rename to 20260518010000 to avoid collision with #834 pending_interactions

* fix(schema): add 20260518010000 to schema_migrations after rebase
buremba added a commit that referenced this pull request May 18, 2026
…interactions (#867)

* fix(server): post-review cleanup of multi-tenant isolation + pending interactions

Closes 6 issues surfaced by a 6-agent review of #834, #836, #848, and the
#836 followup:

1. secret-proxy: fail-closed on agentOrgResolver DB error (was warn +
   fall-through with undefined expectedOrganizationId — a DB hiccup
   window let downstream org checks silently downgrade).
2. secret-proxy: legacy-mapping bypass closed. Pre-fix `lookupPlaceholder
   Mapping` skipped the org check when `mapping.organizationId` was unset
   (legacy rows pre-org-pivot); a worker from org B could resolve a
   legacy unscoped mapping owned by org A. Now: if the caller supplies
   `expectedOrganizationId`, the mapping must match it. Emit a WARN on
   every legacy unscoped access to plan the deprecation.
3. pending-interaction-store: drop `created_at = now()` from ON CONFLICT.
   Webhook retries no longer reset the 24h TTL clock, so a misbehaving
   retry loop cannot keep a row alive indefinitely. `claimed_at = NULL`
   reset is preserved so legitimate retries are still claimable.
4. egress-judge VerdictCache key already includes orgId
   (`cache.ts:30-43`) — no code change needed, documented for the
   reviewers.
5. interaction-bridge: drop the per-bridge `sweepStalePendingInteractions`
   setInterval. The global `coreServices.sweepEphemeralTables`
   (scheduled in `src/scheduled/jobs.ts`) already covers it; the
   per-bridge call was N-times-per-pod wasted DB work. The local sweep
   timer is retained but now only evicts the in-memory `pendingSent
   Messages` cache by TTL.
6. pending-interaction-store: cap `sweepStalePendingInteractions` at 1000
   rows/call (configurable). An unbounded DELETE under a stale-row
   backlog could lock the table; remaining rows drain across subsequent
   5-minute cycles.
7. pending-interaction-store: add `deletePendingQuestion(id, org, conn,
   user)` and use it in the post-failure drop path. Pre-fix the bridge
   called `claimPendingQuestion` (UPDATE setting claimed_at) to "drop"
   the row, leaving a phantom row sitting around until the 24h sweep.
   `deletePendingQuestion` carries the same four-field scoping as the
   claim path, so safety invariants are identical.

Tests:

- `multi-tenant-isolation-reproducers.test.ts` gets 3 new cases under
  `[finding 1]`: legacy-mapping bypass rejected, legacy-mapping warn
  path with no expected org, and SecretProxy.forward returns 503 when
  the resolver throws.
- New `pending-interaction-cleanup.test.ts` covers retry-preserves-
  `created_at`, sweep LIMIT honoured + remainder drains, post-failure
  drop is a DELETE (not just claim), and `deletePendingQuestion`
  scoping invariant.
- `secret-proxy.test.ts` updated to reflect the closed legacy bypass —
  the old "falls through (legacy)" case now expects a null return when
  the caller supplies an expected org; a new "no expected org" case
  documents the WARN path for un-org-scoped callers.

Red→green proof captured per-fix in the PR body.

* fix(server): address codex review on PR #867

Two issues flagged by codex review:

1. Provider credential lookup had an unscoped fail-open DB-error path.
   The PR's new 503 in `secret-proxy.ts` only covered the resolver-throws
   branch — when the worker token already carried `organizationId`, the
   resolver was skipped and downstream `authProfilesManager.getBestProfile`
   ran without org context. Inside `AuthProfilesManager.resolveAgentOrgId`,
   a DB error logged a warning and returned undefined, falling through to
   unscoped credential reads (`auth-profiles-manager.ts:251-275`). Now we
   wrap the credential lookup in `orgContext.run({organizationId:
   expectedOrganizationId}, ...)` so `AuthProfilesManager.listProfiles`
   short-circuits via `tryGetOrgId()` and never invokes its own resolver
   when we already know the org. A DB hiccup in the upstream resolver
   cannot downgrade scoping for these requests.

2. The post-failure cleanup test exercised `deletePendingQuestion()`
   directly but didn't drive `registerInteractionBridge`. A regression
   that swapped `deletePendingQuestion` back to `claimPendingQuestion`
   in the bridge would not have failed the test. Added an integration
   test that emits a `question:created` event, stubs the thread post to
   throw, and asserts the row is GONE from `pending_interactions` (count
   0). Verified red→green by reverting the bridge to claim — the test
   times out at 5s because the row is never deleted.

* chore(submodule): bump owletto to dcb2172 to clear drift check
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants