diff --git a/packages/owletto-backend/src/lobu/__tests__/agent-routes-apply.test.ts b/packages/owletto-backend/src/lobu/__tests__/agent-routes-apply.test.ts index e1d93f96f..ba2700d09 100644 --- a/packages/owletto-backend/src/lobu/__tests__/agent-routes-apply.test.ts +++ b/packages/owletto-backend/src/lobu/__tests__/agent-routes-apply.test.ts @@ -375,3 +375,147 @@ describe('PUT /agents/:agentId/connections/by-stable-id/:stableId', () => { expect(response.status).toBe(404); }); }); + +describe('concurrent-apply race fixes', () => { + beforeEach(async () => { + await resetTestDatabase(); + await seedOrg(ORG_A); + authStash.user = { id: 'u1', name: 'Test', email: 'u1@test', emailVerified: true }; + authStash.organizationId = ORG_A; + }); + + test('POST /agents — two concurrent creates resolve to one 201 + one 200, single row', async () => { + const app = await importAgentRoutes(); + + const payload = JSON.stringify({ + agentId: 'race-agent', + name: 'Race Agent', + description: 'concurrent', + }); + + // Both requests fire before either response — exercises the + // ON CONFLICT (id) DO NOTHING claim path. + const [r1, r2] = await Promise.all([ + app.request('/', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: payload, + }), + app.request('/', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: payload, + }), + ]); + + const statuses = [r1.status, r2.status].sort(); + expect(statuses).toEqual([200, 201]); + + const { getDb } = await import('../../db/client.js'); + const sql = getDb(); + + // Exactly one row. + const rows = await sql`SELECT id FROM agents WHERE id = 'race-agent'`; + expect(rows.length).toBe(1); + + // The auto-injected MCP server is exactly one entry — not double-written + // by both handlers (which would have left the same value but proved both + // ran the saveSettings path). + const settings = await sql` + SELECT mcp_servers FROM agents WHERE id = 'race-agent' + `; + expect(settings[0].mcp_servers).toEqual({ + owletto: { url: expect.stringContaining('/mcp/') }, + }); + }); + + test('POST /agents — concurrent create cannot overwrite operator-set MCP servers', async () => { + const app = await importAgentRoutes(); + const { getDb } = await import('../../db/client.js'); + const sql = getDb(); + + // First, do a normal create so the row + initial MCP server exist. + const initial = await app.request('/', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ agentId: 'preserved-agent', name: 'Preserved' }), + }); + expect(initial.status).toBe(201); + + // Operator overrides mcp_servers (e.g. via a subsequent PATCH /config). + await sql` + UPDATE agents + SET mcp_servers = ${sql.json({ owletto: { url: 'http://operator-set' } })}, + updated_at = NOW() + WHERE id = 'preserved-agent' + `; + + // Two concurrent re-applies must both take the idempotent path; neither + // should re-run the MCP auto-injection. + const [r1, r2] = await Promise.all([ + app.request('/', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ agentId: 'preserved-agent', name: 'Preserved' }), + }), + app.request('/', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ agentId: 'preserved-agent', name: 'Preserved' }), + }), + ]); + expect(r1.status).toBe(200); + expect(r2.status).toBe(200); + + const after = await sql` + SELECT mcp_servers FROM agents WHERE id = 'preserved-agent' + `; + expect(after[0].mcp_servers).toEqual({ + owletto: { url: 'http://operator-set' }, + }); + }); + + test('PUT /connections/by-stable-id — two concurrent identical PUTs converge to one row', async () => { + const app = await importAgentRoutes(); + await seedAgent(ORG_A, 'race-host'); + + const stableId = 'race-host-telegram-prod'; + const config = { chatId: '12345', endpoint: 'https://example.com' }; + + const [r1, r2] = await Promise.all([ + app.request(`/race-host/connections/by-stable-id/${stableId}`, { + method: 'PUT', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ platform: 'telegram', config }), + }), + app.request(`/race-host/connections/by-stable-id/${stableId}`, { + method: 'PUT', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ platform: 'telegram', config }), + }), + ]); + + expect([r1.status, r2.status].sort()).toEqual([200, 201]); + + const bodies = [(await r1.json()) as any, (await r2.json()) as any]; + const created = bodies.find((b) => b.connection && !b.noop && !b.updated); + expect(created).toBeTruthy(); + expect(created?.connection?.id).toBe(stableId); + + // The other response must be either noop:true (config unchanged) or + // updated:true (the loser observed the placeholder/empty config first + // and went down the update path). Both are correct per the race-fix + // contract: the row is consistent and we did not double-spawn. + const other = bodies.find((b) => b !== created); + expect(other?.noop === true || other?.updated === true).toBe(true); + + const { getDb } = await import('../../db/client.js'); + const sql = getDb(); + const rows = await sql` + SELECT id, agent_id, platform FROM agent_connections WHERE id = ${stableId} + `; + expect(rows.length).toBe(1); + expect(rows[0].agent_id).toBe('race-host'); + expect(rows[0].platform).toBe('telegram'); + }); +}); diff --git a/packages/owletto-backend/src/lobu/agent-routes.ts b/packages/owletto-backend/src/lobu/agent-routes.ts index d6ed2f088..0abc317f4 100644 --- a/packages/owletto-backend/src/lobu/agent-routes.ts +++ b/packages/owletto-backend/src/lobu/agent-routes.ts @@ -315,39 +315,52 @@ routes.post('/', mcpAuth, async (c) => { } const orgId = c.get('organizationId') as string; - const existingOrgId = await getAgentOrganizationId(agentId); - if (existingOrgId === orgId) { - // Idempotent path: agent already exists in this org. Return the - // existing payload without re-running the Owletto MCP auto-injection - // below — `lobu apply` re-runs this on every converge cycle and - // we don't want to overwrite operator-configured `mcpServers`. - const existing = await configStore.getMetadata(agentId); - if (!existing) { - return c.json({ error: 'Agent metadata missing' }, 500); + + // Atomic create. Two concurrent `lobu apply` runs from the same operator + // can both reach this endpoint with the same agentId; a check-then-write + // (read existing → branch → save) lets both pass the existence check and + // both run the MCP auto-injection below, clobbering operator-configured + // `mcpServers`. ON CONFLICT (id) DO NOTHING serializes on the primary + // key — only one INSERT returns a row, only that handler runs the + // auto-injection. + const sql = getDb(); + const now = new Date(); + const inserted = await sql` + INSERT INTO agents ( + id, organization_id, name, description, owner_platform, owner_user_id, created_at + ) + VALUES ( + ${agentId}, ${orgId}, ${name}, ${description ?? null}, + 'owletto', ${user.id}, ${now} + ) + ON CONFLICT (id) DO NOTHING + RETURNING id + `; + + if (inserted.length === 0) { + // Another writer (or a previous apply cycle) already owns this id. + // If they're in this org → idempotent 200. If another org → 409. + const existingOrgId = await getAgentOrganizationId(agentId); + if (existingOrgId === orgId) { + const existing = await configStore.getMetadata(agentId); + if (!existing) { + return c.json({ error: 'Agent metadata missing' }, 500); + } + return c.json( + { + agentId, + name: existing.name, + description: existing.description, + }, + 200 + ); } - return c.json( - { - agentId, - name: existing.name, - description: existing.description, - }, - 200 - ); - } - if (existingOrgId) { return c.json({ error: 'Agent ID already exists in another organization' }, 409); } - // Create metadata - await configStore.saveMetadata(agentId, { - agentId, - name, - description, - owner: { platform: 'owletto', userId: user.id }, - createdAt: Date.now(), - }); - - // Create default settings with Owletto MCP server auto-injected + // We won the create race — auto-inject the Owletto MCP server. A losing + // concurrent caller hits the idempotent branch above and never runs this, + // so an operator-configured `mcpServers` survives subsequent applies. const orgSlug = c.req.param('orgSlug'); const publicUrl = getConfiguredPublicOrigin() || `http://localhost:${process.env.PORT || '8787'}`; @@ -760,7 +773,7 @@ routes.put('/:agentId/connections/by-stable-id/:stableId', mcpAuth, async (c) => const { platform, config = {}, settings = {} } = body; if (!platform) return c.json({ error: 'platform is required' }, 400); - const existing = await connectionStore.getConnection(stableId); + let existing = await connectionStore.getConnection(stableId); if (existing && existing.templateAgentId && existing.templateAgentId !== agentId) { return c.json( { error: 'Stable ID already used by a different agent' }, @@ -770,106 +783,162 @@ routes.put('/:agentId/connections/by-stable-id/:stableId', mcpAuth, async (c) => const chatManager = getChatInstanceManager(); - if (existing) { - // Compute the merged config the way ChatInstanceManager.updateConnection - // does: skip `***...` placeholders so a sanitized round-trip from the - // GET endpoint doesn't trigger a spurious "changed" classification. - const previousConfig = (existing.config ?? {}) as Record; - const submittedConfig = { platform, ...config } as Record; - const merged: Record = { ...previousConfig }; - for (const [key, value] of Object.entries(submittedConfig)) { - if (typeof value === 'string' && value.startsWith('***')) continue; - merged[key] = value; - } - merged.platform = platform; - - const configChanged = !configsShallowEqual(merged, previousConfig); - // Settings (allowFrom, allowGroups, etc.) are persisted alongside the - // connection config and are part of "did anything change?" — a - // settings-only update must trigger willRestart, not be silently noop'd. - const previousSettings = (existing.settings ?? {}) as Record; - const mergedSettings = { allowGroups: true, ...settings } as Record; - const settingsChanged = !configsShallowEqual(mergedSettings, previousSettings); - - if (!configChanged && !settingsChanged) { - return c.json({ noop: true, connection: existing }, 200); + if (!existing) { + // Atomic claim. Two concurrent applies can both observe `existing===null` + // and both go to the create path; without this guard, both would call + // chatManager.addConnection() (or the fallback persist) and the second + // write would clobber the first via ON CONFLICT DO UPDATE in + // saveConnection, plus spawn a duplicate platform instance. ON CONFLICT + // DO NOTHING serializes on the primary key — only one PUT inserts. + const sql = getDb(); + const claimNow = new Date(); + const claimed = await sql` + INSERT INTO agent_connections ( + id, agent_id, platform, config, settings, metadata, status, created_at, updated_at + ) + VALUES ( + ${stableId}, ${agentId}, ${platform}, + ${sql.json({})}, ${sql.json({})}, ${sql.json({})}, + 'stopped', ${claimNow}, ${claimNow} + ) + ON CONFLICT (id) DO NOTHING + RETURNING id + `; + + if (claimed.length > 0) { + // We won the create race. Run the full create flow; both the manager + // and fallback paths re-write the row via saveConnection (ON CONFLICT + // DO UPDATE), so updating our placeholder is the same path they'd + // take on a freshly-inserted row. + if (chatManager) { + try { + const created = await chatManager.addConnection( + platform, + agentId, + { platform, ...config }, + { allowGroups: true, ...settings }, + {}, + stableId + ); + await persistConnectionSnapshot(created); + return c.json({ connection: created }, 201); + } catch (error: any) { + // Roll back the placeholder so a retry doesn't see a half-baked + // row that fails the `existing.templateAgentId` check inconsistently. + try { + await connectionStore.deleteConnection(stableId); + } catch { + // best-effort + } + return c.json({ error: error.message || 'Failed to create connection' }, 400); + } + } + + // Fallback path mirrors the POST handler's no-manager branch but uses + // the supplied stable ID instead of a synthesized one. Platform is kept + // in config (matching the manager path) so subsequent idempotent PUTs + // see a stable previousConfig. Settings default `allowGroups: true` to + // match the manager-path default — symmetric with the noop comparison + // below so a follow-up PUT with no settings field round-trips as noop. + const fallbackNow = Date.now(); + await connectionStore.saveConnection({ + id: stableId, + platform, + templateAgentId: agentId, + config: { platform, ...config } as Record, + settings: { allowGroups: true, ...settings } as any, + metadata: {}, + status: 'stopped', + createdAt: fallbackNow, + updatedAt: fallbackNow, + }); + return c.json( + { connection: { id: stableId, platform, status: 'stopped' } }, + 201 + ); } - if (chatManager) { - try { - const updated = await chatManager.updateConnection(stableId, { - config: { platform, ...config }, - settings: { allowGroups: true, ...settings }, - }); - await persistConnectionSnapshot(updated); - return c.json( - { updated: true, willRestart: true, connection: updated }, - 200 - ); - } catch (error: any) { - return c.json({ error: error.message || 'Failed to update connection' }, 400); - } + // We lost the race — someone else inserted the row between our read and + // our INSERT. Re-read and fall through to the update path so concurrent + // PUTs with the same config converge on a single noop response. + const reread = await connectionStore.getConnection(stableId); + if (!reread) { + // Unreachable in practice (insert returned 0 rows means a row existed, + // and we don't expose a delete that races with create). Treat as a + // create-failure to surface the inconsistency rather than 200-OK on + // missing data. + return c.json({ error: 'Connection vanished after conflict' }, 500); + } + if (reread.templateAgentId && reread.templateAgentId !== agentId) { + return c.json( + { error: 'Stable ID already used by a different agent' }, + 409 + ); } + existing = reread; + } - // Fallback when ChatInstanceManager is not available (e.g. boot races, - // tests). Persist the merged config directly. - await connectionStore.saveConnection({ - id: stableId, - platform, - templateAgentId: agentId, - config: merged as Record, - settings: { ...(existing.settings ?? {}), ...settings } as any, - metadata: existing.metadata ?? {}, - status: existing.status ?? 'stopped', - createdAt: existing.createdAt ?? Date.now(), - updatedAt: Date.now(), - }); - const refreshed = await connectionStore.getConnection(stableId); - return c.json( - { updated: true, willRestart: true, connection: refreshed }, - 200 - ); + // Update path. `existing` is guaranteed non-null at this point — either we + // saw it on the first read, or we re-read after losing the create race. + const current = existing; + + // Compute the merged config the way ChatInstanceManager.updateConnection + // does: skip `***...` placeholders so a sanitized round-trip from the + // GET endpoint doesn't trigger a spurious "changed" classification. + const previousConfig = (current.config ?? {}) as Record; + const submittedConfig = { platform, ...config } as Record; + const merged: Record = { ...previousConfig }; + for (const [key, value] of Object.entries(submittedConfig)) { + if (typeof value === 'string' && value.startsWith('***')) continue; + merged[key] = value; + } + merged.platform = platform; + + const configChanged = !configsShallowEqual(merged, previousConfig); + // Settings (allowFrom, allowGroups, etc.) are persisted alongside the + // connection config and are part of "did anything change?" — a + // settings-only update must trigger willRestart, not be silently noop'd. + const previousSettings = (current.settings ?? {}) as Record; + const mergedSettings = { allowGroups: true, ...settings } as Record; + const settingsChanged = !configsShallowEqual(mergedSettings, previousSettings); + + if (!configChanged && !settingsChanged) { + return c.json({ noop: true, connection: current }, 200); } - // No existing row — create with the caller-supplied stable ID. if (chatManager) { try { - const created = await chatManager.addConnection( - platform, - agentId, - { platform, ...config }, - { allowGroups: true, ...settings }, - {}, - stableId + const updated = await chatManager.updateConnection(stableId, { + config: { platform, ...config }, + settings: { allowGroups: true, ...settings }, + }); + await persistConnectionSnapshot(updated); + return c.json( + { updated: true, willRestart: true, connection: updated }, + 200 ); - await persistConnectionSnapshot(created); - return c.json({ connection: created }, 201); } catch (error: any) { - return c.json({ error: error.message || 'Failed to create connection' }, 400); + return c.json({ error: error.message || 'Failed to update connection' }, 400); } } - // Fallback path mirrors the POST handler's no-manager branch but uses - // the supplied stable ID instead of a synthesized one. Platform is kept - // in config (matching the manager path) so subsequent idempotent PUTs - // see a stable previousConfig. Settings default `allowGroups: true` to - // match the manager-path default — symmetric with the noop comparison - // above so a follow-up PUT with no settings field round-trips as noop. - const now = Date.now(); + // Fallback when ChatInstanceManager is not available (e.g. boot races, + // tests). Persist the merged config directly. await connectionStore.saveConnection({ id: stableId, platform, templateAgentId: agentId, - config: { platform, ...config } as Record, - settings: { allowGroups: true, ...settings } as any, - metadata: {}, - status: 'stopped', - createdAt: now, - updatedAt: now, + config: merged as Record, + settings: { ...(current.settings ?? {}), ...settings } as any, + metadata: current.metadata ?? {}, + status: current.status ?? 'stopped', + createdAt: current.createdAt ?? Date.now(), + updatedAt: Date.now(), }); + const refreshed = await connectionStore.getConnection(stableId); return c.json( - { connection: { id: stableId, platform, status: 'stopped' } }, - 201 + { updated: true, willRestart: true, connection: refreshed }, + 200 ); }); });