Skip to content

Native Postgres connector: memory feeds, live pushdown, connection-backed derived entities#1182

Open
buremba wants to merge 14 commits into
mainfrom
feat/postgres-connector
Open

Native Postgres connector: memory feeds, live pushdown, connection-backed derived entities#1182
buremba wants to merge 14 commits into
mainfrom
feat/postgres-connector

Conversation

@buremba
Copy link
Copy Markdown
Member

@buremba buremba commented Jun 4, 2026

Adds native PostgreSQL connectivity through the connector SDK. You can bring a Postgres database in as memory (a scheduled read-only query feed that turns rows into events), run SQL live against it without copying (pushdown via query_sql({ connection })), and back a derived entity with a connection so its view reads live from the external DB. It's the foundation for the enterprise database tier, with a cloud gate already in place so the connector is self-hosted-only until the egress hardening lands.

The whole thing lives in the connector SDK — no new gateway DB pools, no separate executor. The connector owns the socket and runs the compute; Lobu is the aggregator. A memory feed syncs rows into the append-only events table (then embeddings/pgvector, the existing path); a live read forks the connector in a new query run-mode and returns rows with no persistence.

How it's built:

  • packages/connectors/src/postgres.ts — the connector. env_keys auth with a single DATABASE_URL secret. One query memory feed that validates a bare SELECT, wraps it with a keyset compound-cursor predicate (correct across equal-cursor ties), and emits one event per row. A query() method for live reads that wraps + paginates + returns a real count(*) total. SQL validation uses node-sql-parser as a best-effort gate with a token-level fallback when it can't parse valid Postgres (FTS @@, jsonpath, GROUPING SETS, array slices, IS NOT DISTINCT FROM, range casts) — the real write seal is the read-only transaction, not the parser.
  • query run-mode threaded through connector-worker (ExecutorJob/child-runner) and reached from the server via connector-pushdown.ts. query_sql gains an optional connection arg that routes to it.
  • Derived entities get an optional backing.connection; stored as entity_types.backing_source, read via get_type → query_sql({ sql: backing_sql, connection: backing_source }).

Security:

  • Cloud gate (connector-cloud-gate.ts): the postgres connector is restricted under LOBU_CLOUD_MODE — hidden from the catalog, blocked at connection-create, and refused at every execution path (queue creation, the production worker poll, the dev-CLI sync, and live pushdown). Self-hosted is unaffected.
  • Egress guard (db-egress-guard.ts): a pre-connect host check on both sync and query. allow-private (self-hosted default) allows loopback/RFC1918/CGNAT/ULA but still blocks metadata/link-local; block-private (cloud) blocks every non-public address, resolving hostnames and rejecting if any address — or any host of a multi-host failover URL — is internal. IPv4-mapped/NAT64/::a.b.c.d are normalized, malformed literals fail closed. Pinning the resolved IP across the pool and forcing TLS are the remaining items before the cloud gate can be lifted.
  • origin_id is namespaced by feed instance (feeds.id), so two query feeds on one connection can't supersede each other's events.

Testing: 26 connector integration tests (sync keyset + incremental, pushdown read/paginate/total, the cloud gate at the queue and worker-poll layers, the egress guard, and a dogfood end-to-end that runs a signup join through sync → events → dedup and reads a connection-backed funnel entity live), plus 95 unit tests for the egress classifier. Two adversarial review passes ran against this branch and the confirmed findings (a multi-host bypass/regression, a classifier hole, a cross-feed origin_id collision, policy-injection ordering) are all fixed. make review is green: 0 bugs, 0 blockers, 0 slop, tests adequate.

Not in this PR: the Owletto UI hook that threads backing_source into the derived-entity view goes through its own submodule PR (the code is ready). Connecting Lobu's real prod Postgres, and the egress pin/force-TLS work that lets us lift the cloud gate, are follow-ups. Virtual feeds + federated search() fan-out are the next slice.


View with Codesmith Autofix with Codesmith
Need help on this PR? Tag /codesmith with what you need. Autofix is disabled.

Summary by CodeRabbit

  • New Features

    • Added PostgreSQL connector supporting live SQL queries and incremental data sync
    • Derived entity types can now be backed by external database queries via SQL pushdown
    • Live query execution with pagination, sorting, and result counting support
    • Database access security controls for cloud and self-hosted environments
  • Documentation

    • Added database connector architecture and usage guide

buremba added 13 commits June 1, 2026 04:04
… entities

Bring an external Postgres database in as memory (scheduled query feed → events)
and as live derived-entity metrics (no copy, read at query time).

- connectors/src/postgres.ts: bundled Postgres connector. env_keys DATABASE_URL;
  a 'query' feed runs a user SELECT wrapped with a keyset compound-cursor
  predicate (never substitutes :cursor), emits one event per row, incremental.
- utils/execute-external-source.ts: single-DB external executor — per-pod capped
  pool registry (idle-evict, LRU, credential-rotation aware), polyglot read-only
  gate, read-only tx + statement_timeout + outer LIMIT cap, credential isolation.
  SqlDialectAdapter seam (Postgres only in V1).
- query_entity_type tool + entity_types.backing_source column: a derived entity
  type can bind to a connection and run live against its DB; the tool authorizes
  by slug and dispatches internal (query_sql) vs external. Client never passes sql.
- backing.connection config API threaded through lobu apply (map/diff/client).
- Cloud gate: postgres hidden from catalog + blocked at connection-create under
  LOBU_CLOUD_MODE until egress hardening lands (connector-cloud-gate.ts).
- Dogfood: lobu-crm wires Lobu's prod DB as memory feeds + a live funnel entity.
- Tests: external read-only gate (unit) + query_entity_type internal/external
  dispatch (integration, external connection points back at the test DB).
Review-driven hardening of the external-DB read path:
- executeExternalSource refuses to run under LOBU_CLOUD_MODE — the airtight SSRF
  gate (a derived view could bind to ANY env connection's DATABASE_URL, not just
  the postgres connector). Self-hosted unaffected.
- assertExternalReadQuery rejects data-modifying CTEs at parse time (the
  read-only tx is the hard seal; this is defense-in-depth).
- redact() also strips bare user:pass@host:port from error messages.
- query_entity_type added to all-tools-e2e TOOL_PLAN (registry coverage gate).
- New tests: connector keyset incremental (real DB), cloud-mode gate, DML-CTE
  rejection, connector-cloud-gate unit, CLI backing.connection diff idempotency.
…te-external-source)

Simplification pass: query_sql and execute-external-source had identical
PG_OID_TYPE_MAP + oidToTypeName + COLUMN_NAME_RE. Hoist to utils/pg-oid.ts so the
OID→name map and the column-identifier guard have one source of truth.
…blocker)

pi caught a read-only-contract violation in the connector that the server-side
guard didn't cover:
- validateBaseQuery now walks the node-sql-parser AST and rejects data-modifying
  CTEs (WITH x AS (INSERT ... RETURNING) SELECT ...) — the parser reports these
  as a top-level 'select', so the stmt.type check alone missed them.
- sync() runs the LIMIT 0 type-probe INSIDE the read-only transaction (was
  outside) so a crafted query can never write while introspecting.
- execute-external-source: strip a trailing ';' before the subquery wrap
  ('(SELECT 1;) AS _t' was a syntax error); drop the unused connectorKey field
  (resolveExternalDatabaseUrl returns the url directly).
- Tests: connector DML-CTE rejection (asserts no write landed) + trailing-';'
  external read.
Re-architect external-DB execution around a connector-SDK pushdown primitive
instead of a gateway-side pool. The connector owns the DB connection for both
sync (indexing) and live reads; Lobu aggregates. Net -307 lines.

- SDK: ConnectorRuntime.query(ctx) -> { rows, columns }, a virtual feed flag,
  and defineConnector sugar.
- connector-worker: a 'query' run-mode (returns rows, persists nothing).
- Postgres connector: query() reusing the read-only/DML-CTE guards; shared
  read-only gate factored from the memory feed.
- server: runConnectorQuery (reuses the inline-run path); query_sql gains an
  optional 'connection' -> pushdown (internal scoping skipped). The DB socket
  lives in the connector subprocess, never the gateway.
- Derived-entity external read is now get_type -> query_sql({sql, connection}).
- DELETE execute-external-source.ts (+test), query_entity_type.ts (+test +
  registry + e2e), pg-oid.ts (query_sql back to inline OID map).
- Keep backing.connection + backing_source migration + apply threading +
  manage_entity_schema + connector-cloud-gate.
- Test: query_sql pushdown end-to-end (compile + fork + query() vs a real DB).

Slice 2 (next): virtual feeds + connector search() + federated fan-out.
…hdown (pi)

Two security blockers pi found on the lean pushdown:
- runConnectorQuery resolved connections by org+slug only — a member could read
  another user's PRIVATE connection by slug. Now requires status='active' and
  enforces the same visibility as manage_connections (member sees org-visible or
  own connections only); query_sql threads ctx.userId + callerIsAdmin.
- The cloud gate only blocked connection CREATION; an existing raw-DB connection
  could still run pushdown / scheduled sync under LOBU_CLOUD_MODE. Now gated at
  EXECUTION time too: assertConnectorAllowedInCloud in runConnectorQuery and in
  feed-sync before running the connector.
- Tests: member-vs-private-connection visibility + cloud-mode execution refusal.
…g (pi nits)

- query_sql connection branch validates non-finite limit/offset before clamping,
  mirroring the internal path.
- Remove FeedDefinition.virtual + its defineConnector pass-through — it has no
  slice-1 reader (dead code); re-added in slice 2 with the virtual-feed read path.
…te prod path

Adversarial bug hunt surfaced 16 real issues; this fixes them.

connectors/postgres.ts:
- validateReadOnlySelect falls back to a token-level read-only check when
  node-sql-parser can't parse valid Postgres (FTS @@, jsonpath @?/@@, GROUPING
  SETS, array slices, IS NOT DISTINCT FROM, range casts). The read-only tx is
  the real write seal; the AST is best-effort. Fixes 6 false rejections.
- reject a top-level LIMIT/OFFSET in query() too (depth-0 scanner), not just the
  sync feed: an inner LIMIT capped the universe the outer OFFSET paged over, so
  later pages returned zero rows.
- query() returns a real count(*) total, so query_sql total_count/has_more are
  accurate (were the page size / off-by-one at exact-multiple boundaries).
- reject duplicate output column names (the row object silently drops a value).
- one combined OID map: display names (bigint/boolean) for query() columns, cast
  names for the checkpoint re-cast.
- share POOL_OPTS + setReadOnly between sync() and query().

server:
- worker-api pollWorkerJob + queue-helpers createSyncRun enforce the cloud gate
  on the production sync path (feed-sync.ts only covered the dev CLI), so a
  postgres feed cannot sync under LOBU_CLOUD_MODE.
- query_sql rejects search_columns without search_term; dedup page-bound coercion.
- manage_entity_schema guides the agent to pass backing_source as query_sql's
  connection so connection-backed derived entities read live.
- migration comment describes the runConnectorQuery/query_sql pushdown path, not
  the deleted query_entity_type/execute-external-source.

Tests: parser-fallback acceptance, total_count/has_more across pages, top-level
LIMIT + duplicate-column rejection, cloud-gate sync (blocked under cloud mode,
queued self-hosted).
…pi nits)

- Only query() ships today; the virtual feed flag was dropped (re-add in slice 2).
- Cloud gate is enforced at every run path (queue creation, worker poll, dev sync,
  pushdown), not just catalog-hide.
…e test

Follow-ups making the postgres slice verifiable + hardened.

SSRF/egress guard (db-egress-guard.ts):
- Ports the gateway's IP classifier (IPv4-mapped/NAT64/zone-id normalization,
  fail-closed on malformed literals) into the bundled connector. Two policies:
  allow-private (self-hosted/default — allows loopback/RFC1918/CGNAT/ULA, still
  blocks metadata/link-local/multicast/unspecified) and block-private (cloud —
  blocks every non-public address). assertHostAllowed resolves a hostname and
  rejects if ANY address is blocked (multi-record rebind). 81 unit tests.
- postgres.ts guards both sync() and query() before opening the socket; policy
  read from ctx.config.LOBU_DB_EGRESS_POLICY (default allow-private, so loopback
  test/self-hosted paths are unchanged). Cloud mode injects block-private via
  connector-pushdown (query) and connector-worker env (sync). The cloud gate
  stays ON — flipping it (plus DNS-rebind pin + force-TLS) is the go-live step.

Tests:
- pollWorkerJob execution-gate: a pending postgres run is FAILED under
  LOBU_CLOUD_MODE when a worker claims it (and claimed as running when off).
- dogfood E2E (test DB): signup-join sync → mapped events + keyset checkpoint;
  connection-backed derived entity create → get_type returns backing_source →
  read LIVE via query_sql({ connection }) → real funnel counts.
- connector egress: block-private rejects the loopback host on sync() and query().
The IP classifier + allow-private/block-private policies + assertHostAllowed are
now in db-egress-guard.ts and enforced on sync()/query(). Pin/force-TLS/allowlist
remain before flipping the cloud gate.
Adversarial review (12 confirmed) + pi found real bugs in the new egress guard:

- HIGH: a multi-host failover URL (host1,host2) was DNS-resolved as one literal —
  rejecting legitimate self-hosted failover URLs, and letting public,169.254.x
  bypass block-private. extractDbHosts now parses the authority like postgres.js
  (multi-host, IPv6 brackets, password-with-@) and validates EVERY host;
  allow-private skips a hostname's DNS resolve (no new failure mode for a
  self-hosted hostname DB), block-private resolves + rejects each.
- IPv4-compatible IPv6 (::7f00:1 = 127.0.0.1) wasn't unwrapped → block-private
  bypass. normalizeIpLiteral now unwraps ::a.b.c.d (leaving ::/::1).
- env.ts treated LOBU_CLOUD_MODE="0" as cloud (truthy string). Now matches
  isCloudMode (1/true/yes).
- The egress policy was injected BEFORE caller config in the pushdown path (could
  be overridden) and not injected at all on the dev-CLI sync path. Now injected
  LAST (authoritative) on both pushdown and feed-sync.
- manage_entity_schema: reject an empty backing.connection (symmetry with sql).

Host parsing + per-host validation moved into db-egress-guard
(assertConnectionStringAllowed / extractDbHosts) — fully unit-tested (95 cases),
postgres.ts just reads the policy and delegates.

Tests: dogfood now drives sync→worker-stream→events with origin_id dedup
(current_event_records stays 3), closing the ingestion coverage gap.
…oss-feed supersede)

pi blocker: the connector emitted origin_id `${feedKey}:${pk}`, but every postgres
feed shares feedKey 'query'. Two query feeds on one connection with overlapping
primary keys (exactly the dogfood: signups + activity) collided — ingestion
dedupes by (org, connection_id, origin_id), so one feed superseded the other's
events. Data loss.

Thread feedId (the feeds.id, unique per feed instance) through SyncContext →
ExecutorJob → child-runner, populated by both the dev path (feed-sync) and the
prod worker (daemon/executor, from the poll response which already carries
feed_id). The connector namespaces origin_id by feedId when present, falling back
to feedKey only for direct/programmatic sync calls. postgres is brand-new so
there's no existing synced data to migrate.

Test: two syncs with the same primary key but distinct feedId get distinct
origin_ids (101:1 vs 202:1).
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Jun 4, 2026

Review Change Stack

Warning

Review limit reached

@buremba, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 12 minutes and 12 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 36fa78c3-dade-4519-b809-b7290a0a7d32

📥 Commits

Reviewing files that changed from the base of the PR and between 1b52a21 and db46e94.

📒 Files selected for processing (1)
  • packages/owletto
📝 Walkthrough

Walkthrough

This PR introduces PostgreSQL as a live data source with incremental sync and real-time query capabilities, protected by SSRF defenses and cloud-mode gating. The implementation spans SDK types, worker protocol extensions, a production-ready connector with strict SQL validation, server-side execution, and schema support for external-backed derived entities.

Changes

PostgreSQL Connector Implementation

Layer / File(s) Summary
Connector SDK query contract
packages/connector-sdk/src/{connector-runtime,connector-types,define-connector,index,types}.ts
New QueryContext and QueryResult types define a contract for live, non-persisting reads. SyncContext gains optional feedId for namespacing origin_ids across concurrent feed instances. ConnectorRuntime.query() method added with default error throw; defineConnector supports optional query handler on ConnectorSpec.
Connector worker protocol for queries
packages/connector-worker/src/executor/{interface,child-runner}.ts, daemon/executor.ts
ExecutorJob and ExecutorResult discriminated unions extended: sync jobs gain optional feedId; new query job mode accepts query, config, credentials, sessionState, limit/offset/sort. Worker runner delegates to instance.query() and returns rows/columns/total. Sync payload updated to use feedId from poll response.
Database SSRF/egress guard
packages/connectors/src/{db-egress-guard.ts,__tests__/db-egress-guard.test.ts}
Validates DB connector hosts and resolved DNS addresses against two policies: block-private (cloud) restricts internal IPs/hostnames; allow-private (self-hosted) allows RFC1918. Includes IPv6 zone-id stripping, IPv4-mapped unwrapping, NAT64 conversion, fail-closed IP-literal parsing, and DNS resolution with injectable resolvers. 224-line test suite covers IP categories, policy outcomes, connection-string parsing, and multi-host failover.
PostgreSQL connector implementation
packages/connectors/src/{postgres.ts,index.ts}, packages/connectors/package.json
New PostgresConnector class with sync() and query() methods. Validates SQL to be read-only SELECT (or WITH) with no embedded semicolons/bind params/top-level LIMIT/OFFSET via AST parsing (node-sql-parser) plus token-based fallback. Both modes execute in SET TRANSACTION READ ONLY with statement timeout. sync() implements keyset pagination on (cursor_column, primary_key), probes column types, casts checkpoints, and namespaces origin_id by feedId or feedKey. query() applies limit/offset bounds, optional ORDER BY, wraps SQL as subquery for pagination and total-count computation, returns rows + OID-mapped column types + total. Egress guard validates DATABASE_URL before connecting.
Server-side connector pushdown
packages/server/src/lib/connector-pushdown.ts
New runConnectorQuery function executes read-only queries against connections via worker runtime. Resolves connection by org+slug with admin/non-admin visibility rules, blocks execution in cloud mode, loads compiled connector code, resolves credentials/session state, calls executor with query mode, and injects egress policy (block-private in cloud; allow-private otherwise).

Cloud Mode Gating

Layer / File(s) Summary
Cloud mode detection and enforcement
packages/connector-worker/src/env.ts, packages/server/src/utils/{connector-cloud-gate.ts,__tests__/connector-cloud-gate.test.ts}
New cloudModeOn() helper parses LOBU_CLOUD_MODE env (only 1/true/yes after trim). Worker env builder sets LOBU_DB_EGRESS_POLICY to block-private in cloud mode, allow-private otherwise. Cloud gate module defines CLOUD_RESTRICTED_CONNECTOR_KEYS set containing postgres and exports assertConnectorAllowedInCloud(connectorKey) throwing error when restricted connector used in cloud mode.
Cloud gates at connector lifecycle points
packages/server/src/{tools/admin/manage_connections.ts,utils/queue-helpers.ts,lib/feed-sync.ts,worker-api.ts}, packages/server/src/utils/connector-catalog.ts
Connection creation blocks postgres setup in cloud mode. Sync run creation returns null without queueing when connector is restricted in cloud. Worker polling claims pending run but marks it failed (not running) with "Lobu Cloud" error message. Feed sync executor enforces cloud gate. Catalog listing hides cloud-restricted connector definitions when in cloud mode.

Derived Entity External Backing

Layer / File(s) Summary
CLI config and diff support
packages/cli/src/{config/define.ts,commands/_lib/apply/{client,desired-state,map-config}.ts,__tests__/diff-idempotency.test.ts}
EntityBacking type gains optional connection?: string field. CLI client hoists backing_source from server schema into backing.connection. Upsert forwards backing.connection in payload. Map-config conditionally includes connection in backing. Diff idempotency tests verify noop when connection matches and update when connection is added.
Server schema persistence and management
db/migrations/20260601120000_entity_types_backing_source.sql, packages/server/src/tools/admin/manage_entity_schema.ts
Migration adds optional backing_source text column to entity_types (idempotent up/down). Entity schema management tool extends backing input to accept connection slug, validates non-whitespace, and persists/reads both backing_sql and backing_source with conditional CASE logic on update. Result rows include backing_source?: string | null.
Query SQL tool connection pushdown
packages/server/src/tools/admin/query_sql.ts
QuerySqlSchema adds optional connection parameter. When provided, tool bypasses internal SQL execution and calls runConnectorQuery instead. Rejects search_term in external mode. Coerces/clamps pagination bounds. Maps sort parameters. Converts connector errors into result errors. Centralizes page-bounds validation into coercePageBounds helper.

Documentation and Examples

Layer / File(s) Summary
Database connectors overview
docs/database-connectors.md
Documents Postgres connector architecture: connector-owned DB connection for indexing/live reads, query_sql({ connection }) entrypoint, derived entity backing options, and SSRF/egress trust model (self-hosted vs multi-tenant cloud, egress-guard policy, explicit "remaining items" before cloud enablement). Includes entitlement boundary (plan-tier gating) and forward-compat notes for Snowflake/BigQuery.
Lobu CRM dogfood configuration
examples/lobu-crm/lobu.config.ts
Adds lobu_dbAuth postgres auth profile (env-based LOBU_PROD_READONLY_URL), lobu_dbConn connection to production DB, two query feeds (New Signups, Org Activity), and funnel_by_org derived entity backed by live SQL computing per-org signup counts directly from production tables. Updated default config includes new auth profile, connection, and entity.
Integration tests
packages/server/src/__tests__/integration/connectors/{postgres-connector,postgres-dogfood-e2e,postgres-sync-cloud-gate,query-sql-pushdown}.test.ts
Comprehensive test coverage: unit tests validate keyset pagination, feedId namespacing, SQL validation, egress policies, read-only enforcement; dogfood E2E tests verify sync→event-stream ingestion with deduplication and derived-entity query execution; cloud-gate tests confirm queue-time and execution-time gating; pushdown tests validate pagination, sort, authorization, and cloud rejection.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • lobu-ai/lobu#1161: Foundational PR introducing derived SQL-view backing fields and CLI/server schema wiring that this PR extends with external backing.connection support.
  • lobu-ai/lobu#931: Defines the V1 connector-worker executor IPC contract that this PR extends with query-mode jobs and feedId.
  • lobu-ai/lobu#1013: Modifies connector catalog listing logic that this PR further extends with cloud-mode gating.

🐰 Postgres now whispers secrets from afar,
Live reads from afar, no tables to jar,
Cloud keeps the danger locked tight in a gate,
While egress guards vet each host by its fate.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 51.16% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the three main changes: a native Postgres connector, memory feeds, live pushdown, and connection-backed derived entities—matching the PR objectives.
Description check ✅ Passed The PR description is comprehensive and well-structured, covering the summary, test plan, and notes sections. It includes detailed implementation details, security measures, testing coverage, and follow-ups.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/postgres-connector

@codecov-commenter
Copy link
Copy Markdown

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/server/src/lib/feed-sync.ts (1)

121-133: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't pass the gateway process environment into Postgres sync runs.

runConnectorQuery in this PR deliberately uses env: {} so a missing DATABASE_URL fails closed, but runFeed still hands the connector the full process.env. For the new Postgres connector that means an empty/miswired env-key profile can silently fall back to Lobu's own runtime database instead of the connection's database.

Suggested fix
       config: {
         ...mergeExecutionConfig(feed.connection_config, connectionCredentials, feed.config),
         // Authoritative egress policy (injected last): a DB connector rejects
         // internal/metadata hosts under cloud mode, reaches its own private DB
         // self-hosted. process.env is passed below but doesn't carry this key.
         LOBU_DB_EGRESS_POLICY: isCloudMode() ? 'block-private' : 'allow-private',
       },
       checkpoint: feed.checkpoint,
-      env: process.env as Record<string, string | undefined>,
+      env: {},
       sessionState,
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/server/src/lib/feed-sync.ts` around lines 121 - 133, The connector
is being given the full gateway process.env in the executeCompiledConnector call
which allows a connector (e.g., the new Postgres connector) to accidentally use
the gateway DATABASE_URL; change the env argument passed to
executeCompiledConnector in feed-sync.ts (the call within runFeed that invokes
executeCompiledConnector with job and env) to a restrictive environment (e.g.,
an empty object {}) so the connector does not inherit the gateway process.env
and a missing DATABASE_URL will fail closed; ensure you only pass explicitly
allowed vars if needed rather than process.env.
🧹 Nitpick comments (2)
packages/server/src/__tests__/integration/connectors/postgres-dogfood-e2e.test.ts (1)

172-188: ⚡ Quick win

Use named interfaces instead of inline object-shape type literals.

There are several inline shape assertions for DB rows and getType output. Please extract these to local interfaces and reuse them for consistency and guideline compliance.

Suggested refactor
+interface IdRow {
+  id: number;
+}
+
+interface OriginIdRow {
+  origin_id: string;
+}
+
+interface ConnectorKeyRow {
+  connector_key: string;
+}
+
+interface CountRow {
+  n: number;
+}
+
+interface EntityTypeGetResponse {
+  entity_type?: { backing_sql?: string | null; backing_source?: string | null };
+}
...
-    const connId = Number((connRow as { id: number }).id);
+    const connId = Number((connRow as IdRow).id);
...
-    const feedId = Number((feed as { id: number }).id);
+    const feedId = Number((feed as IdRow).id);
...
-    const runId = Number((run as { id: number }).id);
+    const runId = Number((run as IdRow).id);
...
-    expect(events.map((e) => (e as { origin_id: string }).origin_id)).toEqual([
+    expect(events.map((e) => (e as OriginIdRow).origin_id)).toEqual([
...
-    expect(events.every((e) => (e as { connector_key: string }).connector_key === 'postgres')).toBe(
+    expect(events.every((e) => (e as ConnectorKeyRow).connector_key === 'postgres')).toBe(
       true
     );
...
-    expect(Number((current as { n: number }).n)).toBe(3);
+    expect(Number((current as CountRow).n)).toBe(3);
...
-    const got = (await owner.entity_schema.getType('dog-funnel')) as {
-      entity_type?: { backing_sql?: string | null; backing_source?: string | null };
-    };
+    const got = (await owner.entity_schema.getType('dog-funnel')) as EntityTypeGetResponse;

As per coding guidelines: **/*.{ts,tsx}: Use interface for defining object shapes in TypeScript files.

Also applies to: 200-220, 232-234

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@packages/server/src/__tests__/integration/connectors/postgres-dogfood-e2e.test.ts`
around lines 172 - 188, Replace the inline object-shape type assertions used
when reading DB rows with named interfaces: define local interfaces (e.g.,
ConnectionRow { id: number }, FeedRow { id: number }, RunRow { id: number }) and
use them in the casts for connRow, feed, and run (and similarly for the other
occurrences around lines 200-220 and 232-234) so that connId, feedId, and runId
are derived from typed objects rather than inline type literals; update the db
query results to use these interfaces wherever getType-like or row shape
assertions occur.
packages/connectors/src/postgres.ts (1)

200-252: 💤 Low value

String literal scanning doesn't handle escaped quotes — may cause false positives.

The hasTopLevelLimitOrOffset function doesn't handle doubled quote escapes ('' in strings, "" in identifiers). A string like 'it''s a limit' would cause premature exit from string scanning, potentially flagging a LIMIT keyword inside a string literal.

This is security-safe (false positives, not false negatives) — the read-only transaction is the hard seal — but may reject valid queries containing the word "limit" in string literals with escaped quotes.

♻️ Optional fix for escaped quote handling
     if (c === "'") {
       i++;
-      while (i < n && sql[i] !== "'") i++;
+      while (i < n) {
+        if (sql[i] === "'" && sql[i + 1] === "'") { i += 2; continue; }
+        if (sql[i] === "'") break;
+        i++;
+      }
       i++;
       continue;
     }
     if (c === '"') {
       i++;
-      while (i < n && sql[i] !== '"') i++;
+      while (i < n) {
+        if (sql[i] === '"' && sql[i + 1] === '"') { i += 2; continue; }
+        if (sql[i] === '"') break;
+        i++;
+      }
       i++;
       continue;
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/connectors/src/postgres.ts` around lines 200 - 252, The function
hasTopLevelLimitOrOffset currently stops string/identifier scanning on the first
matching quote and doesn't handle escaped/doubled quotes, so adjust the
quote-handling branches for "'" and '"' to treat doubled quotes as escapes:
inside the loop that advances past the string/identifier, when you see the same
quote character check if the next character is the same quote (e.g., sql[i] ===
quote && sql[i+1] === quote) and if so consume both and continue; only break the
loop when you hit an un-doubled closing quote or the end of input. Ensure the
logic uses the same quote variable for both single and double cases, guards
against out-of-bounds access, and preserves existing depth/position updates in
hasTopLevelLimitOrOffset.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@examples/lobu-crm/lobu.config.ts`:
- Around line 414-421: The feed currently uses primary_key: "id" (u.id) but the
SELECT can emit multiple rows per user; modify the query to emit a row-unique
composite key (e.g., CONCAT or cast-and-concat of u.id and o.slug) and then
point primary_key at that new column name; specifically, update the query to
SELECT u.id AS user_id, o.slug AS org, (u.id::text || '::' || o.slug) AS row_pk
(or use concat(u.id, '::', o.slug) AS row_pk) and change primary_key: "id" to
primary_key: "row_pk" so each emitted row is uniquely identified while leaving
mapping (mapping: { title: "email", occurred_at: "created_at" }) unchanged.

In `@packages/connector-sdk/src/types.ts`:
- Around line 24-27: Change the loose string typing of LOBU_DB_EGRESS_POLICY to
a constrained union of known literals (e.g., 'block-private' | 'allow-private')
and update the connector-side handling so unknown or invalid values do not map
silently to allow-private: modify the exported type for LOBU_DB_EGRESS_POLICY in
types.ts to the literal union and then adjust readEgressPolicy(value: unknown)
to validate against those literals and fail-closed (either throw or return
'block-private' as the safe default) instead of defaulting to 'allow-private'.

In
`@packages/server/src/__tests__/integration/connectors/postgres-sync-cloud-gate.test.ts`:
- Around line 67-69: Tests currently hard-reset process.env.LOBU_CLOUD_MODE in
afterEach, which can leak global state; capture the original value in a
beforeEach (e.g., const originalLobuCloudMode = process.env.LOBU_CLOUD_MODE) and
in each afterEach restore it by setting process.env.LOBU_CLOUD_MODE =
originalLobuCloudMode (or delete process.env.LOBU_CLOUD_MODE when original was
undefined) so the test restores the pre-test environment; apply this change to
the afterEach hooks referenced in the file (and add corresponding beforeEach
where missing).

In
`@packages/server/src/__tests__/integration/connectors/query-sql-pushdown.test.ts`:
- Around line 163-169: The test sets process.env.LOBU_CLOUD_MODE but restores it
incorrectly by assigning undefined (which becomes the string "undefined");
update the test around the querySql call to save the original value (e.g., const
prev = process.env.LOBU_CLOUD_MODE) before setting it, and in the finally block
restore it with process.env.LOBU_CLOUD_MODE = prev ?? undefined ? prev : delete
process.env.LOBU_CLOUD_MODE (i.e., delete the env var if it was originally
unset, otherwise restore the original string). Ensure references to
process.env.LOBU_CLOUD_MODE and the querySql test remain and only the
setup/teardown is changed.

In `@packages/server/src/lib/connector-pushdown.ts`:
- Around line 43-50: The query path currently never loads the stored connection
config/credentials and spreads p.config after connectionCredentials, allowing
callers to override secrets and ignoring non-secret saved config; fix by loading
the connection row's config and credentials (use the fetched connRows entry's
config and connection_credentials), merge them so stored credentials/config take
precedence over incoming p.config (i.e. apply saved connection config/creds
first, then merge non-sensitive caller overrides only for allowed fields), and
ensure the code paths around connRows handling and the logic that constructs the
final connection config/credentials (the variables/objects named
connectionCredentials, connection.config, p.config and the connRows result) are
updated accordingly; apply the same change to the analogous block covering lines
76-104.

In `@packages/server/src/tools/admin/manage_connections.ts`:
- Around line 871-878: The cloud-mode connector restriction is only applied for
action 'create' but not for the 'connect' path; update the handleConnect flow in
manage_connections.ts to run the same
assertConnectorAllowedInCloud(args.connector_key) gate before creating
connections. Specifically, invoke assertConnectorAllowedInCloud with
args.connector_key inside the handleConnect (or the branch that handles action
'connect') and mirror the existing try/catch behavior (returning { error: ... }
on failure) so the 'connect' path cannot bypass the cloud-mode restriction.

In `@packages/server/src/worker-api.ts`:
- Around line 728-744: The catch block that handles
assertConnectorAllowedInCloud failure updates the run to failed but omits feed
bookkeeping; update this branch to perform the same feed completion logic as
completeWorkerJob would (so last_sync_status, last_error, next_run_at, failure
counters, etc.) before returning. Specifically, inside the catch for
assertConnectorAllowedInCloud (where row.run_id and row.connector_key are
available), call the same helper used for feed bookkeeping (or invoke
completeWorkerJob/its internal routine) with the run id/feed id, status
'failed', and the computed error message so the feed's last_sync_status,
last_error and scheduling/failure counters are updated and the feed is not left
perpetually due. Ensure you still log the warning and return the same JSON
response after performing the bookkeeping.

---

Outside diff comments:
In `@packages/server/src/lib/feed-sync.ts`:
- Around line 121-133: The connector is being given the full gateway process.env
in the executeCompiledConnector call which allows a connector (e.g., the new
Postgres connector) to accidentally use the gateway DATABASE_URL; change the env
argument passed to executeCompiledConnector in feed-sync.ts (the call within
runFeed that invokes executeCompiledConnector with job and env) to a restrictive
environment (e.g., an empty object {}) so the connector does not inherit the
gateway process.env and a missing DATABASE_URL will fail closed; ensure you only
pass explicitly allowed vars if needed rather than process.env.

---

Nitpick comments:
In `@packages/connectors/src/postgres.ts`:
- Around line 200-252: The function hasTopLevelLimitOrOffset currently stops
string/identifier scanning on the first matching quote and doesn't handle
escaped/doubled quotes, so adjust the quote-handling branches for "'" and '"' to
treat doubled quotes as escapes: inside the loop that advances past the
string/identifier, when you see the same quote character check if the next
character is the same quote (e.g., sql[i] === quote && sql[i+1] === quote) and
if so consume both and continue; only break the loop when you hit an un-doubled
closing quote or the end of input. Ensure the logic uses the same quote variable
for both single and double cases, guards against out-of-bounds access, and
preserves existing depth/position updates in hasTopLevelLimitOrOffset.

In
`@packages/server/src/__tests__/integration/connectors/postgres-dogfood-e2e.test.ts`:
- Around line 172-188: Replace the inline object-shape type assertions used when
reading DB rows with named interfaces: define local interfaces (e.g.,
ConnectionRow { id: number }, FeedRow { id: number }, RunRow { id: number }) and
use them in the casts for connRow, feed, and run (and similarly for the other
occurrences around lines 200-220 and 232-234) so that connId, feedId, and runId
are derived from typed objects rather than inline type literals; update the db
query results to use these interfaces wherever getType-like or row shape
assertions occur.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 0a2d39e8-0f20-42d3-8042-9f44f1333a07

📥 Commits

Reviewing files that changed from the base of the PR and between 7ca3e90 and 1b52a21.

⛔ Files ignored due to path filters (1)
  • bun.lock is excluded by !**/*.lock
📒 Files selected for processing (36)
  • db/migrations/20260601120000_entity_types_backing_source.sql
  • docs/database-connectors.md
  • examples/lobu-crm/lobu.config.ts
  • packages/cli/src/commands/_lib/apply/__tests__/diff-idempotency.test.ts
  • packages/cli/src/commands/_lib/apply/client.ts
  • packages/cli/src/commands/_lib/apply/desired-state.ts
  • packages/cli/src/commands/_lib/apply/map-config.ts
  • packages/cli/src/config/define.ts
  • packages/connector-sdk/src/connector-runtime.ts
  • packages/connector-sdk/src/connector-types.ts
  • packages/connector-sdk/src/define-connector.ts
  • packages/connector-sdk/src/index.ts
  • packages/connector-sdk/src/types.ts
  • packages/connector-worker/src/daemon/executor.ts
  • packages/connector-worker/src/env.ts
  • packages/connector-worker/src/executor/child-runner.ts
  • packages/connector-worker/src/executor/interface.ts
  • packages/connectors/package.json
  • packages/connectors/src/__tests__/db-egress-guard.test.ts
  • packages/connectors/src/db-egress-guard.ts
  • packages/connectors/src/index.ts
  • packages/connectors/src/postgres.ts
  • packages/server/src/__tests__/integration/connectors/postgres-connector.test.ts
  • packages/server/src/__tests__/integration/connectors/postgres-dogfood-e2e.test.ts
  • packages/server/src/__tests__/integration/connectors/postgres-sync-cloud-gate.test.ts
  • packages/server/src/__tests__/integration/connectors/query-sql-pushdown.test.ts
  • packages/server/src/lib/connector-pushdown.ts
  • packages/server/src/lib/feed-sync.ts
  • packages/server/src/tools/admin/manage_connections.ts
  • packages/server/src/tools/admin/manage_entity_schema.ts
  • packages/server/src/tools/admin/query_sql.ts
  • packages/server/src/utils/__tests__/connector-cloud-gate.test.ts
  • packages/server/src/utils/connector-catalog.ts
  • packages/server/src/utils/connector-cloud-gate.ts
  • packages/server/src/utils/queue-helpers.ts
  • packages/server/src/worker-api.ts

Comment on lines +414 to +421
primary_key: "id",
cursor_column: "created_at",
// Base SELECT only — the connector adds the keyset WHERE / ORDER BY / LIMIT.
query: `SELECT u.id, u.email, u.name, u."createdAt" AS created_at, o.slug AS org
FROM "user" u
JOIN member m ON m."userId" = u.id
JOIN organization o ON o.id = m."organizationId"`,
mapping: { title: "email", occurred_at: "created_at" },
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Make the signup feed primary key unique per emitted row.

primary_key: "id" points to u.id, but this query can emit multiple rows per user (one per org). That can cause cursor collisions and skipped/overwritten events across pages. Use a row-unique key for this feed (e.g., user_id + org).

Proposed fix
       config: {
-        primary_key: "id",
+        primary_key: "signup_row_id",
         cursor_column: "created_at",
         // Base SELECT only — the connector adds the keyset WHERE / ORDER BY / LIMIT.
-        query: `SELECT u.id, u.email, u.name, u."createdAt" AS created_at, o.slug AS org
+        query: `SELECT concat(u.id::text, ':', o.slug) AS signup_row_id,
+                       u.id AS user_id,
+                       u.email,
+                       u.name,
+                       u."createdAt" AS created_at,
+                       o.slug AS org
                 FROM "user" u
                 JOIN member m ON m."userId" = u.id
                 JOIN organization o ON o.id = m."organizationId"`,
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/lobu-crm/lobu.config.ts` around lines 414 - 421, The feed currently
uses primary_key: "id" (u.id) but the SELECT can emit multiple rows per user;
modify the query to emit a row-unique composite key (e.g., CONCAT or
cast-and-concat of u.id and o.slug) and then point primary_key at that new
column name; specifically, update the query to SELECT u.id AS user_id, o.slug AS
org, (u.id::text || '::' || o.slug) AS row_pk (or use concat(u.id, '::', o.slug)
AS row_pk) and change primary_key: "id" to primary_key: "row_pk" so each emitted
row is uniquely identified while leaving mapping (mapping: { title: "email",
occurred_at: "created_at" }) unchanged.

Comment on lines +24 to +27
// DB-connector egress policy delivered into the connector subprocess.
// "block-private" (injected under cloud mode) makes a DB connector reject
// internal/metadata hosts; anything else ⇒ trusted "allow-private".
LOBU_DB_EGRESS_POLICY?: string;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== Policy references =="
rg -n -C3 'LOBU_DB_EGRESS_POLICY|block-private|allow-private|db[-_]egress' --type ts

echo
echo "== Egress guard implementation =="
fd -i 'db-egress-guard.ts' -x sh -c 'echo "---- {} ----"; sed -n "1,260p" "{}"'

echo
echo "== Env forwarding into worker/connector jobs =="
rg -n -C3 'LOBU_DB_EGRESS_POLICY|env:' packages/server/src packages/connector-worker/src packages/connectors/src --type ts

Repository: lobu-ai/lobu

Length of output: 50369


Harden LOBU_DB_EGRESS_POLICY parsing to avoid fail-open to allow-private.

LOBU_DB_EGRESS_POLICY is typed as string in packages/connector-sdk/src/types.ts, but the connector’s readEgressPolicy(value: unknown) in packages/connectors/src/db-egress-guard.ts maps anything other than the exact 'block-private' to 'allow-private'. A typo/mis-forwarded value can therefore silently disable private-host blocking in cloud mode. Constrain the type to known literals and make unknown values fail closed (e.g., throw or default to block-private).

🔧 Proposed type hardening
-  LOBU_DB_EGRESS_POLICY?: string;
+  LOBU_DB_EGRESS_POLICY?: 'block-private' | 'allow-private';
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// DB-connector egress policy delivered into the connector subprocess.
// "block-private" (injected under cloud mode) makes a DB connector reject
// internal/metadata hosts; anything else ⇒ trusted "allow-private".
LOBU_DB_EGRESS_POLICY?: string;
// DB-connector egress policy delivered into the connector subprocess.
// "block-private" (injected under cloud mode) makes a DB connector reject
// internal/metadata hosts; anything else ⇒ trusted "allow-private".
LOBU_DB_EGRESS_POLICY?: 'block-private' | 'allow-private';
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/connector-sdk/src/types.ts` around lines 24 - 27, Change the loose
string typing of LOBU_DB_EGRESS_POLICY to a constrained union of known literals
(e.g., 'block-private' | 'allow-private') and update the connector-side handling
so unknown or invalid values do not map silently to allow-private: modify the
exported type for LOBU_DB_EGRESS_POLICY in types.ts to the literal union and
then adjust readEgressPolicy(value: unknown) to validate against those literals
and fail-closed (either throw or return 'block-private' as the safe default)
instead of defaulting to 'allow-private'.

Comment on lines +67 to +69
afterEach(() => {
process.env.LOBU_CLOUD_MODE = undefined;
});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Restore LOBU_CLOUD_MODE to its pre-test value instead of hard-resetting it.

These hooks/tests overwrite global env state and can leak into other suites when the process starts with a non-default LOBU_CLOUD_MODE. Capture the prior value and restore it in afterEach.

💡 Suggested patch
 import { afterEach, beforeEach, describe, expect, it } from 'vitest';
@@
 const CONNECTOR_VERSION = '1.0.0';
+let previousCloudMode: string | undefined;
@@
 describe('createSyncRun cloud gate (postgres) — queue-time', () => {
   beforeEach(async () => {
+    previousCloudMode = process.env.LOBU_CLOUD_MODE;
     await cleanupTestDatabase();
   });
   afterEach(() => {
-    process.env.LOBU_CLOUD_MODE = undefined;
+    if (previousCloudMode === undefined) delete process.env.LOBU_CLOUD_MODE;
+    else process.env.LOBU_CLOUD_MODE = previousCloudMode;
   });
@@
-    process.env.LOBU_CLOUD_MODE = undefined;
+    delete process.env.LOBU_CLOUD_MODE;
@@
 describe('pollWorkerJob cloud gate (postgres) — execution-time', () => {
   beforeEach(async () => {
+    previousCloudMode = process.env.LOBU_CLOUD_MODE;
     await cleanupTestDatabase();
   });
   afterEach(() => {
-    process.env.LOBU_CLOUD_MODE = undefined;
+    if (previousCloudMode === undefined) delete process.env.LOBU_CLOUD_MODE;
+    else process.env.LOBU_CLOUD_MODE = previousCloudMode;
   });
@@
     } finally {
-      process.env.LOBU_CLOUD_MODE = undefined;
+      if (previousCloudMode === undefined) delete process.env.LOBU_CLOUD_MODE;
+      else process.env.LOBU_CLOUD_MODE = previousCloudMode;
     }
@@
-    process.env.LOBU_CLOUD_MODE = undefined;
+    delete process.env.LOBU_CLOUD_MODE;

Also applies to: 90-90, 104-105, 128-129, 144-144

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@packages/server/src/__tests__/integration/connectors/postgres-sync-cloud-gate.test.ts`
around lines 67 - 69, Tests currently hard-reset process.env.LOBU_CLOUD_MODE in
afterEach, which can leak global state; capture the original value in a
beforeEach (e.g., const originalLobuCloudMode = process.env.LOBU_CLOUD_MODE) and
in each afterEach restore it by setting process.env.LOBU_CLOUD_MODE =
originalLobuCloudMode (or delete process.env.LOBU_CLOUD_MODE when original was
undefined) so the test restores the pre-test environment; apply this change to
the afterEach hooks referenced in the file (and add corresponding beforeEach
where missing).

Comment on lines +163 to +169
process.env.LOBU_CLOUD_MODE = '1';
try {
const res = await querySql({ sql: 'SELECT 1', connection: 'qsp-ext-db' }, {}, ctx);
expect(res.error).toMatch(/Lobu Cloud/i);
} finally {
process.env.LOBU_CLOUD_MODE = undefined;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In Node.js (especially Node 22+), does assigning process.env.MY_VAR = undefineddelete the variable, or set it to the string"undefined"?

💡 Result:

In Node.js, assigning undefined to a property on process.env does not delete the variable; instead, it converts the value to the string "undefined" [1][2][3]. This occurs because Node.js implicitly converts all values assigned to process.env entries into strings [3][4]. Consequently, assigning undefined results in the environment variable being set to the literal string value "undefined" [1][5][6]. To remove an environment variable from process.env, you must use the delete operator [2][3][4]: delete process.env.MY_VAR; Using delete correctly removes the property from the process.env object, which is distinct from setting it to the string "undefined" or even an empty string [5][6]. [1][2][3][4]

Citations:


Fix process.env cleanup to avoid leaking "undefined" into later tests

Node.js coerces process.env assignments to strings; setting process.env.LOBU_CLOUD_MODE = undefined leaves the literal "undefined" value instead of removing the variable. Restore the prior value or delete when it was originally unset.

Suggested fix
   it('refuses pushdown for an existing connection under LOBU_CLOUD_MODE', async () =&gt; {
-    process.env.LOBU_CLOUD_MODE = '1';
+    const prevCloudMode = process.env.LOBU_CLOUD_MODE;
+    process.env.LOBU_CLOUD_MODE = '1';
     try {
       const res = await querySql({ sql: 'SELECT 1', connection: 'qsp-ext-db' }, {}, ctx);
       expect(res.error).toMatch(/Lobu Cloud/i);
     } finally {
-      process.env.LOBU_CLOUD_MODE = undefined;
+      if (prevCloudMode === undefined) {
+        delete process.env.LOBU_CLOUD_MODE;
+      } else {
+        process.env.LOBU_CLOUD_MODE = prevCloudMode;
+      }
     }
   }, 60_000);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
process.env.LOBU_CLOUD_MODE = '1';
try {
const res = await querySql({ sql: 'SELECT 1', connection: 'qsp-ext-db' }, {}, ctx);
expect(res.error).toMatch(/Lobu Cloud/i);
} finally {
process.env.LOBU_CLOUD_MODE = undefined;
}
const prevCloudMode = process.env.LOBU_CLOUD_MODE;
process.env.LOBU_CLOUD_MODE = '1';
try {
const res = await querySql({ sql: 'SELECT 1', connection: 'qsp-ext-db' }, {}, ctx);
expect(res.error).toMatch(/Lobu Cloud/i);
} finally {
if (prevCloudMode === undefined) {
delete process.env.LOBU_CLOUD_MODE;
} else {
process.env.LOBU_CLOUD_MODE = prevCloudMode;
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@packages/server/src/__tests__/integration/connectors/query-sql-pushdown.test.ts`
around lines 163 - 169, The test sets process.env.LOBU_CLOUD_MODE but restores
it incorrectly by assigning undefined (which becomes the string "undefined");
update the test around the querySql call to save the original value (e.g., const
prev = process.env.LOBU_CLOUD_MODE) before setting it, and in the finally block
restore it with process.env.LOBU_CLOUD_MODE = prev ?? undefined ? prev : delete
process.env.LOBU_CLOUD_MODE (i.e., delete the env var if it was originally
unset, otherwise restore the original string). Ensure references to
process.env.LOBU_CLOUD_MODE and the querySql test remain and only the
setup/teardown is changed.

Comment on lines +43 to +50
const connRows = await sql`
SELECT id, connector_key, auth_profile_id, app_auth_profile_id
FROM connections
WHERE organization_id = ${p.organizationId}
AND slug = ${p.connectionSlug}
AND deleted_at IS NULL
AND status = 'active'
AND (${p.isAdmin} OR visibility = 'org' OR created_by = ${p.userId})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Bind query-mode execution to the stored connection config and credentials.

This path never loads connections.config, and p.config is spread after connectionCredentials. As written, a caller can override DATABASE_URL/auth keys and run against a different database than connectionSlug authorizes, while any non-secret config saved on the connection row is ignored entirely.

Suggested fix
-    SELECT id, connector_key, auth_profile_id, app_auth_profile_id
+    SELECT id, connector_key, auth_profile_id, app_auth_profile_id,
+           COALESCE(config, '{}'::jsonb) AS connection_config
     FROM connections
@@
   const conn = connRows[0] as {
     id: number;
     connector_key: string;
     auth_profile_id: number | null;
     app_auth_profile_id: number | null;
+    connection_config: Record<string, unknown>;
   };
@@
       config: {
-        ...connectionCredentials,
+        ...(conn.connection_config ?? {}),
         ...(p.config ?? {}),
+        ...connectionCredentials,
         LOBU_DB_EGRESS_POLICY: isCloudMode() ? 'block-private' : 'allow-private',
       },

Also applies to: 76-104

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/server/src/lib/connector-pushdown.ts` around lines 43 - 50, The
query path currently never loads the stored connection config/credentials and
spreads p.config after connectionCredentials, allowing callers to override
secrets and ignoring non-secret saved config; fix by loading the connection
row's config and credentials (use the fetched connRows entry's config and
connection_credentials), merge them so stored credentials/config take precedence
over incoming p.config (i.e. apply saved connection config/creds first, then
merge non-sensitive caller overrides only for allowed fields), and ensure the
code paths around connRows handling and the logic that constructs the final
connection config/credentials (the variables/objects named
connectionCredentials, connection.config, p.config and the connRows result) are
updated accordingly; apply the same change to the analogous block covering lines
76-104.

Comment on lines +871 to +878
// Cloud-mode hard gate (plan §G): raw-DB connectors aren't installable by
// untrusted multi-tenant cloud tenants until egress hardening lands. No-op
// self-hosted. (The catalog also hides them; this blocks a direct API call.)
try {
assertConnectorAllowedInCloud(args.connector_key);
} catch (err) {
return { error: err instanceof Error ? err.message : String(err) };
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Apply the same cloud gate to handleConnect.

This only blocks action: 'create'. action: 'connect' is still a connection-creation path and currently skips the new restriction entirely, so a direct manage_connections({ action: 'connect', connector_key: 'postgres', ... }) call can bypass the cloud-mode block.

Suggested fix
 async function handleConnect(
   args: Extract<ConnectionsArgs, { action: 'connect' }>,
   _env: Env,
   ctx: ToolContext
 ): Promise<ManageConnectionsResult> {
   const sql = getDb();
   const { organizationId, userId } = ctx;
+
+  try {
+    assertConnectorAllowedInCloud(args.connector_key);
+  } catch (err) {
+    return { error: err instanceof Error ? err.message : String(err) };
+  }
 
   const baseUrl = getPublicWebUrl(ctx.requestUrl, ctx.baseUrl);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Cloud-mode hard gate (plan §G): raw-DB connectors aren't installable by
// untrusted multi-tenant cloud tenants until egress hardening lands. No-op
// self-hosted. (The catalog also hides them; this blocks a direct API call.)
try {
assertConnectorAllowedInCloud(args.connector_key);
} catch (err) {
return { error: err instanceof Error ? err.message : String(err) };
}
async function handleConnect(
args: Extract<ConnectionsArgs, { action: 'connect' }>,
_env: Env,
ctx: ToolContext
): Promise<ManageConnectionsResult> {
const sql = getDb();
const { organizationId, userId } = ctx;
try {
assertConnectorAllowedInCloud(args.connector_key);
} catch (err) {
return { error: err instanceof Error ? err.message : String(err) };
}
const baseUrl = getPublicWebUrl(ctx.requestUrl, ctx.baseUrl);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/server/src/tools/admin/manage_connections.ts` around lines 871 -
878, The cloud-mode connector restriction is only applied for action 'create'
but not for the 'connect' path; update the handleConnect flow in
manage_connections.ts to run the same
assertConnectorAllowedInCloud(args.connector_key) gate before creating
connections. Specifically, invoke assertConnectorAllowedInCloud with
args.connector_key inside the handleConnect (or the branch that handles action
'connect') and mirror the existing try/catch behavior (returning { error: ... }
on failure) so the 'connect' path cannot bypass the cloud-mode restriction.

Comment on lines +728 to +744
if (row.connector_key) {
try {
assertConnectorAllowedInCloud(row.connector_key);
} catch (err) {
const message = errorMessage(err);
await sql`
UPDATE runs
SET status = 'failed',
completed_at = current_timestamp,
error_message = ${message}
WHERE id = ${row.run_id}
`;
logger.warn(
{ run_id: row.run_id, connector_key: row.connector_key },
'Blocked cloud-restricted connector run under LOBU_CLOUD_MODE'
);
return c.json({ next_poll_seconds: 1, skipped_run_id: row.run_id, error: message });
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Finalize the feed too when this cloud gate fails a claimed sync run.

This branch marks the run failed and returns, but it skips the feed bookkeeping that completeWorkerJob does (last_sync_status, last_error, next_run_at, failure counters). A blocked sync feed can stay perpetually due and get rematerialized over and over.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/server/src/worker-api.ts` around lines 728 - 744, The catch block
that handles assertConnectorAllowedInCloud failure updates the run to failed but
omits feed bookkeeping; update this branch to perform the same feed completion
logic as completeWorkerJob would (so last_sync_status, last_error, next_run_at,
failure counters, etc.) before returning. Specifically, inside the catch for
assertConnectorAllowedInCloud (where row.run_id and row.connector_key are
available), call the same helper used for feed bookkeeping (or invoke
completeWorkerJob/its internal routine) with the run id/feed id, status
'failed', and the computed error message so the feed's last_sync_status,
last_error and scheduling/failure counters are updated and the feed is not left
perpetually due. Ensure you still log the warning and return the same JSON
response after performing the bookkeeping.

The branch was cut when owletto main was at 62b3ca95; main has since advanced to
25e1a08c (owletto #254, #253). Pin the recorded pointer back to 25e1a08c so this
connector PR carries no owletto submodule change (neither a forward bump nor a
regression). The owletto UI change ships in lobu-ai/owletto#255; the lobu pointer
bump follows as a separate PR after that merges.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants