Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions db/migrations/20260601120000_entity_types_backing_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- migrate:up

-- External-backed derived entity types. A derived entity type (backing_sql IS NOT
-- NULL) normally runs its view over Lobu's own org-scoped tables. When
-- backing_source references a connection, the view instead executes LIVE against
-- that connection's single external database (read-only, no copy): the read goes
-- get_type → query_sql({ sql: backing_sql, connection: backing_source }) →
-- runConnectorQuery (connector-pushdown.ts), which runs the SQL in the
-- connection's connector. NULL ⇒ internal (today's behavior). backing_source is
-- only meaningful on a derived type (backing_sql IS NOT NULL).
--
-- Deliberately NO foreign key to connections: if the source connection is deleted
-- the read must FAIL ("source connection no longer exists") rather than silently
-- fall back to internal scoping (ON DELETE SET NULL — which would run external SQL
-- against internal tables) or block connection deletion (ON DELETE RESTRICT).
-- runConnectorQuery validates the connection exists, is in-org, and is visible to
-- the caller at read time.
--
-- Stored as the connection SLUG (text), not an id: the slug is what the config
-- diff compares (no churn), it survives a connection delete+recreate, and
-- runConnectorQuery resolves slug → connection → DATABASE_URL fresh at read time.
--
-- Idempotent: no-op on databases that already have the column.
ALTER TABLE public.entity_types ADD COLUMN IF NOT EXISTS backing_source text;

-- migrate:down

ALTER TABLE public.entity_types DROP COLUMN IF EXISTS backing_source;
94 changes: 94 additions & 0 deletions docs/database-connectors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Database connectors (Postgres) — design + gating

Bring an external database in as memory, and read it live (no copy) for derived
entities. V1 ships **Postgres**; Snowflake/BigQuery are additive (see end).

## The model: connectors push compute down; Lobu aggregates

The connector owns the DB connection — for *both* indexing and live reads. The
gateway never opens an external pool.

- **Memory feed (indexed)** — a `postgres` connection + a `query` feed runs a
read-only `SELECT` on a schedule, keyset-incremental, and emits one event per
row → embedded, searchable memory. (`packages/connectors/src/postgres.ts`)
- **Live read (no copy)** — the connector's `query()` runs SQL live against the
source and returns rows, persisting nothing. The platform reaches it through one
primitive: `runConnectorQuery` (`packages/server/src/lib/connector-pushdown.ts`),
which invokes the connector in the worker `query` run-mode (the same inline-run
path as `operations.execute`).
- **`query_sql({ connection })`** is the single door: with a `connection` slug it
pushes the SQL down via `runConnectorQuery` (internal org-scoping skipped — it's
the org's own DB); without, it runs the internal org-scoped path. There is no
separate `query_entity_type` tool.
- **Derived entity** — `defineEntityType({ backing: { sql, connection? } })`. With
`connection`, the read is `get_type → query_sql({ sql: backing_sql, connection })`
→ pushdown. Without, it's the shipped internal view over `events`/`entities`.

Single-database only: every query targets one database; no cross-source joins
(that's a later DuckDB-class engine).

Slice 2 (next): **virtual feeds** (a `virtual` feed flag → live reads, no events)
and **federated search** (a connector `search()` the platform fans out to and
merges with the vector index). Only the `query()` live-read primitive is in place
today; the `virtual` feed flag, `search()`, and the fan-out are the remaining work.

## SSRF / egress trust model

The DB socket lives in the **connector subprocess**, behind the worker egress
controls — not the gateway. The dogfood reaches Lobu's own private PG, so the HTTP
scrapers' block-all-private-IPs rule can't be reused.

- **Self-hosted / first-party:** `DATABASE_URL` is an operator-set secret — same
trust boundary as any other env secret. Private IPs allowed. Ships now.
- **Untrusted multi-tenant cloud:** a tenant-supplied `DATABASE_URL` (metadata
IPs, internal CIDRs, another tenant's DB) is an exfil/scan vector. **Not allowed
yet.** Under `LOBU_CLOUD_MODE=1` the postgres connector is hidden from the
catalog (`connector-catalog.ts`) and connection-create is hard-blocked
(`manage_connections.ts` via `connector-cloud-gate.ts`). Execution is gated
independently at every run path, not just by catalog-hide: scheduled-sync run
creation (`queue-helpers.ts`), the production worker poll (`worker-api.ts`), the
dev-CLI sync (`feed-sync.ts`), and the live pushdown (`connector-pushdown.ts`)
each refuse a cloud-restricted connector under `LOBU_CLOUD_MODE`.

**Egress guard (`packages/connectors/src/db-egress-guard.ts`).** The connector
runs a pre-connect host check on both `sync()` and `query()`. Policy comes from
`ctx.config.LOBU_DB_EGRESS_POLICY`, injected by the server from cloud mode:

- `allow-private` (self-hosted, the default) — allows loopback / RFC1918 / CGNAT
/ ULA, but still blocks link-local + cloud metadata (`169.254/16`), multicast,
and the unspecified address (no DB lives there).
- `block-private` (cloud) — blocks **every** non-public address. A hostname is
resolved and rejected if ANY returned address is blocked (multi-record rebind),
with IPv4-mapped / NAT64 / zone-id normalization and fail-closed on malformed
literals.

**Remaining before enabling on cloud** (then remove the key from
`CLOUD_RESTRICTED_CONNECTOR_KEYS`): pin the resolved IP into the socket to close
the DNS-rebind TOCTOU across the pool, force TLS when the URL omits it, and a
per-org allowlist. The classifier + reject is in place and tested; the gate is
what currently keeps untrusted tenants out.

## Entitlement boundary (design-only — not yet built)

Gate advanced database connectivity behind a paid tier. Seam: `organization.plan`
(`free` | `pro` | `enterprise`) + a check in the `multi-tenant.ts` auth resolver.

| Capability | Tier |
| --- | --- |
| Postgres connector + memory feeds | free / pro |
| Internal derived entities | free / pro |
| External-backed (live) derived entities — `backing.connection` set | pro / enterprise |
| Warehouse connectors (Snowflake, BigQuery), virtual feeds + federated search | enterprise |

Enforcement points when built: connector install, connection count, and presence
of `backing.connection`.

## Snowflake / BigQuery forward-compat

No redesign needed: each is a new bundled connector implementing `sync()` +
`query()` (+ later `search()`), with `env_keys` carrying its credentials
(Snowflake account/user/keypair/warehouse/role; BigQuery service-account JSON).
The pushdown plumbing (`runConnectorQuery`, the `query` run-mode, `query_sql`'s
`connection`) is dialect-agnostic — only the connector's own `query()` differs.
Metered warehouses make "live, every read" costly → those lean on the indexed
(memory-feed) path or materialization.
79 changes: 77 additions & 2 deletions examples/lobu-crm/lobu.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,19 @@ const x_accountAuth = defineAuthProfile({
name: "X — @lobu",
});

// Dogfood: Lobu's own production Postgres as a read-only memory + live-metrics
// source for the CRM. Use a least-privilege READ-ONLY role. Self-hosted only —
// the postgres connector is gated off multi-tenant cloud (docs/database-connectors.md).
const lobu_dbAuth = defineAuthProfile({
slug: "lobu-db",
connector: "postgres",
authKind: "env",
name: "Lobu Production DB (read-only)",
credentials: {
DATABASE_URL: secret("LOBU_PROD_READONLY_URL"),
},
});

const competitor_changelogsConn = defineConnection({
slug: "competitor-changelogs",
connector: "website",
Expand Down Expand Up @@ -387,6 +400,62 @@ const npm_downloadsConn = defineConnection({
],
});

const lobu_dbConn = defineConnection({
slug: "lobu-prod-db",
connector: "postgres",
name: "Lobu Production DB",
authProfile: lobu_dbAuth,
feeds: [
{
feed: "query",
name: "New signups",
schedule: "*/15 * * * *",
config: {
primary_key: "id",
cursor_column: "created_at",
// Base SELECT only — the connector adds the keyset WHERE / ORDER BY / LIMIT.
query: `SELECT u.id, u.email, u.name, u."createdAt" AS created_at, o.slug AS org
FROM "user" u
JOIN member m ON m."userId" = u.id
JOIN organization o ON o.id = m."organizationId"`,
mapping: { title: "email", occurred_at: "created_at" },
Comment on lines +414 to +421
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Make the signup feed primary key unique per emitted row.

primary_key: "id" points to u.id, but this query can emit multiple rows per user (one per org). That can cause cursor collisions and skipped/overwritten events across pages. Use a row-unique key for this feed (e.g., user_id + org).

Proposed fix
       config: {
-        primary_key: "id",
+        primary_key: "signup_row_id",
         cursor_column: "created_at",
         // Base SELECT only — the connector adds the keyset WHERE / ORDER BY / LIMIT.
-        query: `SELECT u.id, u.email, u.name, u."createdAt" AS created_at, o.slug AS org
+        query: `SELECT concat(u.id::text, ':', o.slug) AS signup_row_id,
+                       u.id AS user_id,
+                       u.email,
+                       u.name,
+                       u."createdAt" AS created_at,
+                       o.slug AS org
                 FROM "user" u
                 JOIN member m ON m."userId" = u.id
                 JOIN organization o ON o.id = m."organizationId"`,
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/lobu-crm/lobu.config.ts` around lines 414 - 421, The feed currently
uses primary_key: "id" (u.id) but the SELECT can emit multiple rows per user;
modify the query to emit a row-unique composite key (e.g., CONCAT or
cast-and-concat of u.id and o.slug) and then point primary_key at that new
column name; specifically, update the query to SELECT u.id AS user_id, o.slug AS
org, (u.id::text || '::' || o.slug) AS row_pk (or use concat(u.id, '::', o.slug)
AS row_pk) and change primary_key: "id" to primary_key: "row_pk" so each emitted
row is uniquely identified while leaving mapping (mapping: { title: "email",
occurred_at: "created_at" }) unchanged.

},
},
{
feed: "query",
name: "Org activity (daily)",
schedule: "0 6 * * *",
config: {
primary_key: "id",
cursor_column: "created_at",
query: `SELECT id, organization_id AS org, connector_key, semantic_type, created_at
FROM events`,
mapping: { title: "semantic_type", occurred_at: "created_at" },
},
},
],
});

// Live, no-copy: funnel counts computed at read time straight from the prod DB
// (an external-backed derived entity — backing.connection pushes the SQL down to
// the connector, read live via query_sql({ connection })).
const funnel_by_org = defineEntityType({
key: "funnel_by_org",
name: "Funnel by org",
description:
"Signups + last activity per signed-up org, read live from the Lobu prod DB.",
backing: {
connection: "lobu-prod-db",
sql: `SELECT o.slug AS org,
count(DISTINCT u.id) AS signups,
max(u."createdAt") AS last_signup
FROM "user" u
JOIN member m ON m."userId" = u.id
JOIN organization o ON o.id = m."organizationId"
GROUP BY o.slug`,
},
});

export default defineConfig({
connectors: [
connectorFromFile<typeof NpmDownloadsConnector>(
Expand All @@ -398,15 +467,21 @@ export default defineConfig({
orgDescription:
"Funnel CRM for Lobu — leads, pilots, conversations, launch signals",
agents: [crm],
entities: [lead, pilot],
entities: [lead, pilot, funnel_by_org],
relationships: [converted_to],
connections: [
competitor_changelogsConn,
github_lobuConn,
hn_lobuConn,
npm_downloadsConn,
x_mentionsConn,
lobu_dbConn,
],
authProfiles: [
github_accountAuth,
github_appAuth,
x_accountAuth,
lobu_dbAuth,
],
authProfiles: [github_accountAuth, github_appAuth, x_accountAuth],
watchers: [funnel_digestWatcher, inbound_triageWatcher],
});
Loading
Loading