Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 28 additions & 47 deletions gitnexus/src/cli/analyze.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import { execFileSync } from 'child_process';
import v8 from 'v8';
import cliProgress from 'cli-progress';
import { runPipelineFromRepo } from '../core/ingestion/pipeline.js';
import { initLbug, loadGraphToLbug, getLbugStats, executeQuery, executeWithReusedStatement, closeLbug, createFTSIndex, loadCachedEmbeddings } from '../core/lbug/lbug-adapter.js';
import { initLbug, loadGraphToLbug, getLbugStats, executeQuery, executeWithReusedStatement, closeLbug, createFTSIndex } from '../core/lbug/lbug-adapter.js';
// Embedding imports are lazy (dynamic import) so onnxruntime-node is never
// loaded when embeddings are not requested. This avoids crashes on Node
// versions whose ABI is not yet supported by the native binary (#89).
// disposeEmbedder intentionally not called — ONNX Runtime segfaults on cleanup (see #38)
import { getStoragePaths, saveMeta, loadMeta, addToGitignore, registerRepo, getGlobalRegistryPath, cleanupOldKuzuFiles } from '../storage/repo-manager.js';
import { loadParseCache, saveParseCache, pruneCache } from '../storage/parse-cache.js';
import { loadEmbeddingCache, loadEmbeddingCacheMeta, validateEmbeddingCacheMeta, saveEmbeddingCache, createEmptyEmbeddingCache } from '../storage/embedding-cache.js';
import { getCurrentCommit, getGitRoot, hasGitDir } from '../storage/git.js';
import { generateAIContextFiles } from './ai-context.js';
import { generateSkillFiles, type GeneratedSkillInfo } from './skill-gen.js';
Expand Down Expand Up @@ -199,29 +201,16 @@ export const analyzeCommand = async (

const t0Global = Date.now();

// ── Cache embeddings from existing index before rebuild ────────────
let cachedEmbeddingNodeIds = new Set<string>();
let cachedEmbeddings: Array<{ nodeId: string; embedding: number[] }> = [];

if (options?.embeddings && existingMeta && !options?.force) {
try {
updateBar(0, 'Caching embeddings...');
await initLbug(lbugPath);
const cached = await loadCachedEmbeddings();
cachedEmbeddingNodeIds = cached.embeddingNodeIds;
cachedEmbeddings = cached.embeddings;
await closeLbug();
} catch {
try { await closeLbug(); } catch {}
}
}
// ── Caches ──────────────────────────────────────────────────────────
// Both caches are content-addressed and always safe to reuse (even with --force).
const parseCache = await loadParseCache(storagePath);

// ── Phase 1: Full Pipeline (0–60%) ─────────────────────────────────
const pipelineResult = await runPipelineFromRepo(repoPath, (progress) => {
const phaseLabel = PHASE_LABELS[progress.phase] || progress.phase;
const scaled = Math.round(progress.percent * 0.6);
updateBar(scaled, phaseLabel);
});
}, { parseCache });

// ── Phase 2: LadybugDB (60–85%) ──────────────────────────────────────
updateBar(60, 'Loading into LadybugDB...');
Expand Down Expand Up @@ -258,31 +247,8 @@ export const analyzeCommand = async (
}
const ftsTime = ((Date.now() - t0Fts) / 1000).toFixed(1);

// ── Phase 3.5: Re-insert cached embeddings ────────────────────────
if (cachedEmbeddings.length > 0) {
// Check if cached embedding dimensions match current schema
const cachedDims = cachedEmbeddings[0].embedding.length;
const { EMBEDDING_DIMS } = await import('../core/lbug/schema.js');
if (cachedDims !== EMBEDDING_DIMS) {
// Dimensions changed (e.g. switched embedding model) — discard cache and re-embed all
console.error(`⚠️ Embedding dimensions changed (${cachedDims}d → ${EMBEDDING_DIMS}d), discarding cache`);
cachedEmbeddings = [];
cachedEmbeddingNodeIds = new Set();
} else {
updateBar(88, `Restoring ${cachedEmbeddings.length} cached embeddings...`);
const EMBED_BATCH = 200;
for (let i = 0; i < cachedEmbeddings.length; i += EMBED_BATCH) {
const batch = cachedEmbeddings.slice(i, i + EMBED_BATCH);
const paramsList = batch.map(e => ({ nodeId: e.nodeId, embedding: e.embedding }));
try {
await executeWithReusedStatement(
`CREATE (e:CodeEmbedding {nodeId: $nodeId, embedding: $embedding})`,
paramsList,
);
} catch { /* some may fail if node was removed, that's fine */ }
}
}
}
// Old LadybugDB-based embedding cache removed — replaced by file-based
// content-addressed cache in embedding-cache.json (passed into runEmbeddingPipeline).

// ── Phase 4: Embeddings (90–98%) ──────────────────────────────────
const stats = await getLbugStats();
Expand All @@ -300,9 +266,21 @@ export const analyzeCommand = async (

if (!embeddingSkipped) {
const { isHttpMode } = await import('../core/embeddings/http-client.js');
const { DEFAULT_EMBEDDING_CONFIG } = await import('../core/embeddings/types.js');
const httpMode = isHttpMode();
updateBar(90, httpMode ? 'Connecting to embedding endpoint...' : 'Loading embedding model...');
const t0Emb = Date.now();

// Check metadata first (tiny file) — only deserialize full cache if valid
const embMeta = await loadEmbeddingCacheMeta(storagePath);
let embeddingCache: import('../storage/embedding-cache.js').EmbeddingCache;
if (embMeta && validateEmbeddingCacheMeta(embMeta, DEFAULT_EMBEDDING_CONFIG.dimensions, DEFAULT_EMBEDDING_CONFIG.modelId)) {
const fullCache = await loadEmbeddingCache(storagePath);
embeddingCache = fullCache ?? createEmptyEmbeddingCache(DEFAULT_EMBEDDING_CONFIG.dimensions, DEFAULT_EMBEDDING_CONFIG.modelId);
} else {
embeddingCache = createEmptyEmbeddingCache(DEFAULT_EMBEDDING_CONFIG.dimensions, DEFAULT_EMBEDDING_CONFIG.modelId);
}

const { runEmbeddingPipeline } = await import('../core/embeddings/embedding-pipeline.js');
await runEmbeddingPipeline(
executeQuery,
Expand All @@ -315,8 +293,9 @@ export const analyzeCommand = async (
updateBar(scaled, label);
},
{},
cachedEmbeddingNodeIds.size > 0 ? cachedEmbeddingNodeIds : undefined,
embeddingCache,
);
await saveEmbeddingCache(storagePath, embeddingCache);
embeddingTime = ((Date.now() - t0Emb) / 1000).toFixed(1);
}

Expand Down Expand Up @@ -344,6 +323,7 @@ export const analyzeCommand = async (
},
};
await saveMeta(storagePath, meta);
await saveParseCache(storagePath, parseCache);
await registerRepo(repoPath, meta);
// Only attempt to update .gitignore when a .git directory is present.
// Use hasGitDir (filesystem check) rather than git CLI subprocess
Expand Down Expand Up @@ -397,10 +377,11 @@ export const analyzeCommand = async (
bar.stop();

// ── Summary ───────────────────────────────────────────────────────
const embeddingsCached = cachedEmbeddings.length > 0;
console.log(`\n Repository indexed successfully (${totalTime}s)${embeddingsCached ? ` [${cachedEmbeddings.length} embeddings cached]` : ''}\n`);
console.log(`\n Repository indexed successfully (${totalTime}s)\n`);
console.log(` ${stats.nodes.toLocaleString()} nodes | ${stats.edges.toLocaleString()} edges | ${pipelineResult.communityResult?.stats.totalCommunities || 0} clusters | ${pipelineResult.processResult?.stats.totalProcesses || 0} flows`);
console.log(` LadybugDB ${lbugTime}s | FTS ${ftsTime}s | Embeddings ${embeddingSkipped ? embeddingSkipReason : embeddingTime + 's'}`);
const cs = pipelineResult.cacheStats;
const cacheInfo = cs ? `Parse cache: ${cs.hits} cached, ${cs.misses} parsed | ` : '';
console.log(` ${cacheInfo}LadybugDB ${lbugTime}s | FTS ${ftsTime}s | Embeddings ${embeddingSkipped ? embeddingSkipReason : embeddingTime + 's'}`);
console.log(` ${repoPath}`);

if (aiContext.files.length > 0) {
Expand Down
153 changes: 101 additions & 52 deletions gitnexus/src/core/embeddings/embedding-pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import { initEmbedder, embedBatch, embedText, embeddingToArray, isEmbedderReady } from './embedder.js';
import { generateBatchEmbeddingTexts, generateEmbeddingText } from './text-generator.js';
import { embeddingTextHash, type EmbeddingCache } from '../../storage/embedding-cache.js';
import {
type EmbeddingProgress,
type EmbeddingConfig,
Expand Down Expand Up @@ -142,58 +143,25 @@ const createVectorIndex = async (
* @param executeWithReusedStatement - Function to execute with reused prepared statement
* @param onProgress - Callback for progress updates
* @param config - Optional configuration override
* @param skipNodeIds - Optional set of node IDs that already have embeddings (incremental mode)
* @param embeddingCache - Optional content-addressed embedding cache (survives --force)
*/
export const runEmbeddingPipeline = async (
executeQuery: (cypher: string) => Promise<any[]>,
executeWithReusedStatement: (cypher: string, paramsList: Array<Record<string, any>>) => Promise<void>,
onProgress: EmbeddingProgressCallback,
config: Partial<EmbeddingConfig> = {},
skipNodeIds?: Set<string>,
embeddingCache?: EmbeddingCache,
): Promise<void> => {
const finalConfig = { ...DEFAULT_EMBEDDING_CONFIG, ...config };

try {
// Phase 1: Load embedding model
onProgress({
phase: 'loading-model',
percent: 0,
modelDownloadPercent: 0,
});

if (!isEmbedderReady()) {
await initEmbedder((modelProgress: ModelProgress) => {
const downloadPercent = modelProgress.progress ?? 0;
onProgress({
phase: 'loading-model',
percent: Math.round(downloadPercent * 0.2),
modelDownloadPercent: downloadPercent,
});
}, finalConfig);
}

onProgress({
phase: 'loading-model',
percent: 20,
modelDownloadPercent: 100,
});

if (isDev) {
console.log('🔍 Querying embeddable nodes...');
}

// Phase 2: Query embeddable nodes
// Phase 1: Query embeddable nodes (before loading model — allows early exit if all cached)
let nodes = await queryEmbeddableNodes(executeQuery);

// Incremental mode: filter out nodes that already have embeddings
if (skipNodeIds && skipNodeIds.size > 0) {
const beforeCount = nodes.length;
nodes = nodes.filter(n => !skipNodeIds.has(n.id));
if (isDev) {
console.log(`📦 Incremental embeddings: ${beforeCount} total, ${skipNodeIds.size} cached, ${nodes.length} to embed`);
}
}

const totalNodes = nodes.length;

if (isDev) {
Expand All @@ -210,40 +178,107 @@ export const runEmbeddingPipeline = async (
return;
}

// Phase 3: Batch embed nodes
// Single-pass: hash all texts, split into cached/uncached, derive allCached
const allTexts = generateBatchEmbeddingTexts(nodes, finalConfig);
const batchSize = finalConfig.batchSize;
const totalBatches = Math.ceil(totalNodes / batchSize);
let processedNodes = 0;
let cacheHits = 0;

const usedHashes = new Set<string>();
const uncachedIndices: number[] = [];
const cachedUpdates: Array<{ id: string; embedding: number[] }> = [];

for (let i = 0; i < nodes.length; i++) {
const hash = embeddingTextHash(allTexts[i]);
usedHashes.add(hash);
const cached = embeddingCache?.entries[hash];
if (cached) {
cachedUpdates.push({ id: nodes[i].id, embedding: cached.embedding });
cacheHits++;
} else {
uncachedIndices.push(i);
}
}

const allCached = uncachedIndices.length === 0;

// Phase 2: Load embedding model (only if we have uncached nodes)
if (!allCached) {
onProgress({
phase: 'loading-model',
percent: 0,
modelDownloadPercent: 0,
});

if (!isEmbedderReady()) {
await initEmbedder((modelProgress: ModelProgress) => {
const downloadPercent = modelProgress.progress ?? 0;
onProgress({
phase: 'loading-model',
percent: Math.round(downloadPercent * 0.2),
modelDownloadPercent: downloadPercent,
});
}, finalConfig);
}

onProgress({
phase: 'loading-model',
percent: 20,
modelDownloadPercent: 100,
});
}

// Insert cached embeddings in bulk
if (cachedUpdates.length > 0) {
const BULK_BATCH = 200;
for (let i = 0; i < cachedUpdates.length; i += BULK_BATCH) {
const slice = cachedUpdates.slice(i, i + BULK_BATCH);
await batchInsertEmbeddings(executeWithReusedStatement, slice);
}
processedNodes += cachedUpdates.length;
}

if (isDev && cacheHits > 0) {
console.log(`📦 Embedding cache: ${cacheHits} cached, ${uncachedIndices.length} to embed`);
}

// Embed only uncached nodes
const uncachedNodes = uncachedIndices.map(i => nodes[i]);
const uncachedTexts = uncachedIndices.map(i => allTexts[i]);
const totalBatches = Math.ceil(uncachedNodes.length / batchSize);

onProgress({
phase: 'embedding',
percent: 20,
nodesProcessed: 0,
nodesProcessed: processedNodes,
totalNodes,
currentBatch: 0,
totalBatches,
});

for (let batchIndex = 0; batchIndex < totalBatches; batchIndex++) {
const start = batchIndex * batchSize;
const end = Math.min(start + batchSize, totalNodes);
const batch = nodes.slice(start, end);

// Generate texts for this batch
const texts = generateBatchEmbeddingTexts(batch, finalConfig);
const end = Math.min(start + batchSize, uncachedNodes.length);
const batchNodes = uncachedNodes.slice(start, end);
const batchTexts = uncachedTexts.slice(start, end);

// Embed the batch
const embeddings = await embedBatch(texts);

// Update LadybugDB with embeddings
const updates = batch.map((node, i) => ({
id: node.id,
embedding: embeddingToArray(embeddings[i]),
}));
const embeddings = await embedBatch(batchTexts);

// Build updates and store in cache
const updates = batchNodes.map((node, i) => {
const vec = embeddingToArray(embeddings[i]);
// Store in file-based cache for next run
if (embeddingCache) {
const hash = embeddingTextHash(batchTexts[i]);
embeddingCache.entries[hash] = { embedding: Array.from(vec) };
}
return { id: node.id, embedding: vec };
});

await batchInsertEmbeddings(executeWithReusedStatement, updates);

processedNodes += batch.length;
processedNodes += batchNodes.length;

// Report progress (20-90% for embedding phase)
const embeddingProgress = 20 + ((processedNodes / totalNodes) * 70);
Expand All @@ -257,6 +292,20 @@ export const runEmbeddingPipeline = async (
});
}

// Prune stale entries from embedding cache (symbols that no longer exist)
if (embeddingCache) {
let pruned = 0;
for (const hash of Object.keys(embeddingCache.entries)) {
if (!usedHashes.has(hash)) {
delete embeddingCache.entries[hash];
pruned++;
}
}
if (isDev && pruned > 0) {
console.log(`🧹 Pruned ${pruned} stale embedding cache entries`);
}
}

// Phase 4: Create vector index
onProgress({
phase: 'indexing',
Expand Down
Loading
Loading