Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions gitnexus/src/core/embeddings/embedding-pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ const batchInsertEmbeddings = async (
) => Promise<void>,
updates: Array<{ id: string; embedding: number[] }>,
): Promise<void> => {
// INSERT into separate embedding table - much more memory efficient!
const cypher = `CREATE (e:CodeEmbedding {nodeId: $nodeId, embedding: $embedding})`;
// MERGE instead of CREATE — idempotent, handles concurrent analyzes and partial prior runs
const cypher = `MERGE (e:CodeEmbedding {nodeId: $nodeId}) SET e.embedding = $embedding`;
const paramsList = updates.map((u) => ({ nodeId: u.id, embedding: u.embedding }));
await executeWithReusedStatement(cypher, paramsList);
};
Expand Down
10 changes: 7 additions & 3 deletions gitnexus/src/core/lbug/csv-generator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,18 @@ export const streamAllCSVsToDisk = async (
CodeElement: codeElemWriter,
};

const seenFileIds = new Set<string>();
// Deduplicate all node types — the pipeline can produce duplicate IDs across
// all symbol types (Class, Method, Function, etc.), not just File nodes.
// A single Set covering every label prevents PK violations on COPY.
const seenNodeIds = new Set<string>();

// --- SINGLE PASS over all nodes ---
for (const node of graph.iterNodes()) {
if (seenNodeIds.has(node.id)) continue;
seenNodeIds.add(node.id);

switch (node.label) {
case 'File': {
if (seenFileIds.has(node.id)) break;
seenFileIds.add(node.id);
const content = await extractContent(node, contentCache);
await fileWriter.addRow(
[
Expand Down
2 changes: 1 addition & 1 deletion gitnexus/src/core/run-analyze.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ export async function runFullAnalysis(
const paramsList = batch.map((e) => ({ nodeId: e.nodeId, embedding: e.embedding }));
try {
await executeWithReusedStatement(
`CREATE (e:CodeEmbedding {nodeId: $nodeId, embedding: $embedding})`,
`MERGE (e:CodeEmbedding {nodeId: $nodeId}) SET e.embedding = $embedding`,
paramsList,
);
} catch {
Expand Down
66 changes: 47 additions & 19 deletions gitnexus/src/server/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1449,25 +1449,53 @@ export const createServer = async (port: number, host: string = '127.0.0.1') =>
await withLbugDb(lbugPath, async () => {
const { runEmbeddingPipeline } =
await import('../core/embeddings/embedding-pipeline.js');
await runEmbeddingPipeline(executeQuery, executeWithReusedStatement, (p) => {
embedJobManager.updateJob(job.id, {
progress: {
phase:
p.phase === 'ready' ? 'complete' : p.phase === 'error' ? 'failed' : p.phase,
percent: p.percent,
message:
p.phase === 'loading-model'
? 'Loading embedding model...'
: p.phase === 'embedding'
? `Embedding nodes (${p.percent}%)...`
: p.phase === 'indexing'
? 'Creating vector index...'
: p.phase === 'ready'
? 'Embeddings complete'
: `${p.phase} (${p.percent}%)`,
},
});
});
// Skip nodes that already have embeddings — Kuzu forbids SET on vector-indexed properties.
let skipNodeIds: Set<string> | undefined;
try {
const rows = await executeQuery('MATCH (e:CodeEmbedding) RETURN e.nodeId AS nodeId');
if (rows && rows.length > 0) {
skipNodeIds = new Set(rows.map((r: any) => r.nodeId ?? r[0]).filter(Boolean));
console.log(
`[embed] ${skipNodeIds.size} nodes already embedded — skipping in incremental run`,
);
}
} catch (err: any) {
// Swallow only "table does not exist" — let real connection errors propagate.
// Log so ops can see this path fire if Kuzu ever changes error wording.
const msg = err?.message ?? '';
if (msg.includes('does not exist') || msg.includes('not found')) {
console.log(
`[embed] CodeEmbedding table not yet present — full embedding run (${msg})`,
);
} else {
throw err;
}
}
await runEmbeddingPipeline(
executeQuery,
executeWithReusedStatement,
(p) => {
embedJobManager.updateJob(job.id, {
progress: {
phase:
p.phase === 'ready' ? 'complete' : p.phase === 'error' ? 'failed' : p.phase,
percent: p.percent,
message:
p.phase === 'loading-model'
? 'Loading embedding model...'
: p.phase === 'embedding'
? `Embedding nodes (${p.percent}%)...`
: p.phase === 'indexing'
? 'Creating vector index...'
: p.phase === 'ready'
? 'Embeddings complete'
: `${p.phase} (${p.percent}%)`,
},
});
},
{}, // config: use defaults (runEmbeddingPipeline signature: executeQuery, executeWithReusedStatement, onProgress, config, skipNodeIds)
skipNodeIds,
);
});

clearTimeout(embedTimeout);
Expand Down
Loading