From a0420435fb592eb25f9efdc82bc23df059b62ae5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Sat, 25 Apr 2026 02:41:46 +0100 Subject: [PATCH 1/3] feat(agents): add schema-mirror + install flow for template agents MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New installAgentFromTemplate(templateAgentId, targetOrganizationId, userId) creates an agent row in the target org with template_agent_id set, then mirrors the template org's entity_types and entity_relationship_types into the target org tagged managed_by_template_agent_id + source_template_org_id. Core properties: - Idempotent. Re-running against the same target updates rather than creating duplicates. - Safe. The mirror never overwrites rows the user authored directly (managed_by_template_agent_id IS NULL). Slug collisions with user rows or with a different template agent abort with a descriptive error. - Re-syncable. resyncInstalledAgent() lets template schema evolution propagate to every installed instance. Watchers and classifiers are deferred to a follow-up — watchers need the watcher_versions + reaction_script dance, classifiers are scoped per- entity so they interact with entity mirroring. Paired with an integration test under __tests__/integration/agents/ that covers: create on first install, idempotency, managed_by tagging, re-sync propagation, refusal to install into the template's own org, refusal to overwrite user-authored rows. --- .../integration/agents/install.test.ts | 179 ++++++++ .../owletto-backend/src/agents/install.ts | 383 ++++++++++++++++++ 2 files changed, 562 insertions(+) create mode 100644 packages/owletto-backend/src/__tests__/integration/agents/install.test.ts create mode 100644 packages/owletto-backend/src/agents/install.ts diff --git a/packages/owletto-backend/src/__tests__/integration/agents/install.test.ts b/packages/owletto-backend/src/__tests__/integration/agents/install.test.ts new file mode 100644 index 000000000..06cf43e2a --- /dev/null +++ b/packages/owletto-backend/src/__tests__/integration/agents/install.test.ts @@ -0,0 +1,179 @@ +import { beforeAll, describe, expect, it } from 'vitest'; +import { installAgentFromTemplate, resyncInstalledAgent } from '../../../agents/install'; +import { cleanupTestDatabase, getTestDb } from '../../setup/test-db'; +import { + addUserToOrganization, + createTestAgent, + createTestOrganization, + createTestUser, +} from '../../setup/test-fixtures'; + +describe('installAgentFromTemplate', () => { + let templateOrg: Awaited>; + let templateAgent: Awaited>; + let userOrg: Awaited>; + let user: Awaited>; + + beforeAll(async () => { + await cleanupTestDatabase(); + const sql = getTestDb(); + + templateOrg = await createTestOrganization({ + name: 'Personal Finance Template', + slug: 'personal-finance-tpl', + }); + templateAgent = await createTestAgent({ + organizationId: templateOrg.id, + name: 'Personal Finance', + }); + + user = await createTestUser(); + userOrg = await createTestOrganization({ name: 'User Personal Org' }); + await addUserToOrganization(user.id, userOrg.id, 'owner'); + + // Seed two entity types and one relationship type in the template org. + await sql` + INSERT INTO entity_types (slug, name, description, metadata_schema, organization_id, created_by) + VALUES + ('tax_year', 'Tax Year', 'Fiscal year', '{"type":"object"}'::jsonb, ${templateOrg.id}, ${user.id}), + ('transaction', 'Transaction', 'A debit/credit', '{"type":"object"}'::jsonb, ${templateOrg.id}, ${user.id}) + `; + await sql` + INSERT INTO entity_relationship_types (slug, name, description, metadata_schema, organization_id, created_by, status) + VALUES + ('for_tax_year', 'For Tax Year', NULL, '{"type":"object"}'::jsonb, ${templateOrg.id}, ${user.id}, 'active') + `; + }); + + it('creates a new agent row in the target org with template_agent_id set', async () => { + const result = await installAgentFromTemplate({ + templateAgentId: templateAgent.agentId, + targetOrganizationId: userOrg.id, + userId: user.id, + }); + + expect(result.created).toBe(true); + expect(result.mirrored.entity_types).toBe(2); + expect(result.mirrored.entity_relationship_types).toBe(1); + + const sql = getTestDb(); + const rows = await sql` + SELECT id, template_agent_id, organization_id, owner_user_id + FROM agents + WHERE id = ${result.agentId} + `; + expect(rows).toHaveLength(1); + expect(rows[0].template_agent_id).toBe(templateAgent.agentId); + expect(rows[0].organization_id).toBe(userOrg.id); + expect(rows[0].owner_user_id).toBe(user.id); + }); + + it('mirrors entity types with managed_by_template_agent_id set', async () => { + const sql = getTestDb(); + const rows = await sql` + SELECT slug, managed_by_template_agent_id, source_template_org_id + FROM entity_types + WHERE organization_id = ${userOrg.id} + ORDER BY slug + `; + expect(rows.map((r: { slug: string }) => r.slug)).toEqual(['tax_year', 'transaction']); + for (const row of rows) { + expect(row.managed_by_template_agent_id).toBe(templateAgent.agentId); + expect(row.source_template_org_id).toBe(templateOrg.id); + } + }); + + it('mirrors relationship types with managed_by_template_agent_id set', async () => { + const sql = getTestDb(); + const rows = await sql` + SELECT slug, managed_by_template_agent_id, source_template_org_id + FROM entity_relationship_types + WHERE organization_id = ${userOrg.id} + ORDER BY slug + `; + expect(rows.map((r: { slug: string }) => r.slug)).toEqual(['for_tax_year']); + expect(rows[0].managed_by_template_agent_id).toBe(templateAgent.agentId); + }); + + it('is idempotent: re-installing updates rather than creating duplicates', async () => { + const result = await installAgentFromTemplate({ + templateAgentId: templateAgent.agentId, + targetOrganizationId: userOrg.id, + userId: user.id, + }); + expect(result.created).toBe(false); + + const sql = getTestDb(); + const agentCount = await sql` + SELECT COUNT(*)::int AS count + FROM agents + WHERE template_agent_id = ${templateAgent.agentId} + AND organization_id = ${userOrg.id} + `; + expect(agentCount[0].count).toBe(1); + + const typeCount = await sql` + SELECT COUNT(*)::int AS count + FROM entity_types + WHERE organization_id = ${userOrg.id} + AND managed_by_template_agent_id = ${templateAgent.agentId} + `; + expect(typeCount[0].count).toBe(2); + }); + + it('re-sync propagates template changes to the mirror', async () => { + const sql = getTestDb(); + // Simulate a template-side description change. + await sql` + UPDATE entity_types + SET description = 'UK fiscal year (6 April to 5 April)' + WHERE organization_id = ${templateOrg.id} + AND slug = 'tax_year' + `; + + const installed = await sql` + SELECT id FROM agents + WHERE template_agent_id = ${templateAgent.agentId} + AND organization_id = ${userOrg.id} + LIMIT 1 + `; + await resyncInstalledAgent({ + installedAgentId: installed[0].id as string, + userId: user.id, + }); + + const mirrored = await sql` + SELECT description FROM entity_types + WHERE organization_id = ${userOrg.id} + AND slug = 'tax_year' + `; + expect(mirrored[0].description).toBe('UK fiscal year (6 April to 5 April)'); + }); + + it('refuses to install into the template org itself', async () => { + await expect( + installAgentFromTemplate({ + templateAgentId: templateAgent.agentId, + targetOrganizationId: templateOrg.id, + userId: user.id, + }) + ).rejects.toThrow(/Cannot install template agent into its own org/); + }); + + it('refuses to overwrite a user-authored row of the same slug', async () => { + const sql = getTestDb(); + const otherOrg = await createTestOrganization({ name: 'Other User Org' }); + await addUserToOrganization(user.id, otherOrg.id, 'owner'); + await sql` + INSERT INTO entity_types (slug, name, description, metadata_schema, organization_id, created_by) + VALUES ('transaction', 'User Transaction', 'Manual row', '{"type":"object"}'::jsonb, ${otherOrg.id}, ${user.id}) + `; + await expect( + installAgentFromTemplate({ + templateAgentId: templateAgent.agentId, + targetOrganizationId: otherOrg.id, + userId: user.id, + }) + ).rejects.toThrow(/user-authored/); + }); +}); diff --git a/packages/owletto-backend/src/agents/install.ts b/packages/owletto-backend/src/agents/install.ts new file mode 100644 index 000000000..9b47d5dc4 --- /dev/null +++ b/packages/owletto-backend/src/agents/install.ts @@ -0,0 +1,383 @@ +/** + * Install a template agent into a target organization. + * + * A template agent (e.g. the `personal-finance` agent in examples/) lives in a + * template org and owns a canonical set of entity types and relationship types + * that define its data model. When a user installs the agent we: + * + * 1. Create a new agents row in the user's org with template_agent_id set + * (Lobu's existing template-inheritance applies to agents.* settings — + * prompt, tools, mcp_servers, etc. — without any copy step). + * 2. Mirror the template org's entity_types and entity_relationship_types + * into the user's org, tagged managed_by_template_agent_id so we can + * re-sync on template updates and treat them as read-only from the + * user's side. + * + * Classifiers and watchers are NOT mirrored in this module yet — they have + * versioning/reaction-script tables that make them a separate concern. + * The install is idempotent: re-running against the same target simply + * UPDATEs the mirror rows (allowing template schema evolution). + * + * Safety: the mirror never overwrites rows the user authored directly + * (managed_by_template_agent_id IS NULL). Slug collisions of that kind + * abort the install with a descriptive error. + */ + +import { generateSecureToken } from '../auth/oauth/utils'; +import { getDb } from '../db/client'; + +export interface InstallResult { + agentId: string; + organizationId: string; + mirrored: { + entity_types: number; + entity_relationship_types: number; + }; + created: boolean; +} + +export interface InstallAgentParams { + templateAgentId: string; + targetOrganizationId: string; + userId: string; + /** Optional override for the installed agent's display name. */ + name?: string; +} + +type Sql = ReturnType; + +interface TemplateAgentRow { + id: string; + organization_id: string; + name: string; + description: string | null; +} + +async function loadTemplateAgent(sql: Sql, templateAgentId: string): Promise { + const rows = await sql` + SELECT id, organization_id, name, description + FROM agents + WHERE id = ${templateAgentId} + LIMIT 1 + `; + if (rows.length === 0) { + throw new Error(`Template agent ${templateAgentId} not found`); + } + return rows[0] as TemplateAgentRow; +} + +async function findExistingInstall( + sql: Sql, + templateAgentId: string, + targetOrganizationId: string +): Promise { + const rows = await sql` + SELECT id FROM agents + WHERE template_agent_id = ${templateAgentId} + AND organization_id = ${targetOrganizationId} + LIMIT 1 + `; + return rows.length > 0 ? (rows[0].id as string) : null; +} + +async function upsertInstalledAgent( + sql: Sql, + params: { + existingAgentId: string | null; + template: TemplateAgentRow; + targetOrganizationId: string; + userId: string; + name?: string; + } +): Promise<{ agentId: string; created: boolean }> { + if (params.existingAgentId) { + await sql` + UPDATE agents + SET updated_at = NOW(), + name = ${params.name ?? params.template.name}, + description = ${params.template.description} + WHERE id = ${params.existingAgentId} + `; + return { agentId: params.existingAgentId, created: false }; + } + + const agentId = `agent_${generateSecureToken(8).toLowerCase()}`; + await sql` + INSERT INTO agents ( + id, organization_id, name, description, + owner_platform, owner_user_id, + template_agent_id, + is_workspace_agent, + created_at, updated_at + ) VALUES ( + ${agentId}, + ${params.targetOrganizationId}, + ${params.name ?? params.template.name}, + ${params.template.description}, + 'owletto', + ${params.userId}, + ${params.template.id}, + false, + NOW(), NOW() + ) + `; + return { agentId, created: true }; +} + +interface EntityTypeRow { + slug: string; + name: string; + description: string | null; + icon: string | null; + color: string | null; + metadata_schema: Record | null; + event_kinds: Record | null; +} + +async function mirrorEntityTypes( + sql: Sql, + templateOrgId: string, + targetOrgId: string, + templateAgentId: string, + userId: string +): Promise { + const templateRows = (await sql` + SELECT slug, name, description, icon, color, metadata_schema, event_kinds + FROM entity_types + WHERE organization_id = ${templateOrgId} + AND deleted_at IS NULL + `) as EntityTypeRow[]; + + let count = 0; + for (const row of templateRows) { + const existing = await sql` + SELECT id, managed_by_template_agent_id + FROM entity_types + WHERE organization_id = ${targetOrgId} + AND slug = ${row.slug} + AND deleted_at IS NULL + LIMIT 1 + `; + + const metadataSchema = row.metadata_schema ? sql.json(row.metadata_schema) : null; + const eventKinds = row.event_kinds ? sql.json(row.event_kinds) : null; + + if (existing.length === 0) { + await sql` + INSERT INTO entity_types ( + slug, name, description, icon, color, + metadata_schema, event_kinds, + organization_id, created_by, + managed_by_template_agent_id, source_template_org_id, + created_at, updated_at + ) VALUES ( + ${row.slug}, ${row.name}, ${row.description}, + ${row.icon}, ${row.color}, + ${metadataSchema}, ${eventKinds}, + ${targetOrgId}, ${userId}, + ${templateAgentId}, ${templateOrgId}, + NOW(), NOW() + ) + `; + count++; + continue; + } + + const existingOwner = existing[0].managed_by_template_agent_id as string | null; + if (existingOwner === null) { + throw new Error( + `Entity type '${row.slug}' already exists in the target org as a user-authored row. Remove it or rename before installing this agent.` + ); + } + if (existingOwner !== templateAgentId) { + throw new Error( + `Entity type '${row.slug}' is already managed by a different template agent (${existingOwner}).` + ); + } + + await sql` + UPDATE entity_types + SET name = ${row.name}, + description = ${row.description}, + icon = ${row.icon}, + color = ${row.color}, + metadata_schema = ${metadataSchema}, + event_kinds = ${eventKinds}, + updated_at = NOW(), + updated_by = ${userId} + WHERE id = ${existing[0].id} + `; + count++; + } + return count; +} + +interface RelationshipTypeRow { + slug: string; + name: string; + description: string | null; + metadata_schema: Record | null; + is_symmetric: boolean; +} + +async function mirrorRelationshipTypes( + sql: Sql, + templateOrgId: string, + targetOrgId: string, + templateAgentId: string, + userId: string +): Promise { + const templateRows = (await sql` + SELECT slug, name, description, metadata_schema, is_symmetric + FROM entity_relationship_types + WHERE organization_id = ${templateOrgId} + AND status = 'active' + `) as RelationshipTypeRow[]; + + let count = 0; + for (const row of templateRows) { + const existing = await sql` + SELECT id, managed_by_template_agent_id + FROM entity_relationship_types + WHERE organization_id = ${targetOrgId} + AND slug = ${row.slug} + AND status = 'active' + LIMIT 1 + `; + + const metadataSchema = row.metadata_schema ? sql.json(row.metadata_schema) : null; + + if (existing.length === 0) { + await sql` + INSERT INTO entity_relationship_types ( + slug, name, description, metadata_schema, + is_symmetric, organization_id, created_by, + managed_by_template_agent_id, source_template_org_id, + status, created_at, updated_at + ) VALUES ( + ${row.slug}, ${row.name}, ${row.description}, ${metadataSchema}, + ${row.is_symmetric}, ${targetOrgId}, ${userId}, + ${templateAgentId}, ${templateOrgId}, + 'active', NOW(), NOW() + ) + `; + count++; + continue; + } + + const existingOwner = existing[0].managed_by_template_agent_id as string | null; + if (existingOwner === null) { + throw new Error( + `Relationship type '${row.slug}' already exists in the target org as a user-authored row. Remove it or rename before installing this agent.` + ); + } + if (existingOwner !== templateAgentId) { + throw new Error( + `Relationship type '${row.slug}' is already managed by a different template agent (${existingOwner}).` + ); + } + + await sql` + UPDATE entity_relationship_types + SET name = ${row.name}, + description = ${row.description}, + metadata_schema = ${metadataSchema}, + is_symmetric = ${row.is_symmetric}, + updated_at = NOW() + WHERE id = ${existing[0].id} + `; + count++; + } + return count; +} + +export async function installAgentFromTemplate( + params: InstallAgentParams +): Promise { + const sql = getDb(); + const template = await loadTemplateAgent(sql, params.templateAgentId); + + if (template.organization_id === params.targetOrganizationId) { + throw new Error( + `Cannot install template agent into its own org (${template.organization_id}). Pick a different target.` + ); + } + + let result: InstallResult | null = null; + + await sql.begin(async (tx) => { + const existingAgentId = await findExistingInstall( + tx, + params.templateAgentId, + params.targetOrganizationId + ); + const upsert = await upsertInstalledAgent(tx, { + existingAgentId, + template, + targetOrganizationId: params.targetOrganizationId, + userId: params.userId, + name: params.name, + }); + + const entityTypes = await mirrorEntityTypes( + tx, + template.organization_id, + params.targetOrganizationId, + params.templateAgentId, + params.userId + ); + const relationshipTypes = await mirrorRelationshipTypes( + tx, + template.organization_id, + params.targetOrganizationId, + params.templateAgentId, + params.userId + ); + + result = { + agentId: upsert.agentId, + organizationId: params.targetOrganizationId, + mirrored: { + entity_types: entityTypes, + entity_relationship_types: relationshipTypes, + }, + created: upsert.created, + }; + }); + + if (!result) { + throw new Error('Install transaction did not produce a result'); + } + return result; +} + +export async function resyncInstalledAgent(params: { + installedAgentId: string; + userId: string; +}): Promise { + const sql = getDb(); + const rows = await sql` + SELECT id, organization_id, template_agent_id + FROM agents + WHERE id = ${params.installedAgentId} + LIMIT 1 + `; + if (rows.length === 0) { + throw new Error(`Installed agent ${params.installedAgentId} not found`); + } + const row = rows[0] as { + id: string; + organization_id: string; + template_agent_id: string | null; + }; + if (!row.template_agent_id) { + throw new Error( + `Agent ${params.installedAgentId} has no template_agent_id — nothing to re-sync.` + ); + } + return installAgentFromTemplate({ + templateAgentId: row.template_agent_id, + targetOrganizationId: row.organization_id, + userId: params.userId, + }); +} From 887bc5efbf8df9d30dfdc11fc013f562c596ef24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Sat, 25 Apr 2026 03:35:01 +0100 Subject: [PATCH 2/3] fix(agents): harden template installs --- .../integration/agents/install.test.ts | 159 ++++++ .../owletto-backend/src/agents/install.ts | 522 +++++++++++++++++- 2 files changed, 670 insertions(+), 11 deletions(-) diff --git a/packages/owletto-backend/src/__tests__/integration/agents/install.test.ts b/packages/owletto-backend/src/__tests__/integration/agents/install.test.ts index 06cf43e2a..6409c9260 100644 --- a/packages/owletto-backend/src/__tests__/integration/agents/install.test.ts +++ b/packages/owletto-backend/src/__tests__/integration/agents/install.test.ts @@ -21,6 +21,7 @@ describe('installAgentFromTemplate', () => { templateOrg = await createTestOrganization({ name: 'Personal Finance Template', slug: 'personal-finance-tpl', + visibility: 'public', }); templateAgent = await createTestAgent({ organizationId: templateOrg.id, @@ -43,6 +44,57 @@ describe('installAgentFromTemplate', () => { VALUES ('for_tax_year', 'For Tax Year', NULL, '{"type":"object"}'::jsonb, ${templateOrg.id}, ${user.id}, 'active') `; + const watcherRows = await sql` + INSERT INTO watchers ( + organization_id, slug, name, description, status, created_by, + model_config, sources, schedule, agent_id + ) VALUES ( + ${templateOrg.id}, 'gmail-tx', 'Gmail extractor', 'Extract finance events', 'active', ${user.id}, + '{"model":"test"}'::jsonb, + '[{"name":"gmail_messages","query":"SELECT id FROM events"}]'::jsonb, + '*/30 * * * *', ${templateAgent.agentId} + ) + RETURNING id + `; + const watcherId = watcherRows[0].id as number; + const watcherVersionRows = await sql` + INSERT INTO watcher_versions ( + watcher_id, version, name, description, created_by, prompt, + extraction_schema, required_source_types, recommended_source_types, + reactions_guidance + ) VALUES ( + ${watcherId}, 1, 'Gmail extractor v1', 'Current template', ${user.id}, 'Extract {{sources.gmail_messages}}', + '{"type":"object","properties":{"transactions":{"type":"array"}}}'::jsonb, + '{google.gmail}'::text[], '{document}'::text[], 'Create transaction entities' + ) + RETURNING id + `; + await sql` + UPDATE watchers + SET current_version_id = ${watcherVersionRows[0].id as number} + WHERE id = ${watcherId} + `; + const classifierRows = await sql` + INSERT INTO event_classifiers ( + organization_id, slug, name, description, attribute_key, status, + created_by, watcher_id + ) VALUES ( + ${templateOrg.id}, 'tax-relevance', 'Tax relevance', 'Classify tax relevance', 'tax_relevance', + 'active', ${user.id}, ${watcherId} + ) + RETURNING id + `; + await sql` + INSERT INTO event_classifier_versions ( + classifier_id, version, is_current, attribute_values, min_similarity, + fallback_value, change_notes, created_by, preferred_model, extraction_config + ) VALUES ( + ${classifierRows[0].id as number}, 1, true, + '[{"value":"income","description":"Taxable income"}]'::jsonb, + 0.75, 'none', 'Initial template', ${user.id}, '@cf/meta/llama-3.1-8b-instruct', + '{"mode":"llm"}'::jsonb + ) + `; }); it('creates a new agent row in the target org with template_agent_id set', async () => { @@ -55,6 +107,8 @@ describe('installAgentFromTemplate', () => { expect(result.created).toBe(true); expect(result.mirrored.entity_types).toBe(2); expect(result.mirrored.entity_relationship_types).toBe(1); + expect(result.mirrored.watchers).toBe(1); + expect(result.mirrored.event_classifiers).toBe(1); const sql = getTestDb(); const rows = await sql` @@ -95,6 +149,60 @@ describe('installAgentFromTemplate', () => { expect(rows[0].managed_by_template_agent_id).toBe(templateAgent.agentId); }); + it('mirrors watcher definitions with the installed agent as owner', async () => { + const sql = getTestDb(); + const rows = await sql` + SELECT + w.slug, + w.agent_id, + w.connection_id, + w.entity_ids, + w.managed_by_template_agent_id, + w.source_template_org_id, + v.prompt, + v.reactions_guidance + FROM watchers w + JOIN watcher_versions v ON v.id = w.current_version_id + WHERE w.organization_id = ${userOrg.id} + AND w.slug = 'gmail-tx' + LIMIT 1 + `; + expect(rows).toHaveLength(1); + expect(rows[0].managed_by_template_agent_id).toBe(templateAgent.agentId); + expect(rows[0].source_template_org_id).toBe(templateOrg.id); + expect(rows[0].connection_id).toBeNull(); + expect(rows[0].entity_ids).toBeNull(); + expect(rows[0].agent_id).toBeTruthy(); + expect(rows[0].prompt).toContain('{{sources.gmail_messages}}'); + expect(rows[0].reactions_guidance).toContain('transaction'); + }); + + it('mirrors watcher-scoped classifiers and their current version', async () => { + const sql = getTestDb(); + const rows = await sql` + SELECT + c.slug, + c.watcher_id, + c.managed_by_template_agent_id, + v.version, + v.is_current, + v.fallback_value, + v.extraction_config + FROM event_classifiers c + JOIN event_classifier_versions v ON v.classifier_id = c.id + WHERE c.organization_id = ${userOrg.id} + AND c.slug = 'tax-relevance' + LIMIT 1 + `; + expect(rows).toHaveLength(1); + expect(rows[0].managed_by_template_agent_id).toBe(templateAgent.agentId); + expect(rows[0].watcher_id).toBeTruthy(); + expect(rows[0].version).toBe(1); + expect(rows[0].is_current).toBe(true); + expect(rows[0].fallback_value).toBe('none'); + expect(rows[0].extraction_config).toEqual({ mode: 'llm' }); + }); + it('is idempotent: re-installing updates rather than creating duplicates', async () => { const result = await installAgentFromTemplate({ templateAgentId: templateAgent.agentId, @@ -119,6 +227,22 @@ describe('installAgentFromTemplate', () => { AND managed_by_template_agent_id = ${templateAgent.agentId} `; expect(typeCount[0].count).toBe(2); + + const watcherCount = await sql` + SELECT COUNT(*)::int AS count + FROM watchers + WHERE organization_id = ${userOrg.id} + AND managed_by_template_agent_id = ${templateAgent.agentId} + `; + expect(watcherCount[0].count).toBe(1); + + const classifierCount = await sql` + SELECT COUNT(*)::int AS count + FROM event_classifiers + WHERE organization_id = ${userOrg.id} + AND managed_by_template_agent_id = ${templateAgent.agentId} + `; + expect(classifierCount[0].count).toBe(1); }); it('re-sync propagates template changes to the mirror', async () => { @@ -150,6 +274,41 @@ describe('installAgentFromTemplate', () => { expect(mirrored[0].description).toBe('UK fiscal year (6 April to 5 April)'); }); + it('refuses to install a template from a private org', async () => { + const privateTemplateOrg = await createTestOrganization({ name: 'Private Template' }); + const privateTemplateAgent = await createTestAgent({ + organizationId: privateTemplateOrg.id, + name: 'Private Template Agent', + }); + + await expect( + installAgentFromTemplate({ + templateAgentId: privateTemplateAgent.agentId, + targetOrganizationId: userOrg.id, + userId: user.id, + }) + ).rejects.toThrow(/organization is not public/); + }); + + it('refuses to install an already-installed agent as a source template', async () => { + const sql = getTestDb(); + const installed = await sql` + SELECT id FROM agents + WHERE template_agent_id = ${templateAgent.agentId} + AND organization_id = ${userOrg.id} + LIMIT 1 + `; + const otherOrg = await createTestOrganization({ name: 'Other Install Target' }); + + await expect( + installAgentFromTemplate({ + templateAgentId: installed[0].id as string, + targetOrganizationId: otherOrg.id, + userId: user.id, + }) + ).rejects.toThrow(/cannot be used as a source template/); + }); + it('refuses to install into the template org itself', async () => { await expect( installAgentFromTemplate({ diff --git a/packages/owletto-backend/src/agents/install.ts b/packages/owletto-backend/src/agents/install.ts index 9b47d5dc4..e5b6dfd0e 100644 --- a/packages/owletto-backend/src/agents/install.ts +++ b/packages/owletto-backend/src/agents/install.ts @@ -8,13 +8,13 @@ * 1. Create a new agents row in the user's org with template_agent_id set * (Lobu's existing template-inheritance applies to agents.* settings — * prompt, tools, mcp_servers, etc. — without any copy step). - * 2. Mirror the template org's entity_types and entity_relationship_types - * into the user's org, tagged managed_by_template_agent_id so we can - * re-sync on template updates and treat them as read-only from the - * user's side. + * 2. Mirror the template org's entity_types, entity_relationship_types, + * classifiers and watcher definitions into the user's org, tagged + * managed_by_template_agent_id so we can re-sync on template updates and + * treat them as read-only from the user's side. * - * Classifiers and watchers are NOT mirrored in this module yet — they have - * versioning/reaction-script tables that make them a separate concern. + * Watcher/classifier mirrors copy definitions only — not historical windows, + * reactions or classifications. * The install is idempotent: re-running against the same target simply * UPDATEs the mirror rows (allowing template schema evolution). * @@ -24,7 +24,7 @@ */ import { generateSecureToken } from '../auth/oauth/utils'; -import { getDb } from '../db/client'; +import { type DbClient, getDb, pgTextArray } from '../db/client'; export interface InstallResult { agentId: string; @@ -32,6 +32,8 @@ export interface InstallResult { mirrored: { entity_types: number; entity_relationship_types: number; + event_classifiers: number; + watchers: number; }; created: boolean; } @@ -44,20 +46,29 @@ export interface InstallAgentParams { name?: string; } -type Sql = ReturnType; +type Sql = DbClient; interface TemplateAgentRow { id: string; organization_id: string; name: string; description: string | null; + template_agent_id: string | null; + organization_visibility: string; } async function loadTemplateAgent(sql: Sql, templateAgentId: string): Promise { const rows = await sql` - SELECT id, organization_id, name, description - FROM agents - WHERE id = ${templateAgentId} + SELECT + a.id, + a.organization_id, + a.name, + a.description, + a.template_agent_id, + o.visibility AS organization_visibility + FROM agents a + JOIN "organization" o ON o.id = a.organization_id + WHERE a.id = ${templateAgentId} LIMIT 1 `; if (rows.length === 0) { @@ -291,12 +302,483 @@ async function mirrorRelationshipTypes( return count; } +interface TemplateWatcherRow { + id: number; + model_config: Record | null; + sources: unknown[] | null; + reaction_script: string | null; + reaction_script_compiled: string | null; + name: string | null; + slug: string | null; + description: string | null; + version: number | null; + tags: string[] | null; + registry_type: string | null; + registry_repo: string | null; + registry_ref: string | null; + current_version_id: number | null; + schedule: string | null; +} + +interface TemplateWatcherVersionRow { + id: number; + version: number; + name: string; + description: string | null; + change_notes: string | null; + sources_schema: Record | null; + keying_config: Record | null; + json_template: Record | null; + prompt: string; + extraction_schema: Record; + classifiers: Record | null; + required_source_types: string[] | null; + recommended_source_types: string[] | null; + source_repository: string | null; + source_ref: string | null; + source_commit_sha: string | null; + source_path: string | null; + reactions_guidance: string | null; + condensation_prompt: string | null; + condensation_window_count: number | null; + version_sources: Record | null; +} + +function jsonOrNull(sql: Sql, value: unknown): unknown { + return value == null ? null : sql.json(value); +} + +async function loadCurrentWatcherVersion( + sql: Sql, + versionId: number | null +): Promise { + if (versionId === null) return null; + const rows = (await sql` + SELECT + id, + version, + name, + description, + change_notes, + sources_schema, + keying_config, + json_template, + prompt, + extraction_schema, + classifiers, + required_source_types, + recommended_source_types, + source_repository, + source_ref, + source_commit_sha, + source_path, + reactions_guidance, + condensation_prompt, + condensation_window_count, + version_sources + FROM watcher_versions + WHERE id = ${versionId} + LIMIT 1 + `) as TemplateWatcherVersionRow[]; + return rows[0] ?? null; +} + +async function upsertWatcherVersion( + sql: Sql, + row: TemplateWatcherVersionRow, + targetWatcherId: number, + userId: string, + existingVersionId: number | null +): Promise { + const sourcesSchema = jsonOrNull(sql, row.sources_schema); + const keyingConfig = jsonOrNull(sql, row.keying_config); + const jsonTemplate = jsonOrNull(sql, row.json_template); + const extractionSchema = sql.json(row.extraction_schema); + const classifiers = jsonOrNull(sql, row.classifiers); + const versionSources = jsonOrNull(sql, row.version_sources); + const requiredSourceTypes = pgTextArray(row.required_source_types ?? []); + const recommendedSourceTypes = pgTextArray(row.recommended_source_types ?? []); + + if (existingVersionId !== null) { + await sql` + UPDATE watcher_versions + SET version = ${row.version}, + name = ${row.name}, + description = ${row.description}, + change_notes = ${row.change_notes}, + sources_schema = ${sourcesSchema}, + keying_config = ${keyingConfig}, + json_template = ${jsonTemplate}, + prompt = ${row.prompt}, + extraction_schema = ${extractionSchema}, + classifiers = ${classifiers}, + required_source_types = ${requiredSourceTypes}::text[], + recommended_source_types = ${recommendedSourceTypes}::text[], + source_repository = ${row.source_repository}, + source_ref = ${row.source_ref}, + source_commit_sha = ${row.source_commit_sha}, + source_path = ${row.source_path}, + reactions_guidance = ${row.reactions_guidance}, + condensation_prompt = ${row.condensation_prompt}, + condensation_window_count = ${row.condensation_window_count ?? 4}, + version_sources = ${versionSources} + WHERE id = ${existingVersionId} + `; + return existingVersionId; + } + + const inserted = await sql` + INSERT INTO watcher_versions ( + version, name, description, change_notes, created_by, + sources_schema, keying_config, json_template, prompt, extraction_schema, + classifiers, required_source_types, recommended_source_types, + source_repository, source_ref, source_commit_sha, source_path, + reactions_guidance, condensation_prompt, condensation_window_count, + watcher_id, version_sources + ) VALUES ( + ${row.version}, ${row.name}, ${row.description}, ${row.change_notes}, ${userId}, + ${sourcesSchema}, ${keyingConfig}, ${jsonTemplate}, ${row.prompt}, ${extractionSchema}, + ${classifiers}, ${requiredSourceTypes}::text[], ${recommendedSourceTypes}::text[], + ${row.source_repository}, ${row.source_ref}, ${row.source_commit_sha}, ${row.source_path}, + ${row.reactions_guidance}, ${row.condensation_prompt}, ${row.condensation_window_count ?? 4}, + ${targetWatcherId}, ${versionSources} + ) + RETURNING id + `; + return inserted[0].id as number; +} + +async function mirrorWatchers( + sql: Sql, + templateOrgId: string, + targetOrgId: string, + templateAgentId: string, + installedAgentId: string, + userId: string +): Promise<{ count: number; watcherIdsByTemplateId: Map }> { + const templateRows = (await sql` + SELECT + id, + model_config, + sources, + reaction_script, + reaction_script_compiled, + name, + slug, + description, + version, + tags, + registry_type, + registry_repo, + registry_ref, + current_version_id, + schedule + FROM watchers + WHERE organization_id = ${templateOrgId} + AND status = 'active' + `) as TemplateWatcherRow[]; + + let count = 0; + const watcherIdsByTemplateId = new Map(); + + for (const row of templateRows) { + if (!row.slug) { + throw new Error(`Template watcher ${row.id} has no slug — cannot mirror it safely.`); + } + + const existing = await sql` + SELECT id, managed_by_template_agent_id, current_version_id + FROM watchers + WHERE organization_id = ${targetOrgId} + AND slug = ${row.slug} + AND status = 'active' + LIMIT 1 + `; + + const modelConfig = sql.json(row.model_config ?? {}); + const sources = sql.json(row.sources ?? []); + const tags = pgTextArray(row.tags ?? []); + let targetWatcherId: number; + let existingVersionId: number | null = null; + + if (existing.length === 0) { + const inserted = await sql` + INSERT INTO watchers ( + model_config, status, sources, created_by, entity_ids, + reaction_script, reaction_script_compiled, organization_id, + name, slug, description, version, tags, + registry_type, registry_repo, registry_ref, + schedule, next_run_at, agent_id, connection_id, scheduler_client_id, + managed_by_template_agent_id, source_template_org_id, + created_at, updated_at + ) VALUES ( + ${modelConfig}, 'active', ${sources}, ${userId}, NULL, + ${row.reaction_script}, ${row.reaction_script_compiled}, ${targetOrgId}, + ${row.name}, ${row.slug}, ${row.description}, ${row.version ?? 1}, ${tags}::text[], + ${row.registry_type}, ${row.registry_repo}, ${row.registry_ref}, + ${row.schedule}, NULL, ${installedAgentId}, NULL, NULL, + ${templateAgentId}, ${templateOrgId}, + NOW(), NOW() + ) + RETURNING id + `; + targetWatcherId = inserted[0].id as number; + } else { + const existingOwner = existing[0].managed_by_template_agent_id as string | null; + if (existingOwner === null) { + throw new Error( + `Watcher '${row.slug}' already exists in the target org as a user-authored row. Remove it or rename before installing this agent.` + ); + } + if (existingOwner !== templateAgentId) { + throw new Error( + `Watcher '${row.slug}' is already managed by a different template agent (${existingOwner}).` + ); + } + targetWatcherId = existing[0].id as number; + existingVersionId = (existing[0].current_version_id as number | null) ?? null; + await sql` + UPDATE watchers + SET model_config = ${modelConfig}, + sources = ${sources}, + reaction_script = ${row.reaction_script}, + reaction_script_compiled = ${row.reaction_script_compiled}, + name = ${row.name}, + description = ${row.description}, + version = ${row.version ?? 1}, + tags = ${tags}::text[], + registry_type = ${row.registry_type}, + registry_repo = ${row.registry_repo}, + registry_ref = ${row.registry_ref}, + schedule = ${row.schedule}, + next_run_at = NULL, + agent_id = ${installedAgentId}, + connection_id = NULL, + scheduler_client_id = NULL, + updated_at = NOW() + WHERE id = ${targetWatcherId} + `; + } + + const version = await loadCurrentWatcherVersion(sql, row.current_version_id); + if (version) { + const targetVersionId = await upsertWatcherVersion( + sql, + version, + targetWatcherId, + userId, + existingVersionId + ); + await sql` + UPDATE watchers + SET current_version_id = ${targetVersionId}, updated_at = NOW() + WHERE id = ${targetWatcherId} + `; + } + + watcherIdsByTemplateId.set(row.id, targetWatcherId); + count++; + } + + return { count, watcherIdsByTemplateId }; +} + +interface TemplateClassifierRow { + id: number; + slug: string; + name: string; + description: string | null; + attribute_key: string; + watcher_id: number | null; +} + +interface TemplateClassifierVersionRow { + version: number; + is_current: boolean; + attribute_values: Record; + min_similarity: string | number | null; + fallback_value: string | null; + change_notes: string | null; + preferred_model: string | null; + extraction_config: Record | null; +} + +async function loadCurrentClassifierVersion( + sql: Sql, + classifierId: number +): Promise { + const rows = (await sql` + SELECT + version, + is_current, + attribute_values, + min_similarity, + fallback_value, + change_notes, + preferred_model, + extraction_config + FROM event_classifier_versions + WHERE classifier_id = ${classifierId} + AND is_current = true + ORDER BY version DESC + LIMIT 1 + `) as TemplateClassifierVersionRow[]; + return rows[0] ?? null; +} + +async function upsertClassifierVersion( + sql: Sql, + row: TemplateClassifierVersionRow, + targetClassifierId: number, + userId: string +): Promise { + const attributeValues = sql.json(row.attribute_values); + const extractionConfig = jsonOrNull(sql, row.extraction_config); + + await sql` + UPDATE event_classifier_versions + SET is_current = false + WHERE classifier_id = ${targetClassifierId} + `; + + const existing = await sql` + SELECT id FROM event_classifier_versions + WHERE classifier_id = ${targetClassifierId} + AND version = ${row.version} + LIMIT 1 + `; + + if (existing.length > 0) { + await sql` + UPDATE event_classifier_versions + SET is_current = true, + attribute_values = ${attributeValues}, + min_similarity = ${row.min_similarity}, + fallback_value = ${row.fallback_value}, + change_notes = ${row.change_notes}, + preferred_model = ${row.preferred_model}, + extraction_config = ${extractionConfig} + WHERE id = ${existing[0].id} + `; + return; + } + + await sql` + INSERT INTO event_classifier_versions ( + classifier_id, version, is_current, attribute_values, min_similarity, + fallback_value, change_notes, created_by, preferred_model, extraction_config + ) VALUES ( + ${targetClassifierId}, ${row.version}, true, ${attributeValues}, ${row.min_similarity}, + ${row.fallback_value}, ${row.change_notes}, ${userId}, ${row.preferred_model}, ${extractionConfig} + ) + `; +} + +async function mirrorEventClassifiers( + sql: Sql, + templateOrgId: string, + targetOrgId: string, + templateAgentId: string, + userId: string, + watcherIdsByTemplateId: Map +): Promise { + const templateRows = (await sql` + SELECT id, slug, name, description, attribute_key, watcher_id + FROM event_classifiers + WHERE organization_id = ${templateOrgId} + AND status = 'active' + `) as TemplateClassifierRow[]; + + let count = 0; + for (const row of templateRows) { + const targetWatcherId = row.watcher_id ? watcherIdsByTemplateId.get(row.watcher_id) : null; + if (row.watcher_id && !targetWatcherId) { + continue; + } + + const existing = await sql` + SELECT id, managed_by_template_agent_id + FROM event_classifiers + WHERE organization_id = ${targetOrgId} + AND slug = ${row.slug} + AND status = 'active' + AND ( + (${targetWatcherId ?? null}::int IS NULL AND watcher_id IS NULL) + OR watcher_id = ${targetWatcherId ?? null} + ) + LIMIT 1 + `; + + let targetClassifierId: number; + if (existing.length === 0) { + const inserted = await sql` + INSERT INTO event_classifiers ( + slug, name, description, attribute_key, status, + created_by, entity_id, watcher_id, organization_id, entity_ids, + managed_by_template_agent_id, source_template_org_id, + created_at, updated_at + ) VALUES ( + ${row.slug}, ${row.name}, ${row.description}, ${row.attribute_key}, 'active', + ${userId}, NULL, ${targetWatcherId ?? null}, ${targetOrgId}, NULL, + ${templateAgentId}, ${templateOrgId}, + NOW(), NOW() + ) + RETURNING id + `; + targetClassifierId = inserted[0].id as number; + } else { + const existingOwner = existing[0].managed_by_template_agent_id as string | null; + if (existingOwner === null) { + throw new Error( + `Classifier '${row.slug}' already exists in the target org as a user-authored row. Remove it or rename before installing this agent.` + ); + } + if (existingOwner !== templateAgentId) { + throw new Error( + `Classifier '${row.slug}' is already managed by a different template agent (${existingOwner}).` + ); + } + targetClassifierId = existing[0].id as number; + await sql` + UPDATE event_classifiers + SET name = ${row.name}, + description = ${row.description}, + attribute_key = ${row.attribute_key}, + watcher_id = ${targetWatcherId ?? null}, + entity_id = NULL, + entity_ids = NULL, + updated_at = NOW() + WHERE id = ${targetClassifierId} + `; + } + + const version = await loadCurrentClassifierVersion(sql, row.id); + if (version) { + await upsertClassifierVersion(sql, version, targetClassifierId, userId); + } + count++; + } + return count; +} + export async function installAgentFromTemplate( params: InstallAgentParams ): Promise { const sql = getDb(); const template = await loadTemplateAgent(sql, params.templateAgentId); + if (template.template_agent_id) { + throw new Error( + `Agent ${params.templateAgentId} is itself installed from a template and cannot be used as a source template.` + ); + } + + if (template.organization_visibility !== 'public') { + throw new Error( + `Template agent ${params.templateAgentId} is not installable because its organization is not public.` + ); + } + if (template.organization_id === params.targetOrganizationId) { throw new Error( `Cannot install template agent into its own org (${template.organization_id}). Pick a different target.` @@ -333,6 +815,22 @@ export async function installAgentFromTemplate( params.templateAgentId, params.userId ); + const watcherMirror = await mirrorWatchers( + tx, + template.organization_id, + params.targetOrganizationId, + params.templateAgentId, + upsert.agentId, + params.userId + ); + const classifiers = await mirrorEventClassifiers( + tx, + template.organization_id, + params.targetOrganizationId, + params.templateAgentId, + params.userId, + watcherMirror.watcherIdsByTemplateId + ); result = { agentId: upsert.agentId, @@ -340,6 +838,8 @@ export async function installAgentFromTemplate( mirrored: { entity_types: entityTypes, entity_relationship_types: relationshipTypes, + event_classifiers: classifiers, + watchers: watcherMirror.count, }, created: upsert.created, }; From 32cd208913b676c2309d72e5df97d2ff39894cbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Sat, 25 Apr 2026 15:25:41 +0100 Subject: [PATCH 3/3] fix(agents): require admin/owner membership for template install Defense in depth: previously installAgentFromTemplate trusted the caller to have already authorised the user against the target org. The public install route does enforce that (it derives targetOrganizationId from the authenticated user's personal org), but any future caller could bypass it. Add an explicit membership lookup against the "member" table inside installAgentFromTemplate. Refuses when: - user has no member row in the target org - user is a plain member (only owner/admin can install) Adds two tests for the new error paths. --- .../integration/agents/install.test.ts | 23 +++++++++++++++++++ .../owletto-backend/src/agents/install.ts | 17 ++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/packages/owletto-backend/src/__tests__/integration/agents/install.test.ts b/packages/owletto-backend/src/__tests__/integration/agents/install.test.ts index 6409c9260..4709cebe3 100644 --- a/packages/owletto-backend/src/__tests__/integration/agents/install.test.ts +++ b/packages/owletto-backend/src/__tests__/integration/agents/install.test.ts @@ -319,6 +319,29 @@ describe('installAgentFromTemplate', () => { ).rejects.toThrow(/Cannot install template agent into its own org/); }); + it('refuses to install when the user is not a member of the target org', async () => { + const strangerOrg = await createTestOrganization({ name: 'Stranger Org' }); + await expect( + installAgentFromTemplate({ + templateAgentId: templateAgent.agentId, + targetOrganizationId: strangerOrg.id, + userId: user.id, + }) + ).rejects.toThrow(/not a member/); + }); + + it('refuses to install when the user is a member but lacks admin/owner role', async () => { + const memberOrg = await createTestOrganization({ name: 'Member-Only Org' }); + await addUserToOrganization(user.id, memberOrg.id, 'member'); + await expect( + installAgentFromTemplate({ + templateAgentId: templateAgent.agentId, + targetOrganizationId: memberOrg.id, + userId: user.id, + }) + ).rejects.toThrow(/owner or admin role/); + }); + it('refuses to overwrite a user-authored row of the same slug', async () => { const sql = getTestDb(); const otherOrg = await createTestOrganization({ name: 'Other User Org' }); diff --git a/packages/owletto-backend/src/agents/install.ts b/packages/owletto-backend/src/agents/install.ts index e5b6dfd0e..7630127b0 100644 --- a/packages/owletto-backend/src/agents/install.ts +++ b/packages/owletto-backend/src/agents/install.ts @@ -785,6 +785,23 @@ export async function installAgentFromTemplate( ); } + const membership = await sql<{ role: string }[]>` + SELECT role FROM "member" + WHERE "organizationId" = ${params.targetOrganizationId} + AND "userId" = ${params.userId} + LIMIT 1 + `; + if (membership.length === 0) { + throw new Error( + `User ${params.userId} is not a member of organization ${params.targetOrganizationId}.` + ); + } + if (membership[0].role !== 'owner' && membership[0].role !== 'admin') { + throw new Error( + `Installing a template agent requires owner or admin role in organization ${params.targetOrganizationId}.` + ); + } + let result: InstallResult | null = null; await sql.begin(async (tx) => {