diff --git a/db/migrations/20260517040000_archive_orphan_watchers.sql b/db/migrations/20260517040000_archive_orphan_watchers.sql new file mode 100644 index 000000000..6bddaee33 --- /dev/null +++ b/db/migrations/20260517040000_archive_orphan_watchers.sql @@ -0,0 +1,30 @@ +-- migrate:up + +-- Archive watchers that are flagged `status = 'active'` but have no `agent_id`. +-- The scheduler (packages/server/src/watchers/automation.ts:469) already +-- filters them out with `WHERE w.agent_id IS NOT NULL`, so these rows are +-- zombies — visible in the API, redirect-bounced on the watcher detail +-- route, never actually executing. +-- +-- 2026-05-17 prod audit: 28 active orphans across 11 orgs (most originated +-- from older create paths before `agent_id` was wired in). Archiving them +-- aligns the stored status with their actual execution state and lets the +-- `/$owner/watchers/$watcherId` redirect logic stop silently sending users +-- to /agents. +-- +-- The write-time guard (this same PR, manage_watchers.ts) rejects new +-- watcher creates/updates that set a schedule without an agent, so the +-- orphan set can't grow again. + +UPDATE public.watchers +SET status = 'archived', + updated_at = now() +WHERE status = 'active' + AND agent_id IS NULL; + +-- migrate:down + +-- No-op: this is a one-shot data cleanup. The original rows can be restored +-- manually if needed by assigning an agent_id and flipping status back to +-- 'active'; without an agent, status='active' is meaningless to the +-- scheduler. diff --git a/db/migrations/20260517050000_watcher_agent_id_not_null.sql b/db/migrations/20260517050000_watcher_agent_id_not_null.sql new file mode 100644 index 000000000..abc45880d --- /dev/null +++ b/db/migrations/20260517050000_watcher_agent_id_not_null.sql @@ -0,0 +1,34 @@ +-- migrate:up + +-- Delete every watcher with `agent_id IS NULL` and enforce the column at +-- the schema level. These rows are leftovers from before the watcher +-- scheduler required `agent_id` (see watchers/automation.ts:469) — they +-- couldn't fire and the prior migration 20260517040000 had already +-- archived the active subset. Their dependent rows (windows, reactions, +-- versions, classifiers, field feedback) describe runtime state for +-- watchers that will never execute; ON DELETE CASCADE clears them. +-- `runs.watcher_id` is ON DELETE SET NULL, so the 21k historical run +-- records remain — they just lose the watcher linkage. +-- +-- Application-level guards in manage_watchers.ts (handleCreate + +-- handleCreateFromVersion) reject new inserts without `agent_id`; the +-- NOT NULL constraint below is the matching DB-level enforcement so +-- bypass paths can't reintroduce the zombie state. + +DELETE FROM public.watchers WHERE agent_id IS NULL; + +ALTER TABLE public.watchers + ALTER COLUMN agent_id SET NOT NULL; + +-- The existing index is partial (`WHERE agent_id IS NOT NULL`), which is +-- a tautology once the column is NOT NULL. Replace it with an unconditional +-- index so explain plans don't show the dead predicate. +DROP INDEX IF EXISTS public.idx_watchers_agent_id; +CREATE INDEX idx_watchers_agent_id ON public.watchers USING btree (agent_id); + +-- migrate:down + +ALTER TABLE public.watchers + ALTER COLUMN agent_id DROP NOT NULL; + +-- No restoring deleted rows — this is forward-only data cleanup. diff --git a/db/schema.sql b/db/schema.sql index 719265f7c..2465dc3d0 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -1991,7 +1991,7 @@ CREATE TABLE public.watchers ( current_version_id integer, schedule text, next_run_at timestamp with time zone, - agent_id text, + agent_id text NOT NULL, connection_id text, scheduler_client_id text, source_watcher_id integer, @@ -3774,7 +3774,7 @@ CREATE INDEX idx_watcher_windows_watcher ON public.watcher_windows USING btree ( -- Name: idx_watchers_agent_id; Type: INDEX; Schema: public; Owner: - -- -CREATE INDEX idx_watchers_agent_id ON public.watchers USING btree (agent_id) WHERE (agent_id IS NOT NULL); +CREATE INDEX idx_watchers_agent_id ON public.watchers USING btree (agent_id); -- -- Name: idx_watchers_connection_id; Type: INDEX; Schema: public; Owner: - @@ -4996,4 +4996,6 @@ INSERT INTO public.schema_migrations (version) VALUES ('20260516200100'), ('20260517010000'), ('20260517020000'), - ('20260517030000'); + ('20260517030000'), + ('20260517040000'), + ('20260517050000'); diff --git a/packages/server/src/tools/admin/manage_entity.ts b/packages/server/src/tools/admin/manage_entity.ts index 6a22666c6..2a55c0a87 100644 --- a/packages/server/src/tools/admin/manage_entity.ts +++ b/packages/server/src/tools/admin/manage_entity.ts @@ -17,7 +17,7 @@ */ import { type Static, Type } from '@sinclair/typebox'; -import { getDb } from '../../db/client'; +import { getDb, pgTextArray } from '../../db/client'; import type { Env } from '../../index'; import { batchLoadRelationships, @@ -295,6 +295,12 @@ type ManageEntityResult = space_name?: string | null; view_url?: string; }>; + // Server-side resolution of every `x-link-entity-type` column referenced + // by the page. Keyed by `${entityType}:${lookupField}`, then by the + // lookup value from the row's metadata. Previously each entity-list page + // fanned out one `manage_entity.list` per linked column (4× ~2.5 s on + // the Company page); the FE now reads from this map instead. + linked_entities?: Record>; metadata: { page_size: number; has_more: boolean; @@ -733,10 +739,14 @@ async function handleList( const schema = entityTypeRow?.metadata_schema as Record | null; const relSpecs = (schema?.['x-table-relationships'] ?? []) as RelationshipColumnSpec[]; const entityIds = entities.map((e) => e.id); - const relMap = + + // Batch-load relationships and linked-column lookups in parallel. + const [relMap, linkedEntities] = await Promise.all([ relSpecs.length > 0 && entityIds.length > 0 - ? await batchLoadRelationships(entityIds, relSpecs, ctx.organizationId) - : new Map(); + ? batchLoadRelationships(entityIds, relSpecs, ctx.organizationId) + : Promise.resolve(new Map()), + resolveLinkedColumns(entities, schema, ctx.organizationId), + ]); const baseUrl = getPublicWebUrl(ctx.requestUrl, ctx.baseUrl); const ownerSlug = await getOrganizationSlug(ctx.organizationId); @@ -779,6 +789,7 @@ async function handleList( ...(relMap.size > 0 && relMap.has(e.id) ? { relationships: relMap.get(e.id) } : {}), }; }), + ...(Object.keys(linkedEntities).length > 0 ? { linked_entities: linkedEntities } : {}), metadata: { page_size: entities.length, has_more: hasMore, @@ -792,6 +803,92 @@ async function handleList( }; } +/** + * Resolve every `x-link-entity-type` column on the schema to `{slug, name}` + * pairs in one batch per (entityType, lookupField). Replaces the previous + * FE pattern of `useQueries` fanning out one full-table fetch per linked + * column. Returns a map keyed `${entityType}:${lookupField}` → lookup-value + * → ref. Empty object if the schema declares no linked columns or none of + * the visible rows reference linked values. + */ +async function resolveLinkedColumns( + entities: Array<{ metadata?: Record | null }>, + schema: Record | null, + organizationId: string +): Promise>> { + if (!schema || entities.length === 0) return {}; + const properties = (schema as { properties?: Record }).properties; + if (!properties) return {}; + + // Collect (linkedType, lookupField) → set of referenced values from the rows. + const buckets = new Map< + string, + { entityType: string; lookupField: string; values: Set } + >(); + for (const [columnKey, prop] of Object.entries(properties)) { + const linkedType = (prop as { 'x-link-entity-type'?: unknown })['x-link-entity-type']; + if (typeof linkedType !== 'string' || linkedType === '') continue; + const lookupFieldRaw = (prop as { 'x-link-lookup-field'?: unknown })['x-link-lookup-field']; + const lookupField = typeof lookupFieldRaw === 'string' && lookupFieldRaw ? lookupFieldRaw : 'slug'; + const bucketKey = `${linkedType}:${lookupField}`; + let bucket = buckets.get(bucketKey); + if (!bucket) { + bucket = { entityType: linkedType, lookupField, values: new Set() }; + buckets.set(bucketKey, bucket); + } + for (const e of entities) { + const raw = e.metadata?.[columnKey]; + const list = Array.isArray(raw) ? raw : [raw]; + for (const v of list) { + if (v == null) continue; + const s = String(v).trim(); + if (s !== '') bucket.values.add(s); + } + } + } + if (buckets.size === 0) return {}; + + const sql = getDb(); + const out: Record> = {}; + + await Promise.all( + [...buckets.entries()].map(async ([bucketKey, { entityType, lookupField, values }]) => { + if (values.size === 0) return; + const valuesArr = [...values]; + const valuesLiteral = pgTextArray(valuesArr); + const rows = + lookupField === 'slug' + ? await sql<{ slug: string; entity_type: string; name: string; lookup_value: string }>` + SELECT e.slug, et.slug AS entity_type, e.name, e.slug AS lookup_value + FROM entities e + JOIN entity_types et ON et.id = e.entity_type_id + WHERE e.organization_id = ${organizationId} + AND e.deleted_at IS NULL + AND et.slug = ${entityType} + AND e.slug = ANY(${valuesLiteral}::text[]) + ` + : await sql<{ slug: string; entity_type: string; name: string; lookup_value: string }>` + SELECT e.slug, et.slug AS entity_type, e.name, (e.metadata->>${lookupField}) AS lookup_value + FROM entities e + JOIN entity_types et ON et.id = e.entity_type_id + WHERE e.organization_id = ${organizationId} + AND e.deleted_at IS NULL + AND et.slug = ${entityType} + AND (e.metadata->>${lookupField}) = ANY(${valuesLiteral}::text[]) + `; + if (rows.length === 0) return; + const bucketMap: Record = {}; + for (const r of rows) { + if (r.lookup_value == null) continue; + bucketMap[r.lookup_value] = { slug: r.slug, entity_type: r.entity_type, name: r.name }; + } + out[bucketKey] = bucketMap; + }) + ); + + return out; +} + async function handleGet( entityId: number, env: Env, diff --git a/packages/server/src/tools/admin/manage_feeds.ts b/packages/server/src/tools/admin/manage_feeds.ts index e0b1c1914..8c400df40 100644 --- a/packages/server/src/tools/admin/manage_feeds.ts +++ b/packages/server/src/tools/admin/manage_feeds.ts @@ -152,8 +152,46 @@ async function handleListFeeds( const limit = args.limit ?? 50; const offset = args.offset ?? 0; - let query = sql` - SELECT f.*, c.connector_key, c.display_name AS connection_name, + // Build the filtered "page" of feeds first, then compute event_count in a + // single GROUP BY over the (connection_id, feed_key) pairs in that page. + // The previous shape ran a correlated `SELECT COUNT(*) FROM current_event_records` + // per row, which is O(N feeds) × an anti-join over the entire events table — + // ~880ms per feed on a busy connection. Batching collapses it to one scan. + let pageQuery = sql` + SELECT f.* + FROM feeds f + JOIN connections c ON c.id = f.connection_id + WHERE f.organization_id = ${organizationId} AND c.deleted_at IS NULL AND f.deleted_at IS NULL + `; + + if (args.connection_id) { + pageQuery = sql`${pageQuery} AND f.connection_id = ${args.connection_id}`; + } + if (args.entity_id) { + pageQuery = sql`${pageQuery} AND ${args.entity_id} = ANY(f.entity_ids)`; + } + if (args.status) { + pageQuery = sql`${pageQuery} AND f.status = ${args.status}`; + } + + pageQuery = sql`${pageQuery} ORDER BY f.created_at DESC LIMIT ${limit} OFFSET ${offset}`; + + const query = sql` + WITH page AS MATERIALIZED ( + ${pageQuery} + ), + event_counts AS ( + SELECT e.connection_id, e.feed_key, COUNT(*)::int AS event_count + FROM events e + WHERE e.organization_id = ${organizationId} + -- ANY(ARRAY(...)) keeps the planner on a single index scan per + -- distinct connection. IN (subquery) or a join causes Postgres to + -- re-scan the connection_id index per (connection, feed_key) pair. + AND e.connection_id = ANY(ARRAY(SELECT DISTINCT connection_id FROM page)) + AND NOT EXISTS (SELECT 1 FROM events n WHERE n.supersedes_event_id = e.id) + GROUP BY e.connection_id, e.feed_key + ) + SELECT p.*, c.connector_key, c.display_name AS connection_name, c.status AS connection_status, c.device_worker_id, dw.label AS device_label, @@ -171,12 +209,12 @@ async function handleListFeeds( ( SELECT string_agg(DISTINCT ent.name, ', ' ORDER BY ent.name) FROM entities ent - WHERE ent.id = ANY(f.entity_ids) + WHERE ent.id = ANY(p.entity_ids) ) AS entity_names, - (SELECT COUNT(*) FROM runs r WHERE r.feed_id = f.id AND r.status = ANY(${runStatusLiteral(ACTIVE_RUN_STATUSES)}::text[]))::int AS active_runs, - (SELECT COUNT(*) FROM current_event_records e WHERE e.connection_id = f.connection_id AND e.feed_key = f.feed_key)::int AS event_count - FROM feeds f - JOIN connections c ON c.id = f.connection_id + (SELECT COUNT(*) FROM runs r WHERE r.feed_id = p.id AND r.status = ANY(${runStatusLiteral(ACTIVE_RUN_STATUSES)}::text[]))::int AS active_runs, + COALESCE(ec.event_count, 0)::int AS event_count + FROM page p + JOIN connections c ON c.id = p.connection_id LEFT JOIN device_workers dw ON dw.id = c.device_worker_id LEFT JOIN LATERAL ( SELECT name @@ -188,21 +226,10 @@ async function handleListFeeds( LIMIT 1 ) cd ON TRUE LEFT JOIN auth_profiles ap ON ap.id = c.auth_profile_id - WHERE f.organization_id = ${organizationId} AND c.deleted_at IS NULL AND f.deleted_at IS NULL + LEFT JOIN event_counts ec ON ec.connection_id = p.connection_id AND ec.feed_key = p.feed_key + ORDER BY p.created_at DESC `; - if (args.connection_id) { - query = sql`${query} AND f.connection_id = ${args.connection_id}`; - } - if (args.entity_id) { - query = sql`${query} AND ${args.entity_id} = ANY(f.entity_ids)`; - } - if (args.status) { - query = sql`${query} AND f.status = ${args.status}`; - } - - query = sql`${query} ORDER BY f.created_at DESC LIMIT ${limit} OFFSET ${offset}`; - const rows = await query; return { action: 'list_feeds', feeds: rows, total: rows.length, limit, offset }; } diff --git a/packages/server/src/tools/admin/manage_watchers.ts b/packages/server/src/tools/admin/manage_watchers.ts index 159193b25..2b1d0ccbd 100644 --- a/packages/server/src/tools/admin/manage_watchers.ts +++ b/packages/server/src/tools/admin/manage_watchers.ts @@ -928,6 +928,17 @@ async function handleCreate( sources, }); + if (!args.agent_id) { + // The scheduler joins on `agent_id IS NOT NULL` (see + // packages/server/src/watchers/automation.ts:469), so a watcher without + // an agent has no way to execute. Schema-wise `agent_id` is `Type.Optional` + // because the field is shared across all manage_watchers actions, but + // create enforces it: a watcher with no owning agent is a zombie row. + throw new ToolUserError( + 'agent_id is required to create a watcher (the agent that executes it).' + ); + } + if (args.schedule) { const scheduleError = validateSchedule(args.schedule); if (scheduleError) { @@ -1129,6 +1140,13 @@ async function handleCreateFromVersion( `Access denied: watcher version ${args.version_id} does not belong to your organization` ); } + if (!version.agent_id) { + // Source watcher has no agent — cloning would silently inherit null and + // produce active zombies the scheduler skips. Same invariant as handleCreate. + throw new ToolUserError( + `Source watcher version ${args.version_id} has no agent_id; assign an agent on the source before cloning.` + ); + } // Fetch entity names for name pattern substitution const entityRows = await sql` @@ -1217,6 +1235,27 @@ async function handleUpdate( } } + // Match the invariant from handleCreate: a watcher with no agent_id is + // a zombie the scheduler will never run (automation joins on + // `agent_id IS NOT NULL`). Reject explicit nulling, and reject updates + // that would leave a scheduled watcher orphaned. + if (args.agent_id === null) { + throw new ToolUserError( + 'agent_id cannot be set to null — every watcher must have an owning agent.' + ); + } + if (args.schedule !== null && args.schedule !== undefined && args.agent_id === undefined) { + const currentRows = await sql` + SELECT agent_id FROM watchers WHERE id = ${args.watcher_id} LIMIT 1 + `; + const currentAgentId = (currentRows[0] as { agent_id: string | null } | undefined)?.agent_id; + if (currentAgentId === null) { + throw new ToolUserError( + 'Cannot schedule a watcher with no owning agent. Assign agent_id in the same update.' + ); + } + } + const updatedFields: string[] = []; if (args.model_config !== undefined) updatedFields.push('model_config'); if (args.schedule !== undefined) updatedFields.push('schedule'); diff --git a/packages/web b/packages/web index aa161247c..2d2f5bf25 160000 --- a/packages/web +++ b/packages/web @@ -1 +1 @@ -Subproject commit aa161247c7f1b51b1f53fa1e4d5907aff898bfbe +Subproject commit 2d2f5bf253c130ea859982b8243cbf7a1a1719af