Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* resolveDeviceClaimableOrgs — cross-org device pin scoping.
*
* A user-scoped device worker's base scope is [token's bound org, personal org].
* This helper widens it to orgs where the device has an active pin AND its owner
* is still a member. Pinning is the owner's consent (see watcher-device-access);
* the membership join revokes scope when the owner leaves the org.
*/
import { beforeEach, describe, expect, it } from 'vitest';
import { resolveDeviceClaimableOrgs } from '../../../utils/device-claimable-orgs';
import { getNextNumericId } from '../../../tools/admin/helpers/db-helpers';
import { cleanupTestDatabase, getTestDb } from '../../setup/test-db';
import {
addUserToOrganization,
createTestAgent,
createTestOrganization,
createTestUser,
} from '../../setup/test-fixtures';

const sql = getTestDb();

async function insertDevice(userId: string, orgId: string): Promise<string> {
const workerId = `mac-${Math.random().toString(36).slice(2, 10)}`;
const rows = (await sql`
INSERT INTO device_workers (user_id, worker_id, platform, capabilities, label, organization_id)
VALUES (${userId}, ${workerId}, 'macos', ${sql.json({})}, 'Test Mac', ${orgId})
RETURNING id
`) as unknown as Array<{ id: string }>;
return String(rows[0].id);
}

async function pinWatcher(opts: {
orgId: string;
agentId: string;
deviceWorkerId: string;
createdBy: string;
status?: string;
}): Promise<void> {
const id = await getNextNumericId(sql, 'watchers');
await sql`
INSERT INTO watchers (
id, status, created_by, organization_id, agent_id, watcher_group_id,
notification_channel, notification_priority, min_cooldown_seconds,
device_worker_id, slug, created_at, updated_at
) VALUES (
${id}, ${opts.status ?? 'active'}, ${opts.createdBy}, ${opts.orgId}, ${opts.agentId}, ${id},
'notification', 'normal', 300, ${opts.deviceWorkerId}, ${`w-${id}`}, NOW(), NOW()
)
`;
}

describe('resolveDeviceClaimableOrgs (cross-org device pins)', () => {
beforeEach(async () => {
await cleanupTestDatabase();
});

it('adds an org with an active pin when the owner is a member, and excludes a non-member org', async () => {
const user = await createTestUser();
const orgA = await createTestOrganization(); // base scope (bound/personal)
const orgB = await createTestOrganization(); // member + pinned -> included
const orgC = await createTestOrganization(); // pinned but NOT a member -> excluded
await addUserToOrganization(user.id, orgA.id, 'owner');
await addUserToOrganization(user.id, orgB.id, 'owner');
// intentionally NOT a member of orgC

const deviceWorkerId = await insertDevice(user.id, orgA.id);
const agentB = await createTestAgent({ organizationId: orgB.id, ownerUserId: user.id });
const agentC = await createTestAgent({ organizationId: orgC.id, ownerUserId: user.id });
await pinWatcher({ orgId: orgB.id, agentId: agentB.agentId, deviceWorkerId, createdBy: user.id });
await pinWatcher({ orgId: orgC.id, agentId: agentC.agentId, deviceWorkerId, createdBy: user.id });

const result = await resolveDeviceClaimableOrgs(sql, {
deviceWorkerId,
ownerUserId: user.id,
baseOrgIds: [orgA.id],
});

expect(result).toContain(orgA.id); // base scope always present
expect(result).toContain(orgB.id); // pinned + member
expect(result).not.toContain(orgC.id); // pinned but not a member
});

it('does not grant scope from an archived watcher pin', async () => {
const user = await createTestUser();
const orgA = await createTestOrganization();
const orgB = await createTestOrganization();
await addUserToOrganization(user.id, orgA.id, 'owner');
await addUserToOrganization(user.id, orgB.id, 'owner');

const deviceWorkerId = await insertDevice(user.id, orgA.id);
const agentB = await createTestAgent({ organizationId: orgB.id, ownerUserId: user.id });
await pinWatcher({
orgId: orgB.id,
agentId: agentB.agentId,
deviceWorkerId,
createdBy: user.id,
status: 'archived',
});

const result = await resolveDeviceClaimableOrgs(sql, {
deviceWorkerId,
ownerUserId: user.id,
baseOrgIds: [orgA.id],
});

expect(result).toEqual([orgA.id]); // archived pin grants nothing
});
});
43 changes: 43 additions & 0 deletions packages/server/src/__tests__/unit/run-in-worker-scope.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { describe, expect, it } from 'vitest';
import { runInWorkerScope } from '../../utils/device-claimable-orgs';

const base = { organization_id: 'orgB', device_owner: null, watcher_device_owner: null };

describe('runInWorkerScope', () => {
it('in scope when the run org is in the base scope', () => {
expect(runInWorkerScope(base, { workerUserId: 'u1', orgIds: ['orgB'] })).toBe(true);
});

it('in scope when the worker owns the run\'s pinned connection device', () => {
expect(
runInWorkerScope(
{ ...base, device_owner: 'u1' },
{ workerUserId: 'u1', orgIds: ['orgA'] }
)
).toBe(true);
});

it('in scope when the worker owns the run\'s pinned watcher device (cross-org)', () => {
expect(
runInWorkerScope(
{ ...base, watcher_device_owner: 'u1' },
{ workerUserId: 'u1', orgIds: ['orgA'] }
)
).toBe(true);
});

it('forbidden when org is out of scope and the worker owns neither pinned device', () => {
expect(
runInWorkerScope(
{ ...base, device_owner: 'someone-else', watcher_device_owner: 'someone-else' },
{ workerUserId: 'u1', orgIds: ['orgA'] }
)
).toBe(false);
});

it('forbidden when there is no worker user and the org is out of scope', () => {
expect(
runInWorkerScope({ ...base, watcher_device_owner: 'u1' }, { workerUserId: null, orgIds: [] })
).toBe(false);
});
});
67 changes: 67 additions & 0 deletions packages/server/src/utils/device-claimable-orgs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Resolve which orgs a user-scoped device worker may claim runs in.
*
* Base scope (computed in the /api/workers/* auth middleware) is the token's
* bound org plus the user's personal org. On top of that, a device may claim
* runs in any org where it has a pinned watcher/connection AND its owner is
* still a member of that org.
*
* The pin IS the consent: `evaluateDeviceWorkerAccess` only lets a device's
* owner attach it to a resource, so a pin in org B means the owner explicitly
* opted this device into serving org B. The membership join means access is
* revoked automatically if the owner later leaves the org. Within-org claiming
* still follows the pinned/capability rules in the poll, so the device only
* ever runs the resource it was actually pinned to.
*
* Only `active` watchers and non-deleted connections count — an archived
* watcher or deleted connection must not keep an org in scope.
*/
import type { DbClient } from '../db/client';

export async function resolveDeviceClaimableOrgs(
sql: DbClient,
params: { deviceWorkerId: string; ownerUserId: string; baseOrgIds: string[] }
): Promise<string[]> {
const rows = (await sql`
SELECT DISTINCT src.organization_id
FROM (
SELECT organization_id FROM watchers
WHERE device_worker_id = ${params.deviceWorkerId} AND status = 'active'
UNION
SELECT organization_id FROM connections
WHERE device_worker_id = ${params.deviceWorkerId} AND deleted_at IS NULL
) src
JOIN "member" m
ON m."organizationId" = src.organization_id AND m."userId" = ${params.ownerUserId}
WHERE src.organization_id IS NOT NULL
`) as unknown as Array<{ organization_id: string }>;

const pinnedOrgIds = rows
.map((r) => r.organization_id)
.filter((id): id is string => typeof id === 'string' && id.length > 0);

return Array.from(new Set([...params.baseOrgIds, ...pinnedOrgIds]));
}

/**
* Whether a user-scoped device worker may act on a run (claim / complete /
* heartbeat). True when the run's org is in the worker's base scope, OR the
* worker's user owns the device the run is pinned to — via either a pinned
* connection (`device_owner`) or a pinned watcher (`watcher_device_owner`).
* Pinning is the owner's consent, so a device may finish a run it was attached
* to in any org, mirroring the claim-side scope.
*/
export function runInWorkerScope(
run: {
organization_id: string;
device_owner: string | null;
watcher_device_owner: string | null;
},
ctx: { workerUserId: string | null; orgIds: string[] }
): boolean {
if (ctx.orgIds.includes(run.organization_id)) return true;
if (!ctx.workerUserId) return false;
return (
run.device_owner === ctx.workerUserId || run.watcher_device_owner === ctx.workerUserId
);
Comment on lines +54 to +66
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Reapply org-membership checks for owned cross-org runs.

This helper now treats device_owner / watcher_device_owner as sufficient on their own, but resolveDeviceClaimableOrgs() only grants cross-org scope while the owner is still a member. Once membership is revoked, authorizeRunForWorker() will still allow heartbeat/completion for already-running cross-org runs because this path ignores the resolved claimable-org set entirely.

[suggested fix: have completion/heartbeat authorization resolve the worker's current claimable orgs and only allow the ownership fallback when run.organization_id is still in that set.]

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/server/src/utils/device-claimable-orgs.ts` around lines 54 - 66, The
runInWorkerScope helper currently authorizes based solely on
device_owner/watcher_device_owner, letting authorizeRunForWorker accept
cross-org runs even after membership is revoked; update runInWorkerScope to
consult the worker's current claimable orgs via resolveDeviceClaimableOrgs() (or
accept a ctx.claimableOrgIds array) and only allow the ownership fallback when
run.organization_id is still contained in that resolved set—i.e., first check
ctx.orgIds/includes(run.organization_id), then if ctx.workerUserId is present
resolve the worker's claimable org IDs and permit the
device_owner/watcher_device_owner match only when run.organization_id is in that
resolved claimable set.

}
73 changes: 64 additions & 9 deletions packages/server/src/worker-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import { getNextNumericId } from './tools/admin/helpers/db-helpers';
import { reconcileDeviceCapabilities } from './worker-api/device-reconcile';
import { findBundledConnectorFile } from './utils/connector-catalog';
import { resolveConnectorCode } from './utils/ensure-connector-installed';
import { resolveDeviceClaimableOrgs, runInWorkerScope } from './utils/device-claimable-orgs';
import { applyEntityLinks } from './utils/entity-link-upsert';
import { errorMessage } from './utils/errors';
import { validateConnectorEventSemanticType } from './utils/event-kind-validation';
Expand Down Expand Up @@ -96,25 +97,31 @@ async function authorizeRunForWorker(
const orgIds = c.var.workerOrgIds ?? [];
const sql = getDb();
const rows = (await sql`
SELECT r.status, r.claimed_by, r.organization_id, dw.user_id AS device_owner
SELECT r.status, r.claimed_by, r.organization_id,
dw.user_id AS device_owner,
wdw.user_id AS watcher_device_owner
FROM runs r
LEFT JOIN connections con ON con.id = r.connection_id
LEFT JOIN device_workers dw ON dw.id = con.device_worker_id
LEFT JOIN watchers w ON w.id = r.watcher_id
LEFT JOIN device_workers wdw ON wdw.id = w.device_worker_id
WHERE r.id = ${runId}
LIMIT 1
`) as unknown as Array<{
status: string;
claimed_by: string | null;
organization_id: string;
device_owner: string | null;
watcher_device_owner: string | null;
}>;
if (rows.length === 0) {
return c.json({ error: 'Run not found' }, 404);
}
const run = rows[0];
const inScope =
orgIds.includes(run.organization_id) ||
(!!workerUserId && run.device_owner === workerUserId);
// Watcher runs pinned to a device the worker owns are in scope too (the pin
// is the owner's consent), so a device can FINISH a cross-org run it claimed —
// not just claim it. Without this the poll widening would 403 on completion.
const inScope = runInWorkerScope(run, { workerUserId, orgIds });
if (!inScope) {
return c.json({ error: 'Forbidden' }, 403);
}
Expand Down Expand Up @@ -385,18 +392,54 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) {
// middleware. Trusted workers (matched WORKER_API_TOKEN) and anonymous
// local-dev requests see all pending runs — preserving the existing
// server-side worker fleet behavior.
//
// Cross-org device pins: also let the device claim runs in any org where it
// has a pinned watcher/connection AND its owner is still a member of that
// org. The pin IS the owner's consent — `evaluateDeviceWorkerAccess` only
// lets a device's owner attach it — so this keeps the device anchored to its
// home + personal org while serving watchers it was explicitly attached to in
// other orgs the owner belongs to. The membership join revokes access
// automatically if the owner later leaves the org. Within-org claiming still
// follows the pinned/capability rules below, so the device only ever runs the
// resource it was actually pinned to.
let claimableOrgIds = effectiveWorkerOrgIds;
if (isUserScopedWorker && deviceWorkerId && effectiveWorkerUserId) {
try {
claimableOrgIds = await resolveDeviceClaimableOrgs(sql, {
deviceWorkerId,
ownerUserId: effectiveWorkerUserId,
baseOrgIds: effectiveWorkerOrgIds ?? [],
});
} catch (err) {
// Non-fatal: fall back to the base [bound, personal] scope.
logger.warn(
{ worker_id, err: errorMessage(err) },
'[pollWorkerJob] cross-org pinned-scope lookup failed'
);
}
}
// Org scope applies to every device (user-scoped) worker — including a
// re-anchored local device, whose org is effectiveWorkerOrgIds. A signed-in
// re-anchored local device, whose org is claimableOrgIds. A signed-in
// worker with no org in scope can claim nothing; a re-anchored device with no
// org falls through to the empty-array gate (claims only by capability).
if (c.var.workerAuthMode === 'user' && (!effectiveWorkerOrgIds || effectiveWorkerOrgIds.length === 0)) {
if (c.var.workerAuthMode === 'user' && (!claimableOrgIds || claimableOrgIds.length === 0)) {
// No org in scope — nothing this worker can ever claim.
return c.json({ next_poll_seconds: 30 });
}
const orgScopeActive = isUserScopedWorker;
// Always pass a non-empty array to ANY() to keep the SQL valid; the gate
// below only activates when orgScopeActive is true.
const orgScopeIds = orgScopeActive && effectiveWorkerOrgIds ? effectiveWorkerOrgIds : [''];
//
// Two scopes: `orgScopeIds` (widened with cross-org pins) gates the
// explicitly-PINNED claim branches — the pin is the owner's consent.
// `baseOrgScopeIds` (token's bound + personal org only) gates the UNPINNED
// capability-matched branch, so a single pin in org B does NOT also let the
// device claim unrelated unpinned device-connector runs in org B.
const orgScopeIds = orgScopeActive && claimableOrgIds ? claimableOrgIds : [''];
const baseOrgScopeIds =
orgScopeActive && effectiveWorkerOrgIds && effectiveWorkerOrgIds.length > 0
? effectiveWorkerOrgIds
: [''];

const claimNextPendingRun = async () =>
sql.begin(async (tx) => {
Expand Down Expand Up @@ -440,7 +483,7 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) {
AND cd.required_capability IS NOT NULL
AND cd.required_capability = ANY(${pgTextArray(authorizedCapabilities)}::text[])
AND con.device_worker_id IS NULL
AND r.organization_id = ANY(${pgTextArray(orgScopeIds)}::text[])
AND r.organization_id = ANY(${pgTextArray(baseOrgScopeIds)}::text[])
)
-- ... or any connection explicitly pinned to THIS device (this is
-- "run the Reddit connector on my Mac"). Still: a device-only
Expand Down Expand Up @@ -3171,9 +3214,21 @@ export async function triggerWatcherForDevice(c: Context<{ Bindings: Env }>) {
return c.json({ error: 'Watcher not found' }, 404);
}

// Org scope: the watcher's org must be in the caller's base scope OR be a
// cross-org pin the caller still has access to. The pin to THIS device is
// verified next (the consent); here we just confirm membership of the
// watcher's org for the cross-org case, mirroring the poll's membership gate.
const orgIds = c.var.workerOrgIds ?? [];
if (!orgIds.includes(watcher.organization_id)) {
return c.json({ error: 'Forbidden' }, 403);
// workerUserId is guaranteed non-null by the guard above.
const memberRows = (await sql`
SELECT 1 FROM "member"
WHERE "organizationId" = ${watcher.organization_id} AND "userId" = ${workerUserId}
LIMIT 1
`) as unknown as Array<unknown>;
if (memberRows.length === 0) {
return c.json({ error: 'Forbidden' }, 403);
}
}
if (!watcher.device_worker_id || watcher.device_worker_id !== resolvedDeviceWorkerId) {
return c.json({ error: 'Watcher is not pinned to this device' }, 403);
Expand Down
Loading