Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,12 @@ CREATE TABLE public.personal_access_tokens (

COMMENT ON TABLE public.personal_access_tokens IS 'Personal Access Tokens for workers, CLI tools, and MCP clients';

--
-- Name: COLUMN personal_access_tokens.worker_id; Type: COMMENT; Schema: public; Owner: -
--

COMMENT ON COLUMN public.personal_access_tokens.worker_id IS 'Optional binding to a specific device_workers.worker_id. Set by /api/me/devices/mint-child-token. When non-NULL, /api/workers/poll requires the request body''s worker_id to match.';

--
-- Name: personal_access_tokens_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
Expand Down Expand Up @@ -3596,6 +3602,12 @@ CREATE INDEX idx_notification_targets_user_all ON public.notification_targets US

CREATE INDEX idx_notification_targets_user_unread ON public.notification_targets USING btree (user_id, delivered_at DESC) WHERE (read_at IS NULL);

--
-- Name: idx_personal_access_tokens_worker_id; Type: INDEX; Schema: public; Owner: -
--

CREATE INDEX idx_personal_access_tokens_worker_id ON public.personal_access_tokens USING btree (worker_id) WHERE (worker_id IS NOT NULL);

--
-- Name: idx_runs_active_auth_per_profile; Type: INDEX; Schema: public; Owner: -
--
Expand Down Expand Up @@ -3998,13 +4010,6 @@ CREATE INDEX personal_access_tokens_active_idx ON public.personal_access_tokens

CREATE INDEX personal_access_tokens_organization_id_idx ON public.personal_access_tokens USING btree (organization_id);


--
-- Name: idx_personal_access_tokens_worker_id; Type: INDEX; Schema: public; Owner: -
--

CREATE INDEX idx_personal_access_tokens_worker_id ON public.personal_access_tokens USING btree (worker_id) WHERE (worker_id IS NOT NULL);

--
-- Name: personal_access_tokens_token_hash_idx; Type: INDEX; Schema: public; Owner: -
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import { beforeAll, describe, expect, it } from 'vitest';
import {
addUserToOrganization,
createTestAgent,
createTestOrganization,
createTestUser,
} from '../../setup/test-fixtures';
Expand Down Expand Up @@ -36,6 +37,7 @@ describe('classifier CRUD', () => {
})) as { entity: { id: number } };
entityId = entity.entity.id;

const agent = await createTestAgent({ organizationId: org.id, ownerUserId: user.id });
const w = (await owner.watchers.create({
entity_id: entityId,
slug: 'cls-watcher',
Expand All @@ -45,6 +47,7 @@ describe('classifier CRUD', () => {
type: 'object',
properties: { signal: { type: 'string' } },
},
agent_id: agent.agentId,
})) as { watcher_id: string };
watcherId = Number(w.watcher_id);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import { beforeAll, describe, expect, it } from 'vitest';
import { cleanupTestDatabase, getTestDb } from '../../setup/test-db';
import { createTestEvent } from '../../setup/test-fixtures';
import { createTestAgent, createTestEvent } from '../../setup/test-fixtures';
import { TestWorkspace } from '../../setup/test-mcp-client';

const stubEmbedding = Array.from({ length: 768 }, () => 0);
Expand Down Expand Up @@ -35,12 +35,17 @@ async function seedClassifier(workspace: TestWorkspace, slug: string): Promise<S
name: `${slug} Target`,
})) as { entity: { id: number } };

const agent = await createTestAgent({
organizationId: workspace.org.id,
ownerUserId: workspace.users.owner.id,
});
const watcher = (await workspace.owner.watchers.create({
entity_id: entity.entity.id,
slug: `${slug}-watcher`,
name: `${slug} Watcher`,
prompt: 'collect signals.',
extraction_schema: { type: 'object', properties: { signal: { type: 'string' } } },
agent_id: agent.agentId,
})) as { watcher_id: string };

const created = (await workspace.owner.classifiers.create({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { beforeAll, beforeEach, describe, expect, it } from 'vitest';
import { manageWatchers } from '../../../tools/admin/manage_watchers';
import type { ToolContext } from '../../../tools/registry';
import { cleanupTestDatabase, getTestDb } from '../../setup/test-db';
import { createTestEntity } from '../../setup/test-fixtures';
import { createTestAgent, createTestEntity } from '../../setup/test-fixtures';
import { TestWorkspace } from '../../setup/test-mcp-client';

function ownerCtx(workspace: TestWorkspace): ToolContext {
Expand All @@ -35,6 +35,10 @@ async function seedWatcher(workspace: TestWorkspace, suffix: string) {
organization_id: workspace.org.id,
created_by: workspace.users.owner.id,
});
const agent = await createTestAgent({
organizationId: workspace.org.id,
ownerUserId: workspace.users.owner.id,
});
const watcher = (await workspace.owner.watchers.create({
entity_id: entity.id,
slug: `feedback-watcher-${suffix}`,
Expand All @@ -52,6 +56,7 @@ async function seedWatcher(workspace: TestWorkspace, suffix: string) {
},
},
},
agent_id: agent.agentId,
})) as { watcher_id: string };

const [window] = await getTestDb()`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import type { ToolContext } from '../../../tools/registry';
import { createWatcherRun } from '../../../utils/queue-helpers';
import { parseWatcherRunPayload } from '../../../watchers/automation';
import { cleanupTestDatabase, getTestDb } from '../../setup/test-db';
import { createTestEntity } from '../../setup/test-fixtures';
import { createTestAgent, createTestEntity } from '../../setup/test-fixtures';
import { TestWorkspace } from '../../setup/test-mcp-client';

function ownerCtx(workspace: TestWorkspace): ToolContext {
Expand All @@ -46,6 +46,10 @@ async function seedRootWatcher(workspace: TestWorkspace, suffix: string) {
organization_id: workspace.org.id,
created_by: workspace.users.owner.id,
});
const agent = await createTestAgent({
organizationId: workspace.org.id,
ownerUserId: workspace.users.owner.id,
});
const watcher = (await workspace.owner.watchers.create({
entity_id: entity.id,
slug: `digest-${suffix}`,
Expand All @@ -57,6 +61,7 @@ async function seedRootWatcher(workspace: TestWorkspace, suffix: string) {
required: ['summary'],
},
schedule: '0 9 * * *',
agent_id: agent.agentId,
})) as { watcher_id: string };
return { watcherId: Number(watcher.watcher_id), entityId: entity.id };
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import { beforeAll, describe, expect, it } from 'vitest';
import {
addUserToOrganization,
createTestAgent,
createTestOrganization,
createTestUser,
} from '../../setup/test-fixtures';
Expand All @@ -19,6 +20,7 @@ describe('watcher CRUD', () => {
let owner: TestApiClient;
let intruder: TestApiClient;
let entityId: number;
let agentId: string;

beforeAll(async () => {
await cleanupTestDatabase();
Expand All @@ -30,6 +32,8 @@ describe('watcher CRUD', () => {
userId: user.id,
memberRole: 'owner',
});
const agent = await createTestAgent({ organizationId: org.id, ownerUserId: user.id });
agentId = agent.agentId;

const otherOrg = await createTestOrganization({ name: 'Watcher Other Org' });
const otherUser = await createTestUser({ email: 'watcher-other@test.com' });
Expand Down Expand Up @@ -59,6 +63,7 @@ describe('watcher CRUD', () => {
properties: { launches: { type: 'array', items: { type: 'string' } } },
},
schedule: '0 9 * * *',
agent_id: agentId,
})) as { watcher_id: string };
const watcherId = created.watcher_id;
expect(watcherId).toBeDefined();
Expand Down Expand Up @@ -90,6 +95,7 @@ describe('watcher CRUD', () => {
type: 'object',
properties: { signals: { type: 'array', items: { type: 'string' } } },
},
agent_id: agentId,
})) as { watcher_id: string };
expect(created.watcher_id).toBeDefined();

Expand All @@ -109,6 +115,7 @@ describe('watcher CRUD', () => {
name: 'No Org',
prompt: 'should fail',
extraction_schema: { type: 'object', properties: {} },
agent_id: agentId,
})
).rejects.toThrow(/organization|entity_id/i);
});
Expand All @@ -122,6 +129,7 @@ describe('watcher CRUD', () => {
type: 'object',
properties: { signals: { type: 'array', items: { type: 'string' } } },
},
agent_id: agentId,
})) as { watcher_id: string };

await expect(intruder.watchers.get(created.watcher_id)).rejects.toThrow(
Expand Down Expand Up @@ -155,6 +163,7 @@ describe('watcher CRUD', () => {
type: 'object',
properties: { signal: { type: 'string' } },
},
agent_id: agentId,
})) as { watcher_id: string };

const member = owner.withAuth({ memberRole: 'member' });
Expand Down
19 changes: 12 additions & 7 deletions packages/server/src/tools/admin/manage_feeds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,11 @@ async function handleListFeeds(
const offset = args.offset ?? 0;

// Build the filtered "page" of feeds first, then compute event_count in a
// single GROUP BY over the (connection_id, feed_key) pairs in that page.
// The previous shape ran a correlated `SELECT COUNT(*) FROM current_event_records`
// per row, which is O(N feeds) × an anti-join over the entire events table —
// ~880ms per feed on a busy connection. Batching collapses it to one scan.
// single GROUP BY restricted to the (connection_id, feed_key) tuples on
// that page. The previous shape ran a correlated
// `SELECT COUNT(*) FROM current_event_records` per row — O(N feeds) ×
// an anti-join over the entire events table — ~880ms per feed on a busy
// connection. Batching collapses it to one scan.
let pageQuery = sql`
SELECT f.*
FROM feeds f
Expand Down Expand Up @@ -184,10 +185,14 @@ async function handleListFeeds(
SELECT e.connection_id, e.feed_key, COUNT(*)::int AS event_count
FROM events e
WHERE e.organization_id = ${organizationId}
-- ANY(ARRAY(...)) keeps the planner on a single index scan per
-- distinct connection. IN (subquery) or a join causes Postgres to
-- re-scan the connection_id index per (connection, feed_key) pair.
-- ANY(ARRAY(...)) on each column lets the planner stay on
-- per-column index scans and intersect, rather than re-scanning
-- the connection_id index per (connection, feed_key) pair the
-- way IN (subquery) on a tuple would. The feed_key ANY narrows
-- the scan to the keys actually on this page; the final LEFT
-- JOIN drops any over-count from the cross-product.
AND e.connection_id = ANY(ARRAY(SELECT DISTINCT connection_id FROM page))
AND e.feed_key = ANY(ARRAY(SELECT DISTINCT feed_key FROM page WHERE feed_key IS NOT NULL))
AND NOT EXISTS (SELECT 1 FROM events n WHERE n.supersedes_event_id = e.id)
GROUP BY e.connection_id, e.feed_key
)
Expand Down
2 changes: 1 addition & 1 deletion packages/web
Submodule web updated from 2d2f5b to b01682
Loading