Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions db/migrations/20260517040000_archive_orphan_watchers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- migrate:up

-- Archive watchers that are flagged `status = 'active'` but have no `agent_id`.
-- The scheduler (packages/server/src/watchers/automation.ts:469) already
-- filters them out with `WHERE w.agent_id IS NOT NULL`, so these rows are
-- zombies — visible in the API, redirect-bounced on the watcher detail
-- route, never actually executing.
--
-- 2026-05-17 prod audit: 28 active orphans across 11 orgs (most originated
-- from older create paths before `agent_id` was wired in). Archiving them
-- aligns the stored status with their actual execution state and lets the
-- `/$owner/watchers/$watcherId` redirect logic stop silently sending users
-- to /agents.
--
-- The write-time guard (this same PR, manage_watchers.ts) rejects new
-- watcher creates/updates that set a schedule without an agent, so the
-- orphan set can't grow again.

UPDATE public.watchers
SET status = 'archived',
updated_at = now()
WHERE status = 'active'
AND agent_id IS NULL;

-- migrate:down

-- No-op: this is a one-shot data cleanup. The original rows can be restored
-- manually if needed by assigning an agent_id and flipping status back to
-- 'active'; without an agent, status='active' is meaningless to the
-- scheduler.
34 changes: 34 additions & 0 deletions db/migrations/20260517050000_watcher_agent_id_not_null.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- migrate:up

-- Delete every watcher with `agent_id IS NULL` and enforce the column at
-- the schema level. These rows are leftovers from before the watcher
-- scheduler required `agent_id` (see watchers/automation.ts:469) — they
-- couldn't fire and the prior migration 20260517040000 had already
-- archived the active subset. Their dependent rows (windows, reactions,
-- versions, classifiers, field feedback) describe runtime state for
-- watchers that will never execute; ON DELETE CASCADE clears them.
-- `runs.watcher_id` is ON DELETE SET NULL, so the 21k historical run
-- records remain — they just lose the watcher linkage.
--
-- Application-level guards in manage_watchers.ts (handleCreate +
-- handleCreateFromVersion) reject new inserts without `agent_id`; the
-- NOT NULL constraint below is the matching DB-level enforcement so
-- bypass paths can't reintroduce the zombie state.

DELETE FROM public.watchers WHERE agent_id IS NULL;

ALTER TABLE public.watchers
ALTER COLUMN agent_id SET NOT NULL;

-- The existing index is partial (`WHERE agent_id IS NOT NULL`), which is
-- a tautology once the column is NOT NULL. Replace it with an unconditional
-- index so explain plans don't show the dead predicate.
DROP INDEX IF EXISTS public.idx_watchers_agent_id;
CREATE INDEX idx_watchers_agent_id ON public.watchers USING btree (agent_id);

-- migrate:down

ALTER TABLE public.watchers
ALTER COLUMN agent_id DROP NOT NULL;

-- No restoring deleted rows — this is forward-only data cleanup.
26 changes: 17 additions & 9 deletions db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,12 @@ CREATE TABLE public.personal_access_tokens (

COMMENT ON TABLE public.personal_access_tokens IS 'Personal Access Tokens for workers, CLI tools, and MCP clients';

--
-- Name: COLUMN personal_access_tokens.worker_id; Type: COMMENT; Schema: public; Owner: -
--

COMMENT ON COLUMN public.personal_access_tokens.worker_id IS 'Optional binding to a specific device_workers.worker_id. Set by /api/me/devices/mint-child-token. When non-NULL, /api/workers/poll requires the request body''s worker_id to match.';

--
-- Name: personal_access_tokens_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
Expand Down Expand Up @@ -1991,7 +1997,7 @@ CREATE TABLE public.watchers (
current_version_id integer,
schedule text,
next_run_at timestamp with time zone,
agent_id text,
agent_id text NOT NULL,
connection_id text,
scheduler_client_id text,
source_watcher_id integer,
Expand Down Expand Up @@ -3596,6 +3602,12 @@ CREATE INDEX idx_notification_targets_user_all ON public.notification_targets US

CREATE INDEX idx_notification_targets_user_unread ON public.notification_targets USING btree (user_id, delivered_at DESC) WHERE (read_at IS NULL);

--
-- Name: idx_personal_access_tokens_worker_id; Type: INDEX; Schema: public; Owner: -
--

CREATE INDEX idx_personal_access_tokens_worker_id ON public.personal_access_tokens USING btree (worker_id) WHERE (worker_id IS NOT NULL);

--
-- Name: idx_runs_active_auth_per_profile; Type: INDEX; Schema: public; Owner: -
--
Expand Down Expand Up @@ -3774,7 +3786,7 @@ CREATE INDEX idx_watcher_windows_watcher ON public.watcher_windows USING btree (
-- Name: idx_watchers_agent_id; Type: INDEX; Schema: public; Owner: -
--

CREATE INDEX idx_watchers_agent_id ON public.watchers USING btree (agent_id) WHERE (agent_id IS NOT NULL);
CREATE INDEX idx_watchers_agent_id ON public.watchers USING btree (agent_id);

--
-- Name: idx_watchers_connection_id; Type: INDEX; Schema: public; Owner: -
Expand Down Expand Up @@ -3999,12 +4011,6 @@ CREATE INDEX personal_access_tokens_active_idx ON public.personal_access_tokens
CREATE INDEX personal_access_tokens_organization_id_idx ON public.personal_access_tokens USING btree (organization_id);


--
-- Name: idx_personal_access_tokens_worker_id; Type: INDEX; Schema: public; Owner: -
--

CREATE INDEX idx_personal_access_tokens_worker_id ON public.personal_access_tokens USING btree (worker_id) WHERE (worker_id IS NOT NULL);

--
-- Name: personal_access_tokens_token_hash_idx; Type: INDEX; Schema: public; Owner: -
--
Expand Down Expand Up @@ -4996,4 +5002,6 @@ INSERT INTO public.schema_migrations (version) VALUES
('20260516200100'),
('20260517010000'),
('20260517020000'),
('20260517030000');
('20260517030000'),
('20260517040000'),
('20260517050000');
105 changes: 101 additions & 4 deletions packages/server/src/tools/admin/manage_entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

import { type Static, Type } from '@sinclair/typebox';
import { getDb } from '../../db/client';
import { getDb, pgTextArray } from '../../db/client';
import type { Env } from '../../index';
import {
batchLoadRelationships,
Expand Down Expand Up @@ -295,6 +295,12 @@ type ManageEntityResult =
space_name?: string | null;
view_url?: string;
}>;
// Server-side resolution of every `x-link-entity-type` column referenced
// by the page. Keyed by `${entityType}:${lookupField}`, then by the
// lookup value from the row's metadata. Previously each entity-list page
// fanned out one `manage_entity.list` per linked column (4× ~2.5 s on
// the Company page); the FE now reads from this map instead.
linked_entities?: Record<string, Record<string, { slug: string; entity_type: string; name: string }>>;
metadata: {
page_size: number;
has_more: boolean;
Expand Down Expand Up @@ -733,10 +739,14 @@ async function handleList(
const schema = entityTypeRow?.metadata_schema as Record<string, unknown> | null;
const relSpecs = (schema?.['x-table-relationships'] ?? []) as RelationshipColumnSpec[];
const entityIds = entities.map((e) => e.id);
const relMap =

// Batch-load relationships and linked-column lookups in parallel.
const [relMap, linkedEntities] = await Promise.all([
relSpecs.length > 0 && entityIds.length > 0
? await batchLoadRelationships(entityIds, relSpecs, ctx.organizationId)
: new Map();
? batchLoadRelationships(entityIds, relSpecs, ctx.organizationId)
: Promise.resolve(new Map()),
resolveLinkedColumns(entities, schema, ctx.organizationId),
]);

const baseUrl = getPublicWebUrl(ctx.requestUrl, ctx.baseUrl);
const ownerSlug = await getOrganizationSlug(ctx.organizationId);
Expand Down Expand Up @@ -779,6 +789,7 @@ async function handleList(
...(relMap.size > 0 && relMap.has(e.id) ? { relationships: relMap.get(e.id) } : {}),
};
}),
...(Object.keys(linkedEntities).length > 0 ? { linked_entities: linkedEntities } : {}),
metadata: {
page_size: entities.length,
has_more: hasMore,
Expand All @@ -792,6 +803,92 @@ async function handleList(
};
}

/**
* Resolve every `x-link-entity-type` column on the schema to `{slug, name}`
* pairs in one batch per (entityType, lookupField). Replaces the previous
* FE pattern of `useQueries` fanning out one full-table fetch per linked
* column. Returns a map keyed `${entityType}:${lookupField}` → lookup-value
* → ref. Empty object if the schema declares no linked columns or none of
* the visible rows reference linked values.
*/
async function resolveLinkedColumns(
entities: Array<{ metadata?: Record<string, any> | null }>,
schema: Record<string, unknown> | null,
organizationId: string
): Promise<Record<string, Record<string, { slug: string; entity_type: string; name: string }>>> {
if (!schema || entities.length === 0) return {};
const properties = (schema as { properties?: Record<string, any> }).properties;
if (!properties) return {};

// Collect (linkedType, lookupField) → set of referenced values from the rows.
const buckets = new Map<
string,
{ entityType: string; lookupField: string; values: Set<string> }
>();
for (const [columnKey, prop] of Object.entries(properties)) {
const linkedType = (prop as { 'x-link-entity-type'?: unknown })['x-link-entity-type'];
if (typeof linkedType !== 'string' || linkedType === '') continue;
const lookupFieldRaw = (prop as { 'x-link-lookup-field'?: unknown })['x-link-lookup-field'];
const lookupField = typeof lookupFieldRaw === 'string' && lookupFieldRaw ? lookupFieldRaw : 'slug';
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Resolve unannotated FK links by id

When a schema declares x-link-entity-type without x-link-lookup-field, this defaults to slug, but several built-in schemas use integer FK columns without an explicit lookup field (for example examples/atlas/models/schema.yaml:11-16 and examples/market/models/schema.yaml:303-308). For those pages the visible metadata values are entity IDs like company_id, so this new server-side resolver queries e.slug = ANY(...) with numeric IDs and returns no linked_entities, regressing the inline linked-entity display that this path is meant to replace.

Useful? React with 👍 / 👎.

const bucketKey = `${linkedType}:${lookupField}`;
let bucket = buckets.get(bucketKey);
if (!bucket) {
bucket = { entityType: linkedType, lookupField, values: new Set() };
buckets.set(bucketKey, bucket);
}
for (const e of entities) {
const raw = e.metadata?.[columnKey];
const list = Array.isArray(raw) ? raw : [raw];
for (const v of list) {
if (v == null) continue;
const s = String(v).trim();
if (s !== '') bucket.values.add(s);
}
}
}
if (buckets.size === 0) return {};

const sql = getDb();
const out: Record<string, Record<string, { slug: string; entity_type: string; name: string }>> = {};

await Promise.all(
[...buckets.entries()].map(async ([bucketKey, { entityType, lookupField, values }]) => {
if (values.size === 0) return;
const valuesArr = [...values];
const valuesLiteral = pgTextArray(valuesArr);
const rows =
lookupField === 'slug'
? await sql<{ slug: string; entity_type: string; name: string; lookup_value: string }>`
SELECT e.slug, et.slug AS entity_type, e.name, e.slug AS lookup_value
FROM entities e
JOIN entity_types et ON et.id = e.entity_type_id
WHERE e.organization_id = ${organizationId}
AND e.deleted_at IS NULL
AND et.slug = ${entityType}
AND e.slug = ANY(${valuesLiteral}::text[])
`
: await sql<{ slug: string; entity_type: string; name: string; lookup_value: string }>`
SELECT e.slug, et.slug AS entity_type, e.name, (e.metadata->>${lookupField}) AS lookup_value
FROM entities e
JOIN entity_types et ON et.id = e.entity_type_id
WHERE e.organization_id = ${organizationId}
AND e.deleted_at IS NULL
AND et.slug = ${entityType}
AND (e.metadata->>${lookupField}) = ANY(${valuesLiteral}::text[])
`;
if (rows.length === 0) return;
const bucketMap: Record<string, { slug: string; entity_type: string; name: string }> = {};
for (const r of rows) {
if (r.lookup_value == null) continue;
bucketMap[r.lookup_value] = { slug: r.slug, entity_type: r.entity_type, name: r.name };
}
out[bucketKey] = bucketMap;
})
);

return out;
}

async function handleGet(
entityId: number,
env: Env,
Expand Down
72 changes: 52 additions & 20 deletions packages/server/src/tools/admin/manage_feeds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,51 @@ async function handleListFeeds(
const limit = args.limit ?? 50;
const offset = args.offset ?? 0;

let query = sql`
SELECT f.*, c.connector_key, c.display_name AS connection_name,
// Build the filtered "page" of feeds first, then compute event_count in a
// single GROUP BY restricted to the (connection_id, feed_key) tuples on
// that page. The previous shape ran a correlated
// `SELECT COUNT(*) FROM current_event_records` per row — O(N feeds) ×
// an anti-join over the entire events table — ~880ms per feed on a busy
// connection. Batching collapses it to one scan.
let pageQuery = sql`
SELECT f.*
FROM feeds f
JOIN connections c ON c.id = f.connection_id
WHERE f.organization_id = ${organizationId} AND c.deleted_at IS NULL AND f.deleted_at IS NULL
`;

if (args.connection_id) {
pageQuery = sql`${pageQuery} AND f.connection_id = ${args.connection_id}`;
}
if (args.entity_id) {
pageQuery = sql`${pageQuery} AND ${args.entity_id} = ANY(f.entity_ids)`;
}
if (args.status) {
pageQuery = sql`${pageQuery} AND f.status = ${args.status}`;
}

pageQuery = sql`${pageQuery} ORDER BY f.created_at DESC LIMIT ${limit} OFFSET ${offset}`;

const query = sql`
WITH page AS MATERIALIZED (
${pageQuery}
),
event_counts AS (
SELECT e.connection_id, e.feed_key, COUNT(*)::int AS event_count
FROM events e
WHERE e.organization_id = ${organizationId}
-- ANY(ARRAY(...)) on each column lets the planner stay on
-- per-column index scans and intersect, rather than re-scanning
-- the connection_id index per (connection, feed_key) pair the
-- way IN (subquery) on a tuple would. The feed_key ANY narrows
-- the scan to the keys actually on this page; the final LEFT
-- JOIN drops any over-count from the cross-product.
AND e.connection_id = ANY(ARRAY(SELECT DISTINCT connection_id FROM page))
AND e.feed_key = ANY(ARRAY(SELECT DISTINCT feed_key FROM page WHERE feed_key IS NOT NULL))
AND NOT EXISTS (SELECT 1 FROM events n WHERE n.supersedes_event_id = e.id)
GROUP BY e.connection_id, e.feed_key
)
SELECT p.*, c.connector_key, c.display_name AS connection_name,
c.status AS connection_status,
c.device_worker_id,
dw.label AS device_label,
Expand All @@ -171,12 +214,12 @@ async function handleListFeeds(
(
SELECT string_agg(DISTINCT ent.name, ', ' ORDER BY ent.name)
FROM entities ent
WHERE ent.id = ANY(f.entity_ids)
WHERE ent.id = ANY(p.entity_ids)
) AS entity_names,
(SELECT COUNT(*) FROM runs r WHERE r.feed_id = f.id AND r.status = ANY(${runStatusLiteral(ACTIVE_RUN_STATUSES)}::text[]))::int AS active_runs,
(SELECT COUNT(*) FROM current_event_records e WHERE e.connection_id = f.connection_id AND e.feed_key = f.feed_key)::int AS event_count
FROM feeds f
JOIN connections c ON c.id = f.connection_id
(SELECT COUNT(*) FROM runs r WHERE r.feed_id = p.id AND r.status = ANY(${runStatusLiteral(ACTIVE_RUN_STATUSES)}::text[]))::int AS active_runs,
COALESCE(ec.event_count, 0)::int AS event_count
FROM page p
JOIN connections c ON c.id = p.connection_id
LEFT JOIN device_workers dw ON dw.id = c.device_worker_id
LEFT JOIN LATERAL (
SELECT name
Expand All @@ -188,21 +231,10 @@ async function handleListFeeds(
LIMIT 1
) cd ON TRUE
LEFT JOIN auth_profiles ap ON ap.id = c.auth_profile_id
WHERE f.organization_id = ${organizationId} AND c.deleted_at IS NULL AND f.deleted_at IS NULL
LEFT JOIN event_counts ec ON ec.connection_id = p.connection_id AND ec.feed_key = p.feed_key
ORDER BY p.created_at DESC
`;

if (args.connection_id) {
query = sql`${query} AND f.connection_id = ${args.connection_id}`;
}
if (args.entity_id) {
query = sql`${query} AND ${args.entity_id} = ANY(f.entity_ids)`;
}
if (args.status) {
query = sql`${query} AND f.status = ${args.status}`;
}

query = sql`${query} ORDER BY f.created_at DESC LIMIT ${limit} OFFSET ${offset}`;

const rows = await query;
return { action: 'list_feeds', feeds: rows, total: rows.length, limit, offset };
}
Expand Down
Loading