diff --git a/db/migrations/20260526120000_event_embeddings_model_stamp.sql b/db/migrations/20260526120000_event_embeddings_model_stamp.sql new file mode 100644 index 000000000..05d219b18 --- /dev/null +++ b/db/migrations/20260526120000_event_embeddings_model_stamp.sql @@ -0,0 +1,111 @@ +-- migrate:up + +-- Version-stamp every embedding with the model that produced it. Without this, +-- swapping EMBEDDINGS_MODEL to a different model of the SAME dimensionality +-- silently mixes incompatible vector spaces in event_embeddings with no way to +-- detect or segregate the mismatched rows. The stamp lets future similarity +-- queries scope to a single model and makes a model swap auditable. +-- +-- NULL = produced before this column existed (legacy rows, unknown model). +ALTER TABLE public.event_embeddings ADD COLUMN IF NOT EXISTS embedding_model text; + +COMMENT ON COLUMN public.event_embeddings.embedding_model IS 'Model/version stamp of the embedding model that produced this vector (e.g. "Xenova/bge-base-en-v1.5"). NULL = legacy row written before stamping. Vectors from different stamps are NOT comparable even at equal dimensionality.'; + +-- Expose the stamp through current_event_records so similarity queries on the +-- view can scope to a single model. Appended at the end so CREATE OR REPLACE +-- keeps the existing column order; otherwise byte-identical to the baseline. +CREATE OR REPLACE VIEW public.current_event_records AS + SELECT e.id, + e.organization_id, + e.entity_ids, + e.origin_id, + e.title, + e.payload_type, + e.payload_text, + e.payload_data, + e.payload_template, + e.attachments, + e.metadata, + e.score, + emb.embedding, + e.author_name, + e.source_url, + e.occurred_at, + e.created_at, + e.origin_parent_id, + COALESCE(length(e.payload_text), 0) AS content_length, + e.search_tsv, + e.origin_type, + e.connector_key, + e.connection_id, + e.feed_key, + e.feed_id, + e.run_id, + e.semantic_type, + e.client_id, + e.created_by, + e.interaction_type, + e.interaction_status, + e.interaction_input_schema, + e.interaction_input, + e.interaction_output, + e.interaction_error, + e.supersedes_event_id, + emb.embedding_model + FROM (public.events e + LEFT JOIN public.event_embeddings emb ON ((emb.event_id = e.id))) + WHERE (NOT (EXISTS ( SELECT 1 + FROM public.events newer + WHERE (newer.supersedes_event_id = e.id)))); + +-- migrate:down + +-- CREATE OR REPLACE VIEW cannot REMOVE a column from an existing view (Postgres +-- only allows appending columns at the end). Drop and recreate +-- current_event_records WITHOUT embedding_model; the recreated view no longer +-- references the column, so the subsequent DROP COLUMN succeeds. +DROP VIEW IF EXISTS public.current_event_records; +CREATE VIEW public.current_event_records AS + SELECT e.id, + e.organization_id, + e.entity_ids, + e.origin_id, + e.title, + e.payload_type, + e.payload_text, + e.payload_data, + e.payload_template, + e.attachments, + e.metadata, + e.score, + emb.embedding, + e.author_name, + e.source_url, + e.occurred_at, + e.created_at, + e.origin_parent_id, + COALESCE(length(e.payload_text), 0) AS content_length, + e.search_tsv, + e.origin_type, + e.connector_key, + e.connection_id, + e.feed_key, + e.feed_id, + e.run_id, + e.semantic_type, + e.client_id, + e.created_by, + e.interaction_type, + e.interaction_status, + e.interaction_input_schema, + e.interaction_input, + e.interaction_output, + e.interaction_error, + e.supersedes_event_id + FROM (public.events e + LEFT JOIN public.event_embeddings emb ON ((emb.event_id = e.id))) + WHERE (NOT (EXISTS ( SELECT 1 + FROM public.events newer + WHERE (newer.supersedes_event_id = e.id)))); + +ALTER TABLE public.event_embeddings DROP COLUMN IF EXISTS embedding_model; diff --git a/packages/connector-worker/src/__tests__/embeddings-model-stamp.test.ts b/packages/connector-worker/src/__tests__/embeddings-model-stamp.test.ts new file mode 100644 index 000000000..fbccf7ee5 --- /dev/null +++ b/packages/connector-worker/src/__tests__/embeddings-model-stamp.test.ts @@ -0,0 +1,39 @@ +/** + * Finding #3 reproducer (part a): embeddings must be version-stamped so a + * same-dimension model swap can't silently mix incompatible vector spaces. + * + * `resolveServiceModel` is the guard that `fetchEmbeddingsFromService` applies + * to every service response. Asserted directly (pure function) so the check is + * immune to bun's process-global `mock.module` of '../embeddings.js' in the + * sibling executor tests. + * + * - a service `model` that differs from the worker's expectation throws + * (fail loud) — the exact silent-mixing case, even at equal dimensionality; + * - a matching `model` resolves to that stamp; + * - an omitted `model` falls back to the worker's expectation. + * + * Part (b) — the stamp is mapped onto each streamed event — is asserted in + * executor-batch-embed.test.ts; persistence onto event_embeddings is asserted + * server-side in insert-event-embedding-model.test.ts. + */ + +import { describe, expect, test } from 'bun:test'; +import { resolveServiceModel } from '../embeddings-model.js'; + +const EXPECTED = 'Xenova/bge-base-en-v1.5'; + +describe('embeddings model version stamp guard (Finding #3)', () => { + test('rejects a same-dimension model mismatch (fail loud)', () => { + expect(() => resolveServiceModel('some-other-model-v2', EXPECTED)).toThrow( + /returned model 'some-other-model-v2' but this worker expects/ + ); + }); + + test('resolves to the service model on a match', () => { + expect(resolveServiceModel(EXPECTED, EXPECTED)).toBe(EXPECTED); + }); + + test('falls back to the expected model when the service omits one', () => { + expect(resolveServiceModel(undefined, EXPECTED)).toBe(EXPECTED); + }); +}); diff --git a/packages/connector-worker/src/__tests__/executor-batch-embed.test.ts b/packages/connector-worker/src/__tests__/executor-batch-embed.test.ts new file mode 100644 index 000000000..230cf5a82 --- /dev/null +++ b/packages/connector-worker/src/__tests__/executor-batch-embed.test.ts @@ -0,0 +1,140 @@ +/** + * Finding #12 reproducer: the sync embedding path must batch a whole event + * chunk into ONE embedding call (one HTTP round-trip / vectorized pass), not + * one call per event, while still mapping each vector back to its source event. + * + * Strategy: mock `executeCompiledConnector` to invoke `onEventChunk` with a + * 3-event chunk, mock `batchGenerateEmbeddings` to return distinguishable + * vectors, and assert: + * - batchGenerateEmbeddings was called exactly ONCE (not 3x), + * - it received all 3 chunk texts in one call, + * - each streamed ContentItem carries the vector for its own text + the model + * stamp, + * - an event with empty text gets no embedding (per-event association held). + */ + +import { afterEach, beforeEach, describe, expect, mock, test } from 'bun:test'; + +// biome-ignore lint/suspicious/noExplicitAny: test seam — module mocks need loose types +type AnyFn = (...args: any[]) => any; + +// Return a vector derived from the text so we can assert per-event mapping. +const batchGenerateEmbeddingsMock = mock(async (texts: string[]) => ({ + embeddings: texts.map((t) => [t.length, 0, 0]), + model: 'stub-model-v1', +})); + +let capturedHooks: { onEventChunk: (events: unknown[]) => Promise } | undefined; + +const executeCompiledConnectorMock = mock(async (args: { hooks: typeof capturedHooks }) => { + capturedHooks = args.hooks; + return { mode: 'sync', checkpoint: null }; +}); + +mock.module('../executor/runtime.js', () => ({ + executeCompiledConnector: executeCompiledConnectorMock, +})); + +mock.module('../embeddings.js', () => ({ + batchGenerateEmbeddings: batchGenerateEmbeddingsMock, + generateEmbedding: async () => [0, 0, 0], +})); + +mock.module('../compile-connector.js', () => ({ + compileConnectorFromFile: async () => 'compiled-code', + findBundledConnectorFile: () => '/fake/path', +})); + +mock.module('../executor/subprocess.js', () => ({ + SubprocessExecutor: class { + // biome-ignore lint/suspicious/noExplicitAny: test stub + constructor(_opts: any) {} + }, +})); + +import type { ContentItem } from '../daemon/client.js'; +import { executeRun } from '../daemon/executor.js'; + +function makeStubClient() { + const streamed: ContentItem[] = []; + const client = { + id: 'test-worker', + version: 'test', + streamed, + async heartbeat() {}, + async stream(batch: { items: ContentItem[] }) { + streamed.push(...batch.items); + }, + async complete() {}, + async completeAction() {}, + async completeEmbeddings() {}, + async completeAuth() {}, + async emitAuthArtifact() {}, + async pollAuthSignal() { + return { signal: null }; + }, + async fetchEventsForEmbedding() { + return []; + }, + }; + return client; +} + +describe('sync embedding path batches per chunk (Finding #12)', () => { + beforeEach(() => { + capturedHooks = undefined; + executeCompiledConnectorMock.mockClear(); + batchGenerateEmbeddingsMock.mockClear(); + }); + + afterEach(() => { + batchGenerateEmbeddingsMock.mockClear(); + }); + + test('one chunk of N events triggers exactly one batch call with all texts mapped back', async () => { + const client = makeStubClient(); + + executeCompiledConnectorMock.mockImplementationOnce(async (args: { hooks: typeof capturedHooks }) => { + capturedHooks = args.hooks; + // One chunk, three events. The third has empty text (no embeddable + // content) — it must still stream through, just without a vector. + await capturedHooks!.onEventChunk([ + { origin_id: 'a', payload_text: 'aa', occurred_at: new Date(), origin_type: 'post' }, + { origin_id: 'b', payload_text: 'bbbb', occurred_at: new Date(), origin_type: 'post' }, + { origin_id: 'c', payload_text: '', title: '', occurred_at: new Date(), origin_type: 'post' }, + ]); + return { mode: 'sync', checkpoint: null }; + }); + + const job = { + run_id: 500, + run_type: 'sync', + connector_key: 'fake', + feed_key: 'feed', + compiled_code: 'compiled-code', + // biome-ignore lint/suspicious/noExplicitAny: minimal job shape + } as any; + + // batchSize=10 so the chunk does not flush mid-loop; default generateEmbeddings=true. + // biome-ignore lint/suspicious/noExplicitAny: minimal env + const result = await executeRun(client as any, job, {} as any, { batchSize: 10 }); + expect(result.error).toBeUndefined(); + + // (1) exactly ONE batch call for the whole chunk — not one per event. + expect(batchGenerateEmbeddingsMock).toHaveBeenCalledTimes(1); + + // (2) the call received only the embeddable texts ('a'+'b'; 'c' is empty). + const callArgs = batchGenerateEmbeddingsMock.mock.calls[0]![0] as string[]; + expect(callArgs).toEqual(['aa', 'bbbb']); + + // (3) per-event mapping: 'aa' (len 2) and 'bbbb' (len 4) get their own + // vectors + the model stamp; the empty event gets no embedding. + const byId = new Map(client.streamed.map((it) => [it.id, it])); + expect(byId.get('a')!.embedding).toEqual([2, 0, 0]); + expect(byId.get('a')!.embedding_model).toBe('stub-model-v1'); + expect(byId.get('b')!.embedding).toEqual([4, 0, 0]); + expect(byId.get('b')!.embedding_model).toBe('stub-model-v1'); + expect(byId.get('c')!.embedding).toBeUndefined(); + expect(byId.get('c')!.embedding_model).toBeUndefined(); + }); +}); diff --git a/packages/connector-worker/src/__tests__/executor-heartbeat.test.ts b/packages/connector-worker/src/__tests__/executor-heartbeat.test.ts index 950cf20d3..421cbf9d8 100644 --- a/packages/connector-worker/src/__tests__/executor-heartbeat.test.ts +++ b/packages/connector-worker/src/__tests__/executor-heartbeat.test.ts @@ -24,9 +24,10 @@ const executeCompiledConnectorMock = mock(async () => ({ output: { ok: true }, })); -const batchGenerateEmbeddingsMock = mock(async (texts: string[]) => - texts.map(() => [0.1, 0.2, 0.3]) -); +const batchGenerateEmbeddingsMock = mock(async (texts: string[]) => ({ + embeddings: texts.map(() => [0.1, 0.2, 0.3]), + model: 'test-model', +})); mock.module('../executor/runtime.js', () => ({ executeCompiledConnector: executeCompiledConnectorMock, @@ -163,7 +164,7 @@ describe('executor heartbeats (lobu#860)', () => { batchGenerateEmbeddingsMock.mockImplementationOnce(async (texts: string[]) => { await fireIntervalTicks(2); - return texts.map(() => [0.1, 0.2, 0.3]); + return { embeddings: texts.map(() => [0.1, 0.2, 0.3]), model: 'test-model' }; }); const job = { diff --git a/packages/connector-worker/src/daemon/client.ts b/packages/connector-worker/src/daemon/client.ts index 7a0a367c3..3f85655da 100644 --- a/packages/connector-worker/src/daemon/client.ts +++ b/packages/connector-worker/src/daemon/client.ts @@ -139,6 +139,8 @@ export interface ContentItem { metadata?: Record; origin_parent_id?: string; embedding?: number[]; + /** Model/version stamp that produced `embedding`; persisted so vector spaces never mix. */ + embedding_model?: string; origin_type?: string; semantic_type?: string; } @@ -185,7 +187,7 @@ export interface EmbedEvent { export interface CompleteEmbeddingsRequest { run_id: number; worker_id: string; - embeddings: Array<{ event_id: number; embedding: number[] }>; + embeddings: Array<{ event_id: number; embedding: number[]; embedding_model?: string }>; error_message?: string; } diff --git a/packages/connector-worker/src/daemon/executor.ts b/packages/connector-worker/src/daemon/executor.ts index ca476250d..a151d4cc6 100644 --- a/packages/connector-worker/src/daemon/executor.ts +++ b/packages/connector-worker/src/daemon/executor.ts @@ -7,7 +7,7 @@ import type { Env, EventEnvelope } from '@lobu/connector-sdk'; import { compileConnectorFromFile, findBundledConnectorFile } from '../compile-connector.js'; -import { batchGenerateEmbeddings, generateEmbedding } from '../embeddings.js'; +import { batchGenerateEmbeddings } from '../embeddings.js'; import { executeCompiledConnector } from '../executor/runtime.js'; import { SubprocessExecutor } from '../executor/subprocess.js'; import type { ContentItem, ExecutorClient, PollResponse } from './client.js'; @@ -232,8 +232,8 @@ async function executeSyncRun( } }, onEventChunk: async (events) => { - for (const event of events) { - const contentItem = await processEvent(event, cfg.generateEmbeddings); + const contentItems = await processEventChunk(events, cfg.generateEmbeddings); + for (const contentItem of contentItems) { batch.push(contentItem); itemsCollectedSoFar++; @@ -637,13 +637,13 @@ async function executeEmbedBackfillRun( })) .filter((p) => p.text.length > 0); - const results: Array<{ event_id: number; embedding: number[] }> = []; + const results: Array<{ event_id: number; embedding: number[]; embedding_model: string }> = []; try { - const embeddings = await batchGenerateEmbeddings(pending.map((p) => p.text)); + const { embeddings, model } = await batchGenerateEmbeddings(pending.map((p) => p.text)); for (let i = 0; i < pending.length; i++) { const embedding = embeddings[i]; if (embedding) { - results.push({ event_id: pending[i]!.event_id, embedding }); + results.push({ event_id: pending[i]!.event_id, embedding, embedding_model: model }); } } } catch (err) { @@ -698,18 +698,15 @@ function mergeEnv( /** * Convert a V1 EventEnvelope (the SDK's standard sync output) into the - * gateway-bound ContentItem shape, optionally generating an embedding. + * gateway-bound ContentItem shape (without an embedding). */ -async function processEvent( - event: EventEnvelope, - generateEmbeddings: boolean -): Promise { +function toContentItem(event: EventEnvelope): ContentItem { const occurredAtIso = event.occurred_at instanceof Date ? event.occurred_at.toISOString() : (event.occurred_at as unknown as string); - const contentItem: ContentItem = { + return { id: event.origin_id, title: event.title, payload_text: event.payload_text, @@ -722,20 +719,56 @@ async function processEvent( origin_type: event.origin_type, semantic_type: event.semantic_type ?? event.origin_type, }; +} - if (generateEmbeddings) { - try { - const textForEmbedding = [event.title, event.payload_text] - .filter(Boolean) - .join(' ') - .trim(); - if (textForEmbedding) { - contentItem.embedding = await generateEmbedding(textForEmbedding); +/** + * Convert a chunk of events into ContentItems, generating embeddings for the + * whole chunk in a single batch call (one HTTP round-trip / vectorized local + * pass) instead of one per event. Vectors are mapped back to their source + * event by index; events with empty text get no embedding. A batch failure is + * logged and the items stream through without embeddings (same fail-open + * behaviour as the previous per-event path). + */ +async function processEventChunk( + events: EventEnvelope[], + generateEmbeddings: boolean +): Promise { + const contentItems = events.map(toContentItem); + + if (!generateEmbeddings || contentItems.length === 0) { + return contentItems; + } + + // Collect the embeddable texts and remember which ContentItem each maps to, + // so vectors line up after the batch call even though empty-text items are + // skipped. + const targets: number[] = []; + const texts: string[] = []; + for (let i = 0; i < events.length; i++) { + const text = [events[i]!.title, events[i]!.payload_text].filter(Boolean).join(' ').trim(); + if (text) { + targets.push(i); + texts.push(text); + } + } + + if (texts.length === 0) { + return contentItems; + } + + try { + const { embeddings, model } = await batchGenerateEmbeddings(texts); + for (let j = 0; j < targets.length; j++) { + const embedding = embeddings[j]; + if (embedding) { + const item = contentItems[targets[j]!]!; + item.embedding = embedding; + item.embedding_model = model; } - } catch (err) { - console.error(`[executor] Embedding generation failed for ${event.origin_id}:`, err); } + } catch (err) { + console.error('[executor] Batch embedding generation failed for chunk:', err); } - return contentItem; + return contentItems; } diff --git a/packages/connector-worker/src/embeddings-model.ts b/packages/connector-worker/src/embeddings-model.ts new file mode 100644 index 000000000..f2c7f34df --- /dev/null +++ b/packages/connector-worker/src/embeddings-model.ts @@ -0,0 +1,31 @@ +/** + * Embedding model/version stamp guard. + * + * Kept in its own module (separate from `embeddings.ts`) so it can be unit + * tested in isolation — `embeddings.ts` is module-mocked by the executor tests, + * and bun's `mock.module` is process-global, so the guard logic would otherwise + * be unreachable in those test runs. + */ + +/** + * Resolve the model/version stamp to persist for a service-produced embedding. + * + * Equal dimensionality is NOT enough — vectors from a different model occupy an + * incompatible space, so a same-dimension model swap would silently mix spaces + * in `event_embeddings`. Throws (fail loud) when the service reports a model + * that differs from the worker's expectation; otherwise returns the stamp + * (service-reported when present, else the confirmed expectation). + */ +export function resolveServiceModel( + serviceModel: string | undefined, + expectedModel: string +): string { + if (serviceModel && serviceModel !== expectedModel) { + throw new Error( + `Embeddings service returned model '${serviceModel}' but this worker expects ` + + `'${expectedModel}'. Refusing to mix incompatible vector spaces — align ` + + `EMBEDDINGS_MODEL on the worker and the embeddings service.` + ); + } + return serviceModel || expectedModel; +} diff --git a/packages/connector-worker/src/embeddings.ts b/packages/connector-worker/src/embeddings.ts index 50dfcb972..08ab950f7 100644 --- a/packages/connector-worker/src/embeddings.ts +++ b/packages/connector-worker/src/embeddings.ts @@ -8,24 +8,42 @@ import { DEFAULT_DIMENSIONS, batchGenerateLocalEmbeddings, + getLocalModelName, validateEmbeddingDimensions, } from '@lobu/embeddings'; +import { resolveServiceModel } from './embeddings-model.js'; const DEFAULT_BATCH_SIZE = 32; const DEFAULT_TIMEOUT_MS = 30000; +/** Embeddings plus the model/version stamp that produced them. */ +export interface EmbeddingResult { + embeddings: number[][]; + /** Model identifier persisted on each row so different vector spaces never mix. */ + model: string; +} + function getExpectedDimensions(): number { const raw = process.env.EMBEDDINGS_DIMENSIONS; const parsed = raw ? Number.parseInt(raw, 10) : DEFAULT_DIMENSIONS; return Number.isFinite(parsed) ? parsed : DEFAULT_DIMENSIONS; } +/** + * The model this worker is configured to produce/consume. Both backends read + * the same env var (with the same default), so this is the authoritative + * expectation the service must match — see {@link resolveServiceModel}. + */ +export function getExpectedEmbeddingModel(): string { + return getLocalModelName(); +} + function getTimeoutMs(): number { const parsed = Number.parseInt(process.env.EMBEDDINGS_TIMEOUT_MS || '', 10); return Number.isFinite(parsed) ? parsed : DEFAULT_TIMEOUT_MS; } -async function fetchEmbeddingsFromService(texts: string[]): Promise { +async function fetchEmbeddingsFromService(texts: string[]): Promise { const baseUrl = process.env.EMBEDDINGS_SERVICE_URL; if (!baseUrl) { throw new Error('EMBEDDINGS_SERVICE_URL is required for service backend'); @@ -60,6 +78,7 @@ async function fetchEmbeddingsFromService(texts: string[]): Promise const payload = (await response.json()) as { embeddings?: number[][]; dimensions?: number; + model?: string; }; if (!Array.isArray(payload.embeddings)) { @@ -78,27 +97,30 @@ async function fetchEmbeddingsFromService(texts: string[]): Promise ); } + // Fail loud on a model mismatch; otherwise resolve the stamp to persist. + const model = resolveServiceModel(payload.model, getExpectedEmbeddingModel()); + for (const embedding of payload.embeddings) { validateEmbeddingDimensions(embedding, getExpectedDimensions(), 'Embeddings service'); } - return payload.embeddings; + return { embeddings: payload.embeddings, model }; } finally { clearTimeout(timeout); } } export async function generateEmbedding(text: string): Promise { - const [embedding] = await batchGenerateEmbeddings([text]); - return embedding; + const { embeddings } = await batchGenerateEmbeddings([text]); + return embeddings[0]!; } export async function batchGenerateEmbeddings( texts: string[], batchSize: number = DEFAULT_BATCH_SIZE -): Promise { +): Promise { if (texts.length === 0) { - return []; + return { embeddings: [], model: getExpectedEmbeddingModel() }; } if (process.env.EMBEDDINGS_SERVICE_URL) { @@ -109,5 +131,5 @@ export async function batchGenerateEmbeddings( for (const embedding of embeddings) { validateEmbeddingDimensions(embedding, getExpectedDimensions(), 'Local embeddings'); } - return embeddings; + return { embeddings, model: getLocalModelName() }; } diff --git a/packages/server/src/__tests__/integration/events/embedding-model-stamp.test.ts b/packages/server/src/__tests__/integration/events/embedding-model-stamp.test.ts new file mode 100644 index 000000000..c0f92e147 --- /dev/null +++ b/packages/server/src/__tests__/integration/events/embedding-model-stamp.test.ts @@ -0,0 +1,117 @@ +/** + * Finding #3 reproducer (part b): the embedding model/version stamp is + * persisted on event_embeddings. + * + * Before the fix, event_embeddings had no model column and insertEvent + * discarded any stamp, so swapping EMBEDDINGS_MODEL to a different + * same-dimension model silently mixed incompatible vector spaces with no + * record of which model produced which row. This drives the production + * insertEvent path with an embeddingModel and asserts it round-trips into + * event_embeddings.embedding_model. + */ + +import { beforeAll, describe, expect, it } from 'vitest'; +import { insertEvent } from '../../../utils/insert-event'; +import { cleanupTestDatabase, getTestDb } from '../../setup/test-db'; +import { + addUserToOrganization, + createTestConnection, + createTestConnectorDefinition, + createTestEntity, + createTestOrganization, + createTestUser, + seedSystemEntityTypes, +} from '../../setup/test-fixtures'; + +const EMBEDDING_DIM = 768; + +function unitVec(): number[] { + const v = new Array(EMBEDDING_DIM).fill(0); + v[0] = 1; + return v; +} + +describe('event_embeddings model stamp (Finding #3)', () => { + let orgId: string; + let entityId: number; + let connectionId: number; + + beforeAll(async () => { + await cleanupTestDatabase(); + await seedSystemEntityTypes(); + + const org = await createTestOrganization({ name: 'Embedding Stamp Org' }); + orgId = org.id; + const user = await createTestUser({ email: 'embed-stamp-test@example.com' }); + await addUserToOrganization(user.id, org.id, 'owner'); + const entity = await createTestEntity({ name: 'Stamp Target', organization_id: org.id }); + entityId = entity.id; + + await createTestConnectorDefinition({ + key: 'embed-stamp-connector', + name: 'Embed Stamp', + organization_id: org.id, + }); + const connection = await createTestConnection({ + organization_id: org.id, + connector_key: 'embed-stamp-connector', + entity_ids: [entity.id], + }); + connectionId = connection.id; + }); + + it('persists embedding_model alongside the vector', async () => { + const inserted = await insertEvent( + { + entityIds: [entityId], + organizationId: orgId, + originId: `stamp-${Date.now()}`, + title: 'Stamped event', + content: 'content with an embedding', + occurredAt: new Date(), + semanticType: 'content', + originType: 'content', + connectorKey: 'embed-stamp-connector', + connectionId, + embedding: unitVec(), + embeddingModel: 'Xenova/bge-base-en-v1.5', + }, + { onConflictUpdate: true } + ); + + const sql = getTestDb(); + const rows = (await sql` + SELECT embedding_model FROM event_embeddings WHERE event_id = ${inserted.id} + `) as Array<{ embedding_model: string | null }>; + + expect(rows).toHaveLength(1); + expect(rows[0]!.embedding_model).toBe('Xenova/bge-base-en-v1.5'); + }); + + it('leaves embedding_model NULL when no stamp is supplied (legacy path)', async () => { + const inserted = await insertEvent( + { + entityIds: [entityId], + organizationId: orgId, + originId: `nostamp-${Date.now()}`, + title: 'Unstamped event', + content: 'content with an embedding but no model', + occurredAt: new Date(), + semanticType: 'content', + originType: 'content', + connectorKey: 'embed-stamp-connector', + connectionId, + embedding: unitVec(), + }, + { onConflictUpdate: true } + ); + + const sql = getTestDb(); + const rows = (await sql` + SELECT embedding_model FROM event_embeddings WHERE event_id = ${inserted.id} + `) as Array<{ embedding_model: string | null }>; + + expect(rows).toHaveLength(1); + expect(rows[0]!.embedding_model).toBeNull(); + }); +}); diff --git a/packages/server/src/__tests__/integration/events/embedding-model-swap-e2e.test.ts b/packages/server/src/__tests__/integration/events/embedding-model-swap-e2e.test.ts new file mode 100644 index 000000000..45224e74a --- /dev/null +++ b/packages/server/src/__tests__/integration/events/embedding-model-swap-e2e.test.ts @@ -0,0 +1,301 @@ +/** + * Finding #3 end-to-end reproducer: a same-dimension model swap must NOT mix + * vector spaces. + * + * Scenario: + * 1. Configure model A, ingest an event embedded under model A. + * 2. Under model A, a vector search returns the row (control — scoping does + * not over-filter the current model). + * 3. Switch the configured model to a DIFFERENT same-dimension model B. + * 4. Under model B, the same vector search must NOT return the model-A row + * (its vector lives in an incompatible space). + * 5. Under model B, the backfill "needs embedding" query must flag the + * model-A row as stale so it gets re-embedded. + * + * Before the fix, steps 4 and 5 failed: search compared across models and + * backfill only looked for missing (not stale) embeddings. + */ + +import type { Context } from 'hono'; +import { afterAll, beforeAll, describe, expect, it } from 'vitest'; +import { searchContentByText } from '../../../utils/content-search'; +import type { Env } from '../../../index'; +import { insertEvent } from '../../../utils/insert-event'; +import { completeEmbeddings } from '../../../worker-api'; +import { cleanupTestDatabase, getTestDb } from '../../setup/test-db'; +import { + addUserToOrganization, + createTestConnection, + createTestConnectorDefinition, + createTestEntity, + createTestOrganization, + createTestUser, + seedSystemEntityTypes, +} from '../../setup/test-fixtures'; + +const EMBEDDING_DIM = 768; +const MODEL_A = 'Xenova/bge-base-en-v1.5'; +const MODEL_B = 'Xenova/some-other-768d-model'; + +// Identical 768-d vector for both the stored row and the query, so the ONLY +// thing that can exclude the row is the model-scope predicate (not distance). +function unitVec(): number[] { + const v = new Array(EMBEDDING_DIM).fill(0); + v[0] = 1; + return v; +} + +// Minimal Hono Context to drive worker-api handlers directly: completeEmbeddings +// only reads the JSON body and calls c.json() on the success path (no c.env). +function mockEmbeddingsCtx(body: unknown): { + ctx: Context<{ Bindings: Env }>; + result: () => { body: unknown; status: number }; +} { + let captured: { body: unknown; status: number } = { body: undefined, status: 200 }; + const ctx = { + req: { json: async () => body }, + json: (b: unknown, status?: number) => { + captured = { body: b, status: status ?? 200 }; + return captured as unknown as Response; + }, + } as unknown as Context<{ Bindings: Env }>; + return { ctx, result: () => captured }; +} + +describe('embedding model swap E2E (Finding #3)', () => { + let orgId: string; + let entityId: number; + let connectionId: number; + let eventId: number; + let originalModel: string | undefined; + + beforeAll(async () => { + originalModel = process.env.EMBEDDINGS_MODEL; + process.env.EMBEDDINGS_MODEL = MODEL_A; + + await cleanupTestDatabase(); + await seedSystemEntityTypes(); + + const org = await createTestOrganization({ name: 'Model Swap Org' }); + orgId = org.id; + const user = await createTestUser({ email: 'model-swap-test@example.com' }); + await addUserToOrganization(user.id, org.id, 'owner'); + const entity = await createTestEntity({ name: 'Swap Target', organization_id: org.id }); + entityId = entity.id; + + await createTestConnectorDefinition({ + key: 'model-swap-connector', + name: 'Model Swap', + organization_id: org.id, + }); + const connection = await createTestConnection({ + organization_id: org.id, + connector_key: 'model-swap-connector', + entity_ids: [entity.id], + }); + connectionId = connection.id; + + // Event embedded under model A. + const inserted = await insertEvent( + { + entityIds: [entityId], + organizationId: orgId, + originId: `swap-${Date.now()}`, + title: 'Model A event', + content: 'unique model-a content about quarterly revenue', + occurredAt: new Date(), + semanticType: 'content', + originType: 'content', + connectorKey: 'model-swap-connector', + connectionId, + embedding: unitVec(), + embeddingModel: MODEL_A, + }, + { onConflictUpdate: true } + ); + eventId = inserted.id; + }); + + afterAll(() => { + if (originalModel === undefined) delete process.env.EMBEDDINGS_MODEL; + else process.env.EMBEDDINGS_MODEL = originalModel; + }); + + it('control: under model A the row is returned by vector search', async () => { + process.env.EMBEDDINGS_MODEL = MODEL_A; + const res = await searchContentByText('', { + organization_id: orgId, + query_embedding: unitVec(), + min_similarity: 0.5, + limit: 10, + }); + const ids = res.content.map((r) => Number(r.id)); + expect(ids).toContain(eventId); + }); + + it('under model B the model-A row is NOT returned (no cross-model comparison)', async () => { + process.env.EMBEDDINGS_MODEL = MODEL_B; + const res = await searchContentByText('', { + organization_id: orgId, + query_embedding: unitVec(), + min_similarity: 0.5, + limit: 10, + }); + const ids = res.content.map((r) => Number(r.id)); + expect(ids).not.toContain(eventId); + }); + + it('under model B the candidate (recall) path also excludes the model-A row', async () => { + process.env.EMBEDDINGS_MODEL = MODEL_B; + const res = await searchContentByText('', { + organization_id: orgId, + query_embedding: unitVec(), + min_similarity: 0.5, + limit: 10, + approximate_candidate_search: true, + }); + const ids = res.content.map((r) => Number(r.id)); + expect(ids).not.toContain(eventId); + }); + + it('under model B the backfill query flags the model-A row as stale', async () => { + process.env.EMBEDDINGS_MODEL = MODEL_B; + const sql = getTestDb(); + // Mirror trigger-embed-backfill's needs-embedding predicate. + const staleRows = (await sql` + SELECT ev.id + FROM current_event_records ev + LEFT JOIN event_embeddings emb ON emb.event_id = ev.id + WHERE ev.id = ${eventId} + AND (emb.event_id IS NULL OR emb.embedding_model IS DISTINCT FROM ${MODEL_B}) + `) as Array<{ id: number }>; + expect(staleRows.map((r) => Number(r.id))).toContain(eventId); + + // And under model A it is NOT stale (no needless re-embed of current rows). + const freshRows = (await sql` + SELECT ev.id + FROM current_event_records ev + LEFT JOIN event_embeddings emb ON emb.event_id = ev.id + WHERE ev.id = ${eventId} + AND (emb.event_id IS NULL OR emb.embedding_model IS DISTINCT FROM ${MODEL_A}) + `) as Array<{ id: number }>; + expect(freshRows).toHaveLength(0); + }); + + it('completeEmbeddings replaces a stale-model row in place', async () => { + const sql = getTestDb(); + // Re-embed the model-A row under model B via the same upsert the worker uses. + const newVec = new Array(EMBEDDING_DIM).fill(0); + newVec[1] = 1; // distinct vector so we can see the replacement took + const vectorStr = `[${newVec.join(',')}]`; + await sql.unsafe( + `INSERT INTO event_embeddings (event_id, embedding, embedding_model) + VALUES ($1, $2::vector, $3) + ON CONFLICT (event_id) DO UPDATE + SET embedding = EXCLUDED.embedding, + embedding_model = EXCLUDED.embedding_model, + created_at = now() + WHERE event_embeddings.embedding_model IS DISTINCT FROM EXCLUDED.embedding_model`, + [eventId, vectorStr, MODEL_B] + ); + + const rows = (await sql` + SELECT embedding_model FROM event_embeddings WHERE event_id = ${eventId} + `) as Array<{ embedding_model: string | null }>; + expect(rows).toHaveLength(1); + expect(rows[0]!.embedding_model).toBe(MODEL_B); + }); + + it('a NULL-stamp (legacy) row is excluded from vector search and flagged stale', async () => { + process.env.EMBEDDINGS_MODEL = MODEL_A; + const sql = getTestDb(); + + // Legacy row: embedding present but no model stamp (predates stamping). + const legacy = await insertEvent( + { + entityIds: [entityId], + organizationId: orgId, + originId: `legacy-${Date.now()}`, + title: 'Legacy event', + content: 'legacy content with an unstamped embedding', + occurredAt: new Date(), + semanticType: 'content', + originType: 'content', + connectorKey: 'model-swap-connector', + connectionId, + embedding: unitVec(), + // embeddingModel intentionally omitted → NULL stamp + }, + { onConflictUpdate: true } + ); + + // Excluded from vector search under the configured model (unknown true model). + const res = await searchContentByText('', { + organization_id: orgId, + query_embedding: unitVec(), + min_similarity: 0.5, + limit: 10, + }); + expect(res.content.map((r) => Number(r.id))).not.toContain(legacy.id); + + // Flagged stale by the backfill predicate so it gets restamped. + const staleRows = (await sql` + SELECT ev.id + FROM current_event_records ev + LEFT JOIN event_embeddings emb ON emb.event_id = ev.id + WHERE ev.id = ${legacy.id} + AND (emb.event_id IS NULL OR emb.embedding_model IS DISTINCT FROM ${MODEL_A}) + `) as Array<{ id: number }>; + expect(staleRows.map((r) => Number(r.id))).toContain(legacy.id); + }); + + it('completeEmbeddings (real handler) replaces a stale-model row and is idempotent on re-submit', async () => { + const sql = getTestDb(); + + // Fresh event stamped MODEL_A, independent of mutations in earlier tests. + const ev = await insertEvent( + { + entityIds: [entityId], + organizationId: orgId, + originId: `complete-emb-${Date.now()}`, + title: 'Handler upsert event', + content: 'content routed through the real completeEmbeddings handler', + occurredAt: new Date(), + semanticType: 'content', + originType: 'content', + connectorKey: 'model-swap-connector', + connectionId, + embedding: unitVec(), + embeddingModel: MODEL_A, + }, + { onConflictUpdate: true } + ); + + const newVec = new Array(EMBEDDING_DIM).fill(0); + newVec[2] = 1; // distinct from unitVec so the replacement is observable + + // Drive the REAL handler: submit a model-B embedding for the model-A row. + const first = mockEmbeddingsCtx({ + run_id: -1, // no matching run row → the handler's run UPDATE is a harmless no-op + worker_id: 'test-worker', + embeddings: [{ event_id: ev.id, embedding: newVec, embedding_model: MODEL_B }], + }); + await completeEmbeddings(first.ctx); + expect(first.result()).toMatchObject({ body: { success: true, updated: 1 } }); + + const afterReplace = (await sql` + SELECT embedding_model FROM event_embeddings WHERE event_id = ${ev.id} + `) as Array<{ embedding_model: string | null }>; + expect(afterReplace).toHaveLength(1); + expect(afterReplace[0]!.embedding_model).toBe(MODEL_B); // stale model-A row was replaced + + // Re-submit the SAME model → idempotent no-op (the ON CONFLICT WHERE blocks it). + const second = mockEmbeddingsCtx({ + run_id: -1, + worker_id: 'test-worker', + embeddings: [{ event_id: ev.id, embedding: newVec, embedding_model: MODEL_B }], + }); + await completeEmbeddings(second.ctx); + expect(second.result()).toMatchObject({ body: { success: true, updated: 0 } }); + }); +}); diff --git a/packages/server/src/__tests__/setup/test-fixtures.ts b/packages/server/src/__tests__/setup/test-fixtures.ts index 90893e73c..13ac00889 100644 --- a/packages/server/src/__tests__/setup/test-fixtures.ts +++ b/packages/server/src/__tests__/setup/test-fixtures.ts @@ -10,6 +10,7 @@ import { hashClientSecret } from '../../auth/oauth/clients'; import { generateSecureToken, hashToken } from '../../auth/oauth/utils'; import { pgBigintArray, pgTextArray } from '../../db/client'; import { ensureUniqueConnectionSlug } from '../../utils/connections'; +import { getConfiguredEmbeddingModel } from '../../utils/embeddings'; import { generateSlug } from '../../utils/entity-management'; import type { ToolContext } from '../../tools/registry'; import { getTestDb } from './test-db'; @@ -661,6 +662,9 @@ export async function createTestEvent(options: { occurred_at?: Date; origin_id?: string; embedding?: number[]; + /** Model stamp for the embedding. Defaults to the configured model (mirrors + * real ingestion, which always stamps); set null to simulate a legacy row. */ + embedding_model?: string | null; semantic_type?: string; connector_key?: string; entity_ids?: number[]; @@ -707,9 +711,16 @@ export async function createTestEvent(options: { `; if (options.embedding) { + // Stamp the configured model by default so vector-scoped search (which only + // compares same-model rows) sees these fixtures; pass embedding_model: null + // to deliberately simulate a legacy unstamped row. + const stamp = + options.embedding_model === undefined + ? getConfiguredEmbeddingModel() + : options.embedding_model; await sql` - INSERT INTO event_embeddings (event_id, embedding) - VALUES (${inserted.id}, ${JSON.stringify(options.embedding)}::vector) + INSERT INTO event_embeddings (event_id, embedding, embedding_model) + VALUES (${inserted.id}, ${JSON.stringify(options.embedding)}::vector, ${stamp}) `; } diff --git a/packages/server/src/__tests__/unit/embeddings-model-guard.test.ts b/packages/server/src/__tests__/unit/embeddings-model-guard.test.ts new file mode 100644 index 000000000..84fdcaa23 --- /dev/null +++ b/packages/server/src/__tests__/unit/embeddings-model-guard.test.ts @@ -0,0 +1,82 @@ +/** + * Finding #3 (server-side guard): the query embedding the search path compares + * against stored rows MUST come from the configured model. If the embeddings + * service reports a different model, generateEmbeddings must FAIL LOUD rather + * than returning vectors that would be compared across incompatible spaces. + * + * Also covers configuredEmbeddingModelSqlLiteral's validation (it is inlined + * into SQL, so an unsafe value must be rejected). + */ + +import { afterEach, beforeEach, describe, expect, it } from 'bun:test'; +import { + configuredEmbeddingModelSqlLiteral, + generateEmbeddings, + getConfiguredEmbeddingModel, +} from '../../utils/embeddings'; + +// biome-ignore lint/suspicious/noExplicitAny: minimal Env stub for the util +const ENV = { EMBEDDINGS_SERVICE_URL: 'http://embeddings.test' } as any; + +let originalFetch: typeof globalThis.fetch; +let originalModel: string | undefined; + +function stubFetch(body: unknown) { + globalThis.fetch = (async () => + new Response(JSON.stringify(body), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + })) as typeof globalThis.fetch; +} + +function vec768(): number[] { + const v = new Array(768).fill(0); + v[0] = 1; + return v; +} + +beforeEach(() => { + originalFetch = globalThis.fetch; + originalModel = process.env.EMBEDDINGS_MODEL; + process.env.EMBEDDINGS_MODEL = 'Xenova/bge-base-en-v1.5'; +}); + +afterEach(() => { + globalThis.fetch = originalFetch; + if (originalModel === undefined) delete process.env.EMBEDDINGS_MODEL; + else process.env.EMBEDDINGS_MODEL = originalModel; +}); + +describe('server generateEmbeddings model guard (Finding #3)', () => { + it('rejects a service model that differs from the configured model', async () => { + stubFetch({ model: 'some-other-model-v2', dimensions: 768, embeddings: [vec768()] }); + await expect(generateEmbeddings(['hi'], ENV)).rejects.toThrow( + /returned model 'some-other-model-v2' but this deployment is configured/ + ); + }); + + it('accepts a matching service model', async () => { + stubFetch({ model: getConfiguredEmbeddingModel(), dimensions: 768, embeddings: [vec768()] }); + const out = await generateEmbeddings(['hi'], ENV); + expect(out).toHaveLength(1); + expect(out[0]).toHaveLength(768); + }); + + it('accepts a response that omits the model (backward compatible)', async () => { + stubFetch({ dimensions: 768, embeddings: [vec768()] }); + const out = await generateEmbeddings(['hi'], ENV); + expect(out).toHaveLength(1); + }); +}); + +describe('configuredEmbeddingModelSqlLiteral', () => { + it('quotes a valid model name', () => { + process.env.EMBEDDINGS_MODEL = 'Xenova/bge-base-en-v1.5'; + expect(configuredEmbeddingModelSqlLiteral()).toBe("'Xenova/bge-base-en-v1.5'"); + }); + + it('rejects an unsafe model identifier (SQL-injection / whitespace)', () => { + process.env.EMBEDDINGS_MODEL = "x'; DROP TABLE event_embeddings; --"; + expect(() => configuredEmbeddingModelSqlLiteral()).toThrow(/not a valid model identifier/); + }); +}); diff --git a/packages/server/src/benchmarks/memory/adapters/lobu-inprocess.ts b/packages/server/src/benchmarks/memory/adapters/lobu-inprocess.ts index 86a673149..a136795f7 100644 --- a/packages/server/src/benchmarks/memory/adapters/lobu-inprocess.ts +++ b/packages/server/src/benchmarks/memory/adapters/lobu-inprocess.ts @@ -15,7 +15,7 @@ import { import { clearMcpSessions } from '../../../__tests__/setup/mcp-session-cache'; import { post } from '../../../__tests__/setup/test-helpers'; import { closeDbSingleton } from '../../../db/client'; -import { generateEmbeddings } from '../../../utils/embeddings'; +import { generateEmbeddings, getConfiguredEmbeddingModel } from '../../../utils/embeddings'; import type { BenchmarkAdapter, BenchmarkSuite, @@ -622,14 +622,17 @@ export class LobuInprocessBenchmarkAdapter implements BenchmarkAdapter { EMBEDDINGS_TIMEOUT_MS: process.env.EMBEDDINGS_TIMEOUT_MS, }); + // Stamp the configured model so these rows are visible to model-scoped + // vector search (which only compares same-model rows). + const embeddingModel = getConfiguredEmbeddingModel(); for (let index = 0; index < rows.length; index += 1) { const row = rows[index] as { id: number }; const embedding = embeddings[index]; if (!embedding) continue; const vectorStr = `[${embedding.join(',')}]`; await sql.unsafe( - 'INSERT INTO event_embeddings (event_id, embedding) VALUES ($1, $2::vector) ON CONFLICT (event_id) DO NOTHING', - [row.id, vectorStr] + 'INSERT INTO event_embeddings (event_id, embedding, embedding_model) VALUES ($1, $2::vector, $3) ON CONFLICT (event_id) DO NOTHING', + [row.id, vectorStr, embeddingModel] ); } } diff --git a/packages/server/src/scheduled/trigger-embed-backfill.ts b/packages/server/src/scheduled/trigger-embed-backfill.ts index f6767571b..2407b1499 100644 --- a/packages/server/src/scheduled/trigger-embed-backfill.ts +++ b/packages/server/src/scheduled/trigger-embed-backfill.ts @@ -7,6 +7,7 @@ */ import { getDb } from '../db/client'; +import { configuredEmbeddingModelSqlLiteral } from '../utils/embeddings'; import type { Env } from '../index'; import logger from '../utils/logger'; import { isUniqueViolation } from '../utils/pg-errors'; @@ -25,16 +26,29 @@ interface OrgBatch { event_count: number; } +// A row needs (re)embedding when it has no embedding at all, OR its stamp is +// not the configured model — including a NULL stamp (legacy row whose true +// model is unknown, written before stamping). Search excludes those NULL/stale +// rows from vector comparison, so the backfill must restamp them to make them +// searchable again. `IS DISTINCT FROM` makes NULL count as different from the +// (non-NULL) configured model. The model is server config, inlined as a +// validated literal. +function needsEmbeddingPredicate(): string { + const model = configuredEmbeddingModelSqlLiteral(); + return `(emb.event_id IS NULL OR emb.embedding_model IS DISTINCT FROM ${model})`; +} + export async function triggerEmbedBackfill(_env: Env): Promise { const sql = getDb(); + const needsEmbedding = needsEmbeddingPredicate(); try { - // Find organizations with events missing embeddings, grouped for batch runs + // Find organizations with events missing/stale embeddings, grouped for batch runs const orgBatches = await sql` SELECT ev.organization_id, COUNT(*)::int AS event_count FROM current_event_records ev LEFT JOIN event_embeddings emb ON emb.event_id = ev.id - WHERE emb.event_id IS NULL + WHERE ${sql.unsafe(needsEmbedding)} AND ev.payload_text IS NOT NULL AND ev.payload_text != '' AND ev.organization_id IS NOT NULL @@ -94,14 +108,16 @@ export async function triggerEmbedBackfill(_env: Env): Promise { async function createBackfillRun(organizationId: string): Promise { const sql = getDb(); + const needsEmbedding = needsEmbeddingPredicate(); + try { return await sql.begin(async (tx) => { - // Collect event IDs that need embeddings (up to batch limit) + // Collect event IDs that need (re)embedding (up to batch limit) const events = await tx` SELECT ev.id FROM current_event_records ev LEFT JOIN event_embeddings emb ON emb.event_id = ev.id - WHERE emb.event_id IS NULL + WHERE ${tx.unsafe(needsEmbedding)} AND ev.payload_text IS NOT NULL AND ev.payload_text != '' AND ev.organization_id = ${organizationId} diff --git a/packages/server/src/utils/content-search.ts b/packages/server/src/utils/content-search.ts index 20eb2a331..aff47395c 100644 --- a/packages/server/src/utils/content-search.ts +++ b/packages/server/src/utils/content-search.ts @@ -17,7 +17,7 @@ import { groupClassificationFilters, } from './content-query-filters'; import { parseDateAlias, toEndOfDay } from './date-aliases'; -import { generateEmbeddings } from './embeddings'; +import { configuredEmbeddingModelSqlLiteral, generateEmbeddings } from './embeddings'; import { toVectorLiteral } from './entity-management'; import logger from './logger'; import { validateNumericId } from './sql-validation'; @@ -1612,8 +1612,17 @@ async function searchContentBySingleQuery( const textMatchExpr = `(LENGTH($1) > 0 AND (f.payload_text ILIKE '%' || $1 || '%' OR COALESCE(${textDocumentExpr} @@ ${TSQUERY_SQL}, false)))`; const vecParam = vectorParamIdx ? `$${vectorParamIdx}::vector` : 'NULL::vector'; const minSimilarityParam = `$${minSimilarityParamIdx}::numeric`; + // Vector-space integrity: only compare against rows stamped with the EXACT + // model this deployment is configured for. A NULL stamp (legacy row written + // before stamping) is NOT comparable — its true model is unknown, so comparing + // it against the configured query vector could mix incompatible spaces. Such + // rows are excluded from vector ranking until the backfill restamps them (see + // trigger-embed-backfill, which treats NULL as stale). The model is server + // config, inlined as a validated literal (`` substituted per CTE). + const configuredModelLiteral = configuredEmbeddingModelSqlLiteral(); + const modelScopeFor = (alias: string) => `${alias}.embedding_model = ${configuredModelLiteral}`; const matchCondition = hasEmbedding - ? `(${textMatchExpr} OR (f.embedding IS NOT NULL AND 1 - (f.embedding <=> ${vecParam}) >= ${minSimilarityParam}))` + ? `(${textMatchExpr} OR (f.embedding IS NOT NULL AND ${modelScopeFor('f')} AND 1 - (f.embedding <=> ${vecParam}) >= ${minSimilarityParam}))` : textMatchExpr; const searchWhereSQL = `${matchCondition} @@ -1647,8 +1656,13 @@ async function searchContentBySingleQuery( '[content-search] weights' ); } - similarityExpr = `CASE WHEN fi.embedding IS NOT NULL THEN 1 - (fi.embedding <=> ${vecParam}) ELSE NULL END`; - combinedScoreExpr = `COALESCE((${textRankExpr}) * ${textWeight} + (1 - (fi.embedding <=> ${vecParam})) * ${vectorWeight}, ${textRankExpr})`; + // Same model scope as matchCondition: a row whose stamp differs from the + // configured model contributes no vector similarity (NULL → COALESCE falls + // back to the text-only score), so stale-model rows can never rank via an + // incompatible <=> comparison. + const fiVectorComparable = `fi.embedding IS NOT NULL AND ${modelScopeFor('fi')}`; + similarityExpr = `CASE WHEN ${fiVectorComparable} THEN 1 - (fi.embedding <=> ${vecParam}) ELSE NULL END`; + combinedScoreExpr = `COALESCE((${textRankExpr}) * ${textWeight} + (CASE WHEN ${fiVectorComparable} THEN 1 - (fi.embedding <=> ${vecParam}) ELSE NULL END) * ${vectorWeight}, ${textRankExpr})`; searchExtraColumns = 'rs.text_rank, rs.similarity, rs.combined_score, rs.total_count, rs.cursor_fetched_count'; orderByExpr = preferChronologicalOrdering @@ -1727,6 +1741,7 @@ async function searchContentBySingleQuery( JOIN current_event_records f ON f.id = emb.event_id ${candidateFilterJoins} WHERE ${standardFiltersSQL} + AND ${modelScopeFor('emb')} AND (1 - (emb.embedding <=> ${vecParam})) >= ${minSimilarityParam} ORDER BY emb.embedding <=> ${vecParam} LIMIT ${CANDIDATE_VECTOR_LIMIT}`); @@ -1760,7 +1775,7 @@ async function searchContentBySingleQuery( } const nonDateFilteredIdsCteSql = `filtered_ids AS ( - SELECT f.id, f.score, f.occurred_at, f.title, f.payload_text, f.embedding, f.search_tsv + SELECT f.id, f.score, f.occurred_at, f.title, f.payload_text, f.embedding, f.embedding_model, f.search_tsv FROM current_event_records f ${useCandidatePath ? 'JOIN search_candidates sc ON sc.id = f.id' : ''} LEFT JOIN connections c ON c.id = f.connection_id @@ -1774,7 +1789,7 @@ async function searchContentBySingleQuery( const querySQL = useDateFeed ? ` WITH RECURSIVE filtered_ids AS ( - SELECT f.id, f.score, f.occurred_at, f.title, f.payload_text, f.embedding, f.search_tsv + SELECT f.id, f.score, f.occurred_at, f.title, f.payload_text, f.embedding, f.embedding_model, f.search_tsv FROM current_event_records f LEFT JOIN connections c ON c.id = f.connection_id LEFT JOIN watcher_window_events iwf diff --git a/packages/server/src/utils/embeddings.ts b/packages/server/src/utils/embeddings.ts index ed740ecbf..6a2e9aa03 100644 --- a/packages/server/src/utils/embeddings.ts +++ b/packages/server/src/utils/embeddings.ts @@ -12,6 +12,40 @@ import type { Env } from '../index'; import logger from '../utils/logger'; const EMBEDDING_DIMENSIONS = 768; +// Must match @lobu/embeddings DEFAULT_MODEL_NAME (the local pipeline + service +// fall back to this when EMBEDDINGS_MODEL is unset). +const DEFAULT_EMBEDDING_MODEL = 'Xenova/bge-base-en-v1.5'; +// Allowlist mirroring the embeddings service's MODEL_NAME_PATTERN. Used to +// reject anything unsafe before inlining the configured model into SQL. +const MODEL_NAME_PATTERN = /^[A-Za-z0-9][A-Za-z0-9._/:-]{0,127}$/; + +/** + * The embedding model this deployment is configured to produce/consume. Every + * freshly-written event_embeddings row is stamped with this, and similarity + * queries scope to it so a same-dimension model swap can't compare vectors + * across incompatible spaces. Reads the same env var (with the same default) + * as the worker and the embeddings service. + */ +export function getConfiguredEmbeddingModel(): string { + return process.env.EMBEDDINGS_MODEL || DEFAULT_EMBEDDING_MODEL; +} + +/** + * SQL-safe string literal for the configured model, for inlining into a query + * builder where threading another bound parameter would be error-prone. The + * value is server config (not user input); we still validate it against the + * service's model-name allowlist and reject otherwise — fail closed rather than + * risk injection. + */ +export function configuredEmbeddingModelSqlLiteral(): string { + const model = getConfiguredEmbeddingModel(); + if (!MODEL_NAME_PATTERN.test(model)) { + throw new Error( + `EMBEDDINGS_MODEL '${model}' is not a valid model identifier (allowed: ${MODEL_NAME_PATTERN}).` + ); + } + return `'${model}'`; +} const DEFAULT_TIMEOUT_MS = 30000; function resolveEmbeddingServiceUrl(env: Env): string { @@ -132,7 +166,19 @@ export async function generateEmbeddings(texts: string[], env: Env): Promise | null; @@ -166,14 +168,22 @@ function isSemanticallyEqual( async function upsertEmbedding( eventId: number, embedding: number[] | null | undefined, + embeddingModel: string | null | undefined, sql: ReturnType = getDb() ): Promise { if (!embedding || embedding.length === 0) return; const vectorLiteral = `[${embedding.join(',')}]`; + // On conflict, REPLACE a stale-model row with the freshly-embedded vector + + // stamp; the WHERE makes a same-model re-submit a no-op (idempotent), so a + // re-ingest of unchanged content under the same model never churns the row. await sql` - INSERT INTO event_embeddings (event_id, embedding) - VALUES (${eventId}, ${vectorLiteral}::vector) - ON CONFLICT (event_id) DO NOTHING + INSERT INTO event_embeddings (event_id, embedding, embedding_model) + VALUES (${eventId}, ${vectorLiteral}::vector, ${embeddingModel ?? null}) + ON CONFLICT (event_id) DO UPDATE + SET embedding = EXCLUDED.embedding, + embedding_model = EXCLUDED.embedding_model, + created_at = now() + WHERE event_embeddings.embedding_model IS DISTINCT FROM EXCLUDED.embedding_model `; } @@ -240,7 +250,7 @@ export async function insertEvent( `; const existingRow = existingRows[0] as InsertedEvent | undefined; if (existingRow) { - await upsertEmbedding(existingRow.id, params.embedding, sql); + await upsertEmbedding(existingRow.id, params.embedding, params.embeddingModel, sql); return existingRow; } // Race: the existing row was deleted/tombstoned between the @@ -342,7 +352,7 @@ export async function insertEvent( `Row was not persisted; check server logs for diagnostic context.` ); } - await upsertEmbedding(inserted.id, params.embedding, sql); + await upsertEmbedding(inserted.id, params.embedding, params.embeddingModel, sql); return inserted; } diff --git a/packages/server/src/worker-api.ts b/packages/server/src/worker-api.ts index 744afa7e9..0535ff407 100644 --- a/packages/server/src/worker-api.ts +++ b/packages/server/src/worker-api.ts @@ -48,6 +48,7 @@ import { materializeInlineAttachments, triggerAudioTranscriptions, } from './utils/inline-attachments'; +import { configuredEmbeddingModelSqlLiteral } from './utils/embeddings'; import { insertEvent, recordLifecycleEvent } from './utils/insert-event'; import logger from './utils/logger'; import { getWorkspaceRole } from './utils/organization-access'; @@ -819,6 +820,7 @@ export async function streamContent(c: Context<{ Bindings: Env }>) { origin_parent_id?: string; origin_type?: string; embedding?: number[]; + embedding_model?: string; semantic_type?: string; }>; checkpoint?: Record; @@ -933,6 +935,7 @@ export async function streamContent(c: Context<{ Bindings: Env }>) { occurredAt: item.occurred_at, score: item.score, embedding: item.embedding, + embeddingModel: item.embedding_model, metadata: item.metadata as Record | undefined, semanticType: itemSemanticType, originType: itemOriginType, @@ -1628,13 +1631,20 @@ export async function fetchEventsForEmbedding(c: Context<{ Bindings: Env }>) { return c.json({ events: [] }); } + // Return events with no embedding OR an embedding whose stamp is not the + // configured model — including NULL (legacy row, unknown model). Search + // excludes those rows from vector comparison, so they must be restamped. + // `IS DISTINCT FROM` makes NULL count as different from the (non-NULL) + // configured model. The model is server config, inlined as a validated + // literal. + const modelLiteral = configuredEmbeddingModelSqlLiteral(); const placeholders = safeIds.map((_, i) => `$${i + 1}`).join(','); const rows = await sql.unsafe( `SELECT e.id, e.payload_text, e.title FROM events e LEFT JOIN event_embeddings emb ON emb.event_id = e.id WHERE e.id IN (${placeholders}) - AND emb.event_id IS NULL`, + AND (emb.event_id IS NULL OR emb.embedding_model IS DISTINCT FROM ${modelLiteral})`, safeIds ); @@ -1661,7 +1671,7 @@ export async function completeEmbeddings(c: Context<{ Bindings: Env }>) { const req = await c.req.json<{ run_id: number; worker_id: string; - embeddings: Array<{ event_id: number; embedding: number[] }>; + embeddings: Array<{ event_id: number; embedding: number[]; embedding_model?: string }>; error_message?: string; }>(); @@ -1693,9 +1703,19 @@ export async function completeEmbeddings(c: Context<{ Bindings: Env }>) { try { // pgvector expects '[0.1,0.2,...]' format const vectorStr = `[${item.embedding.join(',')}]`; + // On conflict, REPLACE a stale-model row (a model swap left its vector in + // an incompatible space) with the freshly-embedded vector + stamp. The + // WHERE makes a same-model re-submit a no-op (idempotent), so we never + // churn rows that are already current. const result = await sql.unsafe( - 'INSERT INTO event_embeddings (event_id, embedding) VALUES ($1, $2::vector) ON CONFLICT (event_id) DO NOTHING', - [item.event_id, vectorStr] + `INSERT INTO event_embeddings (event_id, embedding, embedding_model) + VALUES ($1, $2::vector, $3) + ON CONFLICT (event_id) DO UPDATE + SET embedding = EXCLUDED.embedding, + embedding_model = EXCLUDED.embedding_model, + created_at = now() + WHERE event_embeddings.embedding_model IS DISTINCT FROM EXCLUDED.embedding_model`, + [item.event_id, vectorStr, item.embedding_model ?? null] ); if (result.count > 0) updated++; } catch (err) {