diff --git a/gitnexus/scripts/build.js b/gitnexus/scripts/build.js index ec7f67cf4c..e651708e37 100644 --- a/gitnexus/scripts/build.js +++ b/gitnexus/scripts/build.js @@ -18,6 +18,22 @@ const ROOT = path.resolve(__dirname, '..'); const SHARED_ROOT = path.resolve(ROOT, '..', 'gitnexus-shared'); const DIST = path.join(ROOT, 'dist'); const SHARED_DEST = path.join(DIST, '_shared'); +const DEFAULT_BUILD_TIMEOUT_MS = 300_000; + +function getBuildTimeoutMs() { + const raw = process.env.GITNEXUS_BUILD_TIMEOUT_MS; + if (raw === undefined || raw.trim() === '') return DEFAULT_BUILD_TIMEOUT_MS; + + const parsed = Number.parseInt(raw, 10); + if (Number.isFinite(parsed) && parsed > 0) return parsed; + + console.warn( + `[build] ignoring invalid GITNEXUS_BUILD_TIMEOUT_MS=${JSON.stringify(raw)}; using ${DEFAULT_BUILD_TIMEOUT_MS}ms`, + ); + return DEFAULT_BUILD_TIMEOUT_MS; +} + +const BUILD_TIMEOUT_MS = getBuildTimeoutMs(); // ── 1. Build gitnexus-shared ─────────────────────────────────────── console.log('[build] compiling gitnexus-shared…'); @@ -25,11 +41,11 @@ const tscCmd = process.platform === 'win32' ? path.join('node_modules', '.bin', 'tsc.cmd') : path.join('node_modules', '.bin', 'tsc'); -execSync(tscCmd, { cwd: SHARED_ROOT, stdio: 'inherit', timeout: 120_000 }); +execSync(tscCmd, { cwd: SHARED_ROOT, stdio: 'inherit', timeout: BUILD_TIMEOUT_MS }); // ── 2. Build gitnexus ────────────────────────────────────────────── console.log('[build] compiling gitnexus…'); -execSync(tscCmd, { cwd: ROOT, stdio: 'inherit', timeout: 120_000 }); +execSync(tscCmd, { cwd: ROOT, stdio: 'inherit', timeout: BUILD_TIMEOUT_MS }); // ── 3. Copy shared dist ──────────────────────────────────────────── console.log('[build] copying shared module into dist/_shared…'); @@ -82,9 +98,9 @@ if (fs.existsSync(path.join(WEB_ROOT, 'package.json'))) { console.log('[build] building gitnexus-web…'); if (!fs.existsSync(path.join(WEB_ROOT, 'node_modules'))) { console.log('[build] installing gitnexus-web dependencies…'); - execSync('npm ci', { cwd: WEB_ROOT, stdio: 'inherit', timeout: 120_000 }); + execSync('npm ci', { cwd: WEB_ROOT, stdio: 'inherit', timeout: BUILD_TIMEOUT_MS }); } - execSync('npm run build', { cwd: WEB_ROOT, stdio: 'inherit', timeout: 120_000 }); + execSync('npm run build', { cwd: WEB_ROOT, stdio: 'inherit', timeout: BUILD_TIMEOUT_MS }); // Copy dist → gitnexus/web/ (shipped in the npm package) fs.rmSync(WEB_DEST, { recursive: true, force: true }); diff --git a/gitnexus/src/cli/analyze.ts b/gitnexus/src/cli/analyze.ts index 6ea04df05d..29253f969c 100644 --- a/gitnexus/src/cli/analyze.ts +++ b/gitnexus/src/cli/analyze.ts @@ -9,7 +9,7 @@ */ import path from 'path'; -import { execFileSync } from 'child_process'; +import { spawn } from 'child_process'; import v8 from 'v8'; import cliProgress from 'cli-progress'; import { closeLbug } from '../core/lbug/lbug-adapter.js'; @@ -37,6 +37,7 @@ import { isHfDownloadFailure } from '../core/embeddings/hf-env.js'; // previous behaviour silently swallowed stack traces and made #1169 // indistinguishable from a no-op success on Windows. const realStderrWrite = process.stderr.write.bind(process.stderr); +const realStdoutWrite = process.stdout.write.bind(process.stdout); const writeFatalToStderr = (label: string, err: unknown): void => { const isErr = err instanceof Error; @@ -78,15 +79,274 @@ const HEAP_FLAG = `--max-old-space-size=${RESPAWN_HEAP_MB}`; /** Increase default stack size (KB) to prevent stack overflow on deep class hierarchies. */ const STACK_KB = 4096; const STACK_FLAG = `--stack-size=${STACK_KB}`; +const RESPAWN_OUTPUT_TAIL_CHARS = 1024 * 1024; +const RESPAWN_PROGRESS_ENV = 'GITNEXUS_RESPAWN_PROGRESS_TTY'; + +interface CliProgressTerminal { + cursorSave(): void; + cursorRestore(): void; + cursor(enabled: boolean): void; + lineWrapping(enabled: boolean): void; + cursorTo(x?: number | null, y?: number | null): void; + cursorRelative(dx?: number | null, dy?: number | null): void; + cursorRelativeReset(): void; + clearRight(): void; + clearLine(): void; + clearBottom(): void; + newline(): void; + write(s: string, rawWrite?: boolean): void; + isTTY(): boolean; + getWidth(): number; +} + +const terminalColumns = (): number => { + const parsed = Number(process.env.COLUMNS); + return Number.isFinite(parsed) && parsed > 0 ? Math.floor(parsed) : 80; +}; + +const ANSI_ESCAPE_PATTERN = + /\x1B(?:\[[0-?]*[ -/]*[@-~]|\][^\x07]*(?:\x07|\x1B\\)|[PX^_][\s\S]*?\x1B\\|[78]|[@-Z\\-_])/y; + +interface IntlSegmenterLike { + segment(input: string): Iterable<{ segment: string }>; +} + +type IntlWithOptionalSegmenter = typeof Intl & { + Segmenter?: new ( + locales?: string | string[], + options?: { granularity?: 'grapheme' }, + ) => IntlSegmenterLike; +}; + +const splitGraphemes = (text: string): string[] => { + const Segmenter = (Intl as IntlWithOptionalSegmenter).Segmenter; + if (Segmenter) { + return Array.from( + new Segmenter(undefined, { granularity: 'grapheme' }).segment(text), + (s) => s.segment, + ); + } + return Array.from(text); +}; + +const isZeroWidthCodePoint = (codePoint: number): boolean => + codePoint === 0x200d || + (codePoint >= 0x0300 && codePoint <= 0x036f) || + (codePoint >= 0x1ab0 && codePoint <= 0x1aff) || + (codePoint >= 0x1dc0 && codePoint <= 0x1dff) || + (codePoint >= 0x20d0 && codePoint <= 0x20ff) || + (codePoint >= 0xfe00 && codePoint <= 0xfe0f) || + (codePoint >= 0xfe20 && codePoint <= 0xfe2f); + +const isWideCodePoint = (codePoint: number): boolean => + codePoint >= 0x1100 && + (codePoint <= 0x115f || + codePoint === 0x2329 || + codePoint === 0x232a || + (codePoint >= 0x2e80 && codePoint <= 0xa4cf && codePoint !== 0x303f) || + (codePoint >= 0xac00 && codePoint <= 0xd7a3) || + (codePoint >= 0xf900 && codePoint <= 0xfaff) || + (codePoint >= 0xfe10 && codePoint <= 0xfe19) || + (codePoint >= 0xfe30 && codePoint <= 0xfe6f) || + (codePoint >= 0xff00 && codePoint <= 0xff60) || + (codePoint >= 0xffe0 && codePoint <= 0xffe6) || + (codePoint >= 0x1f300 && codePoint <= 0x1faff) || + (codePoint >= 0x20000 && codePoint <= 0x3fffd)); + +const visibleColumns = (text: string): number => { + let columns = 0; + for (const char of Array.from(text)) { + const codePoint = char.codePointAt(0); + if (codePoint === undefined || isZeroWidthCodePoint(codePoint)) continue; + columns += isWideCodePoint(codePoint) ? 2 : 1; + } + return columns; +}; + +const readAnsiEscapeAt = (text: string, index: number): string | undefined => { + ANSI_ESCAPE_PATTERN.lastIndex = index; + return ANSI_ESCAPE_PATTERN.exec(text)?.[0]; +}; + +const truncateAnsiToColumns = (text: string, maxColumns: number): string => { + if (!Number.isFinite(maxColumns) || maxColumns <= 0) return ''; + + let output = ''; + let columns = 0; + let index = 0; + + while (index < text.length) { + const escape = readAnsiEscapeAt(text, index); + if (escape) { + output += escape; + index += escape.length; + continue; + } + + const nextEscapeIndex = text.indexOf('\x1B', index); + const plainEnd = nextEscapeIndex === -1 ? text.length : nextEscapeIndex; + const plainText = text.slice(index, plainEnd); + + for (const segment of splitGraphemes(plainText)) { + const width = visibleColumns(segment); + if (width > 0 && columns + width > maxColumns) return output; + output += segment; + columns += width; + } + + index = plainEnd; + } + + return output; +}; + +const createAnsiPipeTerminal = (stream: NodeJS.WriteStream): CliProgressTerminal => { + let linewrap = true; + let dy = 0; + const write = (s: string): void => { + stream.write(s); + }; + const moveVertical = (delta: number): void => { + if (delta > 0) write(`\x1B[${delta}B`); + else if (delta < 0) write(`\x1B[${Math.abs(delta)}A`); + }; + + return { + cursorSave: () => write('\x1B7'), + cursorRestore: () => write('\x1B8'), + cursor: (enabled) => write(enabled ? '\x1B[?25h' : '\x1B[?25l'), + lineWrapping: (enabled) => { + linewrap = enabled; + write(enabled ? '\x1B[?7h' : '\x1B[?7l'); + }, + cursorTo: (x = null, y = null) => { + if (typeof y === 'number' && typeof x === 'number') { + write(`\x1B[${y + 1};${x + 1}H`); + return; + } + if (typeof x === 'number') { + write(x === 0 ? '\r' : `\x1B[${x + 1}G`); + } + }, + cursorRelative: (dx = null, nextDy = null) => { + if (typeof dx === 'number' && dx !== 0) { + write(dx > 0 ? `\x1B[${dx}C` : `\x1B[${Math.abs(dx)}D`); + } + if (typeof nextDy === 'number' && nextDy !== 0) { + dy += nextDy; + moveVertical(nextDy); + } + }, + cursorRelativeReset: () => { + moveVertical(-dy); + write('\r'); + dy = 0; + }, + clearRight: () => write('\x1B[0K'), + clearLine: () => write('\x1B[2K'), + clearBottom: () => write('\x1B[0J'), + newline: () => { + write('\n'); + dy++; + }, + write: (s, rawWrite = false) => { + const width = terminalColumns(); + write(linewrap && rawWrite === false ? truncateAnsiToColumns(s, width) : s); + }, + isTTY: () => true, + getWidth: terminalColumns, + }; +}; + +const shouldBridgeRespawnProgressTty = (): boolean => + process.stderr.isTTY === true || process.stdout.isTTY === true; + +interface RespawnExit { + status?: number | null; + signal?: NodeJS.Signals | null; + stdout?: string; + stderr?: string; + message?: string; +} + +const appendOutputTail = (tail: string, chunk: unknown): string => { + const text = Buffer.isBuffer(chunk) + ? chunk.toString('utf8') + : typeof chunk === 'string' + ? chunk + : String(chunk ?? ''); + if (!text) return tail; + const next = tail + text; + return next.length > RESPAWN_OUTPUT_TAIL_CHARS ? next.slice(-RESPAWN_OUTPUT_TAIL_CHARS) : next; +}; + +/** + * Run the respawned analyzer while teeing child output through to the parent + * and keeping a bounded tail for crash classification. + * + * `execFileSync(..., { stdio: 'inherit' })` preserved live progress but hid + * stderr/stdout from the parent on abnormal exits. That made every + * SIGABRT/status-134 child look like an output-less V8 heap OOM, even when the + * terminal had already shown a native crash such as + * `libc++abi: ... Napi::Error`. Piped streams plus an explicit tee keeps the UX + * and gives `childProcessLikelyOom` the evidence it needs. + */ +const runRespawnedAnalyze = ( + args: readonly string[], + env: NodeJS.ProcessEnv, +): Promise => + new Promise((resolve) => { + let stdout = ''; + let stderr = ''; + let settled = false; + const finish = (exit: RespawnExit): void => { + if (settled) return; + settled = true; + resolve(exit); + }; + + const child = spawn(process.execPath, [...args], { + stdio: ['inherit', 'pipe', 'pipe'], + env, + }); + + child.stdout?.on('data', (chunk) => { + stdout = appendOutputTail(stdout, chunk); + realStdoutWrite(chunk); + }); + child.stderr?.on('data', (chunk) => { + stderr = appendOutputTail(stderr, chunk); + realStderrWrite(chunk); + }); + child.on('error', (err) => { + finish({ + status: 1, + signal: null, + stdout, + stderr, + message: err instanceof Error ? err.message : String(err), + }); + }); + child.on('close', (status, signal) => { + finish({ + status, + signal, + stdout, + stderr, + message: `Command failed: ${process.execPath} ${args.join(' ')}`, + }); + }); + }); /** * Heuristic for "child re-exec likely died from V8 OOM". * - * Platform-independent detection is best-effort: V8/Node usually emit - * stable heap-exhaustion phrases in stderr/message across Linux/macOS/Windows - * (for example "JavaScript heap out of memory" or "Reached heap limit"), - * while some environments only expose status/signal (e.g. 134/SIGABRT). - * We combine both text signatures and process-exit signatures. + * Platform-independent detection is best-effort: V8/Node usually emit stable + * heap-exhaustion phrases in stderr/message across Linux/macOS/Windows (for + * example "JavaScript heap out of memory" or "Reached heap limit"). When the + * child produced no output at all, we still treat status 134/SIGABRT as likely + * heap OOM. If stderr/stdout contains a native crash diagnostic, the output + * evidence wins and we do not print heap guidance. */ const childProcessLikelyOom = (err: unknown): boolean => { if (!err || typeof err !== 'object') return false; @@ -122,6 +382,31 @@ const childProcessLikelyOom = (err: unknown): boolean => { return e.status === 134 || e.signal === 'SIGABRT'; }; +const childProcessLikelyNativeAbort = (err: unknown): boolean => { + if (!err || typeof err !== 'object') return false; + const e = err as { + stderr?: unknown; + stdout?: unknown; + message?: unknown; + }; + const hasNativeAbortSignature = (v: unknown): boolean => { + const text = ( + Buffer.isBuffer(v) ? v.toString('utf8') : typeof v === 'string' ? v : '' + ).toLowerCase(); + if (!text) return false; + return ( + text.includes('napi::error') || + text.includes('libc++abi: terminating') || + text.includes('abort trap') || + text.includes('native stack') || + text.includes('native worker') || + text.includes('native binding') + ); + }; + + return [e.message, e.stderr, e.stdout].some((v) => hasNativeAbortSignature(v)); +}; + const forceHeapOOMForTestIfEnabled = (): void => { if (process.env.GITNEXUS_TEST_FORCE_HEAP_OOM !== '1') return; // Allocate JS strings (not Buffers) so pressure lands on V8 heap itself. @@ -131,7 +416,7 @@ const forceHeapOOMForTestIfEnabled = (): void => { }; /** Re-exec the process with a 16GB heap and larger stack if we're currently below that. */ -function ensureHeap(): boolean { +async function ensureHeap(): Promise { const nodeOpts = process.env.NODE_OPTIONS || ''; if (nodeOpts.includes('--max-old-space-size')) return false; @@ -143,13 +428,15 @@ function ensureHeap(): boolean { const cliFlags = [HEAP_FLAG]; if (!nodeOpts.includes('--stack-size')) cliFlags.push(STACK_FLAG); - try { - execFileSync(process.execPath, [...cliFlags, ...process.argv.slice(1)], { - stdio: 'inherit', - env: { ...process.env, NODE_OPTIONS: `${nodeOpts} ${HEAP_FLAG}`.trim() }, - }); - } catch (e: unknown) { - if (childProcessLikelyOom(e)) { + const childArgs = [...cliFlags, ...process.argv.slice(1)]; + const childEnv = { + ...process.env, + NODE_OPTIONS: `${nodeOpts} ${HEAP_FLAG}`.trim(), + }; + if (shouldBridgeRespawnProgressTty()) childEnv[RESPAWN_PROGRESS_ENV] = '1'; + const childExit = await runRespawnedAnalyze(childArgs, childEnv); + if (childExit.status !== 0 || childExit.signal) { + if (childProcessLikelyOom(childExit)) { cliError( ` Analysis likely ran out of memory.\n` + ` Retry with a larger heap if your machine allows it:\n` + @@ -158,11 +445,18 @@ function ensureHeap(): boolean { ` If this persists, it may be a native crash unrelated to heap size.\n`, { recoveryHint: 'heap-oom-respawn' }, ); + } else if (childProcessLikelyNativeAbort(childExit)) { + cliError( + ` Analysis aborted in a native worker or native binding path.\n` + + ` Try one of these recovery paths:\n` + + ` gitnexus analyze --workers 0\n` + + ` npm uninstall -g gitnexus && npm install -g gitnexus@latest\n` + + ` Use Node 22 LTS if you are on a newer non-LTS runtime.\n`, + { recoveryHint: 'native-worker-abort' }, + ); } const status = - typeof e === 'object' && e !== null && 'status' in e && typeof e.status === 'number' - ? e.status - : 1; + typeof childExit.status === 'number' && childExit.status !== 0 ? childExit.status : 1; process.exitCode = status; } return true; @@ -185,6 +479,7 @@ const ANALYZE_CLI_ENV_KEYS = [ 'GITNEXUS_EMBEDDING_BATCH_SIZE', 'GITNEXUS_EMBEDDING_SUB_BATCH_SIZE', 'GITNEXUS_EMBEDDING_DEVICE', + 'GITNEXUS_ANALYZE_PROGRESS_ACTIVE', ] as const; type AnalyzeEnvSnapshot = Record<(typeof ANALYZE_CLI_ENV_KEYS)[number], string | undefined>; @@ -292,7 +587,7 @@ export const shouldGenerateCommunitySkillFiles = ( ): boolean => Boolean(options?.skills && pipelineResult && !options?.indexOnly); export const analyzeCommand = async (inputPath?: string, options?: AnalyzeOptions) => { - if (ensureHeap()) return; + if (await ensureHeap()) return; forceHeapOOMForTestIfEnabled(); // Install fatal handlers immediately after re-exec resolution so any @@ -515,19 +810,25 @@ const analyzeCommandImpl = async (inputPath?: string, options?: AnalyzeOptions): } // ── CLI progress bar setup ───────────────────────────────────────── - const bar = new cliProgress.SingleBar( - { - format: ' {bar} {percentage}% | {phase}', - barCompleteChar: '\u2588', - barIncompleteChar: '\u2591', - hideCursor: true, - barGlue: '', - autopadding: true, - clearOnComplete: false, - stopOnComplete: false, - }, - cliProgress.Presets.shades_grey, - ); + const barOptions: cliProgress.Options & { terminal?: CliProgressTerminal } = { + format: ' {bar} {percentage}% | {phase}', + barCompleteChar: '\u2588', + barIncompleteChar: '\u2591', + hideCursor: true, + barGlue: '', + autopadding: true, + clearOnComplete: false, + stopOnComplete: false, + }; + if (process.env[RESPAWN_PROGRESS_ENV] === '1' && process.stderr.isTTY !== true) { + // Heap respawn pipes stderr so the parent can classify native/OOM crashes. + // The parent was a real TTY when it opted into this env var, so forward + // ANSI cursor controls through the pipe instead of cli-progress' non-TTY + // newline mode. That keeps one-line redraw UX while retaining stderr tail + // capture for diagnostics. + barOptions.terminal = createAnsiPipeTerminal(process.stderr); + } + const bar = new cliProgress.SingleBar(barOptions, cliProgress.Presets.shades_grey); bar.start(100, 0, { phase: 'Initializing...' }); @@ -561,7 +862,7 @@ const analyzeCommandImpl = async (inputPath?: string, options?: AnalyzeOptions): // eslint-disable-next-line no-console -- intentional console-routing for progress bar UX const origError = console.error.bind(console); let barCurrentValue = 0; - const barLog = (...args: any[]) => { + const barLog = (...args: unknown[]) => { process.stdout.write('\x1b[2K\r'); origLog(args.map((a) => (typeof a === 'string' ? a : String(a))).join(' ')); bar.update(barCurrentValue); @@ -571,6 +872,7 @@ const analyzeCommandImpl = async (inputPath?: string, options?: AnalyzeOptions): console.warn = barLog; // eslint-disable-next-line no-console -- intentional console-routing for progress bar UX console.error = barLog; + process.env.GITNEXUS_ANALYZE_PROGRESS_ACTIVE = '1'; // Track elapsed time per phase let lastPhaseLabel = 'Initializing...'; diff --git a/gitnexus/src/core/ingestion/filesystem-walker.ts b/gitnexus/src/core/ingestion/filesystem-walker.ts index c30ea43216..9ba959ea64 100644 --- a/gitnexus/src/core/ingestion/filesystem-walker.ts +++ b/gitnexus/src/core/ingestion/filesystem-walker.ts @@ -23,6 +23,21 @@ export interface FilePath { } const READ_CONCURRENCY = 32; +const ANALYZE_PROGRESS_ACTIVE_ENV = 'GITNEXUS_ANALYZE_PROGRESS_ACTIVE'; + +const warnLargeFileSkip = (message: string): void => { + if (process.env[ANALYZE_PROGRESS_ACTIVE_ENV] === '1') { + // analyze.ts routes console.warn through the progress bar logger while + // the bar is active. Emitting the operator-facing large-file notice there + // avoids raw pino NDJSON corrupting the one-line progress display in the + // heap-respawn child, whose stderr is intentionally piped for crash + // classification. + // eslint-disable-next-line no-console -- intentionally routed by analyze progress UI + console.warn(message); + return; + } + logger.warn(message); +}; /** * Phase 1: Scan repository — stat files to get paths + sizes, no content loaded. @@ -76,7 +91,9 @@ export const walkRepositoryPaths = async ( const isDefault = maxFileSizeBytes === DEFAULT_MAX_FILE_SIZE_BYTES; const isOverrideUnset = !process.env.GITNEXUS_MAX_FILE_SIZE; const suffix = isDefault ? ', likely generated/vendored' : ''; - logger.warn(` Skipped ${skippedLarge} large files (>${maxFileSizeBytes / 1024}KB${suffix})`); + warnLargeFileSkip( + ` Skipped ${skippedLarge} large files (>${maxFileSizeBytes / 1024}KB${suffix})`, + ); // Always show at least the first few paths so users can diagnose why // edges are missing from a specific file (issue #1659). The full list is @@ -88,17 +105,19 @@ export const walkRepositoryPaths = async ( const showAll = isVerboseIngestionEnabled() || skippedLargePaths.length <= SKIPPED_PREVIEW_CAP; const preview = showAll ? skippedLargePaths : skippedLargePaths.slice(0, SKIPPED_PREVIEW_CAP); for (const p of preview) { - logger.warn(` - ${p}`); + warnLargeFileSkip(` - ${p}`); } if (!showAll) { const remaining = skippedLargePaths.length - SKIPPED_PREVIEW_CAP; - logger.warn(` ...and ${remaining} more (set GITNEXUS_VERBOSE=1 to list them all)`); + warnLargeFileSkip(` ...and ${remaining} more (set GITNEXUS_VERBOSE=1 to list them all)`); } // Only hint about the env var when the user has not set it at all. An // explicit GITNEXUS_MAX_FILE_SIZE=512 happens to resolve to the same // bytes as the default but the operator clearly already knows the knob. if (isDefault && isOverrideUnset) { - logger.warn(` Set GITNEXUS_MAX_FILE_SIZE= to include files above the default cap.`); + warnLargeFileSkip( + ` Set GITNEXUS_MAX_FILE_SIZE= to include files above the default cap.`, + ); } } diff --git a/gitnexus/src/core/ingestion/pipeline-phases/parse-impl.ts b/gitnexus/src/core/ingestion/pipeline-phases/parse-impl.ts index 46945d74c9..10e4557d24 100644 --- a/gitnexus/src/core/ingestion/pipeline-phases/parse-impl.ts +++ b/gitnexus/src/core/ingestion/pipeline-phases/parse-impl.ts @@ -48,7 +48,7 @@ import { ASTCache, createASTCache } from '../ast-cache.js'; import { type PipelineProgress, getLanguageFromFilename } from 'gitnexus-shared'; import { readFileContents } from '../filesystem-walker.js'; import { isLanguageAvailable } from '../../tree-sitter/parser-loader.js'; -import { createWorkerPool } from '../workers/worker-pool.js'; +import { createWorkerPool, WorkerPoolInitializationError } from '../workers/worker-pool.js'; import type { WorkerPool } from '../workers/worker-pool.js'; import type { ExtractedAssignment, @@ -252,20 +252,23 @@ export async function runChunkedParseAndResolve( const MIN_BYTES_FOR_WORKERS = options?.workerThresholdsForTest?.minBytes ?? 512 * 1024; const totalBytes = parseableScanned.reduce((s, f) => s + f.size, 0); - // Create worker pool once, reuse across chunks. + // Create worker pool lazily, reuse across cache-miss chunks. // // `workerPoolSize === 0` is a programmatic equivalent of `skipWorkers: // true` per the `PipelineOptions.workerPoolSize` contract. Short- - // circuiting here avoids constructing a useless pool that rejects - // every dispatch (with a `Worker pool parsing stopped` warn log per - // chunk) just to fall back to the sequential path via the error - // catch — the gate honors the docstring directly. - let workerPool: WorkerPool | undefined; - if ( + // circuiting here avoids constructing a useless pool. The pool is + // intentionally NOT created before parse-cache lookup: a warm-cache + // all-hit run should replay cached worker output without loading + // parse-worker.js or any tree-sitter/N-API native bindings. + const shouldUseWorkers = !options?.skipWorkers && options?.workerPoolSize !== 0 && - (totalParseable >= MIN_FILES_FOR_WORKERS || totalBytes >= MIN_BYTES_FOR_WORKERS) - ) { + (totalParseable >= MIN_FILES_FOR_WORKERS || totalBytes >= MIN_BYTES_FOR_WORKERS); + let workerPool: WorkerPool | undefined; + let workerPoolDisabled = false; + const getOrCreateWorkerPool = (): WorkerPool | undefined => { + if (!shouldUseWorkers || workerPoolDisabled) return undefined; + if (workerPool) return workerPool; try { // U20.U3 test-only injection: integration tests pass a custom // worker script URL via `workerUrlForTest` (mirrors the @@ -296,13 +299,16 @@ export async function runChunkedParseAndResolve( } } workerPool = createWorkerPool(workerUrl, options?.workerPoolSize); + return workerPool; } catch (err) { + workerPoolDisabled = true; logger.warn( { err: (err as Error).message }, 'Worker pool creation failed, using sequential fallback:', ); + return undefined; } - } + }; let filesParsedSoFar = 0; @@ -418,12 +424,18 @@ export async function runChunkedParseAndResolve( // never saw the log (M3 from PR #1693 review). const chunkStartMs: number | null = verboseThroughputLog ? Date.now() : null; - const chunkContents = await chunkContentPromises[chunkIdx]!; + const chunkContentPromise = chunkContentPromises[chunkIdx]; + if (!chunkContentPromise) { + throw new Error(`Missing prefetched parse chunk ${chunkIdx + 1}/${numChunks}`); + } + const chunkContents = await chunkContentPromise; chunkContentPromises[chunkIdx] = undefined; // release the in-memory copy startChunkPrefetch(chunkIdx + parseChunkConcurrency); - const chunkFiles = chunkPaths - .filter((p) => chunkContents.has(p)) - .map((p) => ({ path: p, content: chunkContents.get(p)! })); + const chunkFiles: Array<{ path: string; content: string }> = []; + for (const p of chunkPaths) { + const content = chunkContents.get(p); + if (content !== undefined) chunkFiles.push({ path: p, content }); + } // Compute the chunk's content-hash signature (if cache available). let chunkHash: string | null = null; @@ -436,7 +448,7 @@ export async function runChunkedParseAndResolve( } let chunkWorkerData: WorkerExtractedData | null; - const cachedRaw = chunkHash ? parseCache!.entries.get(chunkHash) : undefined; + const cachedRaw = chunkHash && parseCache ? parseCache.entries.get(chunkHash) : undefined; // Track every chunk hash we touched so the orchestrator can // prune stale entries (chunks whose composition no longer @@ -450,7 +462,7 @@ export async function runChunkedParseAndResolve( chunkWorkerData = mergeChunkResults(graph, symbolTable, cachedRaw); if (isDev) { logger.info( - `📦 parse-cache HIT: chunk ${chunkIdx + 1}/${numChunks} (${chunkFiles.length} files, ${chunkHash!.slice(0, 8)})`, + `📦 parse-cache HIT: chunk ${chunkIdx + 1}/${numChunks} (${chunkFiles.length} files, ${chunkHash?.slice(0, 8) ?? 'unknown'})`, ); } // Progress update so UI advances even on a cache hit. @@ -474,33 +486,61 @@ export async function runChunkedParseAndResolve( // them under the chunk hash for the next run. chunkCacheMisses++; const rawResults: ParseWorkerResult[] = []; - chunkWorkerData = await processParsing( - graph, - chunkFiles, - symbolTable, - astCache, - scopeTreeCache, - (current, _total, filePath) => { - const globalCurrent = filesParsedSoFar + current; - // Parse phase covers 20-70 (M2). Deferred extraction handles 70-95. - const parsingProgress = 20 + (globalCurrent / totalParseable) * 50; - onProgress({ - phase: 'parsing', - percent: Math.round(parsingProgress), - message: `Parsing chunk ${chunkIdx + 1}/${numChunks}...`, - detail: filePath, - stats: { - filesProcessed: globalCurrent, - totalFiles: totalParseable, - nodesCreated: graph.nodeCount, - }, - }); - }, - workerPool, - // Capture raw results only when we have a cache to write to — - // otherwise we'd retain extra arrays for nothing. - parseCache && chunkHash ? rawResults : undefined, - ); + const progressForChunk = (current: number, _total: number, filePath: string) => { + const globalCurrent = filesParsedSoFar + current; + // Parse phase covers 20-70 (M2). Deferred extraction handles 70-95. + const parsingProgress = 20 + (globalCurrent / totalParseable) * 50; + onProgress({ + phase: 'parsing', + percent: Math.round(parsingProgress), + message: `Parsing chunk ${chunkIdx + 1}/${numChunks}...`, + detail: filePath, + stats: { + filesProcessed: globalCurrent, + totalFiles: totalParseable, + nodesCreated: graph.nodeCount, + }, + }); + }; + const activeWorkerPool = getOrCreateWorkerPool(); + try { + chunkWorkerData = await processParsing( + graph, + chunkFiles, + symbolTable, + astCache, + scopeTreeCache, + progressForChunk, + activeWorkerPool, + // Capture raw results only when we have a cache to write to — + // otherwise we'd retain extra arrays for nothing. + parseCache && chunkHash && activeWorkerPool ? rawResults : undefined, + ); + } catch (err) { + if (!(err instanceof WorkerPoolInitializationError)) throw err; + logger.warn( + { + err: err.message, + readinessFailures: err.readinessFailures, + }, + 'Worker pool initialization failed, using sequential fallback:', + ); + rawResults.length = 0; + workerPoolDisabled = true; + const failedPool = workerPool; + workerPool = undefined; + await failedPool?.terminate().catch(() => undefined); + chunkWorkerData = await processParsing( + graph, + chunkFiles, + symbolTable, + astCache, + scopeTreeCache, + progressForChunk, + undefined, + undefined, + ); + } // Persist the raw results for this chunk hash. Sequential path // doesn't populate rawResults (it writes directly to graph), so // small repos without worker pool simply don't cache. That's fine. @@ -843,9 +883,11 @@ export async function runChunkedParseAndResolve( const cachedSequentialChunkFiles: Array> = []; for (const chunkPaths of sequentialChunkPaths) { const chunkContents = await readFileContents(repoPath, chunkPaths); - const chunkFiles = chunkPaths - .filter((p) => chunkContents.has(p)) - .map((p) => ({ path: p, content: chunkContents.get(p)! })); + const chunkFiles: Array<{ path: string; content: string }> = []; + for (const p of chunkPaths) { + const content = chunkContents.get(p); + if (content !== undefined) chunkFiles.push({ path: p, content }); + } cachedSequentialChunkFiles.push(chunkFiles); astCache = createASTCache(chunkFiles.length); const sequentialHeritage = await extractExtractedHeritageFromFiles(chunkFiles, astCache); diff --git a/gitnexus/src/core/ingestion/workers/worker-pool.ts b/gitnexus/src/core/ingestion/workers/worker-pool.ts index fc032bd565..d29b352ab4 100644 --- a/gitnexus/src/core/ingestion/workers/worker-pool.ts +++ b/gitnexus/src/core/ingestion/workers/worker-pool.ts @@ -235,6 +235,20 @@ export class WorkerPoolDispatchError extends Error { } } +export class WorkerPoolInitializationError extends WorkerPoolDispatchError { + readonly readinessFailures: readonly string[]; + + constructor( + message: string, + quarantinedPaths: readonly string[] = [], + readinessFailures: readonly string[] = [], + ) { + super(message, quarantinedPaths); + this.name = 'WorkerPoolInitializationError'; + this.readinessFailures = readinessFailures; + } +} + /** Message shapes sent back by worker threads. */ type WorkerOutgoingMessage = | { type: 'progress'; filesProcessed: number } @@ -592,6 +606,7 @@ export const createWorkerPool = ( // 1100+ LOC of pool plumbing. Public worker-pool API is unchanged — // `getQuarantinedPaths()` still returns the same defensive copy. const quarantine = createQuarantine(); + const initialReadinessFailures: string[] = []; // Per-slot consecutive-failure counter (F6): replaces the prior pool-wide // scalar so a chronically-failing slot trips the breaker on its own // failure streak instead of being masked by another slot's successes. @@ -636,6 +651,7 @@ export const createWorkerPool = ( try { await waitForWorkerReady(w); } catch (err) { + initialReadinessFailures.push(err instanceof Error ? err.message : String(err)); logger.warn( { workerIndex: i, @@ -673,7 +689,15 @@ export const createWorkerPool = ( } if (items.length === 0) return []; if (activeSlots.size === 0) { - throw new WorkerPoolDispatchError('Worker pool has no active workers', []); + const detail = + initialReadinessFailures.length > 0 + ? ` after initial ready handshake: ${initialReadinessFailures.join('; ')}` + : ''; + throw new WorkerPoolInitializationError( + `Worker pool has no active workers${detail}`, + [], + initialReadinessFailures, + ); } // Layer 3: filter out quarantined paths so a known-bad file never reaches diff --git a/gitnexus/test/integration/filesystem-walker.test.ts b/gitnexus/test/integration/filesystem-walker.test.ts index 1d743a8989..8f934573c7 100644 --- a/gitnexus/test/integration/filesystem-walker.test.ts +++ b/gitnexus/test/integration/filesystem-walker.test.ts @@ -438,6 +438,28 @@ describe('filesystem-walker', () => { .filter((r) => String(r.msg ?? '').includes('GITNEXUS_MAX_FILE_SIZE=')); expect(hint.length).toBe(0); }); + + it('routes large-file notices through console.warn while analyze progress is active', async () => { + const originalProgressActive = process.env.GITNEXUS_ANALYZE_PROGRESS_ACTIVE; + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined); + try { + process.env.GITNEXUS_ANALYZE_PROGRESS_ACTIVE = '1'; + await walkRepositoryPaths(sizeDir); + const messages = warnSpy.mock.calls.map(([msg]) => String(msg)); + expect(messages.some((m) => m.includes('Skipped 1 large files'))).toBe(true); + expect(messages.some((m) => m.includes(BIG_FILE))).toBe(true); + expect(cap.records().filter((r) => String(r.msg ?? '').includes('Skipped '))).toHaveLength( + 0, + ); + } finally { + warnSpy.mockRestore(); + if (originalProgressActive === undefined) { + delete process.env.GITNEXUS_ANALYZE_PROGRESS_ACTIVE; + } else { + process.env.GITNEXUS_ANALYZE_PROGRESS_ACTIVE = originalProgressActive; + } + } + }); }); describe('large file skip preview cap (#1659)', () => { diff --git a/gitnexus/test/integration/worker-pool.test.ts b/gitnexus/test/integration/worker-pool.test.ts index 5c5f11944e..b8740957ba 100644 --- a/gitnexus/test/integration/worker-pool.test.ts +++ b/gitnexus/test/integration/worker-pool.test.ts @@ -394,7 +394,7 @@ describe('worker pool integration', () => { const { parentPort } = require('node:worker_threads'); const markerPath = ${JSON.stringify(markerPath)}; if (fs.existsSync(markerPath)) { - throw new Error('simulated startup crash'); + process.exit(1); } parentPort.on('message', (msg) => { if (msg && msg.type === 'sub-batch') { diff --git a/gitnexus/test/unit/analyze-heap-respawn.test.ts b/gitnexus/test/unit/analyze-heap-respawn.test.ts index 2f094ddbfd..9a2db16594 100644 --- a/gitnexus/test/unit/analyze-heap-respawn.test.ts +++ b/gitnexus/test/unit/analyze-heap-respawn.test.ts @@ -1,11 +1,12 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { EventEmitter } from 'node:events'; -const execFileSyncMock = vi.fn(); +const spawnMock = vi.fn(); const getHeapStatisticsMock = vi.fn(); vi.mock('child_process', async () => { const actual = await vi.importActual('child_process'); - return { ...actual, execFileSync: execFileSyncMock }; + return { ...actual, spawn: spawnMock }; }); vi.mock('v8', () => ({ @@ -18,33 +19,99 @@ vi.mock('../../src/core/lbug/lbug-adapter.js', () => ({ closeLbug: vi.fn(async () => undefined), })); +const mockSpawnExit = ({ + status = 0, + signal = null, + stdout = '', + stderr = '', +}: { + status?: number | null; + signal?: NodeJS.Signals | null; + stdout?: string | Buffer; + stderr?: string | Buffer; +} = {}) => { + spawnMock.mockImplementationOnce(() => { + const child = new EventEmitter() as EventEmitter & { + stdout: EventEmitter; + stderr: EventEmitter; + }; + child.stdout = new EventEmitter(); + child.stderr = new EventEmitter(); + queueMicrotask(() => { + if (stdout) child.stdout.emit('data', stdout); + if (stderr) child.stderr.emit('data', stderr); + child.emit('close', status, signal); + }); + return child; + }); +}; + +const setStreamIsTTY = (stream: NodeJS.WriteStream, value: boolean): (() => void) => { + const descriptor = Object.getOwnPropertyDescriptor(stream, 'isTTY'); + Object.defineProperty(stream, 'isTTY', { configurable: true, value }); + return () => { + if (descriptor) Object.defineProperty(stream, 'isTTY', descriptor); + else delete (stream as NodeJS.WriteStream & { isTTY?: boolean }).isTTY; + }; +}; + describe('analyzeCommand heap respawn', () => { let initialNodeOptions: string | undefined; + let stdoutWriteSpy: ReturnType; + let stderrWriteSpy: ReturnType; + let restoreStdoutIsTTY: (() => void) | undefined; + let restoreStderrIsTTY: (() => void) | undefined; beforeEach(() => { initialNodeOptions = process.env.NODE_OPTIONS; vi.resetModules(); - execFileSyncMock.mockReset(); + spawnMock.mockReset(); getHeapStatisticsMock.mockReset(); process.exitCode = undefined; + stdoutWriteSpy = vi.spyOn(process.stdout, 'write').mockImplementation(() => true); + stderrWriteSpy = vi.spyOn(process.stderr, 'write').mockImplementation(() => true); }); afterEach(() => { + restoreStdoutIsTTY?.(); + restoreStderrIsTTY?.(); + restoreStdoutIsTTY = undefined; + restoreStderrIsTTY = undefined; + stdoutWriteSpy.mockRestore(); + stderrWriteSpy.mockRestore(); if (initialNodeOptions === undefined) delete process.env.NODE_OPTIONS; else process.env.NODE_OPTIONS = initialNodeOptions; }); - it('re-execs analyze with 16GB heap when no max-old-space-size is present', async () => { + it('re-execs analyze with 16GB heap and bridges progress redraw when parent is a TTY', async () => { delete process.env.NODE_OPTIONS; + restoreStderrIsTTY = setStreamIsTTY(process.stderr, true); getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); + mockSpawnExit(); const { analyzeCommand } = await import('../../src/cli/analyze.js'); await analyzeCommand(undefined, {}); - expect(execFileSyncMock).toHaveBeenCalledTimes(1); - const [, args, opts] = execFileSyncMock.mock.calls[0]; + expect(spawnMock).toHaveBeenCalledTimes(1); + const [, args, opts] = spawnMock.mock.calls[0]; expect(args).toContain('--max-old-space-size=16384'); expect(opts.env.NODE_OPTIONS).toContain('--max-old-space-size=16384'); + expect(opts.env.GITNEXUS_RESPAWN_PROGRESS_TTY).toBe('1'); + }); + + it('does not force ANSI progress when the parent output is not a TTY', async () => { + delete process.env.NODE_OPTIONS; + restoreStdoutIsTTY = setStreamIsTTY(process.stdout, false); + restoreStderrIsTTY = setStreamIsTTY(process.stderr, false); + getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); + mockSpawnExit(); + + const { analyzeCommand } = await import('../../src/cli/analyze.js'); + await analyzeCommand(undefined, {}); + + expect(spawnMock).toHaveBeenCalledTimes(1); + const [, , opts] = spawnMock.mock.calls[0]; + expect(opts.env.GITNEXUS_RESPAWN_PROGRESS_TTY).toBeUndefined(); }); it('does not re-exec when NODE_OPTIONS already defines max-old-space-size', async () => { @@ -54,18 +121,13 @@ describe('analyzeCommand heap respawn', () => { const { analyzeCommand } = await import('../../src/cli/analyze.js'); await analyzeCommand('/__gitnexus_nonexistent__', {}); - expect(execFileSyncMock).not.toHaveBeenCalled(); + expect(spawnMock).not.toHaveBeenCalled(); }); it('prints heap guidance when respawned analyze exits with likely OOM', async () => { delete process.env.NODE_OPTIONS; getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); - execFileSyncMock.mockImplementationOnce(() => { - const err = new Error('child failed') as Error & { status?: number; signal?: string }; - err.status = undefined; - err.signal = 'SIGABRT'; - throw err; - }); + mockSpawnExit({ status: null, signal: 'SIGABRT' }); const { _captureLogger } = await import('../../src/core/logger.js'); const cap = _captureLogger(); @@ -89,18 +151,12 @@ describe('analyzeCommand heap respawn', () => { it('prints heap guidance when child stderr contains heap OOM signature', async () => { delete process.env.NODE_OPTIONS; getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); - execFileSyncMock.mockImplementationOnce(() => { - const err = new Error('Command failed') as Error & { - status?: number; - signal?: string; - stderr?: Buffer; - }; - err.status = 1; - err.signal = undefined; - err.stderr = Buffer.from( + mockSpawnExit({ + status: 1, + signal: null, + stderr: Buffer.from( 'FATAL ERROR: Reached heap limit Allocation failed - JavaScript heap out of memory', - ); - throw err; + ), }); const { _captureLogger } = await import('../../src/core/logger.js'); @@ -118,16 +174,10 @@ describe('analyzeCommand heap respawn', () => { it('prints heap guidance when child stdout contains heap OOM signature', async () => { delete process.env.NODE_OPTIONS; getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); - execFileSyncMock.mockImplementationOnce(() => { - const err = new Error('Command failed') as Error & { - status?: number; - signal?: string; - stdout?: string; - }; - err.status = 1; - err.signal = undefined; - err.stdout = 'FATAL ERROR: JavaScript heap out of memory'; - throw err; + mockSpawnExit({ + status: 1, + signal: null, + stdout: 'FATAL ERROR: JavaScript heap out of memory', }); const { _captureLogger } = await import('../../src/core/logger.js'); @@ -145,19 +195,7 @@ describe('analyzeCommand heap respawn', () => { it('prints heap guidance when child exits 134 without output', async () => { delete process.env.NODE_OPTIONS; getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); - execFileSyncMock.mockImplementationOnce(() => { - const err = new Error('Command failed') as Error & { - status?: number; - signal?: string; - stderr?: string; - stdout?: string; - }; - err.status = 134; - err.signal = undefined; - err.stderr = ''; - err.stdout = ''; - throw err; - }); + mockSpawnExit({ status: 134, signal: null, stderr: '', stdout: '' }); const { _captureLogger } = await import('../../src/core/logger.js'); const cap = _captureLogger(); @@ -174,16 +212,10 @@ describe('analyzeCommand heap respawn', () => { it('does not print heap guidance for non-OOM child failures with output', async () => { delete process.env.NODE_OPTIONS; getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); - execFileSyncMock.mockImplementationOnce(() => { - const err = new Error('Command failed') as Error & { - status?: number; - signal?: string; - stderr?: Buffer; - }; - err.status = 2; - err.signal = undefined; - err.stderr = Buffer.from('parser failed: invalid token'); - throw err; + mockSpawnExit({ + status: 2, + signal: null, + stderr: Buffer.from('parser failed: invalid token'), }); const { _captureLogger } = await import('../../src/core/logger.js'); @@ -197,4 +229,30 @@ describe('analyzeCommand heap respawn', () => { ); cap.restore(); }); + + it('does not print heap guidance when a SIGABRT child emitted a native N-API crash', async () => { + delete process.env.NODE_OPTIONS; + getHeapStatisticsMock.mockReturnValue({ heap_size_limit: 512 * 1024 * 1024 }); + mockSpawnExit({ + status: 134, + signal: null, + stderr: Buffer.from('libc++abi: terminating due to uncaught exception of type Napi::Error'), + }); + + const { _captureLogger } = await import('../../src/core/logger.js'); + const cap = _captureLogger(); + const { analyzeCommand } = await import('../../src/cli/analyze.js'); + await analyzeCommand(undefined, {}); + + expect(process.exitCode).toBe(134); + expect(cap.records().some((r) => r.msg.includes('Analysis likely ran out of memory.'))).toBe( + false, + ); + expect(cap.records().some((r) => r.msg.includes('Analysis aborted in a native worker'))).toBe( + true, + ); + expect(cap.records().some((r) => r.recoveryHint === 'native-worker-abort')).toBe(true); + expect(stderrWriteSpy).toHaveBeenCalled(); + cap.restore(); + }); }); diff --git a/gitnexus/test/unit/analyze-respawn-progress-terminal.test.ts b/gitnexus/test/unit/analyze-respawn-progress-terminal.test.ts new file mode 100644 index 0000000000..de90ae4679 --- /dev/null +++ b/gitnexus/test/unit/analyze-respawn-progress-terminal.test.ts @@ -0,0 +1,147 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +interface CapturedTerminal { + cursorTo(x?: number | null, y?: number | null): void; + lineWrapping(enabled: boolean): void; + clearRight(): void; + newline(): void; + write(s: string, rawWrite?: boolean): void; + isTTY(): boolean; +} + +interface CapturedBarOptions { + noTTYOutput?: boolean; + notTTYSchedule?: number; + terminal?: CapturedTerminal; +} + +const mocks = vi.hoisted(() => ({ + runFullAnalysisMock: vi.fn(), + capturedBarOptions: [] as CapturedBarOptions[], +})); + +vi.mock('cli-progress', () => ({ + default: { + SingleBar: vi.fn(function (options: CapturedBarOptions) { + mocks.capturedBarOptions.push(options); + return { + start: vi.fn(), + update: vi.fn(), + stop: vi.fn(), + }; + }), + Presets: { shades_grey: {} }, + }, +})); + +vi.mock('../../src/core/run-analyze.js', () => ({ + runFullAnalysis: mocks.runFullAnalysisMock, +})); + +vi.mock('../../src/core/lbug/lbug-adapter.js', () => ({ + closeLbug: vi.fn(async () => undefined), +})); + +vi.mock('../../src/storage/repo-manager.js', () => ({ + getStoragePaths: vi.fn(() => ({ storagePath: '.gitnexus', lbugPath: '.gitnexus/lbug' })), + getGlobalRegistryPath: vi.fn(() => 'registry.json'), + RegistryNameCollisionError: class RegistryNameCollisionError extends Error {}, + AnalysisNotFinalizedError: class AnalysisNotFinalizedError extends Error {}, + assertAnalysisFinalized: vi.fn(async () => undefined), +})); + +vi.mock('../../src/storage/git.js', () => ({ + getGitRoot: vi.fn(() => '/repo'), + hasGitDir: vi.fn(() => true), +})); + +vi.mock('../../src/core/ingestion/utils/max-file-size.js', () => ({ + getMaxFileSizeBannerMessage: vi.fn(() => null), +})); + +const setStreamIsTTY = (stream: NodeJS.WriteStream, value: boolean): (() => void) => { + const descriptor = Object.getOwnPropertyDescriptor(stream, 'isTTY'); + Object.defineProperty(stream, 'isTTY', { configurable: true, value }); + return () => { + if (descriptor) Object.defineProperty(stream, 'isTTY', descriptor); + else delete (stream as NodeJS.WriteStream & { isTTY?: boolean }).isTTY; + }; +}; + +describe('analyzeCommand respawn progress terminal bridge', () => { + const ORIGINAL_NODE_OPTIONS = process.env.NODE_OPTIONS; + const ORIGINAL_RESPAWN_PROGRESS = process.env.GITNEXUS_RESPAWN_PROGRESS_TTY; + const ORIGINAL_COLUMNS = process.env.COLUMNS; + let restoreStderrIsTTY: (() => void) | undefined; + let stdoutWriteSpy: ReturnType; + let stderrWriteSpy: ReturnType; + + beforeEach(() => { + vi.resetModules(); + mocks.runFullAnalysisMock.mockReset(); + mocks.capturedBarOptions.length = 0; + mocks.runFullAnalysisMock.mockResolvedValue({ + repoName: 'repo', + repoPath: '/repo', + stats: {}, + alreadyUpToDate: true, + }); + process.exitCode = undefined; + process.env.NODE_OPTIONS = '--max-old-space-size=8192'; + process.env.GITNEXUS_RESPAWN_PROGRESS_TTY = '1'; + restoreStderrIsTTY = setStreamIsTTY(process.stderr, false); + stdoutWriteSpy = vi.spyOn(process.stdout, 'write').mockImplementation(() => true); + stderrWriteSpy = vi.spyOn(process.stderr, 'write').mockImplementation(() => true); + }); + + afterEach(() => { + stdoutWriteSpy.mockRestore(); + stderrWriteSpy.mockRestore(); + restoreStderrIsTTY?.(); + restoreStderrIsTTY = undefined; + if (ORIGINAL_NODE_OPTIONS === undefined) delete process.env.NODE_OPTIONS; + else process.env.NODE_OPTIONS = ORIGINAL_NODE_OPTIONS; + if (ORIGINAL_RESPAWN_PROGRESS === undefined) delete process.env.GITNEXUS_RESPAWN_PROGRESS_TTY; + else process.env.GITNEXUS_RESPAWN_PROGRESS_TTY = ORIGINAL_RESPAWN_PROGRESS; + if (ORIGINAL_COLUMNS === undefined) delete process.env.COLUMNS; + else process.env.COLUMNS = ORIGINAL_COLUMNS; + }); + + it('uses an ANSI terminal shim instead of cli-progress non-TTY newline mode', async () => { + const { analyzeCommand } = await import('../../src/cli/analyze.js'); + + await analyzeCommand(undefined, {}); + + expect(mocks.capturedBarOptions).toHaveLength(1); + const options = mocks.capturedBarOptions[0]; + expect(options.noTTYOutput).toBeUndefined(); + expect(options.notTTYSchedule).toBeUndefined(); + expect(options.terminal).toBeDefined(); + expect(options.terminal.isTTY()).toBe(true); + + options.terminal.cursorTo(0, null); + options.terminal.clearRight(); + options.terminal.newline(); + expect(stderrWriteSpy).toHaveBeenCalledWith('\r'); + expect(stderrWriteSpy).toHaveBeenCalledWith('\x1B[0K'); + expect(stderrWriteSpy).toHaveBeenCalledWith('\n'); + }); + + it('truncates wrapped progress writes without splitting ANSI escapes or surrogate pairs', async () => { + process.env.COLUMNS = '3'; + const { analyzeCommand } = await import('../../src/cli/analyze.js'); + + await analyzeCommand(undefined, {}); + + const options = mocks.capturedBarOptions[0]; + options.terminal.write('ab\x1B[31mcd'); + expect(stderrWriteSpy).toHaveBeenLastCalledWith('ab\x1B[31mc'); + + process.env.COLUMNS = '4'; + options.terminal.write('abc😀def'); + expect(stderrWriteSpy).toHaveBeenLastCalledWith('abc'); + + options.terminal.write('abc😀def', true); + expect(stderrWriteSpy).toHaveBeenLastCalledWith('abc😀def'); + }); +}); diff --git a/gitnexus/test/unit/parse-impl-worker-lazy-cache.test.ts b/gitnexus/test/unit/parse-impl-worker-lazy-cache.test.ts new file mode 100644 index 0000000000..44242b02d0 --- /dev/null +++ b/gitnexus/test/unit/parse-impl-worker-lazy-cache.test.ts @@ -0,0 +1,244 @@ +/** + * Regression coverage for native-worker startup on warm parse-cache runs. + * + * A cache-hit chunk must replay cached worker output without spawning the + * parse-worker. Spawning workers on a warm cache hit still loads tree-sitter + * native bindings at top level, which was the root trigger for intermittent + * `libc++abi ... Napi::Error` crashes in linked local builds. + */ +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; +import { pathToFileURL } from 'node:url'; + +import { createKnowledgeGraph } from '../../src/core/graph/graph.js'; +import { runChunkedParseAndResolve } from '../../src/core/ingestion/pipeline-phases/parse-impl.js'; +import { computeChunkHash, fileContentHash } from '../../src/storage/parse-cache.js'; +import type { ParseWorkerResult } from '../../src/core/ingestion/workers/parse-worker.js'; + +const emptyWorkerResult = (filePath: string, name: string): ParseWorkerResult => ({ + nodes: [ + { + id: `Function:${filePath}:${name}`, + label: 'Function', + properties: { + name, + filePath, + startLine: 1, + endLine: 1, + language: 'typescript', + }, + }, + ], + relationships: [], + symbols: [], + imports: [], + calls: [], + assignments: [], + heritage: [], + routes: [], + fetchCalls: [], + decoratorRoutes: [], + toolDefs: [], + ormQueries: [], + constructorBindings: [], + fileScopeBindings: [], + parsedFiles: [], + skippedLanguages: {}, + fileCount: 1, +}); + +const writeReadyWorker = (workerPath: string, markerPath: string): void => { + fs.writeFileSync( + workerPath, + ` +const fs = require('node:fs'); +const { parentPort } = require('node:worker_threads'); +fs.writeFileSync(${JSON.stringify(markerPath)}, 'spawned'); +parentPort.postMessage({ type: 'ready' }); +parentPort.on('message', () => {}); +`, + ); +}; + +const writeResultWorker = (workerPath: string, markerPath: string): void => { + fs.writeFileSync( + workerPath, + ` +const fs = require('node:fs'); +const { parentPort } = require('node:worker_threads'); +const decoder = new TextDecoder('utf-8'); +fs.writeFileSync(${JSON.stringify(markerPath)}, 'spawned'); +parentPort.postMessage({ type: 'ready' }); +const accumulated = { + nodes: [], relationships: [], symbols: [], imports: [], calls: [], assignments: [], heritage: [], + routes: [], fetchCalls: [], decoratorRoutes: [], toolDefs: [], ormQueries: [], constructorBindings: [], + fileScopeBindings: [], parsedFiles: [], skippedLanguages: {}, fileCount: 0, +}; +parentPort.on('message', (msg) => { + if (msg && msg.type === 'sub-batch') { + for (const file of msg.files) { + const filePath = file.path; + const name = filePath.split('/').pop().replace(/\\.ts$/, ''); + accumulated.nodes.push({ + id: 'Function:' + filePath + ':' + name, + label: 'Function', + properties: { name, filePath, startLine: 1, endLine: 1, language: 'typescript' }, + }); + accumulated.fileCount++; + // Decode to exercise the same transfer-list shape as production. + if (file.content && typeof file.content !== 'string') decoder.decode(file.content); + } + parentPort.postMessage({ type: 'progress', filesProcessed: accumulated.fileCount }); + parentPort.postMessage({ type: 'sub-batch-done' }); + return; + } + if (msg && msg.type === 'flush') parentPort.postMessage({ type: 'result', data: accumulated }); +}); +`, + ); +}; + +const writeExitBeforeReadyWorker = (workerPath: string): void => { + fs.writeFileSync(workerPath, `process.exit(1);\n`); +}; + +describe('parse-impl worker pool lazy startup', () => { + let tempDir = ''; + let repoDir = ''; + + beforeEach(() => { + tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'parse-impl-worker-lazy-cache-')); + repoDir = path.join(tempDir, 'repo'); + fs.mkdirSync(repoDir, { recursive: true }); + }); + + afterEach(() => { + if (tempDir) fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('does not spawn a parse worker when every chunk is served from parse cache', async () => { + const rel = 'src/cached.ts'; + const content = 'export function cached() { return 1; }\n'; + const full = path.join(repoDir, rel); + fs.mkdirSync(path.dirname(full), { recursive: true }); + fs.writeFileSync(full, content); + + const chunkHash = computeChunkHash([{ filePath: rel, contentHash: fileContentHash(content) }]); + const parseCache = { + version: 'test', + entries: new Map([ + [chunkHash, [emptyWorkerResult(rel, 'cached')]], + ]), + usedKeys: new Set(), + }; + + const markerPath = path.join(tempDir, 'worker-spawned.marker'); + const workerPath = path.join(tempDir, 'ready-worker.js'); + writeReadyWorker(workerPath, markerPath); + + const graph = createKnowledgeGraph(); + await runChunkedParseAndResolve( + graph, + [{ path: rel, size: fs.statSync(full).size }], + [rel], + 1, + repoDir, + Date.now(), + () => {}, + { + workerThresholdsForTest: { minFiles: 1, minBytes: 1 }, + workerUrlForTest: pathToFileURL(workerPath), + workerPoolSize: 1, + parseCache, + }, + ); + + expect(fs.existsSync(markerPath)).toBe(false); + expect(parseCache.usedKeys.has(chunkHash)).toBe(true); + expect(Array.from(graph.nodes.values()).some((n) => n.properties.name === 'cached')).toBe(true); + }); + + it('spawns the parse worker lazily on the first cache miss and stores raw results', async () => { + const rel = 'src/miss.ts'; + const content = 'export function miss() { return 1; }\n'; + const full = path.join(repoDir, rel); + fs.mkdirSync(path.dirname(full), { recursive: true }); + fs.writeFileSync(full, content); + + const markerPath = path.join(tempDir, 'worker-spawned.marker'); + const workerPath = path.join(tempDir, 'result-worker.js'); + writeResultWorker(workerPath, markerPath); + + const parseCache = { + version: 'test', + entries: new Map(), + usedKeys: new Set(), + }; + const chunkHash = computeChunkHash([{ filePath: rel, contentHash: fileContentHash(content) }]); + + const graph = createKnowledgeGraph(); + await runChunkedParseAndResolve( + graph, + [{ path: rel, size: fs.statSync(full).size }], + [rel], + 1, + repoDir, + Date.now(), + () => {}, + { + workerThresholdsForTest: { minFiles: 1, minBytes: 1 }, + workerUrlForTest: pathToFileURL(workerPath), + workerPoolSize: 1, + parseCache, + }, + ); + + expect(fs.existsSync(markerPath)).toBe(true); + expect(parseCache.entries.has(chunkHash)).toBe(true); + expect(Array.from(graph.nodes.values()).some((n) => n.properties.name === 'miss')).toBe(true); + }); + + it('falls back to sequential parsing when initial workers exit before ready', async () => { + const rel = 'src/fallback.ts'; + const content = 'export function fallback() { return 1; }\n'; + const full = path.join(repoDir, rel); + fs.mkdirSync(path.dirname(full), { recursive: true }); + fs.writeFileSync(full, content); + + const workerPath = path.join(tempDir, 'exit-before-ready-worker.js'); + writeExitBeforeReadyWorker(workerPath); + + const parseCache = { + version: 'test', + entries: new Map(), + usedKeys: new Set(), + }; + const chunkHash = computeChunkHash([{ filePath: rel, contentHash: fileContentHash(content) }]); + + const graph = createKnowledgeGraph(); + const result = await runChunkedParseAndResolve( + graph, + [{ path: rel, size: fs.statSync(full).size }], + [rel], + 1, + repoDir, + Date.now(), + () => {}, + { + workerThresholdsForTest: { minFiles: 1, minBytes: 1 }, + workerUrlForTest: pathToFileURL(workerPath), + workerPoolSize: 1, + parseCache, + }, + ); + + expect(result.usedWorkerPool).toBe(false); + expect(parseCache.usedKeys.has(chunkHash)).toBe(true); + expect(parseCache.entries.has(chunkHash)).toBe(false); + expect(Array.from(graph.nodes.values()).some((n) => n.properties.name === 'fallback')).toBe( + true, + ); + }); +});