diff --git a/packages/owletto b/packages/owletto index 1f1c027da..09e361dc3 160000 --- a/packages/owletto +++ b/packages/owletto @@ -1 +1 @@ -Subproject commit 1f1c027da939e485b48a47f204de1b3f3fb20972 +Subproject commit 09e361dc3fcbc0d0eb50c675e41593288e94fe53 diff --git a/packages/server/src/__tests__/integration/connections-cross-org-entity.test.ts b/packages/server/src/__tests__/integration/connections-cross-org-entity.test.ts new file mode 100644 index 000000000..35c03fb27 --- /dev/null +++ b/packages/server/src/__tests__/integration/connections-cross-org-entity.test.ts @@ -0,0 +1,204 @@ +/** + * manage_connections update / list — connection↔entity association. + * + * Connections can be directly tagged with entities (connections.entity_ids), + * mirroring feeds. Tagging a connection with an entity owned by a DIFFERENT + * org would surface the connection under a non-existent in-org entity, so the + * update path validates entity ownership and rejects any cross-org entity_id + * (mirrors manage_feeds). The list path resolves a connection's entities as the + * UNION of its own entity_ids and any of its feeds' entity_ids, so direct tags + * and feed-derived links both surface under entity_id / entity_names. + * + * (The create/connect tool paths run the same `assertEntityIdsInOrg` guard + + * `entity_ids::bigint[]` insert, but driving them needs an installed connector + * definition + auth scaffolding; the cross-org guard + tri-state semantics are + * exercised here through the update path, which shares the helper.) + */ + +import { beforeAll, describe, expect, it } from "vitest"; +import { cleanupTestDatabase, getTestDb } from "../setup/test-db"; +import { + createTestConnection, + createTestEntity, + createTestOrganization, + createTestUser, +} from "../setup/test-fixtures"; +import { TestApiClient } from "../setup/test-mcp-client"; + +// postgres.js may surface a bigint[] column either as a JS array or as the raw +// Postgres array literal string (e.g. "{1,2}"). Normalize both to number[]. +function toIds(raw: number[] | string | null | undefined): number[] { + if (Array.isArray(raw)) return raw.map(Number); + if (typeof raw === "string") { + const inner = raw.replace(/^\{|\}$/g, "").trim(); + return inner === "" ? [] : inner.split(",").map(Number); + } + return []; +} + +describe("manage_connections entity association", () => { + let owner: TestApiClient; + let ownerOrgId: string; + let inOrgEntityId: number; + let feedEntityId: number; + let foreignEntityId: number; + let taggableConnectionId: number; + let feedLinkedConnectionId: number; + + beforeAll(async () => { + await cleanupTestDatabase(); + + const org = await createTestOrganization({ name: "Conn Owner Org" }); + ownerOrgId = org.id; + const user = await createTestUser({ email: "conn-owner@test.com" }); + owner = await TestApiClient.for({ + organizationId: org.id, + userId: user.id, + memberRole: "owner", + }); + + inOrgEntityId = Number( + ( + await createTestEntity({ + name: "Direct Entity", + entity_type: "company", + organization_id: ownerOrgId, + created_by: user.id, + }) + ).id, + ); + feedEntityId = Number( + ( + await createTestEntity({ + name: "Feed Entity", + entity_type: "company", + organization_id: ownerOrgId, + created_by: user.id, + }) + ).id, + ); + + // A separate org owns the "foreign" entity. + const foreignOrg = await createTestOrganization({ name: "Foreign Org" }); + foreignEntityId = Number( + ( + await createTestEntity({ + name: "Foreign Entity", + entity_type: "company", + organization_id: foreignOrg.id, + }) + ).id, + ); + + // Connection with no feed — entity links come only from connection.entity_ids. + taggableConnectionId = Number( + ( + await createTestConnection({ + organization_id: ownerOrgId, + connector_key: "github", + created_by: user.id, + createDefaultFeed: false, + }) + ).id, + ); + + // Connection whose default feed is tagged with feedEntityId — its entity + // link is feed-derived (no direct connection.entity_ids). + feedLinkedConnectionId = Number( + ( + await createTestConnection({ + organization_id: ownerOrgId, + connector_key: "github", + created_by: user.id, + entity_ids: [feedEntityId], + }) + ).id, + ); + }); + + it("rejects update when an entity_id belongs to another org", async () => { + const result = (await owner.connections.update({ + connection_id: taggableConnectionId, + entity_ids: [foreignEntityId], + })) as { error?: string }; + + expect(result.error).toBeTruthy(); + expect(result.error).toContain(String(foreignEntityId)); + + const sql = getTestDb(); + const [row] = await sql<{ entity_ids: number[] | string | null }[]>` + SELECT entity_ids FROM connections WHERE id = ${taggableConnectionId} + `; + expect(toIds(row?.entity_ids)).not.toContain(foreignEntityId); + }); + + it("update sets in-org entity_ids; explicit [] clears them", async () => { + await owner.connections.update({ + connection_id: taggableConnectionId, + entity_ids: [inOrgEntityId], + }); + + const sql = getTestDb(); + let [row] = await sql<{ entity_ids: number[] | string | null }[]>` + SELECT entity_ids FROM connections WHERE id = ${taggableConnectionId} + `; + expect(toIds(row?.entity_ids)).toContain(inOrgEntityId); + + // undefined leaves it unchanged... + await owner.connections.update({ + connection_id: taggableConnectionId, + display_name: "renamed", + }); + [row] = await sql<{ entity_ids: number[] | string | null }[]>` + SELECT entity_ids FROM connections WHERE id = ${taggableConnectionId} + `; + expect(toIds(row?.entity_ids)).toContain(inOrgEntityId); + + // ...explicit [] clears. + await owner.connections.update({ + connection_id: taggableConnectionId, + entity_ids: [], + }); + [row] = await sql<{ entity_ids: number[] | string | null }[]>` + SELECT entity_ids FROM connections WHERE id = ${taggableConnectionId} + `; + expect(toIds(row?.entity_ids).length).toBe(0); + }); + + it("lists a connection under its directly-tagged entity (entity_names + filter)", async () => { + await owner.connections.update({ + connection_id: taggableConnectionId, + entity_ids: [inOrgEntityId], + }); + + const result = (await owner.connections.list({ + entity_id: inOrgEntityId, + })) as { connections?: Array<{ id: number; entity_names?: string }> }; + + const match = result.connections?.find( + (c) => Number(c.id) === taggableConnectionId, + ); + expect(match).toBeDefined(); + expect(match?.entity_names ?? "").toContain("Direct Entity"); + }); + + it("lists a connection under a feed-derived entity (union, no direct tag)", async () => { + const result = (await owner.connections.list({ + entity_id: feedEntityId, + })) as { + connections?: Array<{ + id: number; + entity_names?: string; + entity_ids?: number[] | string | null; + }>; + }; + + const match = result.connections?.find( + (c) => Number(c.id) === feedLinkedConnectionId, + ); + expect(match).toBeDefined(); + // The link is feed-derived: the connection itself has no entity_ids. + expect(toIds(match?.entity_ids).length).toBe(0); + expect(match?.entity_names ?? "").toContain("Feed Entity"); + }); +}); diff --git a/packages/server/src/sandbox/namespaces/connections.ts b/packages/server/src/sandbox/namespaces/connections.ts index 8c9a20b95..3644adb23 100644 --- a/packages/server/src/sandbox/namespaces/connections.ts +++ b/packages/server/src/sandbox/namespaces/connections.ts @@ -17,6 +17,7 @@ export interface ConnectionsConnectInput { auth_profile_slug?: string; app_auth_profile_slug?: string; config?: Record; + entity_ids?: number[]; entity_link_overrides?: Record | null; } @@ -31,6 +32,7 @@ export interface ConnectionsUpdateInput { auth_profile_slug?: string | null; app_auth_profile_slug?: string | null; config?: Record; + entity_ids?: number[] | null; } /** diff --git a/packages/server/src/tools/admin/manage_connections.ts b/packages/server/src/tools/admin/manage_connections.ts index d19e8fed5..4265eb9e2 100644 --- a/packages/server/src/tools/admin/manage_connections.ts +++ b/packages/server/src/tools/admin/manage_connections.ts @@ -22,7 +22,7 @@ import { parseJsonObject } from '@lobu/core'; import { type Static, Type } from '@sinclair/typebox'; -import { getDb } from '../../db/client'; +import { getDb, parsePgNumberArray, pgBigintArray } from '../../db/client'; import type { Env } from '../../index'; import { notifyConnectionPermissionRequest } from '../../notifications/triggers'; import { @@ -87,6 +87,7 @@ import { buildConnectorDefinitionList, type ListedConnectorDefinition, } from './helpers/connector-definition-list'; +import { assertEntityIdsInOrg } from './helpers/db-helpers'; import { type FeedDefinition, splitConfigByFeedScope } from './helpers/feed-helpers'; import { PaginationFields } from './schemas/common-fields'; @@ -172,6 +173,11 @@ const CreateAction = Type.Object({ "Run this connection's syncs/actions on a specific device worker (its device_workers.id) instead of the Lobu server (runs serverless). Required for connectors that declare a required_capability; optional otherwise. The device must belong to you or be granted to this org.", }) ), + entity_ids: Type.Optional( + Type.Array(Type.Number(), { + description: 'Entity IDs to tag this connection with (links the connection to entities)', + }) + ), entity_link_overrides: Type.Optional(EntityLinkOverridesSchema), }); @@ -186,6 +192,12 @@ const UpdateAction = Type.Object({ auth_profile_slug: Type.Optional(Type.Union([Type.String(), Type.Null()])), app_auth_profile_slug: Type.Optional(Type.Union([Type.String(), Type.Null()])), config: Type.Optional(Type.Record(Type.String(), Type.Any())), + entity_ids: Type.Optional( + Type.Array(Type.Number(), { + description: + 'Entity IDs to tag this connection with. Pass [] (or null) to clear all links; omit to leave unchanged.', + }) + ), device_worker_id: Type.Optional( Type.Union([Type.String(), Type.Null()], { description: @@ -279,6 +291,11 @@ const ConnectAction = Type.Object({ "Run this connection's syncs/actions on a specific device worker (its device_workers.id) instead of the Lobu server (runs serverless). Required for connectors that declare a required_capability; optional otherwise. The device must belong to you or be granted to this org.", }) ), + entity_ids: Type.Optional( + Type.Array(Type.Number(), { + description: 'Entity IDs to tag this connection with (links the connection to entities)', + }) + ), entity_link_overrides: Type.Optional(EntityLinkOverridesSchema), }); @@ -591,11 +608,18 @@ async function handleList( (SELECT ct.token FROM connect_tokens ct WHERE ct.connection_id = c.id AND ct.status = 'pending' AND ct.expires_at > NOW() ORDER BY ct.created_at DESC LIMIT 1) AS connect_token, + -- entity_names = UNION of the connection's own entity_ids and any of + -- its feeds' entity_ids (a connection counts under an entity if + -- either it is directly tagged OR one of its feeds is). ( SELECT string_agg(DISTINCT ent.name, ', ' ORDER BY ent.name) - FROM feeds f - JOIN entities ent ON ent.id = ANY(f.entity_ids) - WHERE f.connection_id = c.id AND f.deleted_at IS NULL + FROM entities ent + WHERE ent.id = ANY(c.entity_ids) + OR ent.id IN ( + SELECT unnest(f.entity_ids) + FROM feeds f + WHERE f.connection_id = c.id AND f.deleted_at IS NULL + ) ) AS entity_names FROM connections c LEFT JOIN LATERAL ( @@ -620,12 +644,15 @@ async function handleList( query = sql`${query} AND c.status = ${args.status}`; } if (args.entity_id) { - query = sql`${query} AND EXISTS ( - SELECT 1 - FROM feeds f - WHERE f.connection_id = c.id - AND f.deleted_at IS NULL - AND ${args.entity_id} = ANY(f.entity_ids) + query = sql`${query} AND ( + ${args.entity_id} = ANY(c.entity_ids) + OR EXISTS ( + SELECT 1 + FROM feeds f + WHERE f.connection_id = c.id + AND f.deleted_at IS NULL + AND ${args.entity_id} = ANY(f.entity_ids) + ) )`; } if (args.created_by) { @@ -655,6 +682,9 @@ async function handleList( const operationsSummary = summaries.get(String(row.connector_key)) ?? { ...EMPTY_SUMMARY }; return { ...row, + // Postgres returns bigint[] as a literal string ('{2}'); parse to number[] + // so the API contract matches the typed entity_ids the UI picker expects. + entity_ids: parsePgNumberArray(row.entity_ids), operations_summary: operationsSummary, has_operations: operationsSummary.total > 0, }; @@ -1231,6 +1261,16 @@ async function handleCreate( ? 'pending_auth' : 'active'; + // Reject cross-org entity_ids: a connection tagged with another org's entity + // would surface under a non-existent in-org entity (mirrors manage_feeds). + try { + await assertEntityIdsInOrg(sql, organizationId, args.entity_ids); + } catch (err) { + return { error: err instanceof Error ? err.message : String(err) }; + } + const entityIdsValue = + args.entity_ids && args.entity_ids.length > 0 ? pgBigintArray(args.entity_ids) : null; + const slugResult = await resolveNewConnectionSlug({ organizationId, connectorKey: args.connector_key, @@ -1251,7 +1291,8 @@ async function handleCreate( doInsert: (slug) => sql` INSERT INTO connections ( organization_id, connector_key, slug, display_name, status, - auth_profile_id, app_auth_profile_id, config, created_by, visibility, device_worker_id + auth_profile_id, app_auth_profile_id, config, created_by, visibility, device_worker_id, + entity_ids ) VALUES ( ${organizationId}, ${args.connector_key}, ${slug}, @@ -1262,7 +1303,8 @@ async function handleCreate( ${connectionConfigToInsert ? sql.json(connectionConfigToInsert) : null}, ${effectiveCreatedBy}, ${visibility}, - ${effectiveDeviceWorkerId} + ${effectiveDeviceWorkerId}, + ${entityIdsValue}::bigint[] ) RETURNING * `, @@ -1564,6 +1606,15 @@ async function handleConnect( } : null; + // Reject cross-org entity_ids (mirrors handleCreate / manage_feeds). + try { + await assertEntityIdsInOrg(sql, organizationId, args.entity_ids); + } catch (err) { + return { error: err instanceof Error ? err.message : String(err), setup_url: setupUrl }; + } + const connectEntityIdsValue = + args.entity_ids && args.entity_ids.length > 0 ? pgBigintArray(args.entity_ids) : null; + const connectSlugResult = await resolveNewConnectionSlug({ organizationId, connectorKey: args.connector_key, @@ -1584,7 +1635,8 @@ async function handleConnect( doInsert: (slug) => sql` INSERT INTO connections ( organization_id, connector_key, slug, display_name, status, - auth_profile_id, app_auth_profile_id, config, created_by, visibility, device_worker_id + auth_profile_id, app_auth_profile_id, config, created_by, visibility, device_worker_id, + entity_ids ) VALUES ( ${organizationId}, ${args.connector_key}, ${slug}, @@ -1595,7 +1647,8 @@ async function handleConnect( ${connectionConfigToInsert ? sql.json(connectionConfigToInsert) : null}, ${userId}, ${connectVisibility}, - ${effectiveDeviceWorkerIdConnect} + ${effectiveDeviceWorkerIdConnect}, + ${connectEntityIdsValue}::bigint[] ) RETURNING * `, @@ -2012,6 +2065,23 @@ async function handleUpdate( }; } + // Reject cross-org entity_ids on update too (skip when clearing to []). + if (args.entity_ids !== undefined && args.entity_ids.length > 0) { + try { + await assertEntityIdsInOrg(sql, organizationId, args.entity_ids); + } catch (err) { + return { error: err instanceof Error ? err.message : String(err) }; + } + } + // Tri-state, mirrors manage_feeds: undefined = leave unchanged (null → COALESCE + // keeps existing), explicit [] = clear ('{}' → COALESCE picks the empty array). + const entityIdsValue = + args.entity_ids !== undefined + ? args.entity_ids.length > 0 + ? pgBigintArray(args.entity_ids) + : '{}' + : null; + // Slug is only ever changed when the caller passes one explicitly — a // display_name change never touches it (that's the whole point of a stable // identity for `lobu apply`). An explicit slug is validated for format and @@ -2043,6 +2113,7 @@ async function handleUpdate( status = COALESCE(${effectiveStatus}, status), auth_profile_id = ${nextAuthProfileId}, app_auth_profile_id = ${nextAppAuthProfileId}, + entity_ids = COALESCE(${entityIdsValue}::bigint[], entity_ids), config = ${ replaceConfig ? sql`${sql.json(connectionConfigForReplace)}::jsonb`