diff --git a/packages/server/src/__tests__/integration/feeds-cross-org-entity.test.ts b/packages/server/src/__tests__/integration/feeds-cross-org-entity.test.ts new file mode 100644 index 000000000..98f2f8abe --- /dev/null +++ b/packages/server/src/__tests__/integration/feeds-cross-org-entity.test.ts @@ -0,0 +1,125 @@ +/** + * manage_feeds create_feed / update_feed — cross-org entity_ids validation. + * + * Regression (found in prod lobu-crm): feeds were created with `entity_ids` + * pointing at an entity owned by a DIFFERENT org. The create/update path did + * not validate entity ownership, so synced events linked to a non-existent + * in-org entity — a silent data-correctness bug. The fix rejects any entity_id + * that does not belong to the requesting org. + */ + +import { beforeAll, describe, expect, it } from "vitest"; +import { cleanupTestDatabase, getTestDb } from "../setup/test-db"; +import { + createTestConnection, + createTestEntity, + createTestOrganization, + createTestUser, +} from "../setup/test-fixtures"; +import { TestApiClient } from "../setup/test-mcp-client"; + +describe("manage_feeds cross-org entity_ids", () => { + let owner: TestApiClient; + let ownerOrgId: string; + let connectionId: number; + let inOrgEntityId: number; + let foreignEntityId: number; + + beforeAll(async () => { + await cleanupTestDatabase(); + + const org = await createTestOrganization({ name: "Feed Owner Org" }); + ownerOrgId = org.id; + const user = await createTestUser({ email: "feed-owner@test.com" }); + owner = await TestApiClient.for({ + organizationId: org.id, + userId: user.id, + memberRole: "owner", + }); + + const conn = await createTestConnection({ + organization_id: ownerOrgId, + connector_key: "github", + created_by: user.id, + createDefaultFeed: false, + }); + connectionId = Number(conn.id); + + const inOrg = await createTestEntity({ + name: "In-Org Entity", + entity_type: "company", + organization_id: ownerOrgId, + created_by: user.id, + }); + inOrgEntityId = Number(inOrg.id); + + // A separate org owns the "foreign" entity. + const foreignOrg = await createTestOrganization({ name: "Foreign Org" }); + const foreignEntity = await createTestEntity({ + name: "Foreign Entity", + entity_type: "company", + organization_id: foreignOrg.id, + }); + foreignEntityId = Number(foreignEntity.id); + }); + + it("rejects create_feed when an entity_id belongs to another org", async () => { + const result = (await owner.feeds.create({ + connection_id: connectionId, + feed_key: "default", + entity_ids: [foreignEntityId], + })) as { error?: string; feed?: unknown }; + + expect(result.error).toBeTruthy(); + expect(result.error).toContain(String(foreignEntityId)); + expect(result.feed).toBeUndefined(); + + // No feed row leaked into the DB. + const sql = getTestDb(); + const rows = await sql<{ id: number }[]>` + SELECT id FROM feeds + WHERE organization_id = ${ownerOrgId} AND ${foreignEntityId} = ANY(entity_ids) + `; + expect(rows.length).toBe(0); + }); + + it("accepts create_feed with an in-org entity_id", async () => { + const result = (await owner.feeds.create({ + connection_id: connectionId, + feed_key: "default", + entity_ids: [inOrgEntityId], + })) as { error?: string; feed?: { id: number } }; + + expect(result.error).toBeUndefined(); + expect(result.feed?.id).toBeDefined(); + }); + + it("rejects update_feed that repoints to another org's entity", async () => { + // Create a clean feed with no entity_ids first. + const created = (await owner.feeds.create({ + connection_id: connectionId, + feed_key: "default", + display_name: "update-target", + })) as { feed?: { id: number } }; + const feedId = Number(created.feed?.id); + expect(feedId).toBeGreaterThan(0); + + const result = (await owner.feeds.update({ + feed_id: feedId, + entity_ids: [foreignEntityId], + })) as { error?: string }; + + expect(result.error).toBeTruthy(); + expect(result.error).toContain(String(foreignEntityId)); + + // The feed's entity_ids were NOT changed. + const sql = getTestDb(); + const [row] = await sql<{ entity_ids: number[] | null }[]>` + SELECT entity_ids FROM feeds WHERE id = ${feedId} + `; + const ids = Array.isArray(row?.entity_ids) + ? row.entity_ids.map(Number) + : []; + expect(ids).not.toContain(foreignEntityId); + }); +}); diff --git a/packages/server/src/__tests__/integration/watchers/source-id-and-cross-org.test.ts b/packages/server/src/__tests__/integration/watchers/source-id-and-cross-org.test.ts new file mode 100644 index 000000000..1b355c91f --- /dev/null +++ b/packages/server/src/__tests__/integration/watchers/source-id-and-cross-org.test.ts @@ -0,0 +1,194 @@ +/** + * manage_watchers correctness guards: + * + * BUG A — a source query that omits `id` is rejected at create/create_version + * time. Watcher-mode aggregation keys rows by `id` and the signed window_token + * only carries those ids, so an id-less source produces content_linked: 0 at + * complete_window and SILENTLY skips the reaction. We reject it on save. + * + * BUG B — create_from_version rejects entity_ids that belong to another org. + * A watcher attached to a foreign entity links its content to a non-existent + * in-org entity. + */ + +import { beforeAll, describe, expect, it } from "vitest"; +import { cleanupTestDatabase, getTestDb } from "../../setup/test-db"; +import { + addUserToOrganization, + createTestAgent, + createTestEntity, + createTestOrganization, + createTestUser, +} from "../../setup/test-fixtures"; +import { TestApiClient } from "../../setup/test-mcp-client"; + +describe("manage_watchers source-id + cross-org guards", () => { + let owner: TestApiClient; + let ownerOrgId: string; + let agentId: string; + let inOrgEntityId: number; + let foreignEntityId: number; + + const schema = { + type: "object", + properties: { items: { type: "array", items: { type: "string" } } }, + }; + + beforeAll(async () => { + await cleanupTestDatabase(); + const org = await createTestOrganization({ name: "Watcher Guard Org" }); + ownerOrgId = org.id; + const user = await createTestUser({ email: "watcher-guard@test.com" }); + await addUserToOrganization(user.id, org.id, "owner"); + owner = await TestApiClient.for({ + organizationId: org.id, + userId: user.id, + memberRole: "owner", + }); + const agent = await createTestAgent({ + organizationId: org.id, + ownerUserId: user.id, + }); + agentId = agent.agentId; + + const inOrg = await createTestEntity({ + name: "In-Org Watcher Target", + entity_type: "company", + organization_id: ownerOrgId, + created_by: user.id, + }); + inOrgEntityId = Number(inOrg.id); + + const foreignOrg = await createTestOrganization({ + name: "Watcher Foreign Org", + }); + const foreignEntity = await createTestEntity({ + name: "Foreign Watcher Target", + entity_type: "company", + organization_id: foreignOrg.id, + }); + foreignEntityId = Number(foreignEntity.id); + }); + + // ---- BUG A ---- + + it("rejects create when a source query omits the id column", async () => { + await expect( + owner.watchers.create({ + entity_id: inOrgEntityId, + slug: "no-id-source", + name: "No Id Source", + prompt: "Track stuff.", + extraction_schema: schema, + agent_id: agentId, + sources: [ + { + name: "content", + query: "SELECT origin_id, payload_text FROM events", + }, + ], + }), + ).rejects.toThrow(/id/i); + }); + + it("accepts create when the source query projects id", async () => { + const created = (await owner.watchers.create({ + entity_id: inOrgEntityId, + slug: "with-id-source", + name: "With Id Source", + prompt: "Track stuff.", + extraction_schema: schema, + agent_id: agentId, + sources: [ + { + name: "content", + query: "SELECT id, origin_id, payload_text FROM events", + }, + ], + })) as { watcher_id?: string }; + expect(created.watcher_id).toBeDefined(); + }); + + it("rejects create_version when a source query omits id", async () => { + const created = (await owner.watchers.create({ + entity_id: inOrgEntityId, + slug: "version-id-guard", + name: "Version Id Guard", + prompt: "Track stuff.", + extraction_schema: schema, + agent_id: agentId, + sources: [{ name: "content", query: "SELECT id FROM events" }], + })) as { watcher_id: string }; + + await expect( + owner.watchers.createVersion({ + watcher_id: created.watcher_id, + prompt: "Track stuff v2.", + extraction_schema: schema, + change_notes: "omit id", + sources: [ + { name: "content", query: "SELECT payload_text FROM events" }, + ], + } as never), + ).rejects.toThrow(/id/i); + }); + + // ---- BUG B ---- + + it("create_from_version rejects a cross-org entity_id", async () => { + const base = (await owner.watchers.create({ + entity_id: inOrgEntityId, + slug: "cfv-base", + name: "CFV Base", + prompt: "Track stuff.", + extraction_schema: schema, + agent_id: agentId, + sources: [{ name: "content", query: "SELECT id FROM events" }], + })) as { watcher_id: string }; + + const sql = getTestDb(); + const [row] = await sql<{ current_version_id: number }[]>` + SELECT current_version_id FROM watchers WHERE id = ${base.watcher_id} + `; + const versionId = Number(row?.current_version_id); + expect(versionId).toBeGreaterThan(0); + + await expect( + owner.watchers.createFromVersion({ + version_id: versionId, + entity_ids: [foreignEntityId], + }), + ).rejects.toThrow(new RegExp(String(foreignEntityId))); + + // No watcher leaked pointing at the foreign entity. + const leaked = await sql<{ id: number }[]>` + SELECT id FROM watchers + WHERE organization_id = ${ownerOrgId} AND ${foreignEntityId} = ANY(entity_ids) + `; + expect(leaked.length).toBe(0); + }); + + it("create_from_version accepts an in-org entity_id", async () => { + const base = (await owner.watchers.create({ + entity_id: inOrgEntityId, + slug: "cfv-base-ok", + name: "CFV Base OK", + prompt: "Track stuff.", + extraction_schema: schema, + agent_id: agentId, + sources: [{ name: "content", query: "SELECT id FROM events" }], + })) as { watcher_id: string }; + + const sql = getTestDb(); + const [row] = await sql<{ current_version_id: number }[]>` + SELECT current_version_id FROM watchers WHERE id = ${base.watcher_id} + `; + const versionId = Number(row?.current_version_id); + + const result = (await owner.watchers.createFromVersion({ + version_id: versionId, + entity_ids: [inOrgEntityId], + })) as { created: Array<{ watcher_id: string }> }; + expect(result.created.length).toBe(1); + }); +}); diff --git a/packages/server/src/__tests__/unit/watcher-source-id-projection.test.ts b/packages/server/src/__tests__/unit/watcher-source-id-projection.test.ts new file mode 100644 index 000000000..9151b55fa --- /dev/null +++ b/packages/server/src/__tests__/unit/watcher-source-id-projection.test.ts @@ -0,0 +1,93 @@ +/** + * Unit coverage for queryProjectsIdColumn — the save-time guard that catches + * watcher source queries which omit an `id` column. + * + * Bug being pinned: watcher-mode content aggregation (queryContentData in + * get_content.ts) keys every row by `row.id`, and the signed window_token only + * carries those numeric ids. A source query like + * `SELECT origin_id, payload_text FROM events` produces zero content_ids, so + * complete_window reports `content_linked: 0` and SILENTLY skips the reaction — + * even though the agent received the rows. We reject such queries at save time. + */ + +import { describe, expect, it } from "bun:test"; +import { queryProjectsIdColumn } from "../../utils/execute-data-sources"; + +describe("queryProjectsIdColumn", () => { + it("accepts a query that projects id explicitly", () => { + expect(queryProjectsIdColumn("SELECT id, payload_text FROM events")).toBe( + true, + ); + }); + + it("accepts SELECT *", () => { + expect( + queryProjectsIdColumn("SELECT * FROM events ORDER BY occurred_at DESC"), + ).toBe(true); + }); + + it("accepts a table-qualified star (e.g. ev.*)", () => { + expect(queryProjectsIdColumn("SELECT ev.* FROM events ev")).toBe(true); + }); + + it("accepts a qualified id column (e.g. e.id)", () => { + expect( + queryProjectsIdColumn("SELECT e.id, e.payload_text FROM events e"), + ).toBe(true); + }); + + it("accepts a column aliased AS id", () => { + expect( + queryProjectsIdColumn("SELECT event_id AS id, payload_text FROM events"), + ).toBe(true); + }); + + it("accepts a WITH/CTE query whose final projection includes id", () => { + expect( + queryProjectsIdColumn( + "WITH x AS (SELECT * FROM events) SELECT id, payload_text FROM x", + ), + ).toBe(true); + }); + + // --- the bug: these omit id and would silently drop every row --- + + it("REJECTS a query that omits id (origin_id is not id)", () => { + expect( + queryProjectsIdColumn("SELECT origin_id, payload_text FROM events"), + ).toBe(false); + }); + + it("REJECTS a query selecting only non-id columns", () => { + expect( + queryProjectsIdColumn( + "SELECT payload_text, author_name, occurred_at FROM events", + ), + ).toBe(false); + }); + + it("REJECTS a CTE query whose final projection omits id", () => { + expect( + queryProjectsIdColumn( + "WITH x AS (SELECT id FROM events) SELECT origin_id FROM x", + ), + ).toBe(false); + }); + + it("fails open (returns true) on unparseable SQL so a parser edge case never blocks a save", () => { + expect(queryProjectsIdColumn("this is not sql at all")).toBe(true); + }); + + it("tolerates watcher template placeholders ({{entityId}}, {{query.x}})", () => { + expect( + queryProjectsIdColumn( + "SELECT id FROM events WHERE entity_ids @> ARRAY[{{entityId}}]", + ), + ).toBe(true); + expect( + queryProjectsIdColumn( + "SELECT origin_id FROM events WHERE x = {{query.foo}}", + ), + ).toBe(false); + }); +}); diff --git a/packages/server/src/tools/admin/helpers/db-helpers.ts b/packages/server/src/tools/admin/helpers/db-helpers.ts index b4d2223f7..bb9e53874 100644 --- a/packages/server/src/tools/admin/helpers/db-helpers.ts +++ b/packages/server/src/tools/admin/helpers/db-helpers.ts @@ -5,7 +5,7 @@ * manage_watchers, manage_entity_schema, manage_classifiers, etc. */ -import type { DbClient } from '../../../db/client'; +import { type DbClient, pgBigintArray } from '../../../db/client'; /** * Valid tables for requireExists. Uses a whitelist so we can safely @@ -48,6 +48,42 @@ export async function requireExists( } } +/** + * Validate that every entity id in `entityIds` belongs to `organizationId`. + * + * Feeds and watchers carry an `entity_ids` array used to link synced events to + * in-org entities. A cross-org entity id (e.g. a feed in org A pointing at an + * entity owned by org B) means synced events never link to a valid in-org + * entity — a silent data-correctness bug. We reject it at create/update time. + * + * Returns the deduped list of ids that ARE in the org (empty/undefined input + * returns `[]`). Throws an Error naming the offending ids when any id is + * missing or belongs to another org. `entities` has no soft-delete column, so + * a row simply present + org-scoped is sufficient. + */ +export async function assertEntityIdsInOrg( + sql: DbClient, + organizationId: string, + entityIds: number[] | null | undefined +): Promise { + const requested = [...new Set((entityIds ?? []).map(Number).filter(Number.isFinite))]; + if (requested.length === 0) return []; + + const rows = await sql<{ id: number }>` + SELECT id FROM entities + WHERE organization_id = ${organizationId} + AND id = ANY(${pgBigintArray(requested)}::bigint[]) + `; + const found = new Set(rows.map((r) => Number(r.id))); + const invalid = requested.filter((id) => !found.has(id)); + if (invalid.length > 0) { + throw new Error( + `entity_ids do not belong to this organization (or do not exist): ${invalid.join(', ')}` + ); + } + return requested; +} + /** * Tables whose `id` column is allocated via SELECT MAX(id) + 1. Whitelisted so * the table name can be safely interpolated into sql.unsafe(). diff --git a/packages/server/src/tools/admin/manage_feeds.ts b/packages/server/src/tools/admin/manage_feeds.ts index 5ee313c9a..11476c042 100644 --- a/packages/server/src/tools/admin/manage_feeds.ts +++ b/packages/server/src/tools/admin/manage_feeds.ts @@ -26,6 +26,7 @@ import { ACTIVE_RUN_STATUSES, runStatusLiteral } from '../../utils/run-statuses' import type { ToolContext } from '../registry'; import { routeAction } from './action-router'; import { getDefaultSchedule } from './helpers/connection-helpers'; +import { assertEntityIdsInOrg } from './helpers/db-helpers'; import { resolveFeedDisplayName } from './helpers/feed-helpers'; import { PaginationFields } from './schemas/common-fields'; @@ -338,6 +339,13 @@ async function handleCreateFeed( } // Don't schedule a first run for a feed whose connection is still pending auth. const nextRunAtVal = feedInitialStatus === 'active' ? nextRunAt(schedule) : null; + // Reject cross-org entity_ids: a feed pointing at another org's entity links + // synced events to a non-existent in-org entity (silent data-correctness bug). + try { + await assertEntityIdsInOrg(sql, organizationId, args.entity_ids); + } catch (err) { + return { error: err instanceof Error ? err.message : String(err) }; + } const entityIdsValue = args.entity_ids && args.entity_ids.length > 0 ? pgBigintArray(args.entity_ids) : null; @@ -394,6 +402,14 @@ async function handleUpdateFeed( return { error: 'Feed not found' }; } + // Reject cross-org entity_ids on update too (skip when clearing to []). + if (args.entity_ids !== undefined && args.entity_ids.length > 0) { + try { + await assertEntityIdsInOrg(sql, organizationId, args.entity_ids); + } catch (err) { + return { error: err instanceof Error ? err.message : String(err) }; + } + } const entityIdsValue = args.entity_ids !== undefined ? args.entity_ids.length > 0 diff --git a/packages/server/src/tools/admin/manage_watchers.ts b/packages/server/src/tools/admin/manage_watchers.ts index 2349a1e91..9723a1c94 100644 --- a/packages/server/src/tools/admin/manage_watchers.ts +++ b/packages/server/src/tools/admin/manage_watchers.ts @@ -57,6 +57,7 @@ import { } from '../../watchers/classifier-extraction'; import { advanceWatcherSchedule } from '../../watchers/automation'; import { compileReactionScript, executeReaction } from '../../watchers/reaction-executor'; +import { queryProjectsIdColumn } from '../../utils/execute-data-sources'; import { validateTemplate } from '../../watchers/renderer'; import { validateClassifierSourcePaths, validateExtractionSchema } from '../../watchers/validator'; import type { ToolContext } from '../registry'; @@ -66,7 +67,7 @@ import { assertValidExecutionConfig, WatcherExecutionConfigSchema, } from './watcher-execution-config'; -import { getNextNumericId, requireExists } from './helpers/db-helpers'; +import { assertEntityIdsInOrg, getNextNumericId, requireExists } from './helpers/db-helpers'; // Initialize AJV for JSON Schema validation // removeAdditional: true strips fields like 'embedding' that workers add but aren't in the schema @@ -864,6 +865,13 @@ function validateWatcherConfig(input: { if (!trimmed.startsWith('SELECT') && !trimmed.startsWith('WITH')) { return `source "${source.name}": query must be a SELECT statement (read-only)`; } + // Watcher-mode content aggregation keys every row by `id` and the signed + // window_token only carries those ids; a source that omits `id` yields + // content_linked: 0 at complete_window and SILENTLY skips the reaction. + // Reject it at save time so the failure is loud, not invisible. + if (!queryProjectsIdColumn(source.query)) { + return `source "${source.name}": query must project an "id" column (e.g. SELECT id, ... FROM events). Without it the reaction is silently skipped because no content can be linked to the window.`; + } } } @@ -1213,12 +1221,18 @@ async function handleCreateFromVersion( ); } - // Fetch entity names for name pattern substitution + // Reject cross-org entity_ids before cloning: a watcher attached to another + // org's entity links its synced/extracted content to a non-existent in-org + // entity (silent data-correctness bug). Names are fetched org-scoped below. + await assertEntityIdsInOrg(sql, organizationId, args.entity_ids); + + // Fetch entity names for name pattern substitution (org-scoped) const entityRows = await sql` SELECT e.id, e.name, et.slug AS entity_type, e.slug FROM entities e JOIN entity_types et ON et.id = e.entity_type_id - WHERE e.id = ANY(${`{${args.entity_ids.join(',')}}`}::bigint[]) + WHERE e.organization_id = ${organizationId} + AND e.id = ANY(${`{${args.entity_ids.join(',')}}`}::bigint[]) `; const entityMap = new Map(entityRows.map((e: any) => [Number(e.id), e])); diff --git a/packages/server/src/utils/execute-data-sources.ts b/packages/server/src/utils/execute-data-sources.ts index ec50eb445..79160c14d 100644 --- a/packages/server/src/utils/execute-data-sources.ts +++ b/packages/server/src/utils/execute-data-sources.ts @@ -385,6 +385,58 @@ function buildScopedQuery( // Validation // ============================================ +/** + * Inspect a SELECT/WITH query's top-level projection and report whether it + * surfaces an `id` column. + * + * Watcher-mode content aggregation keys every row by `row.id` (see + * queryContentData in get_content.ts) and the signed window_token only carries + * those numeric ids. A source query that omits `id` (e.g. `SELECT origin_id, + * payload_text FROM events`) therefore produces zero content_ids — which makes + * complete_window silently report `content_linked: 0` and skip the reaction + * even though the agent received the rows. We catch that at save time instead. + * + * A projection "has id" if it contains a `*` star (bare or table-qualified), + * a bare `id` column reference, or any column aliased `AS id`. + * + * Returns true on any parse failure: this is a best-effort guard, not a + * security control, and we never want a parser edge case to block a save. + */ +export function queryProjectsIdColumn(query: string): boolean { + try { + const forParsing = query.trim().replace(/\{\{\w+(?:\.\w+)?\}\}/g, '0'); + const ast = sqlParser.astify(forParsing, { database: 'PostgreSql' }); + const stmt = (Array.isArray(ast) ? ast[0] : ast) as Record | undefined; + const columns = stmt?.columns; + if (!Array.isArray(columns)) { + // `SELECT *` is sometimes represented as a non-array; treat unknown + // shapes as "has id" so we never block a save we can't analyze. + return true; + } + for (const col of columns as Array>) { + const as = col.as; + if (typeof as === 'string' && as.toLowerCase() === 'id') return true; + + const expr = col.expr as Record | undefined; + if (!expr || expr.type !== 'column_ref') continue; + + const column = expr.column; + // Star projection: `*` or `alias.*` + if (column === '*') return true; + // Bare column reference: { expr: { value: 'id' } } + const name = + typeof column === 'string' + ? column + : ((column as Record)?.expr as Record | undefined) + ?.value; + if (typeof name === 'string' && name.toLowerCase() === 'id') return true; + } + return false; + } catch { + return true; + } +} + /** * Validate a data source query. * Checks: SELECT/WITH prefix, forbidden ops, SQL syntax, schema-qualified refs.