From c84c9030f78e9da5d07225c6a19ce1d599773ad2 Mon Sep 17 00:00:00 2001 From: wangxc Date: Thu, 21 May 2026 19:31:55 +0800 Subject: [PATCH 1/3] fix(analyze): prevent cache-hit native workers from aborting Delay parse worker startup until a cache miss requires it, fall back to sequential parsing when initial worker readiness fails, and preserve analyzer diagnostics/progress when heap respawn captures child output. Constraint: Node 25 and tree-sitter/N-API worker initialization can abort before ready, while warm-cache analysis should not start workers at all. Rejected: Treating status-134/SIGABRT as heap OOM unconditionally | native worker aborts require distinct recovery guidance and stderr/stdout evidence. Rejected: cli-progress noTTYOutput for respawn progress | it appends newline frames instead of preserving one-line redraw UX. Confidence: high Scope-risk: moderate Directive: Keep parse-worker creation behind confirmed cache misses and preserve TTY-style progress when respawn pipes stderr for crash classification. Tested: GitNexus impact analysis for ensureHeap, runChunkedParseAndResolve, createWorkerPool, WorkerPool, walkRepositoryPaths; GitNexus detect_changes scoped to staged worktree; targeted vitest for analyze respawn, parse lazy cache, filesystem walker, worker pool; npx tsc --noEmit; npm run build; NODE_OPTIONS='--max-old-space-size=8192' npm test. Not-tested: Windows terminal rendering and published npm package install path. --- gitnexus/src/cli/analyze.ts | 270 +++++++++++++++--- .../src/core/ingestion/filesystem-walker.ts | 27 +- .../ingestion/pipeline-phases/parse-impl.ts | 136 ++++++--- .../src/core/ingestion/workers/worker-pool.ts | 26 +- .../integration/filesystem-walker.test.ts | 22 ++ gitnexus/test/integration/worker-pool.test.ts | 2 +- .../test/unit/analyze-heap-respawn.test.ts | 172 +++++++---- .../analyze-respawn-progress-terminal.test.ts | 124 ++++++++ .../unit/parse-impl-worker-lazy-cache.test.ts | 244 ++++++++++++++++ 9 files changed, 881 insertions(+), 142 deletions(-) create mode 100644 gitnexus/test/unit/analyze-respawn-progress-terminal.test.ts create mode 100644 gitnexus/test/unit/parse-impl-worker-lazy-cache.test.ts diff --git a/gitnexus/src/cli/analyze.ts b/gitnexus/src/cli/analyze.ts index 6ea04df05d..f4aac990db 100644 --- a/gitnexus/src/cli/analyze.ts +++ b/gitnexus/src/cli/analyze.ts @@ -9,7 +9,7 @@ */ import path from 'path'; -import { execFileSync } from 'child_process'; +import { spawn } from 'child_process'; import v8 from 'v8'; import cliProgress from 'cli-progress'; import { closeLbug } from '../core/lbug/lbug-adapter.js'; @@ -37,6 +37,7 @@ import { isHfDownloadFailure } from '../core/embeddings/hf-env.js'; // previous behaviour silently swallowed stack traces and made #1169 // indistinguishable from a no-op success on Windows. const realStderrWrite = process.stderr.write.bind(process.stderr); +const realStdoutWrite = process.stdout.write.bind(process.stdout); const writeFatalToStderr = (label: string, err: unknown): void => { const isErr = err instanceof Error; @@ -78,15 +79,178 @@ const HEAP_FLAG = `--max-old-space-size=${RESPAWN_HEAP_MB}`; /** Increase default stack size (KB) to prevent stack overflow on deep class hierarchies. */ const STACK_KB = 4096; const STACK_FLAG = `--stack-size=${STACK_KB}`; +const RESPAWN_OUTPUT_TAIL_CHARS = 1024 * 1024; +const RESPAWN_PROGRESS_ENV = 'GITNEXUS_RESPAWN_PROGRESS_TTY'; + +interface CliProgressTerminal { + cursorSave(): void; + cursorRestore(): void; + cursor(enabled: boolean): void; + lineWrapping(enabled: boolean): void; + cursorTo(x?: number | null, y?: number | null): void; + cursorRelative(dx?: number | null, dy?: number | null): void; + cursorRelativeReset(): void; + clearRight(): void; + clearLine(): void; + clearBottom(): void; + newline(): void; + write(s: string, rawWrite?: boolean): void; + isTTY(): boolean; + getWidth(): number; +} + +const terminalColumns = (): number => { + const parsed = Number(process.env.COLUMNS); + return Number.isFinite(parsed) && parsed > 0 ? Math.floor(parsed) : 80; +}; + +const createAnsiPipeTerminal = (stream: NodeJS.WriteStream): CliProgressTerminal => { + let linewrap = true; + let dy = 0; + const write = (s: string): void => { + stream.write(s); + }; + const moveVertical = (delta: number): void => { + if (delta > 0) write(`\x1B[${delta}B`); + else if (delta < 0) write(`\x1B[${Math.abs(delta)}A`); + }; + + return { + cursorSave: () => write('\x1B7'), + cursorRestore: () => write('\x1B8'), + cursor: (enabled) => write(enabled ? '\x1B[?25h' : '\x1B[?25l'), + lineWrapping: (enabled) => { + linewrap = enabled; + write(enabled ? '\x1B[?7h' : '\x1B[?7l'); + }, + cursorTo: (x = null, y = null) => { + if (typeof y === 'number' && typeof x === 'number') { + write(`\x1B[${y + 1};${x + 1}H`); + return; + } + if (typeof x === 'number') { + write(x === 0 ? '\r' : `\x1B[${x + 1}G`); + } + }, + cursorRelative: (dx = null, nextDy = null) => { + if (typeof dx === 'number' && dx !== 0) { + write(dx > 0 ? `\x1B[${dx}C` : `\x1B[${Math.abs(dx)}D`); + } + if (typeof nextDy === 'number' && nextDy !== 0) { + dy += nextDy; + moveVertical(nextDy); + } + }, + cursorRelativeReset: () => { + moveVertical(-dy); + write('\r'); + dy = 0; + }, + clearRight: () => write('\x1B[0K'), + clearLine: () => write('\x1B[2K'), + clearBottom: () => write('\x1B[0J'), + newline: () => { + write('\n'); + dy++; + }, + write: (s, rawWrite = false) => { + const width = terminalColumns(); + write(linewrap && rawWrite === false ? s.slice(0, width) : s); + }, + isTTY: () => true, + getWidth: terminalColumns, + }; +}; + +const shouldBridgeRespawnProgressTty = (): boolean => + process.stderr.isTTY === true || process.stdout.isTTY === true; + +interface RespawnExit { + status?: number | null; + signal?: NodeJS.Signals | null; + stdout?: string; + stderr?: string; + message?: string; +} + +const appendOutputTail = (tail: string, chunk: unknown): string => { + const text = Buffer.isBuffer(chunk) + ? chunk.toString('utf8') + : typeof chunk === 'string' + ? chunk + : String(chunk ?? ''); + if (!text) return tail; + const next = tail + text; + return next.length > RESPAWN_OUTPUT_TAIL_CHARS ? next.slice(-RESPAWN_OUTPUT_TAIL_CHARS) : next; +}; + +/** + * Run the respawned analyzer while teeing child output through to the parent + * and keeping a bounded tail for crash classification. + * + * `execFileSync(..., { stdio: 'inherit' })` preserved live progress but hid + * stderr/stdout from the parent on abnormal exits. That made every + * SIGABRT/status-134 child look like an output-less V8 heap OOM, even when the + * terminal had already shown a native crash such as + * `libc++abi: ... Napi::Error`. Piped streams plus an explicit tee keeps the UX + * and gives `childProcessLikelyOom` the evidence it needs. + */ +const runRespawnedAnalyze = ( + args: readonly string[], + env: NodeJS.ProcessEnv, +): Promise => + new Promise((resolve) => { + let stdout = ''; + let stderr = ''; + let settled = false; + const finish = (exit: RespawnExit): void => { + if (settled) return; + settled = true; + resolve(exit); + }; + + const child = spawn(process.execPath, [...args], { + stdio: ['inherit', 'pipe', 'pipe'], + env, + }); + + child.stdout?.on('data', (chunk) => { + stdout = appendOutputTail(stdout, chunk); + realStdoutWrite(chunk); + }); + child.stderr?.on('data', (chunk) => { + stderr = appendOutputTail(stderr, chunk); + realStderrWrite(chunk); + }); + child.on('error', (err) => { + finish({ + status: 1, + signal: null, + stdout, + stderr, + message: err instanceof Error ? err.message : String(err), + }); + }); + child.on('close', (status, signal) => { + finish({ + status, + signal, + stdout, + stderr, + message: `Command failed: ${process.execPath} ${args.join(' ')}`, + }); + }); + }); /** * Heuristic for "child re-exec likely died from V8 OOM". * - * Platform-independent detection is best-effort: V8/Node usually emit - * stable heap-exhaustion phrases in stderr/message across Linux/macOS/Windows - * (for example "JavaScript heap out of memory" or "Reached heap limit"), - * while some environments only expose status/signal (e.g. 134/SIGABRT). - * We combine both text signatures and process-exit signatures. + * Platform-independent detection is best-effort: V8/Node usually emit stable + * heap-exhaustion phrases in stderr/message across Linux/macOS/Windows (for + * example "JavaScript heap out of memory" or "Reached heap limit"). When the + * child produced no output at all, we still treat status 134/SIGABRT as likely + * heap OOM. If stderr/stdout contains a native crash diagnostic, the output + * evidence wins and we do not print heap guidance. */ const childProcessLikelyOom = (err: unknown): boolean => { if (!err || typeof err !== 'object') return false; @@ -122,6 +286,31 @@ const childProcessLikelyOom = (err: unknown): boolean => { return e.status === 134 || e.signal === 'SIGABRT'; }; +const childProcessLikelyNativeAbort = (err: unknown): boolean => { + if (!err || typeof err !== 'object') return false; + const e = err as { + stderr?: unknown; + stdout?: unknown; + message?: unknown; + }; + const hasNativeAbortSignature = (v: unknown): boolean => { + const text = ( + Buffer.isBuffer(v) ? v.toString('utf8') : typeof v === 'string' ? v : '' + ).toLowerCase(); + if (!text) return false; + return ( + text.includes('napi::error') || + text.includes('libc++abi: terminating') || + text.includes('abort trap') || + text.includes('native stack') || + text.includes('native worker') || + text.includes('native binding') + ); + }; + + return [e.message, e.stderr, e.stdout].some((v) => hasNativeAbortSignature(v)); +}; + const forceHeapOOMForTestIfEnabled = (): void => { if (process.env.GITNEXUS_TEST_FORCE_HEAP_OOM !== '1') return; // Allocate JS strings (not Buffers) so pressure lands on V8 heap itself. @@ -131,7 +320,7 @@ const forceHeapOOMForTestIfEnabled = (): void => { }; /** Re-exec the process with a 16GB heap and larger stack if we're currently below that. */ -function ensureHeap(): boolean { +async function ensureHeap(): Promise { const nodeOpts = process.env.NODE_OPTIONS || ''; if (nodeOpts.includes('--max-old-space-size')) return false; @@ -143,13 +332,15 @@ function ensureHeap(): boolean { const cliFlags = [HEAP_FLAG]; if (!nodeOpts.includes('--stack-size')) cliFlags.push(STACK_FLAG); - try { - execFileSync(process.execPath, [...cliFlags, ...process.argv.slice(1)], { - stdio: 'inherit', - env: { ...process.env, NODE_OPTIONS: `${nodeOpts} ${HEAP_FLAG}`.trim() }, - }); - } catch (e: unknown) { - if (childProcessLikelyOom(e)) { + const childArgs = [...cliFlags, ...process.argv.slice(1)]; + const childEnv = { + ...process.env, + NODE_OPTIONS: `${nodeOpts} ${HEAP_FLAG}`.trim(), + }; + if (shouldBridgeRespawnProgressTty()) childEnv[RESPAWN_PROGRESS_ENV] = '1'; + const childExit = await runRespawnedAnalyze(childArgs, childEnv); + if (childExit.status !== 0 || childExit.signal) { + if (childProcessLikelyOom(childExit)) { cliError( ` Analysis likely ran out of memory.\n` + ` Retry with a larger heap if your machine allows it:\n` + @@ -158,11 +349,18 @@ function ensureHeap(): boolean { ` If this persists, it may be a native crash unrelated to heap size.\n`, { recoveryHint: 'heap-oom-respawn' }, ); + } else if (childProcessLikelyNativeAbort(childExit)) { + cliError( + ` Analysis aborted in a native worker or native binding path.\n` + + ` Try one of these recovery paths:\n` + + ` gitnexus analyze --workers 0\n` + + ` npm uninstall -g gitnexus && npm install -g gitnexus@latest\n` + + ` Use Node 22 LTS if you are on a newer non-LTS runtime.\n`, + { recoveryHint: 'native-worker-abort' }, + ); } const status = - typeof e === 'object' && e !== null && 'status' in e && typeof e.status === 'number' - ? e.status - : 1; + typeof childExit.status === 'number' && childExit.status !== 0 ? childExit.status : 1; process.exitCode = status; } return true; @@ -185,6 +383,7 @@ const ANALYZE_CLI_ENV_KEYS = [ 'GITNEXUS_EMBEDDING_BATCH_SIZE', 'GITNEXUS_EMBEDDING_SUB_BATCH_SIZE', 'GITNEXUS_EMBEDDING_DEVICE', + 'GITNEXUS_ANALYZE_PROGRESS_ACTIVE', ] as const; type AnalyzeEnvSnapshot = Record<(typeof ANALYZE_CLI_ENV_KEYS)[number], string | undefined>; @@ -292,7 +491,7 @@ export const shouldGenerateCommunitySkillFiles = ( ): boolean => Boolean(options?.skills && pipelineResult && !options?.indexOnly); export const analyzeCommand = async (inputPath?: string, options?: AnalyzeOptions) => { - if (ensureHeap()) return; + if (await ensureHeap()) return; forceHeapOOMForTestIfEnabled(); // Install fatal handlers immediately after re-exec resolution so any @@ -515,19 +714,25 @@ const analyzeCommandImpl = async (inputPath?: string, options?: AnalyzeOptions): } // ── CLI progress bar setup ───────────────────────────────────────── - const bar = new cliProgress.SingleBar( - { - format: ' {bar} {percentage}% | {phase}', - barCompleteChar: '\u2588', - barIncompleteChar: '\u2591', - hideCursor: true, - barGlue: '', - autopadding: true, - clearOnComplete: false, - stopOnComplete: false, - }, - cliProgress.Presets.shades_grey, - ); + const barOptions: cliProgress.Options & { terminal?: CliProgressTerminal } = { + format: ' {bar} {percentage}% | {phase}', + barCompleteChar: '\u2588', + barIncompleteChar: '\u2591', + hideCursor: true, + barGlue: '', + autopadding: true, + clearOnComplete: false, + stopOnComplete: false, + }; + if (process.env[RESPAWN_PROGRESS_ENV] === '1' && process.stderr.isTTY !== true) { + // Heap respawn pipes stderr so the parent can classify native/OOM crashes. + // The parent was a real TTY when it opted into this env var, so forward + // ANSI cursor controls through the pipe instead of cli-progress' non-TTY + // newline mode. That keeps one-line redraw UX while retaining stderr tail + // capture for diagnostics. + barOptions.terminal = createAnsiPipeTerminal(process.stderr); + } + const bar = new cliProgress.SingleBar(barOptions, cliProgress.Presets.shades_grey); bar.start(100, 0, { phase: 'Initializing...' }); @@ -561,7 +766,7 @@ const analyzeCommandImpl = async (inputPath?: string, options?: AnalyzeOptions): // eslint-disable-next-line no-console -- intentional console-routing for progress bar UX const origError = console.error.bind(console); let barCurrentValue = 0; - const barLog = (...args: any[]) => { + const barLog = (...args: unknown[]) => { process.stdout.write('\x1b[2K\r'); origLog(args.map((a) => (typeof a === 'string' ? a : String(a))).join(' ')); bar.update(barCurrentValue); @@ -571,6 +776,7 @@ const analyzeCommandImpl = async (inputPath?: string, options?: AnalyzeOptions): console.warn = barLog; // eslint-disable-next-line no-console -- intentional console-routing for progress bar UX console.error = barLog; + process.env.GITNEXUS_ANALYZE_PROGRESS_ACTIVE = '1'; // Track elapsed time per phase let lastPhaseLabel = 'Initializing...'; diff --git a/gitnexus/src/core/ingestion/filesystem-walker.ts b/gitnexus/src/core/ingestion/filesystem-walker.ts index c30ea43216..9ba959ea64 100644 --- a/gitnexus/src/core/ingestion/filesystem-walker.ts +++ b/gitnexus/src/core/ingestion/filesystem-walker.ts @@ -23,6 +23,21 @@ export interface FilePath { } const READ_CONCURRENCY = 32; +const ANALYZE_PROGRESS_ACTIVE_ENV = 'GITNEXUS_ANALYZE_PROGRESS_ACTIVE'; + +const warnLargeFileSkip = (message: string): void => { + if (process.env[ANALYZE_PROGRESS_ACTIVE_ENV] === '1') { + // analyze.ts routes console.warn through the progress bar logger while + // the bar is active. Emitting the operator-facing large-file notice there + // avoids raw pino NDJSON corrupting the one-line progress display in the + // heap-respawn child, whose stderr is intentionally piped for crash + // classification. + // eslint-disable-next-line no-console -- intentionally routed by analyze progress UI + console.warn(message); + return; + } + logger.warn(message); +}; /** * Phase 1: Scan repository — stat files to get paths + sizes, no content loaded. @@ -76,7 +91,9 @@ export const walkRepositoryPaths = async ( const isDefault = maxFileSizeBytes === DEFAULT_MAX_FILE_SIZE_BYTES; const isOverrideUnset = !process.env.GITNEXUS_MAX_FILE_SIZE; const suffix = isDefault ? ', likely generated/vendored' : ''; - logger.warn(` Skipped ${skippedLarge} large files (>${maxFileSizeBytes / 1024}KB${suffix})`); + warnLargeFileSkip( + ` Skipped ${skippedLarge} large files (>${maxFileSizeBytes / 1024}KB${suffix})`, + ); // Always show at least the first few paths so users can diagnose why // edges are missing from a specific file (issue #1659). The full list is @@ -88,17 +105,19 @@ export const walkRepositoryPaths = async ( const showAll = isVerboseIngestionEnabled() || skippedLargePaths.length <= SKIPPED_PREVIEW_CAP; const preview = showAll ? skippedLargePaths : skippedLargePaths.slice(0, SKIPPED_PREVIEW_CAP); for (const p of preview) { - logger.warn(` - ${p}`); + warnLargeFileSkip(` - ${p}`); } if (!showAll) { const remaining = skippedLargePaths.length - SKIPPED_PREVIEW_CAP; - logger.warn(` ...and ${remaining} more (set GITNEXUS_VERBOSE=1 to list them all)`); + warnLargeFileSkip(` ...and ${remaining} more (set GITNEXUS_VERBOSE=1 to list them all)`); } // Only hint about the env var when the user has not set it at all. An // explicit GITNEXUS_MAX_FILE_SIZE=512 happens to resolve to the same // bytes as the default but the operator clearly already knows the knob. if (isDefault && isOverrideUnset) { - logger.warn(` Set GITNEXUS_MAX_FILE_SIZE= to include files above the default cap.`); + warnLargeFileSkip( + ` Set GITNEXUS_MAX_FILE_SIZE= to include files above the default cap.`, + ); } } diff --git a/gitnexus/src/core/ingestion/pipeline-phases/parse-impl.ts b/gitnexus/src/core/ingestion/pipeline-phases/parse-impl.ts index 46945d74c9..10e4557d24 100644 --- a/gitnexus/src/core/ingestion/pipeline-phases/parse-impl.ts +++ b/gitnexus/src/core/ingestion/pipeline-phases/parse-impl.ts @@ -48,7 +48,7 @@ import { ASTCache, createASTCache } from '../ast-cache.js'; import { type PipelineProgress, getLanguageFromFilename } from 'gitnexus-shared'; import { readFileContents } from '../filesystem-walker.js'; import { isLanguageAvailable } from '../../tree-sitter/parser-loader.js'; -import { createWorkerPool } from '../workers/worker-pool.js'; +import { createWorkerPool, WorkerPoolInitializationError } from '../workers/worker-pool.js'; import type { WorkerPool } from '../workers/worker-pool.js'; import type { ExtractedAssignment, @@ -252,20 +252,23 @@ export async function runChunkedParseAndResolve( const MIN_BYTES_FOR_WORKERS = options?.workerThresholdsForTest?.minBytes ?? 512 * 1024; const totalBytes = parseableScanned.reduce((s, f) => s + f.size, 0); - // Create worker pool once, reuse across chunks. + // Create worker pool lazily, reuse across cache-miss chunks. // // `workerPoolSize === 0` is a programmatic equivalent of `skipWorkers: // true` per the `PipelineOptions.workerPoolSize` contract. Short- - // circuiting here avoids constructing a useless pool that rejects - // every dispatch (with a `Worker pool parsing stopped` warn log per - // chunk) just to fall back to the sequential path via the error - // catch — the gate honors the docstring directly. - let workerPool: WorkerPool | undefined; - if ( + // circuiting here avoids constructing a useless pool. The pool is + // intentionally NOT created before parse-cache lookup: a warm-cache + // all-hit run should replay cached worker output without loading + // parse-worker.js or any tree-sitter/N-API native bindings. + const shouldUseWorkers = !options?.skipWorkers && options?.workerPoolSize !== 0 && - (totalParseable >= MIN_FILES_FOR_WORKERS || totalBytes >= MIN_BYTES_FOR_WORKERS) - ) { + (totalParseable >= MIN_FILES_FOR_WORKERS || totalBytes >= MIN_BYTES_FOR_WORKERS); + let workerPool: WorkerPool | undefined; + let workerPoolDisabled = false; + const getOrCreateWorkerPool = (): WorkerPool | undefined => { + if (!shouldUseWorkers || workerPoolDisabled) return undefined; + if (workerPool) return workerPool; try { // U20.U3 test-only injection: integration tests pass a custom // worker script URL via `workerUrlForTest` (mirrors the @@ -296,13 +299,16 @@ export async function runChunkedParseAndResolve( } } workerPool = createWorkerPool(workerUrl, options?.workerPoolSize); + return workerPool; } catch (err) { + workerPoolDisabled = true; logger.warn( { err: (err as Error).message }, 'Worker pool creation failed, using sequential fallback:', ); + return undefined; } - } + }; let filesParsedSoFar = 0; @@ -418,12 +424,18 @@ export async function runChunkedParseAndResolve( // never saw the log (M3 from PR #1693 review). const chunkStartMs: number | null = verboseThroughputLog ? Date.now() : null; - const chunkContents = await chunkContentPromises[chunkIdx]!; + const chunkContentPromise = chunkContentPromises[chunkIdx]; + if (!chunkContentPromise) { + throw new Error(`Missing prefetched parse chunk ${chunkIdx + 1}/${numChunks}`); + } + const chunkContents = await chunkContentPromise; chunkContentPromises[chunkIdx] = undefined; // release the in-memory copy startChunkPrefetch(chunkIdx + parseChunkConcurrency); - const chunkFiles = chunkPaths - .filter((p) => chunkContents.has(p)) - .map((p) => ({ path: p, content: chunkContents.get(p)! })); + const chunkFiles: Array<{ path: string; content: string }> = []; + for (const p of chunkPaths) { + const content = chunkContents.get(p); + if (content !== undefined) chunkFiles.push({ path: p, content }); + } // Compute the chunk's content-hash signature (if cache available). let chunkHash: string | null = null; @@ -436,7 +448,7 @@ export async function runChunkedParseAndResolve( } let chunkWorkerData: WorkerExtractedData | null; - const cachedRaw = chunkHash ? parseCache!.entries.get(chunkHash) : undefined; + const cachedRaw = chunkHash && parseCache ? parseCache.entries.get(chunkHash) : undefined; // Track every chunk hash we touched so the orchestrator can // prune stale entries (chunks whose composition no longer @@ -450,7 +462,7 @@ export async function runChunkedParseAndResolve( chunkWorkerData = mergeChunkResults(graph, symbolTable, cachedRaw); if (isDev) { logger.info( - `📦 parse-cache HIT: chunk ${chunkIdx + 1}/${numChunks} (${chunkFiles.length} files, ${chunkHash!.slice(0, 8)})`, + `📦 parse-cache HIT: chunk ${chunkIdx + 1}/${numChunks} (${chunkFiles.length} files, ${chunkHash?.slice(0, 8) ?? 'unknown'})`, ); } // Progress update so UI advances even on a cache hit. @@ -474,33 +486,61 @@ export async function runChunkedParseAndResolve( // them under the chunk hash for the next run. chunkCacheMisses++; const rawResults: ParseWorkerResult[] = []; - chunkWorkerData = await processParsing( - graph, - chunkFiles, - symbolTable, - astCache, - scopeTreeCache, - (current, _total, filePath) => { - const globalCurrent = filesParsedSoFar + current; - // Parse phase covers 20-70 (M2). Deferred extraction handles 70-95. - const parsingProgress = 20 + (globalCurrent / totalParseable) * 50; - onProgress({ - phase: 'parsing', - percent: Math.round(parsingProgress), - message: `Parsing chunk ${chunkIdx + 1}/${numChunks}...`, - detail: filePath, - stats: { - filesProcessed: globalCurrent, - totalFiles: totalParseable, - nodesCreated: graph.nodeCount, - }, - }); - }, - workerPool, - // Capture raw results only when we have a cache to write to — - // otherwise we'd retain extra arrays for nothing. - parseCache && chunkHash ? rawResults : undefined, - ); + const progressForChunk = (current: number, _total: number, filePath: string) => { + const globalCurrent = filesParsedSoFar + current; + // Parse phase covers 20-70 (M2). Deferred extraction handles 70-95. + const parsingProgress = 20 + (globalCurrent / totalParseable) * 50; + onProgress({ + phase: 'parsing', + percent: Math.round(parsingProgress), + message: `Parsing chunk ${chunkIdx + 1}/${numChunks}...`, + detail: filePath, + stats: { + filesProcessed: globalCurrent, + totalFiles: totalParseable, + nodesCreated: graph.nodeCount, + }, + }); + }; + const activeWorkerPool = getOrCreateWorkerPool(); + try { + chunkWorkerData = await processParsing( + graph, + chunkFiles, + symbolTable, + astCache, + scopeTreeCache, + progressForChunk, + activeWorkerPool, + // Capture raw results only when we have a cache to write to — + // otherwise we'd retain extra arrays for nothing. + parseCache && chunkHash && activeWorkerPool ? rawResults : undefined, + ); + } catch (err) { + if (!(err instanceof WorkerPoolInitializationError)) throw err; + logger.warn( + { + err: err.message, + readinessFailures: err.readinessFailures, + }, + 'Worker pool initialization failed, using sequential fallback:', + ); + rawResults.length = 0; + workerPoolDisabled = true; + const failedPool = workerPool; + workerPool = undefined; + await failedPool?.terminate().catch(() => undefined); + chunkWorkerData = await processParsing( + graph, + chunkFiles, + symbolTable, + astCache, + scopeTreeCache, + progressForChunk, + undefined, + undefined, + ); + } // Persist the raw results for this chunk hash. Sequential path // doesn't populate rawResults (it writes directly to graph), so // small repos without worker pool simply don't cache. That's fine. @@ -843,9 +883,11 @@ export async function runChunkedParseAndResolve( const cachedSequentialChunkFiles: Array> = []; for (const chunkPaths of sequentialChunkPaths) { const chunkContents = await readFileContents(repoPath, chunkPaths); - const chunkFiles = chunkPaths - .filter((p) => chunkContents.has(p)) - .map((p) => ({ path: p, content: chunkContents.get(p)! })); + const chunkFiles: Array<{ path: string; content: string }> = []; + for (const p of chunkPaths) { + const content = chunkContents.get(p); + if (content !== undefined) chunkFiles.push({ path: p, content }); + } cachedSequentialChunkFiles.push(chunkFiles); astCache = createASTCache(chunkFiles.length); const sequentialHeritage = await extractExtractedHeritageFromFiles(chunkFiles, astCache); diff --git a/gitnexus/src/core/ingestion/workers/worker-pool.ts b/gitnexus/src/core/ingestion/workers/worker-pool.ts index fc032bd565..d29b352ab4 100644 --- a/gitnexus/src/core/ingestion/workers/worker-pool.ts +++ b/gitnexus/src/core/ingestion/workers/worker-pool.ts @@ -235,6 +235,20 @@ export class WorkerPoolDispatchError extends Error { } } +export class WorkerPoolInitializationError extends WorkerPoolDispatchError { + readonly readinessFailures: readonly string[]; + + constructor( + message: string, + quarantinedPaths: readonly string[] = [], + readinessFailures: readonly string[] = [], + ) { + super(message, quarantinedPaths); + this.name = 'WorkerPoolInitializationError'; + this.readinessFailures = readinessFailures; + } +} + /** Message shapes sent back by worker threads. */ type WorkerOutgoingMessage = | { type: 'progress'; filesProcessed: number } @@ -592,6 +606,7 @@ export const createWorkerPool = ( // 1100+ LOC of pool plumbing. Public worker-pool API is unchanged — // `getQuarantinedPaths()` still returns the same defensive copy. const quarantine = createQuarantine(); + const initialReadinessFailures: string[] = []; // Per-slot consecutive-failure counter (F6): replaces the prior pool-wide // scalar so a chronically-failing slot trips the breaker on its own // failure streak instead of being masked by another slot's successes. @@ -636,6 +651,7 @@ export const createWorkerPool = ( try { await waitForWorkerReady(w); } catch (err) { + initialReadinessFailures.push(err instanceof Error ? err.message : String(err)); logger.warn( { workerIndex: i, @@ -673,7 +689,15 @@ export const createWorkerPool = ( } if (items.length === 0) return []; if (activeSlots.size === 0) { - throw new WorkerPoolDispatchError('Worker pool has no active workers', []); + const detail = + initialReadinessFailures.length > 0 + ? ` after initial ready handshake: ${initialReadinessFailures.join('; ')}` + : ''; + throw new WorkerPoolInitializationError( + `Worker pool has no active workers${detail}`, + [], + initialReadinessFailures, + ); } // Layer 3: filter out quarantined paths so a known-bad file never reaches diff --git a/gitnexus/test/integration/filesystem-walker.test.ts b/gitnexus/test/integration/filesystem-walker.test.ts index 1d743a8989..8f934573c7 100644 --- a/gitnexus/test/integration/filesystem-walker.test.ts +++ b/gitnexus/test/integration/filesystem-walker.test.ts @@ -438,6 +438,28 @@ describe('filesystem-walker', () => { .filter((r) => String(r.msg ?? '').includes('GITNEXUS_MAX_FILE_SIZE=')); expect(hint.length).toBe(0); }); + + it('routes large-file notices through console.warn while analyze progress is active', async () => { + const originalProgressActive = process.env.GITNEXUS_ANALYZE_PROGRESS_ACTIVE; + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined); + try { + process.env.GITNEXUS_ANALYZE_PROGRESS_ACTIVE = '1'; + await walkRepositoryPaths(sizeDir); + const messages = warnSpy.mock.calls.map(([msg]) => String(msg)); + expect(messages.some((m) => m.includes('Skipped 1 large files'))).toBe(true); + expect(messages.some((m) => m.includes(BIG_FILE))).toBe(true); + expect(cap.records().filter((r) => String(r.msg ?? '').includes('Skipped '))).toHaveLength( + 0, + ); + } finally { + warnSpy.mockRestore(); + if (originalProgressActive === undefined) { + delete process.env.GITNEXUS_ANALYZE_PROGRESS_ACTIVE; + } else { + process.env.GITNEXUS_ANALYZE_PROGRESS_ACTIVE = originalProgressActive; + } + } + }); }); describe('large file skip preview cap (#1659)', () => { diff --git a/gitnexus/test/integration/worker-pool.test.ts b/gitnexus/test/integration/worker-pool.test.ts index 5c5f11944e..b8740957ba 100644 --- a/gitnexus/test/integration/worker-pool.test.ts +++ b/gitnexus/test/integration/worker-pool.test.ts @@ -394,7 +394,7 @@ describe('worker pool integration', () => { const { parentPort } = require('node:worker_threads'); const markerPath = ${JSON.stringify(markerPath)}; if (fs.existsSync(markerPath)) { - throw new Error('simulated startup crash'); + process.exit(1); } parentPort.on('message', (msg) => { if (msg && msg.type === 'sub-batch') { diff --git a/gitnexus/test/unit/analyze-heap-respawn.test.ts b/gitnexus/test/unit/analyze-heap-respawn.test.ts index 2f094ddbfd..9a2db16594 100644 --- a/gitnexus/test/unit/analyze-heap-respawn.test.ts +++ b/gitnexus/test/unit/analyze-heap-respawn.test.ts @@ -1,11 +1,12 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { EventEmitter } from 'node:events'; -const execFileSyncMock = vi.fn(); +const spawnMock = vi.fn(); const getHeapStatisticsMock = vi.fn(); vi.mock('child_process', async () => { const actual = await vi.importActual('child_process'); - return { ...actual, execFileSync: execFileSyncMock }; + return { ...actual, spawn: spawnMock }; }); vi.mock('v8', () => ({ @@ -18,33 +19,99 @@ vi.mock('../../src/core/lbug/lbug-adapter.js', () => ({ closeLbug: vi.fn(async () => undefined), })); +const mockSpawnExit = ({ + status = 0, + signal = null, + stdout = '', + stderr = '', +}: { + status?: number | null; + signal?: NodeJS.Signals | null; + stdout?: string | Buffer; + stderr?: string | Buffer; +} = {}) => { + spawnMock.mockImplementationOnce(() => { + const child = new EventEmitter() as EventEmitter & { + stdout: EventEmitter; + stderr: EventEmitter; + }; + child.stdout = new EventEmitter(); + child.stderr = new EventEmitter(); + queueMicrotask(() => { + if (stdout) child.stdout.emit('data', stdout); + if (stderr) child.stderr.emit('data', stderr); + child.emit('close', status, signal); + }); + return child; + }); +}; + +const setStreamIsTTY = (stream: NodeJS.WriteStream, value: boolean): (() => void) => { + const descriptor = Object.getOwnPropertyDescriptor(stream, 'isTTY'); + Object.defineProperty(stream, 'isTTY', { configurable: true, value }); + return () => { + if (descriptor) Object.defineProperty(stream, 'isTTY', descriptor); + else delete (stream as NodeJS.WriteStream & { isTTY?: boolean }).isTTY; + }; +}; + describe('analyzeCommand heap respawn', () => { let initialNodeOptions: string | undefined; + let stdoutWriteSpy: ReturnType; + let stderrWriteSpy: ReturnType; + let restoreStdoutIsTTY: (() => void) | undefined; + let restoreStderrIsTTY: (() => void) | undefined; beforeEach(() => { initialNodeOptions = process.env.NODE_OPTIONS; vi.resetModules(); - execFileSyncMock.mockReset(); + spawnMock.mockReset(); getHeapStatisticsMock.mockReset(); process.exitCode = undefined; + stdoutWriteSpy = vi.spyOn(process.stdout, 'write').mockImplementation(() => true); + stderrWriteSpy = vi.spyOn(process.stderr, 'write').mockImplementation(() => true); }); afterEach(() => { + restoreStdoutIsTTY?.(); + restoreStderrIsTTY?.(); + restoreStdoutIsTTY = undefined; + restoreStderrIsTTY = undefined; + stdoutWriteSpy.mockRestore(); + stderrWriteSpy.mockRestore(); if (initialNodeOptions === undefined) delete process.env.NODE_OPTIONS; else process.env.NODE_OPTIONS = initialNodeOptions; }); - it('re-execs analyze with 16GB heap when no max-old-space-size is present', async () => { + it('re-execs analyze with 16GB heap and bridges progress redraw when parent is a TTY', async () => { delete process.env.NODE_OPTIONS; + restoreStderrIsTTY = setStreamIsTTY(process.stderr, true); getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); + mockSpawnExit(); const { analyzeCommand } = await import('../../src/cli/analyze.js'); await analyzeCommand(undefined, {}); - expect(execFileSyncMock).toHaveBeenCalledTimes(1); - const [, args, opts] = execFileSyncMock.mock.calls[0]; + expect(spawnMock).toHaveBeenCalledTimes(1); + const [, args, opts] = spawnMock.mock.calls[0]; expect(args).toContain('--max-old-space-size=16384'); expect(opts.env.NODE_OPTIONS).toContain('--max-old-space-size=16384'); + expect(opts.env.GITNEXUS_RESPAWN_PROGRESS_TTY).toBe('1'); + }); + + it('does not force ANSI progress when the parent output is not a TTY', async () => { + delete process.env.NODE_OPTIONS; + restoreStdoutIsTTY = setStreamIsTTY(process.stdout, false); + restoreStderrIsTTY = setStreamIsTTY(process.stderr, false); + getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); + mockSpawnExit(); + + const { analyzeCommand } = await import('../../src/cli/analyze.js'); + await analyzeCommand(undefined, {}); + + expect(spawnMock).toHaveBeenCalledTimes(1); + const [, , opts] = spawnMock.mock.calls[0]; + expect(opts.env.GITNEXUS_RESPAWN_PROGRESS_TTY).toBeUndefined(); }); it('does not re-exec when NODE_OPTIONS already defines max-old-space-size', async () => { @@ -54,18 +121,13 @@ describe('analyzeCommand heap respawn', () => { const { analyzeCommand } = await import('../../src/cli/analyze.js'); await analyzeCommand('/__gitnexus_nonexistent__', {}); - expect(execFileSyncMock).not.toHaveBeenCalled(); + expect(spawnMock).not.toHaveBeenCalled(); }); it('prints heap guidance when respawned analyze exits with likely OOM', async () => { delete process.env.NODE_OPTIONS; getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); - execFileSyncMock.mockImplementationOnce(() => { - const err = new Error('child failed') as Error & { status?: number; signal?: string }; - err.status = undefined; - err.signal = 'SIGABRT'; - throw err; - }); + mockSpawnExit({ status: null, signal: 'SIGABRT' }); const { _captureLogger } = await import('../../src/core/logger.js'); const cap = _captureLogger(); @@ -89,18 +151,12 @@ describe('analyzeCommand heap respawn', () => { it('prints heap guidance when child stderr contains heap OOM signature', async () => { delete process.env.NODE_OPTIONS; getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); - execFileSyncMock.mockImplementationOnce(() => { - const err = new Error('Command failed') as Error & { - status?: number; - signal?: string; - stderr?: Buffer; - }; - err.status = 1; - err.signal = undefined; - err.stderr = Buffer.from( + mockSpawnExit({ + status: 1, + signal: null, + stderr: Buffer.from( 'FATAL ERROR: Reached heap limit Allocation failed - JavaScript heap out of memory', - ); - throw err; + ), }); const { _captureLogger } = await import('../../src/core/logger.js'); @@ -118,16 +174,10 @@ describe('analyzeCommand heap respawn', () => { it('prints heap guidance when child stdout contains heap OOM signature', async () => { delete process.env.NODE_OPTIONS; getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); - execFileSyncMock.mockImplementationOnce(() => { - const err = new Error('Command failed') as Error & { - status?: number; - signal?: string; - stdout?: string; - }; - err.status = 1; - err.signal = undefined; - err.stdout = 'FATAL ERROR: JavaScript heap out of memory'; - throw err; + mockSpawnExit({ + status: 1, + signal: null, + stdout: 'FATAL ERROR: JavaScript heap out of memory', }); const { _captureLogger } = await import('../../src/core/logger.js'); @@ -145,19 +195,7 @@ describe('analyzeCommand heap respawn', () => { it('prints heap guidance when child exits 134 without output', async () => { delete process.env.NODE_OPTIONS; getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); - execFileSyncMock.mockImplementationOnce(() => { - const err = new Error('Command failed') as Error & { - status?: number; - signal?: string; - stderr?: string; - stdout?: string; - }; - err.status = 134; - err.signal = undefined; - err.stderr = ''; - err.stdout = ''; - throw err; - }); + mockSpawnExit({ status: 134, signal: null, stderr: '', stdout: '' }); const { _captureLogger } = await import('../../src/core/logger.js'); const cap = _captureLogger(); @@ -174,16 +212,10 @@ describe('analyzeCommand heap respawn', () => { it('does not print heap guidance for non-OOM child failures with output', async () => { delete process.env.NODE_OPTIONS; getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); - execFileSyncMock.mockImplementationOnce(() => { - const err = new Error('Command failed') as Error & { - status?: number; - signal?: string; - stderr?: Buffer; - }; - err.status = 2; - err.signal = undefined; - err.stderr = Buffer.from('parser failed: invalid token'); - throw err; + mockSpawnExit({ + status: 2, + signal: null, + stderr: Buffer.from('parser failed: invalid token'), }); const { _captureLogger } = await import('../../src/core/logger.js'); @@ -197,4 +229,30 @@ describe('analyzeCommand heap respawn', () => { ); cap.restore(); }); + + it('does not print heap guidance when a SIGABRT child emitted a native N-API crash', async () => { + delete process.env.NODE_OPTIONS; + getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); + mockSpawnExit({ + status: 134, + signal: null, + stderr: Buffer.from('libc++abi: terminating due to uncaught exception of type Napi::Error'), + }); + + const { _captureLogger } = await import('../../src/core/logger.js'); + const cap = _captureLogger(); + const { analyzeCommand } = await import('../../src/cli/analyze.js'); + await analyzeCommand(undefined, {}); + + expect(process.exitCode).toBe(134); + expect(cap.records().some((r) => r.msg.includes('Analysis likely ran out of memory.'))).toBe( + false, + ); + expect(cap.records().some((r) => r.msg.includes('Analysis aborted in a native worker'))).toBe( + true, + ); + expect(cap.records().some((r) => r.recoveryHint === 'native-worker-abort')).toBe(true); + expect(stderrWriteSpy).toHaveBeenCalled(); + cap.restore(); + }); }); diff --git a/gitnexus/test/unit/analyze-respawn-progress-terminal.test.ts b/gitnexus/test/unit/analyze-respawn-progress-terminal.test.ts new file mode 100644 index 0000000000..2daff60fea --- /dev/null +++ b/gitnexus/test/unit/analyze-respawn-progress-terminal.test.ts @@ -0,0 +1,124 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +interface CapturedTerminal { + cursorTo(x?: number | null, y?: number | null): void; + clearRight(): void; + newline(): void; + isTTY(): boolean; +} + +interface CapturedBarOptions { + noTTYOutput?: boolean; + notTTYSchedule?: number; + terminal?: CapturedTerminal; +} + +const mocks = vi.hoisted(() => ({ + runFullAnalysisMock: vi.fn(), + capturedBarOptions: [] as CapturedBarOptions[], +})); + +vi.mock('cli-progress', () => ({ + default: { + SingleBar: vi.fn(function (options: CapturedBarOptions) { + mocks.capturedBarOptions.push(options); + return { + start: vi.fn(), + update: vi.fn(), + stop: vi.fn(), + }; + }), + Presets: { shades_grey: {} }, + }, +})); + +vi.mock('../../src/core/run-analyze.js', () => ({ + runFullAnalysis: mocks.runFullAnalysisMock, +})); + +vi.mock('../../src/core/lbug/lbug-adapter.js', () => ({ + closeLbug: vi.fn(async () => undefined), +})); + +vi.mock('../../src/storage/repo-manager.js', () => ({ + getStoragePaths: vi.fn(() => ({ storagePath: '.gitnexus', lbugPath: '.gitnexus/lbug' })), + getGlobalRegistryPath: vi.fn(() => 'registry.json'), + RegistryNameCollisionError: class RegistryNameCollisionError extends Error {}, + AnalysisNotFinalizedError: class AnalysisNotFinalizedError extends Error {}, + assertAnalysisFinalized: vi.fn(async () => undefined), +})); + +vi.mock('../../src/storage/git.js', () => ({ + getGitRoot: vi.fn(() => '/repo'), + hasGitDir: vi.fn(() => true), +})); + +vi.mock('../../src/core/ingestion/utils/max-file-size.js', () => ({ + getMaxFileSizeBannerMessage: vi.fn(() => null), +})); + +const setStreamIsTTY = (stream: NodeJS.WriteStream, value: boolean): (() => void) => { + const descriptor = Object.getOwnPropertyDescriptor(stream, 'isTTY'); + Object.defineProperty(stream, 'isTTY', { configurable: true, value }); + return () => { + if (descriptor) Object.defineProperty(stream, 'isTTY', descriptor); + else delete (stream as NodeJS.WriteStream & { isTTY?: boolean }).isTTY; + }; +}; + +describe('analyzeCommand respawn progress terminal bridge', () => { + const ORIGINAL_NODE_OPTIONS = process.env.NODE_OPTIONS; + const ORIGINAL_RESPAWN_PROGRESS = process.env.GITNEXUS_RESPAWN_PROGRESS_TTY; + let restoreStderrIsTTY: (() => void) | undefined; + let stdoutWriteSpy: ReturnType; + let stderrWriteSpy: ReturnType; + + beforeEach(() => { + vi.resetModules(); + mocks.runFullAnalysisMock.mockReset(); + mocks.capturedBarOptions.length = 0; + mocks.runFullAnalysisMock.mockResolvedValue({ + repoName: 'repo', + repoPath: '/repo', + stats: {}, + alreadyUpToDate: true, + }); + process.exitCode = undefined; + process.env.NODE_OPTIONS = '--max-old-space-size=8192'; + process.env.GITNEXUS_RESPAWN_PROGRESS_TTY = '1'; + restoreStderrIsTTY = setStreamIsTTY(process.stderr, false); + stdoutWriteSpy = vi.spyOn(process.stdout, 'write').mockImplementation(() => true); + stderrWriteSpy = vi.spyOn(process.stderr, 'write').mockImplementation(() => true); + }); + + afterEach(() => { + stdoutWriteSpy.mockRestore(); + stderrWriteSpy.mockRestore(); + restoreStderrIsTTY?.(); + restoreStderrIsTTY = undefined; + if (ORIGINAL_NODE_OPTIONS === undefined) delete process.env.NODE_OPTIONS; + else process.env.NODE_OPTIONS = ORIGINAL_NODE_OPTIONS; + if (ORIGINAL_RESPAWN_PROGRESS === undefined) delete process.env.GITNEXUS_RESPAWN_PROGRESS_TTY; + else process.env.GITNEXUS_RESPAWN_PROGRESS_TTY = ORIGINAL_RESPAWN_PROGRESS; + }); + + it('uses an ANSI terminal shim instead of cli-progress non-TTY newline mode', async () => { + const { analyzeCommand } = await import('../../src/cli/analyze.js'); + + await analyzeCommand(undefined, {}); + + expect(mocks.capturedBarOptions).toHaveLength(1); + const options = mocks.capturedBarOptions[0]; + expect(options.noTTYOutput).toBeUndefined(); + expect(options.notTTYSchedule).toBeUndefined(); + expect(options.terminal).toBeDefined(); + expect(options.terminal.isTTY()).toBe(true); + + options.terminal.cursorTo(0, null); + options.terminal.clearRight(); + options.terminal.newline(); + expect(stderrWriteSpy).toHaveBeenCalledWith('\r'); + expect(stderrWriteSpy).toHaveBeenCalledWith('\x1B[0K'); + expect(stderrWriteSpy).toHaveBeenCalledWith('\n'); + }); +}); diff --git a/gitnexus/test/unit/parse-impl-worker-lazy-cache.test.ts b/gitnexus/test/unit/parse-impl-worker-lazy-cache.test.ts new file mode 100644 index 0000000000..44242b02d0 --- /dev/null +++ b/gitnexus/test/unit/parse-impl-worker-lazy-cache.test.ts @@ -0,0 +1,244 @@ +/** + * Regression coverage for native-worker startup on warm parse-cache runs. + * + * A cache-hit chunk must replay cached worker output without spawning the + * parse-worker. Spawning workers on a warm cache hit still loads tree-sitter + * native bindings at top level, which was the root trigger for intermittent + * `libc++abi ... Napi::Error` crashes in linked local builds. + */ +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; +import { pathToFileURL } from 'node:url'; + +import { createKnowledgeGraph } from '../../src/core/graph/graph.js'; +import { runChunkedParseAndResolve } from '../../src/core/ingestion/pipeline-phases/parse-impl.js'; +import { computeChunkHash, fileContentHash } from '../../src/storage/parse-cache.js'; +import type { ParseWorkerResult } from '../../src/core/ingestion/workers/parse-worker.js'; + +const emptyWorkerResult = (filePath: string, name: string): ParseWorkerResult => ({ + nodes: [ + { + id: `Function:${filePath}:${name}`, + label: 'Function', + properties: { + name, + filePath, + startLine: 1, + endLine: 1, + language: 'typescript', + }, + }, + ], + relationships: [], + symbols: [], + imports: [], + calls: [], + assignments: [], + heritage: [], + routes: [], + fetchCalls: [], + decoratorRoutes: [], + toolDefs: [], + ormQueries: [], + constructorBindings: [], + fileScopeBindings: [], + parsedFiles: [], + skippedLanguages: {}, + fileCount: 1, +}); + +const writeReadyWorker = (workerPath: string, markerPath: string): void => { + fs.writeFileSync( + workerPath, + ` +const fs = require('node:fs'); +const { parentPort } = require('node:worker_threads'); +fs.writeFileSync(${JSON.stringify(markerPath)}, 'spawned'); +parentPort.postMessage({ type: 'ready' }); +parentPort.on('message', () => {}); +`, + ); +}; + +const writeResultWorker = (workerPath: string, markerPath: string): void => { + fs.writeFileSync( + workerPath, + ` +const fs = require('node:fs'); +const { parentPort } = require('node:worker_threads'); +const decoder = new TextDecoder('utf-8'); +fs.writeFileSync(${JSON.stringify(markerPath)}, 'spawned'); +parentPort.postMessage({ type: 'ready' }); +const accumulated = { + nodes: [], relationships: [], symbols: [], imports: [], calls: [], assignments: [], heritage: [], + routes: [], fetchCalls: [], decoratorRoutes: [], toolDefs: [], ormQueries: [], constructorBindings: [], + fileScopeBindings: [], parsedFiles: [], skippedLanguages: {}, fileCount: 0, +}; +parentPort.on('message', (msg) => { + if (msg && msg.type === 'sub-batch') { + for (const file of msg.files) { + const filePath = file.path; + const name = filePath.split('/').pop().replace(/\\.ts$/, ''); + accumulated.nodes.push({ + id: 'Function:' + filePath + ':' + name, + label: 'Function', + properties: { name, filePath, startLine: 1, endLine: 1, language: 'typescript' }, + }); + accumulated.fileCount++; + // Decode to exercise the same transfer-list shape as production. + if (file.content && typeof file.content !== 'string') decoder.decode(file.content); + } + parentPort.postMessage({ type: 'progress', filesProcessed: accumulated.fileCount }); + parentPort.postMessage({ type: 'sub-batch-done' }); + return; + } + if (msg && msg.type === 'flush') parentPort.postMessage({ type: 'result', data: accumulated }); +}); +`, + ); +}; + +const writeExitBeforeReadyWorker = (workerPath: string): void => { + fs.writeFileSync(workerPath, `process.exit(1);\n`); +}; + +describe('parse-impl worker pool lazy startup', () => { + let tempDir = ''; + let repoDir = ''; + + beforeEach(() => { + tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'parse-impl-worker-lazy-cache-')); + repoDir = path.join(tempDir, 'repo'); + fs.mkdirSync(repoDir, { recursive: true }); + }); + + afterEach(() => { + if (tempDir) fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('does not spawn a parse worker when every chunk is served from parse cache', async () => { + const rel = 'src/cached.ts'; + const content = 'export function cached() { return 1; }\n'; + const full = path.join(repoDir, rel); + fs.mkdirSync(path.dirname(full), { recursive: true }); + fs.writeFileSync(full, content); + + const chunkHash = computeChunkHash([{ filePath: rel, contentHash: fileContentHash(content) }]); + const parseCache = { + version: 'test', + entries: new Map([ + [chunkHash, [emptyWorkerResult(rel, 'cached')]], + ]), + usedKeys: new Set(), + }; + + const markerPath = path.join(tempDir, 'worker-spawned.marker'); + const workerPath = path.join(tempDir, 'ready-worker.js'); + writeReadyWorker(workerPath, markerPath); + + const graph = createKnowledgeGraph(); + await runChunkedParseAndResolve( + graph, + [{ path: rel, size: fs.statSync(full).size }], + [rel], + 1, + repoDir, + Date.now(), + () => {}, + { + workerThresholdsForTest: { minFiles: 1, minBytes: 1 }, + workerUrlForTest: pathToFileURL(workerPath), + workerPoolSize: 1, + parseCache, + }, + ); + + expect(fs.existsSync(markerPath)).toBe(false); + expect(parseCache.usedKeys.has(chunkHash)).toBe(true); + expect(Array.from(graph.nodes.values()).some((n) => n.properties.name === 'cached')).toBe(true); + }); + + it('spawns the parse worker lazily on the first cache miss and stores raw results', async () => { + const rel = 'src/miss.ts'; + const content = 'export function miss() { return 1; }\n'; + const full = path.join(repoDir, rel); + fs.mkdirSync(path.dirname(full), { recursive: true }); + fs.writeFileSync(full, content); + + const markerPath = path.join(tempDir, 'worker-spawned.marker'); + const workerPath = path.join(tempDir, 'result-worker.js'); + writeResultWorker(workerPath, markerPath); + + const parseCache = { + version: 'test', + entries: new Map(), + usedKeys: new Set(), + }; + const chunkHash = computeChunkHash([{ filePath: rel, contentHash: fileContentHash(content) }]); + + const graph = createKnowledgeGraph(); + await runChunkedParseAndResolve( + graph, + [{ path: rel, size: fs.statSync(full).size }], + [rel], + 1, + repoDir, + Date.now(), + () => {}, + { + workerThresholdsForTest: { minFiles: 1, minBytes: 1 }, + workerUrlForTest: pathToFileURL(workerPath), + workerPoolSize: 1, + parseCache, + }, + ); + + expect(fs.existsSync(markerPath)).toBe(true); + expect(parseCache.entries.has(chunkHash)).toBe(true); + expect(Array.from(graph.nodes.values()).some((n) => n.properties.name === 'miss')).toBe(true); + }); + + it('falls back to sequential parsing when initial workers exit before ready', async () => { + const rel = 'src/fallback.ts'; + const content = 'export function fallback() { return 1; }\n'; + const full = path.join(repoDir, rel); + fs.mkdirSync(path.dirname(full), { recursive: true }); + fs.writeFileSync(full, content); + + const workerPath = path.join(tempDir, 'exit-before-ready-worker.js'); + writeExitBeforeReadyWorker(workerPath); + + const parseCache = { + version: 'test', + entries: new Map(), + usedKeys: new Set(), + }; + const chunkHash = computeChunkHash([{ filePath: rel, contentHash: fileContentHash(content) }]); + + const graph = createKnowledgeGraph(); + const result = await runChunkedParseAndResolve( + graph, + [{ path: rel, size: fs.statSync(full).size }], + [rel], + 1, + repoDir, + Date.now(), + () => {}, + { + workerThresholdsForTest: { minFiles: 1, minBytes: 1 }, + workerUrlForTest: pathToFileURL(workerPath), + workerPoolSize: 1, + parseCache, + }, + ); + + expect(result.usedWorkerPool).toBe(false); + expect(parseCache.usedKeys.has(chunkHash)).toBe(true); + expect(parseCache.entries.has(chunkHash)).toBe(false); + expect(Array.from(graph.nodes.values()).some((n) => n.properties.name === 'fallback')).toBe( + true, + ); + }); +}); From acc4c1b4b2f0228a7e3aa6b331f52a5daea73ded Mon Sep 17 00:00:00 2001 From: wangxc Date: Thu, 21 May 2026 20:16:58 +0800 Subject: [PATCH 2/3] ci(docker): tolerate slower arm64 TypeScript builds Docker PR builds run gitnexus prepare under QEMU for linux/arm64, where the fixed 120s TypeScript timeout can kill otherwise healthy builds. Increase the default timeout and allow GITNEXUS_BUILD_TIMEOUT_MS to tune slower environments without changing the build steps. Constraint: PR #1751 Docker Build & Push gitnexus failed with spawnSync /bin/sh ETIMEDOUT while running node_modules/.bin/tsc in scripts/build.js.\nRejected: Rerunning CI only | the failure was the build script's deterministic timeout boundary under arm64 emulation, not a code assertion.\nConfidence: high\nScope-risk: narrow\nDirective: Keep build timeout changes in scripts/build.js configurable; do not hide real compiler failures, only allow slower successful compiles to finish.\nTested: GitNexus impact for gitnexus/scripts/build.js reported LOW; gitnexus detect_changes reported 1 changed file, 0 affected processes, low risk; git diff --check; gitnexus npm run build.\nNot-tested: GitHub Docker arm64 build rerun before pushing; local Docker multi-platform build under QEMU. --- gitnexus/scripts/build.js | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/gitnexus/scripts/build.js b/gitnexus/scripts/build.js index ec7f67cf4c..e651708e37 100644 --- a/gitnexus/scripts/build.js +++ b/gitnexus/scripts/build.js @@ -18,6 +18,22 @@ const ROOT = path.resolve(__dirname, '..'); const SHARED_ROOT = path.resolve(ROOT, '..', 'gitnexus-shared'); const DIST = path.join(ROOT, 'dist'); const SHARED_DEST = path.join(DIST, '_shared'); +const DEFAULT_BUILD_TIMEOUT_MS = 300_000; + +function getBuildTimeoutMs() { + const raw = process.env.GITNEXUS_BUILD_TIMEOUT_MS; + if (raw === undefined || raw.trim() === '') return DEFAULT_BUILD_TIMEOUT_MS; + + const parsed = Number.parseInt(raw, 10); + if (Number.isFinite(parsed) && parsed > 0) return parsed; + + console.warn( + `[build] ignoring invalid GITNEXUS_BUILD_TIMEOUT_MS=${JSON.stringify(raw)}; using ${DEFAULT_BUILD_TIMEOUT_MS}ms`, + ); + return DEFAULT_BUILD_TIMEOUT_MS; +} + +const BUILD_TIMEOUT_MS = getBuildTimeoutMs(); // ── 1. Build gitnexus-shared ─────────────────────────────────────── console.log('[build] compiling gitnexus-shared…'); @@ -25,11 +41,11 @@ const tscCmd = process.platform === 'win32' ? path.join('node_modules', '.bin', 'tsc.cmd') : path.join('node_modules', '.bin', 'tsc'); -execSync(tscCmd, { cwd: SHARED_ROOT, stdio: 'inherit', timeout: 120_000 }); +execSync(tscCmd, { cwd: SHARED_ROOT, stdio: 'inherit', timeout: BUILD_TIMEOUT_MS }); // ── 2. Build gitnexus ────────────────────────────────────────────── console.log('[build] compiling gitnexus…'); -execSync(tscCmd, { cwd: ROOT, stdio: 'inherit', timeout: 120_000 }); +execSync(tscCmd, { cwd: ROOT, stdio: 'inherit', timeout: BUILD_TIMEOUT_MS }); // ── 3. Copy shared dist ──────────────────────────────────────────── console.log('[build] copying shared module into dist/_shared…'); @@ -82,9 +98,9 @@ if (fs.existsSync(path.join(WEB_ROOT, 'package.json'))) { console.log('[build] building gitnexus-web…'); if (!fs.existsSync(path.join(WEB_ROOT, 'node_modules'))) { console.log('[build] installing gitnexus-web dependencies…'); - execSync('npm ci', { cwd: WEB_ROOT, stdio: 'inherit', timeout: 120_000 }); + execSync('npm ci', { cwd: WEB_ROOT, stdio: 'inherit', timeout: BUILD_TIMEOUT_MS }); } - execSync('npm run build', { cwd: WEB_ROOT, stdio: 'inherit', timeout: 120_000 }); + execSync('npm run build', { cwd: WEB_ROOT, stdio: 'inherit', timeout: BUILD_TIMEOUT_MS }); // Copy dist → gitnexus/web/ (shipped in the npm package) fs.rmSync(WEB_DEST, { recursive: true, force: true }); From 6fba199c214d11c59376e280ad788087f007b51d Mon Sep 17 00:00:00 2001 From: wangxc Date: Thu, 21 May 2026 21:03:57 +0800 Subject: [PATCH 3/3] fix(analyze): truncate respawn progress safely Preserve complete ANSI escape sequences and grapheme boundaries when the respawn progress terminal shim truncates wrapped output, so the shim does not emit dangling escape bytes or split surrogate pairs while keeping raw writes untouched. Constraint: Claude review on PR #1751 flagged `s.slice(0, width)` in createAnsiPipeTerminal.write() as a latent terminal-corruption risk. Rejected: Adding a display-width dependency | a local helper is sufficient for this narrow respawn terminal shim and avoids new dependency churn. Rejected: Changing silent status-134 classification | current tests already document the output-less 134 fallback as heap guidance. Confidence: high Scope-risk: narrow Directive: Keep respawn terminal writes ANSI-aware and preserve rawWrite bypass semantics for callers that intentionally write control sequences. Tested: GitNexus impact for createAnsiPipeTerminal reported LOW; GitNexus detect_changes reported 2 changed files, 3 affected processes, medium risk; targeted vitest for analyze respawn progress and heap respawn; gitnexus npx tsc --noEmit; prettier check for changed files; eslint for changed files. Not-tested: Full npm test suite; manual terminal rendering on Windows. --- gitnexus/src/cli/analyze.ts | 98 ++++++++++++++++++- .../analyze-respawn-progress-terminal.test.ts | 23 +++++ 2 files changed, 120 insertions(+), 1 deletion(-) diff --git a/gitnexus/src/cli/analyze.ts b/gitnexus/src/cli/analyze.ts index f4aac990db..29253f969c 100644 --- a/gitnexus/src/cli/analyze.ts +++ b/gitnexus/src/cli/analyze.ts @@ -104,6 +104,102 @@ const terminalColumns = (): number => { return Number.isFinite(parsed) && parsed > 0 ? Math.floor(parsed) : 80; }; +const ANSI_ESCAPE_PATTERN = + /\x1B(?:\[[0-?]*[ -/]*[@-~]|\][^\x07]*(?:\x07|\x1B\\)|[PX^_][\s\S]*?\x1B\\|[78]|[@-Z\\-_])/y; + +interface IntlSegmenterLike { + segment(input: string): Iterable<{ segment: string }>; +} + +type IntlWithOptionalSegmenter = typeof Intl & { + Segmenter?: new ( + locales?: string | string[], + options?: { granularity?: 'grapheme' }, + ) => IntlSegmenterLike; +}; + +const splitGraphemes = (text: string): string[] => { + const Segmenter = (Intl as IntlWithOptionalSegmenter).Segmenter; + if (Segmenter) { + return Array.from( + new Segmenter(undefined, { granularity: 'grapheme' }).segment(text), + (s) => s.segment, + ); + } + return Array.from(text); +}; + +const isZeroWidthCodePoint = (codePoint: number): boolean => + codePoint === 0x200d || + (codePoint >= 0x0300 && codePoint <= 0x036f) || + (codePoint >= 0x1ab0 && codePoint <= 0x1aff) || + (codePoint >= 0x1dc0 && codePoint <= 0x1dff) || + (codePoint >= 0x20d0 && codePoint <= 0x20ff) || + (codePoint >= 0xfe00 && codePoint <= 0xfe0f) || + (codePoint >= 0xfe20 && codePoint <= 0xfe2f); + +const isWideCodePoint = (codePoint: number): boolean => + codePoint >= 0x1100 && + (codePoint <= 0x115f || + codePoint === 0x2329 || + codePoint === 0x232a || + (codePoint >= 0x2e80 && codePoint <= 0xa4cf && codePoint !== 0x303f) || + (codePoint >= 0xac00 && codePoint <= 0xd7a3) || + (codePoint >= 0xf900 && codePoint <= 0xfaff) || + (codePoint >= 0xfe10 && codePoint <= 0xfe19) || + (codePoint >= 0xfe30 && codePoint <= 0xfe6f) || + (codePoint >= 0xff00 && codePoint <= 0xff60) || + (codePoint >= 0xffe0 && codePoint <= 0xffe6) || + (codePoint >= 0x1f300 && codePoint <= 0x1faff) || + (codePoint >= 0x20000 && codePoint <= 0x3fffd)); + +const visibleColumns = (text: string): number => { + let columns = 0; + for (const char of Array.from(text)) { + const codePoint = char.codePointAt(0); + if (codePoint === undefined || isZeroWidthCodePoint(codePoint)) continue; + columns += isWideCodePoint(codePoint) ? 2 : 1; + } + return columns; +}; + +const readAnsiEscapeAt = (text: string, index: number): string | undefined => { + ANSI_ESCAPE_PATTERN.lastIndex = index; + return ANSI_ESCAPE_PATTERN.exec(text)?.[0]; +}; + +const truncateAnsiToColumns = (text: string, maxColumns: number): string => { + if (!Number.isFinite(maxColumns) || maxColumns <= 0) return ''; + + let output = ''; + let columns = 0; + let index = 0; + + while (index < text.length) { + const escape = readAnsiEscapeAt(text, index); + if (escape) { + output += escape; + index += escape.length; + continue; + } + + const nextEscapeIndex = text.indexOf('\x1B', index); + const plainEnd = nextEscapeIndex === -1 ? text.length : nextEscapeIndex; + const plainText = text.slice(index, plainEnd); + + for (const segment of splitGraphemes(plainText)) { + const width = visibleColumns(segment); + if (width > 0 && columns + width > maxColumns) return output; + output += segment; + columns += width; + } + + index = plainEnd; + } + + return output; +}; + const createAnsiPipeTerminal = (stream: NodeJS.WriteStream): CliProgressTerminal => { let linewrap = true; let dy = 0; @@ -155,7 +251,7 @@ const createAnsiPipeTerminal = (stream: NodeJS.WriteStream): CliProgressTerminal }, write: (s, rawWrite = false) => { const width = terminalColumns(); - write(linewrap && rawWrite === false ? s.slice(0, width) : s); + write(linewrap && rawWrite === false ? truncateAnsiToColumns(s, width) : s); }, isTTY: () => true, getWidth: terminalColumns, diff --git a/gitnexus/test/unit/analyze-respawn-progress-terminal.test.ts b/gitnexus/test/unit/analyze-respawn-progress-terminal.test.ts index 2daff60fea..de90ae4679 100644 --- a/gitnexus/test/unit/analyze-respawn-progress-terminal.test.ts +++ b/gitnexus/test/unit/analyze-respawn-progress-terminal.test.ts @@ -2,8 +2,10 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; interface CapturedTerminal { cursorTo(x?: number | null, y?: number | null): void; + lineWrapping(enabled: boolean): void; clearRight(): void; newline(): void; + write(s: string, rawWrite?: boolean): void; isTTY(): boolean; } @@ -69,6 +71,7 @@ const setStreamIsTTY = (stream: NodeJS.WriteStream, value: boolean): (() => void describe('analyzeCommand respawn progress terminal bridge', () => { const ORIGINAL_NODE_OPTIONS = process.env.NODE_OPTIONS; const ORIGINAL_RESPAWN_PROGRESS = process.env.GITNEXUS_RESPAWN_PROGRESS_TTY; + const ORIGINAL_COLUMNS = process.env.COLUMNS; let restoreStderrIsTTY: (() => void) | undefined; let stdoutWriteSpy: ReturnType; let stderrWriteSpy: ReturnType; @@ -100,6 +103,8 @@ describe('analyzeCommand respawn progress terminal bridge', () => { else process.env.NODE_OPTIONS = ORIGINAL_NODE_OPTIONS; if (ORIGINAL_RESPAWN_PROGRESS === undefined) delete process.env.GITNEXUS_RESPAWN_PROGRESS_TTY; else process.env.GITNEXUS_RESPAWN_PROGRESS_TTY = ORIGINAL_RESPAWN_PROGRESS; + if (ORIGINAL_COLUMNS === undefined) delete process.env.COLUMNS; + else process.env.COLUMNS = ORIGINAL_COLUMNS; }); it('uses an ANSI terminal shim instead of cli-progress non-TTY newline mode', async () => { @@ -121,4 +126,22 @@ describe('analyzeCommand respawn progress terminal bridge', () => { expect(stderrWriteSpy).toHaveBeenCalledWith('\x1B[0K'); expect(stderrWriteSpy).toHaveBeenCalledWith('\n'); }); + + it('truncates wrapped progress writes without splitting ANSI escapes or surrogate pairs', async () => { + process.env.COLUMNS = '3'; + const { analyzeCommand } = await import('../../src/cli/analyze.js'); + + await analyzeCommand(undefined, {}); + + const options = mocks.capturedBarOptions[0]; + options.terminal.write('ab\x1B[31mcd'); + expect(stderrWriteSpy).toHaveBeenLastCalledWith('ab\x1B[31mc'); + + process.env.COLUMNS = '4'; + options.terminal.write('abc😀def'); + expect(stderrWriteSpy).toHaveBeenLastCalledWith('abc'); + + options.terminal.write('abc😀def', true); + expect(stderrWriteSpy).toHaveBeenLastCalledWith('abc😀def'); + }); });