diff --git a/packages/server/src/__tests__/integration/auth/device-cross-org-pins.test.ts b/packages/server/src/__tests__/integration/auth/device-cross-org-pins.test.ts new file mode 100644 index 000000000..59a3f92d0 --- /dev/null +++ b/packages/server/src/__tests__/integration/auth/device-cross-org-pins.test.ts @@ -0,0 +1,108 @@ +/** + * resolveDeviceClaimableOrgs — cross-org device pin scoping. + * + * A user-scoped device worker's base scope is [token's bound org, personal org]. + * This helper widens it to orgs where the device has an active pin AND its owner + * is still a member. Pinning is the owner's consent (see watcher-device-access); + * the membership join revokes scope when the owner leaves the org. + */ +import { beforeEach, describe, expect, it } from 'vitest'; +import { resolveDeviceClaimableOrgs } from '../../../utils/device-claimable-orgs'; +import { getNextNumericId } from '../../../tools/admin/helpers/db-helpers'; +import { cleanupTestDatabase, getTestDb } from '../../setup/test-db'; +import { + addUserToOrganization, + createTestAgent, + createTestOrganization, + createTestUser, +} from '../../setup/test-fixtures'; + +const sql = getTestDb(); + +async function insertDevice(userId: string, orgId: string): Promise { + const workerId = `mac-${Math.random().toString(36).slice(2, 10)}`; + const rows = (await sql` + INSERT INTO device_workers (user_id, worker_id, platform, capabilities, label, organization_id) + VALUES (${userId}, ${workerId}, 'macos', ${sql.json({})}, 'Test Mac', ${orgId}) + RETURNING id + `) as unknown as Array<{ id: string }>; + return String(rows[0].id); +} + +async function pinWatcher(opts: { + orgId: string; + agentId: string; + deviceWorkerId: string; + createdBy: string; + status?: string; +}): Promise { + const id = await getNextNumericId(sql, 'watchers'); + await sql` + INSERT INTO watchers ( + id, status, created_by, organization_id, agent_id, watcher_group_id, + notification_channel, notification_priority, min_cooldown_seconds, + device_worker_id, slug, created_at, updated_at + ) VALUES ( + ${id}, ${opts.status ?? 'active'}, ${opts.createdBy}, ${opts.orgId}, ${opts.agentId}, ${id}, + 'notification', 'normal', 300, ${opts.deviceWorkerId}, ${`w-${id}`}, NOW(), NOW() + ) + `; +} + +describe('resolveDeviceClaimableOrgs (cross-org device pins)', () => { + beforeEach(async () => { + await cleanupTestDatabase(); + }); + + it('adds an org with an active pin when the owner is a member, and excludes a non-member org', async () => { + const user = await createTestUser(); + const orgA = await createTestOrganization(); // base scope (bound/personal) + const orgB = await createTestOrganization(); // member + pinned -> included + const orgC = await createTestOrganization(); // pinned but NOT a member -> excluded + await addUserToOrganization(user.id, orgA.id, 'owner'); + await addUserToOrganization(user.id, orgB.id, 'owner'); + // intentionally NOT a member of orgC + + const deviceWorkerId = await insertDevice(user.id, orgA.id); + const agentB = await createTestAgent({ organizationId: orgB.id, ownerUserId: user.id }); + const agentC = await createTestAgent({ organizationId: orgC.id, ownerUserId: user.id }); + await pinWatcher({ orgId: orgB.id, agentId: agentB.agentId, deviceWorkerId, createdBy: user.id }); + await pinWatcher({ orgId: orgC.id, agentId: agentC.agentId, deviceWorkerId, createdBy: user.id }); + + const result = await resolveDeviceClaimableOrgs(sql, { + deviceWorkerId, + ownerUserId: user.id, + baseOrgIds: [orgA.id], + }); + + expect(result).toContain(orgA.id); // base scope always present + expect(result).toContain(orgB.id); // pinned + member + expect(result).not.toContain(orgC.id); // pinned but not a member + }); + + it('does not grant scope from an archived watcher pin', async () => { + const user = await createTestUser(); + const orgA = await createTestOrganization(); + const orgB = await createTestOrganization(); + await addUserToOrganization(user.id, orgA.id, 'owner'); + await addUserToOrganization(user.id, orgB.id, 'owner'); + + const deviceWorkerId = await insertDevice(user.id, orgA.id); + const agentB = await createTestAgent({ organizationId: orgB.id, ownerUserId: user.id }); + await pinWatcher({ + orgId: orgB.id, + agentId: agentB.agentId, + deviceWorkerId, + createdBy: user.id, + status: 'archived', + }); + + const result = await resolveDeviceClaimableOrgs(sql, { + deviceWorkerId, + ownerUserId: user.id, + baseOrgIds: [orgA.id], + }); + + expect(result).toEqual([orgA.id]); // archived pin grants nothing + }); +}); diff --git a/packages/server/src/__tests__/unit/run-in-worker-scope.test.ts b/packages/server/src/__tests__/unit/run-in-worker-scope.test.ts new file mode 100644 index 000000000..25b5b8bbb --- /dev/null +++ b/packages/server/src/__tests__/unit/run-in-worker-scope.test.ts @@ -0,0 +1,43 @@ +import { describe, expect, it } from 'vitest'; +import { runInWorkerScope } from '../../utils/device-claimable-orgs'; + +const base = { organization_id: 'orgB', device_owner: null, watcher_device_owner: null }; + +describe('runInWorkerScope', () => { + it('in scope when the run org is in the base scope', () => { + expect(runInWorkerScope(base, { workerUserId: 'u1', orgIds: ['orgB'] })).toBe(true); + }); + + it('in scope when the worker owns the run\'s pinned connection device', () => { + expect( + runInWorkerScope( + { ...base, device_owner: 'u1' }, + { workerUserId: 'u1', orgIds: ['orgA'] } + ) + ).toBe(true); + }); + + it('in scope when the worker owns the run\'s pinned watcher device (cross-org)', () => { + expect( + runInWorkerScope( + { ...base, watcher_device_owner: 'u1' }, + { workerUserId: 'u1', orgIds: ['orgA'] } + ) + ).toBe(true); + }); + + it('forbidden when org is out of scope and the worker owns neither pinned device', () => { + expect( + runInWorkerScope( + { ...base, device_owner: 'someone-else', watcher_device_owner: 'someone-else' }, + { workerUserId: 'u1', orgIds: ['orgA'] } + ) + ).toBe(false); + }); + + it('forbidden when there is no worker user and the org is out of scope', () => { + expect( + runInWorkerScope({ ...base, watcher_device_owner: 'u1' }, { workerUserId: null, orgIds: [] }) + ).toBe(false); + }); +}); diff --git a/packages/server/src/utils/device-claimable-orgs.ts b/packages/server/src/utils/device-claimable-orgs.ts new file mode 100644 index 000000000..5fe891f81 --- /dev/null +++ b/packages/server/src/utils/device-claimable-orgs.ts @@ -0,0 +1,67 @@ +/** + * Resolve which orgs a user-scoped device worker may claim runs in. + * + * Base scope (computed in the /api/workers/* auth middleware) is the token's + * bound org plus the user's personal org. On top of that, a device may claim + * runs in any org where it has a pinned watcher/connection AND its owner is + * still a member of that org. + * + * The pin IS the consent: `evaluateDeviceWorkerAccess` only lets a device's + * owner attach it to a resource, so a pin in org B means the owner explicitly + * opted this device into serving org B. The membership join means access is + * revoked automatically if the owner later leaves the org. Within-org claiming + * still follows the pinned/capability rules in the poll, so the device only + * ever runs the resource it was actually pinned to. + * + * Only `active` watchers and non-deleted connections count — an archived + * watcher or deleted connection must not keep an org in scope. + */ +import type { DbClient } from '../db/client'; + +export async function resolveDeviceClaimableOrgs( + sql: DbClient, + params: { deviceWorkerId: string; ownerUserId: string; baseOrgIds: string[] } +): Promise { + const rows = (await sql` + SELECT DISTINCT src.organization_id + FROM ( + SELECT organization_id FROM watchers + WHERE device_worker_id = ${params.deviceWorkerId} AND status = 'active' + UNION + SELECT organization_id FROM connections + WHERE device_worker_id = ${params.deviceWorkerId} AND deleted_at IS NULL + ) src + JOIN "member" m + ON m."organizationId" = src.organization_id AND m."userId" = ${params.ownerUserId} + WHERE src.organization_id IS NOT NULL + `) as unknown as Array<{ organization_id: string }>; + + const pinnedOrgIds = rows + .map((r) => r.organization_id) + .filter((id): id is string => typeof id === 'string' && id.length > 0); + + return Array.from(new Set([...params.baseOrgIds, ...pinnedOrgIds])); +} + +/** + * Whether a user-scoped device worker may act on a run (claim / complete / + * heartbeat). True when the run's org is in the worker's base scope, OR the + * worker's user owns the device the run is pinned to — via either a pinned + * connection (`device_owner`) or a pinned watcher (`watcher_device_owner`). + * Pinning is the owner's consent, so a device may finish a run it was attached + * to in any org, mirroring the claim-side scope. + */ +export function runInWorkerScope( + run: { + organization_id: string; + device_owner: string | null; + watcher_device_owner: string | null; + }, + ctx: { workerUserId: string | null; orgIds: string[] } +): boolean { + if (ctx.orgIds.includes(run.organization_id)) return true; + if (!ctx.workerUserId) return false; + return ( + run.device_owner === ctx.workerUserId || run.watcher_device_owner === ctx.workerUserId + ); +} diff --git a/packages/server/src/worker-api.ts b/packages/server/src/worker-api.ts index f481e93c3..8f2ce7627 100644 --- a/packages/server/src/worker-api.ts +++ b/packages/server/src/worker-api.ts @@ -40,6 +40,7 @@ import { getNextNumericId } from './tools/admin/helpers/db-helpers'; import { reconcileDeviceCapabilities } from './worker-api/device-reconcile'; import { findBundledConnectorFile } from './utils/connector-catalog'; import { resolveConnectorCode } from './utils/ensure-connector-installed'; +import { resolveDeviceClaimableOrgs, runInWorkerScope } from './utils/device-claimable-orgs'; import { applyEntityLinks } from './utils/entity-link-upsert'; import { errorMessage } from './utils/errors'; import { validateConnectorEventSemanticType } from './utils/event-kind-validation'; @@ -96,10 +97,14 @@ async function authorizeRunForWorker( const orgIds = c.var.workerOrgIds ?? []; const sql = getDb(); const rows = (await sql` - SELECT r.status, r.claimed_by, r.organization_id, dw.user_id AS device_owner + SELECT r.status, r.claimed_by, r.organization_id, + dw.user_id AS device_owner, + wdw.user_id AS watcher_device_owner FROM runs r LEFT JOIN connections con ON con.id = r.connection_id LEFT JOIN device_workers dw ON dw.id = con.device_worker_id + LEFT JOIN watchers w ON w.id = r.watcher_id + LEFT JOIN device_workers wdw ON wdw.id = w.device_worker_id WHERE r.id = ${runId} LIMIT 1 `) as unknown as Array<{ @@ -107,14 +112,16 @@ async function authorizeRunForWorker( claimed_by: string | null; organization_id: string; device_owner: string | null; + watcher_device_owner: string | null; }>; if (rows.length === 0) { return c.json({ error: 'Run not found' }, 404); } const run = rows[0]; - const inScope = - orgIds.includes(run.organization_id) || - (!!workerUserId && run.device_owner === workerUserId); + // Watcher runs pinned to a device the worker owns are in scope too (the pin + // is the owner's consent), so a device can FINISH a cross-org run it claimed — + // not just claim it. Without this the poll widening would 403 on completion. + const inScope = runInWorkerScope(run, { workerUserId, orgIds }); if (!inScope) { return c.json({ error: 'Forbidden' }, 403); } @@ -385,18 +392,54 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { // middleware. Trusted workers (matched WORKER_API_TOKEN) and anonymous // local-dev requests see all pending runs — preserving the existing // server-side worker fleet behavior. + // + // Cross-org device pins: also let the device claim runs in any org where it + // has a pinned watcher/connection AND its owner is still a member of that + // org. The pin IS the owner's consent — `evaluateDeviceWorkerAccess` only + // lets a device's owner attach it — so this keeps the device anchored to its + // home + personal org while serving watchers it was explicitly attached to in + // other orgs the owner belongs to. The membership join revokes access + // automatically if the owner later leaves the org. Within-org claiming still + // follows the pinned/capability rules below, so the device only ever runs the + // resource it was actually pinned to. + let claimableOrgIds = effectiveWorkerOrgIds; + if (isUserScopedWorker && deviceWorkerId && effectiveWorkerUserId) { + try { + claimableOrgIds = await resolveDeviceClaimableOrgs(sql, { + deviceWorkerId, + ownerUserId: effectiveWorkerUserId, + baseOrgIds: effectiveWorkerOrgIds ?? [], + }); + } catch (err) { + // Non-fatal: fall back to the base [bound, personal] scope. + logger.warn( + { worker_id, err: errorMessage(err) }, + '[pollWorkerJob] cross-org pinned-scope lookup failed' + ); + } + } // Org scope applies to every device (user-scoped) worker — including a - // re-anchored local device, whose org is effectiveWorkerOrgIds. A signed-in + // re-anchored local device, whose org is claimableOrgIds. A signed-in // worker with no org in scope can claim nothing; a re-anchored device with no // org falls through to the empty-array gate (claims only by capability). - if (c.var.workerAuthMode === 'user' && (!effectiveWorkerOrgIds || effectiveWorkerOrgIds.length === 0)) { + if (c.var.workerAuthMode === 'user' && (!claimableOrgIds || claimableOrgIds.length === 0)) { // No org in scope — nothing this worker can ever claim. return c.json({ next_poll_seconds: 30 }); } const orgScopeActive = isUserScopedWorker; // Always pass a non-empty array to ANY() to keep the SQL valid; the gate // below only activates when orgScopeActive is true. - const orgScopeIds = orgScopeActive && effectiveWorkerOrgIds ? effectiveWorkerOrgIds : ['']; + // + // Two scopes: `orgScopeIds` (widened with cross-org pins) gates the + // explicitly-PINNED claim branches — the pin is the owner's consent. + // `baseOrgScopeIds` (token's bound + personal org only) gates the UNPINNED + // capability-matched branch, so a single pin in org B does NOT also let the + // device claim unrelated unpinned device-connector runs in org B. + const orgScopeIds = orgScopeActive && claimableOrgIds ? claimableOrgIds : ['']; + const baseOrgScopeIds = + orgScopeActive && effectiveWorkerOrgIds && effectiveWorkerOrgIds.length > 0 + ? effectiveWorkerOrgIds + : ['']; const claimNextPendingRun = async () => sql.begin(async (tx) => { @@ -440,7 +483,7 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { AND cd.required_capability IS NOT NULL AND cd.required_capability = ANY(${pgTextArray(authorizedCapabilities)}::text[]) AND con.device_worker_id IS NULL - AND r.organization_id = ANY(${pgTextArray(orgScopeIds)}::text[]) + AND r.organization_id = ANY(${pgTextArray(baseOrgScopeIds)}::text[]) ) -- ... or any connection explicitly pinned to THIS device (this is -- "run the Reddit connector on my Mac"). Still: a device-only @@ -3171,9 +3214,21 @@ export async function triggerWatcherForDevice(c: Context<{ Bindings: Env }>) { return c.json({ error: 'Watcher not found' }, 404); } + // Org scope: the watcher's org must be in the caller's base scope OR be a + // cross-org pin the caller still has access to. The pin to THIS device is + // verified next (the consent); here we just confirm membership of the + // watcher's org for the cross-org case, mirroring the poll's membership gate. const orgIds = c.var.workerOrgIds ?? []; if (!orgIds.includes(watcher.organization_id)) { - return c.json({ error: 'Forbidden' }, 403); + // workerUserId is guaranteed non-null by the guard above. + const memberRows = (await sql` + SELECT 1 FROM "member" + WHERE "organizationId" = ${watcher.organization_id} AND "userId" = ${workerUserId} + LIMIT 1 + `) as unknown as Array; + if (memberRows.length === 0) { + return c.json({ error: 'Forbidden' }, 403); + } } if (!watcher.device_worker_id || watcher.device_worker_id !== resolvedDeviceWorkerId) { return c.json({ error: 'Watcher is not pinned to this device' }, 403);