Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 191 additions & 5 deletions packages/owletto-backend/src/lobu/__tests__/agent-routes-apply.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ mock.module('../../auth/middleware', () => ({
requireAuth: async (_c: any, next: any) => next(),
}));

// `getChatInstanceManager` returns null in tests — this is the documented
// "no manager" fallback path that persists straight to the connection store.
// We don't need a real manager for these tests; the upsert + idempotent paths
// both have a no-manager branch that exercises the same correctness contract.
// `getChatInstanceManager` returns whatever the active test installs into
// `chatManagerStash.manager` — null by default. The fallback (no-manager) path
// covers most of the route's correctness contract; tests that exercise the
// manager-side serialization (e.g. addConnection-call-count) install a stub
// here.
const chatManagerStash: { manager: unknown } = { manager: null };

mock.module('../gateway', () => ({
getChatInstanceManager: () => null,
getChatInstanceManager: () => chatManagerStash.manager,
getLobuCoreServices: () => null,
initLobuGateway: async () => null,
stopLobuGateway: async () => {},
Expand Down Expand Up @@ -711,3 +714,186 @@ describe('admin-tier auth admission (requireSessionOrAdminPat)', () => {
expect(response.status).toBe(403);
});
});

describe('residual-race fixes (PR-466 follow-up)', () => {
beforeEach(async () => {
await resetTestDatabase();
await seedOrg(ORG_A);
authStash.user = { id: 'u1', name: 'Test', email: 'u1@test', emailVerified: true };
authStash.organizationId = ORG_A;
authStash.authSource = 'session';
authStash.mcpAuthInfo = null;
chatManagerStash.manager = null;
});

test(
'POST /agents — concurrent creates always leave mcpServers populated (20 iterations)',
async () => {
// The pre-fix flow split row creation and the auto-injected `mcpServers`
// into two writes (INSERT, then `saveSettings`). A concurrent loser
// returning 200 in the idempotent branch could observe the row before
// the winner's `saveSettings` landed and clobber `mcpServers` via a
// PATCH that races with the deferred winner write. Folding the column
// into the INSERT makes the loser see fully-initialized state on its
// first read.
const app = await importAgentRoutes();
const { getDb } = await import('../../db/client.js');
const sql = getDb();

for (let i = 0; i < 20; i++) {
const agentId = `race-mcp-${i}`;
const payload = JSON.stringify({
agentId,
name: `Race MCP ${i}`,
});

const [r1, r2] = await Promise.all([
app.request('/', {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: payload,
}),
app.request('/', {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: payload,
}),
]);

const statuses = [r1.status, r2.status].sort();
expect(statuses).toEqual([200, 201]);

// The agent row exists exactly once and `mcp_servers.owletto.url` is
// populated regardless of which request handler ran the INSERT and
// which observed it via the idempotent branch.
const rows = await sql`
SELECT id, mcp_servers FROM agents WHERE id = ${agentId}
`;
expect(rows.length).toBe(1);
const mcpServers = rows[0].mcp_servers as
| { owletto?: { url?: string } }
| undefined;
expect(mcpServers?.owletto?.url).toBeTruthy();
expect(mcpServers?.owletto?.url).toContain('/mcp/');
}
}
);

test(
'PUT /platforms/by-stable-id — concurrent PUTs invoke addConnection exactly once',
async () => {
// Without the in-process per-stableId chain (and the diagnostic
// pg_advisory_xact_lock backing it), a concurrent loser can re-read
// the just-created row mid-`addConnection` and call
// `updateConnection` against half-initialized state — potentially
// double-spawning the chat instance. The chain serializes the
// create-or-update flow on the stable ID; only the winner reaches
// `addConnection`, every other writer sees committed state and goes
// to the update path.
const app = await importAgentRoutes();
await seedAgent(ORG_A, 'mgr-host');

const calls: { addConnection: number; updateConnection: number } = {
addConnection: 0,
updateConnection: 0,
};

// Mock manager: addConnection takes a few microseconds (small Promise
// tick) so the second PUT has time to enqueue at the lock before the
// first PUT releases it. The lock fix is what guarantees the second
// PUT then sees a committed row.
chatManagerStash.manager = {
async addConnection(
platform: string,
templateAgentId: string,
config: Record<string, unknown>,
settings: Record<string, unknown>,
metadata: Record<string, unknown>,
stableId: string
) {
calls.addConnection++;
// Yield to the event loop a few times so the loser actually runs
// its lock-acquire while we hold ours.
for (let i = 0; i < 5; i++) {
await new Promise((r) => setTimeout(r, 0));
}
return {
id: stableId,
platform,
templateAgentId,
config,
settings,
metadata,
status: 'active' as const,
createdAt: Date.now(),
updatedAt: Date.now(),
};
},
async updateConnection(stableId: string, updates: Record<string, unknown>) {
calls.updateConnection++;
// Read the existing row through the connection store so the test
// mirrors the production manager behavior closely enough that the
// route's persistConnectionSnapshot call sees a sane shape.
const { getDb } = await import('../../db/client.js');
const sql = getDb();
const rows = await sql`
SELECT * FROM agent_connections WHERE id = ${stableId}
`;
const row = rows[0] ?? {};
const updatedConfig =
((updates as { config?: Record<string, unknown> }).config ??
row.config) ?? {};
const updatedSettings =
((updates as { settings?: Record<string, unknown> }).settings ??
row.settings) ?? {};
return {
id: stableId,
platform: row.platform ?? 'telegram',
templateAgentId: row.agent_id ?? 'mgr-host',
config: updatedConfig,
settings: updatedSettings,
metadata: row.metadata ?? {},
status: 'active' as const,
createdAt:
row.created_at instanceof Date
? row.created_at.getTime()
: (row.created_at ?? Date.now()),
updatedAt: Date.now(),
};
},
};

const stableId = 'mgr-host-telegram-prod';
const config = { chatId: '12345', endpoint: 'https://example.com' };

const [r1, r2] = await Promise.all([
app.request(`/mgr-host/platforms/by-stable-id/${stableId}`, {
method: 'PUT',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ platform: 'telegram', config }),
}),
app.request(`/mgr-host/platforms/by-stable-id/${stableId}`, {
method: 'PUT',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ platform: 'telegram', config }),
}),
]);

// Exactly one PUT created (201); the loser is either a noop (200) or
// an update (200), never another create.
expect([r1.status, r2.status].sort()).toEqual([200, 201]);

// The race-fix contract: only ONE addConnection call hit the manager.
// Without the in-process chain, both PUTs could observe `existing===null`
// and both call addConnection.
expect(calls.addConnection).toBe(1);

const { getDb } = await import('../../db/client.js');
const sql = getDb();
const rows = await sql`
SELECT id FROM agent_connections WHERE id = ${stableId}
`;
expect(rows.length).toBe(1);
}
);
});
Loading
Loading