Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/owletto
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,30 @@

import { beforeEach, describe, expect, it } from 'vitest';
import type { DbClient } from '../../../db/client';
import type { Env } from '../../../index';
import { generateSecureToken, hashToken } from '../../../auth/oauth/utils';
import { manageWatchers } from '../../../tools/admin/manage_watchers';
import type { ToolContext } from '../../../tools/registry';
import { cleanupTestDatabase, getTestDb } from '../../setup/test-db';
import { createTestAgent, createTestEntity } from '../../setup/test-fixtures';
import { post } from '../../setup/test-helpers';
import { TestWorkspace } from '../../setup/test-mcp-client';

function ownerCtx(workspace: TestWorkspace): ToolContext {
return {
organizationId: workspace.org.id,
userId: workspace.users.owner.id,
memberRole: 'owner',
agentId: null,
isAuthenticated: true,
clientId: null,
scopes: ['mcp:read', 'mcp:write', 'mcp:admin'],
tokenType: 'oauth',
scopedToOrg: true,
allowCrossOrg: false,
};
}

/**
* Mint a PAT bound to a specific device worker_id with `device_worker:run`
* scope — same shape as createWorkerBoundPat in the sibling automation
Expand Down Expand Up @@ -338,4 +356,46 @@ describe('POST /api/workers/me/watchers/:watcher_id/trigger', () => {
expect(job.run_type).toBe('watcher');
expect(job.payload?.watcher?.execution_config).toEqual(execCfg);
});

it('poll payload carries the run-pinned version prompt, not a later edit', async () => {
const ctx = await setupDevicePinnedWatcher({ workerId: 'mac-poll-prompt' });
const { token } = await createWorkerBoundPat(
ctx.workspace.users.owner.id,
ctx.workspace.org.id,
'mac-poll-prompt'
);

// Queue a run: createWatcherRun snapshots the current version (prompt v1)
// into approved_input.version_id.
const trig = await post(`/api/workers/me/watchers/${ctx.watcherId}/trigger`, { token });
expect(trig.status).toBe(200);

// Edit the watcher AFTER the run is queued: a new current version with a
// different prompt. The already-queued run must NOT pick this up.
const v2 = (await manageWatchers(
{
action: 'create_version',
watcher_id: String(ctx.watcherId),
prompt: 'EDITED-AFTER-QUEUE prompt v2.',
change_notes: 'edit after queue',
} as never,
{} as Env,
ownerCtx(ctx.workspace)
)) as { version: number };
expect(v2.version).toBe(2);

// The poll must deliver the prompt of the version snapshotted at queue time
// (v1) — the same version complete_window validates against — not the edit.
const pollRes = await post('/api/workers/poll', {
token,
body: { worker_id: 'mac-poll-prompt', capabilities: {} },
});
expect(pollRes.status).toBe(200);
const job = (await pollRes.json()) as {
run_type?: string;
payload?: { watcher?: { prompt?: string } };
};
expect(job.run_type).toBe('watcher');
expect(job.payload?.watcher?.prompt).toBe('Summarize {{entities}}.');
});
});
17 changes: 16 additions & 1 deletion packages/server/src/worker-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,8 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) {
w.agent_kind AS watcher_agent_kind,
w.notification_channel AS watcher_notification_channel,
w.notification_priority AS watcher_notification_priority,
w.execution_config AS watcher_execution_config
w.execution_config AS watcher_execution_config,
wv.prompt AS watcher_prompt
FROM runs r
LEFT JOIN feeds f ON f.id = r.feed_id
LEFT JOIN connections conn ON conn.id = r.connection_id
Expand All @@ -540,6 +541,9 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) {
AND cd.status = 'active'
LEFT JOIN auth_profiles ap ON ap.id = r.auth_profile_id
LEFT JOIN watchers w ON w.id = r.watcher_id
LEFT JOIN watcher_versions wv
ON wv.id = COALESCE((r.approved_input->>'version_id')::bigint, w.current_version_id)
AND wv.watcher_id = w.watcher_group_id
WHERE r.id = ${runId}
LIMIT 1
`;
Expand Down Expand Up @@ -605,6 +609,7 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) {
watcher_notification_channel: string | null;
watcher_notification_priority: string | null;
watcher_execution_config: Record<string, unknown> | null;
watcher_prompt: string | null;
// Auth run fields
run_auth_profile_id: number | null;
auth_profile_auth_data: Record<string, unknown> | null;
Expand Down Expand Up @@ -643,6 +648,16 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) {
notification_channel: row.watcher_notification_channel ?? 'canvas',
notification_priority: row.watcher_notification_priority ?? 'normal',
execution_config: row.watcher_execution_config ?? null,
// The prompt of the version this run was pinned to at creation
// (run's snapshotted approved_input.version_id, else the watcher's
// current_version_id) — same source complete_window validates
// against, so a watcher edited after the run was queued doesn't swap
// the prompt mid-flight. Device-local executors had no other channel
// for the watcher's instructions (the payload shipped only
// id/name/slug), so a scheduled watcher's local CLI got a bare
// "process this" and improvised; shipping it lets the device run the
// real prompt. Null only if the watcher has no version row.
prompt: row.watcher_prompt ?? null,
},
event: {
trigger_event_id: null,
Expand Down
Loading