Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions packages/server/src/utils/insert-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,33 @@ export async function insertEvent(
const existing = await findCurrentEventByOrigin(sql, params);
if (existing) {
if (isSemanticallyEqual(existing, params)) {
await upsertEmbedding(existing.id, params.embedding, sql);
// Reread before touching the embedding. If the row got
// deleted / tombstoned between findCurrentEventByOrigin and
// here, upsertEmbedding would FK-fail instead of letting us
// fall through cleanly to a fresh insert (CodeRabbit catch
// on PR #780).
const existingRows = await sql`
SELECT id, entity_ids, origin_id, title, semantic_type, created_at
FROM events
WHERE id = ${existing.id}
LIMIT 1
`;
return existingRows[0] as InsertedEvent;
const existingRow = existingRows[0] as InsertedEvent | undefined;
if (existingRow) {
await upsertEmbedding(existingRow.id, params.embedding, sql);
return existingRow;
}
// Race: the existing row was deleted/tombstoned between the
// findCurrentEventByOrigin lookup above and the SELECT here. Fall
// through into the INSERT path with `supersedesEventId` unset so we
// create a fresh row instead of crashing on `undefined.id`.
logger.warn(
{ existingId: existing.id, originId: params.originId },
'[insert-event] existing row vanished between find and reread — proceeding with fresh insert'
);
} else {
supersedesEventId = existing.id;
}
supersedesEventId = existing.id;
}
}

Expand Down Expand Up @@ -299,7 +316,33 @@ export async function insertEvent(
result = await insertWithClientId(null);
}

const inserted = result[0] as InsertedEvent;
const inserted = result[0] as InsertedEvent | undefined;
if (!inserted) {
// INSERT ... RETURNING should always yield a row for a successful insert.
// An empty result means either (a) a BEFORE INSERT trigger silently
// RETURNed NULL (none in our schema today), (b) a PGlite quirk around
// GENERATED STORED columns failing without surfacing an error, or
// (c) postgres.js dropping the rows for an obscure reason. None of these
// should drop events on the floor — convert the cryptic `Cannot read
// properties of undefined (reading 'id')` into a real error with
// diagnostic context so we can root-cause when it next happens.
logger.error(
{
originId: params.originId,
connectionId: params.connectionId,
organizationId: params.organizationId,
semanticType: params.semanticType,
connectorKey: params.connectorKey,
feedKey: params.feedKey,
},
'[insert-event] INSERT ... RETURNING returned no rows — event not persisted'
);
throw new Error(
`insertEvent: INSERT RETURNING came back empty for ` +
`origin_id=${params.originId} connection_id=${params.connectionId}. ` +
`Row was not persisted; check server logs for diagnostic context.`
);
}
await upsertEmbedding(inserted.id, params.embedding, sql);
return inserted;
}
Expand Down
Loading