From 410bd7ca425681213bfa71dfada383c24ba3047f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 19 May 2026 05:06:14 +0000 Subject: [PATCH 01/39] Initial plan From 108914f91a0e9f4136de2707a1193e240a56cbbc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 19 May 2026 05:30:15 +0000 Subject: [PATCH 02/39] fix: skip worker-timeout files in sequential fallback and optimize TS capture node lookup Agent-Logs-Url: https://github.com/abhigyanpatwari/GitNexus/sessions/0e53743e-0600-4690-bd0d-198894daef58 --- .../languages/typescript/captures.ts | 59 +++++++++++++++++-- .../src/core/ingestion/parsing-processor.ts | 27 ++++++++- .../src/core/ingestion/workers/worker-pool.ts | 16 ++++- .../test/unit/parsing-worker-fallback.test.ts | 37 ++++++++++++ 4 files changed, 130 insertions(+), 9 deletions(-) diff --git a/gitnexus/src/core/ingestion/languages/typescript/captures.ts b/gitnexus/src/core/ingestion/languages/typescript/captures.ts index b82edcff81..06ff691a00 100644 --- a/gitnexus/src/core/ingestion/languages/typescript/captures.ts +++ b/gitnexus/src/core/ingestion/languages/typescript/captures.ts @@ -64,7 +64,10 @@ const CALL_TAGS = [ '@reference.call.constructor', ] as const; -function pickFirstDefined(grouped: CaptureMatch, tags: readonly string[]): Capture | undefined { +function pickFirstDefined( + grouped: Record, + tags: readonly string[], +): T | undefined { for (const tag of tags) { const cap = grouped[tag]; if (cap !== undefined) return cap; @@ -113,6 +116,27 @@ function shouldEmitReadMember(memberNode: SyntaxNode): boolean { } } +function findSelfOrAncestorOfType(node: SyntaxNode | undefined, type: string): SyntaxNode | null { + let current: SyntaxNode | null | undefined = node; + while (current !== undefined && current !== null) { + if (current.type === type) return current; + current = current.parent; + } + return null; +} + +function findSelfOrAncestorOfTypes( + node: SyntaxNode | undefined, + types: readonly string[], +): SyntaxNode | null { + let current: SyntaxNode | null | undefined = node; + while (current !== undefined && current !== null) { + if (types.includes(current.type)) return current; + current = current.parent; + } + return null; +} + export function emitTsScopeCaptures( sourceText: string, filePath: string, @@ -151,9 +175,11 @@ export function emitTsScopeCaptures( // `@`; we put it back so the central extractor's prefix lookups // (`@scope.`, `@declaration.`, …) work. const grouped: Record = {}; + const groupedNodes: Record = {}; for (const c of m.captures) { const tag = '@' + c.name; grouped[tag] = nodeToCapture(tag, c.node); + groupedNodes[tag] = c.node; } if (Object.keys(grouped).length === 0) continue; @@ -165,6 +191,10 @@ export function emitTsScopeCaptures( if (grouped['@import.statement'] !== undefined) { const stmtCapture = grouped['@import.statement']; const stmtNode = + findSelfOrAncestorOfTypes(groupedNodes['@import.statement'], [ + 'import_statement', + 'export_statement', + ]) ?? findNodeAtRange(tree.rootNode, stmtCapture.range, 'import_statement') ?? findNodeAtRange(tree.rootNode, stmtCapture.range, 'export_statement'); if (stmtNode !== null) { @@ -183,7 +213,9 @@ export function emitTsScopeCaptures( // `splitDynamicImport` branch consumes. if (grouped['@import.dynamic'] !== undefined) { const dynCapture = grouped['@import.dynamic']; - const callNode = findNodeAtRange(tree.rootNode, dynCapture.range, 'call_expression'); + const callNode = + findSelfOrAncestorOfType(groupedNodes['@import.dynamic'], 'call_expression') ?? + findNodeAtRange(tree.rootNode, dynCapture.range, 'call_expression'); if (callNode !== null) { const decomposed = splitImportStatement(callNode); for (const d of decomposed) out.push(d); @@ -197,7 +229,9 @@ export function emitTsScopeCaptures( // we rely on this emit-side filter so the query stays simple. if (grouped['@reference.read.member'] !== undefined) { const anchor = grouped['@reference.read.member']; - const memberNode = findNodeAtRange(tree.rootNode, anchor.range, 'member_expression'); + const memberNode = + findSelfOrAncestorOfType(groupedNodes['@reference.read.member'], 'member_expression') ?? + findNodeAtRange(tree.rootNode, anchor.range, 'member_expression'); if (memberNode === null || !shouldEmitReadMember(memberNode)) { continue; } @@ -209,8 +243,9 @@ export function emitTsScopeCaptures( // function_signature, so `parameterTypes` is populated when // available. const declAnchor = pickFirstDefined(grouped, FUNCTION_DECL_TAGS); + const declAnchorNode = pickFirstDefined(groupedNodes, FUNCTION_DECL_TAGS); if (declAnchor !== undefined) { - const fnNode = findFunctionNode(tree.rootNode, declAnchor.range); + const fnNode = findFunctionNode(tree.rootNode, declAnchor.range, declAnchorNode); if (fnNode !== null) { const arity = computeTsArityMetadata(fnNode); if (arity.parameterCount !== undefined) { @@ -256,8 +291,10 @@ export function emitTsScopeCaptures( // synthesizer would need to count `jsx_attribute` children of the // opening tag instead of `arguments`. const callAnchor = pickFirstDefined(grouped, CALL_TAGS); + const callAnchorNode = pickFirstDefined(groupedNodes, CALL_TAGS); if (callAnchor !== undefined && grouped['@reference.arity'] === undefined) { const callNode = + findSelfOrAncestorOfTypes(callAnchorNode, ['call_expression', 'new_expression']) ?? findNodeAtRange(tree.rootNode, callAnchor.range, 'call_expression') ?? findNodeAtRange(tree.rootNode, callAnchor.range, 'new_expression'); if (callNode !== null) { @@ -293,7 +330,11 @@ export function emitTsScopeCaptures( // lookup instead of synthesis — covered by `tsReceiverBinding`. const scopeFnAnchor = grouped['@scope.function']; if (scopeFnAnchor !== undefined) { - const fnNode = findFunctionNode(tree.rootNode, scopeFnAnchor.range); + const fnNode = findFunctionNode( + tree.rootNode, + scopeFnAnchor.range, + groupedNodes['@scope.function'], + ); if (fnNode !== null) { const synth = synthesizeTsReceiverBinding(fnNode); if (synth !== null) out.push(synth); @@ -518,7 +559,13 @@ function inferArgType(argNode: SyntaxNode): string { * The `@scope.function` anchor range covers the whole node, but the * tag alone doesn't identify which node type among the many TS * function-likes. */ -function findFunctionNode(rootNode: SyntaxNode, range: Capture['range']): SyntaxNode | null { +function findFunctionNode( + rootNode: SyntaxNode, + range: Capture['range'], + anchorNode?: SyntaxNode, +): SyntaxNode | null { + const fromAnchor = findSelfOrAncestorOfTypes(anchorNode, FUNCTION_NODE_TYPES); + if (fromAnchor !== null) return fromAnchor; for (const nodeType of FUNCTION_NODE_TYPES) { const n = findNodeAtRange(rootNode, range, nodeType); if (n !== null) return n; diff --git a/gitnexus/src/core/ingestion/parsing-processor.ts b/gitnexus/src/core/ingestion/parsing-processor.ts index 5cf398beeb..7a669504ea 100644 --- a/gitnexus/src/core/ingestion/parsing-processor.ts +++ b/gitnexus/src/core/ingestion/parsing-processor.ts @@ -37,7 +37,7 @@ import { } from './utils/template-arguments.js'; import type { LanguageProvider } from './language-provider.js'; import type { ParsedFile } from 'gitnexus-shared'; -import { WorkerPool } from './workers/worker-pool.js'; +import { WorkerPool, WorkerPoolDispatchError } from './workers/worker-pool.js'; import { logger } from '../logger.js'; import type { ParseWorkerResult, @@ -886,12 +886,37 @@ export const processParsing = async ( ); } catch (err) { const message = err instanceof Error ? err.message : String(err); + let fallbackFiles = files; + if (err instanceof WorkerPoolDispatchError && err.fallbackExcludePaths.length > 0) { + const excluded = new Set(err.fallbackExcludePaths); + fallbackFiles = files.filter((file) => !excluded.has(file.path)); + logger.warn( + { + skippedPaths: err.fallbackExcludePaths, + }, + 'Skipping worker-timeout files in sequential fallback:', + ); + reportProgress?.( + lastProgress, + files.length, + `Skipping ${files.length - fallbackFiles.length} worker-timeout file(s) in sequential fallback`, + ); + } logger.warn({ message }, 'Worker pool parsing stopped; continuing with sequential parser:'); reportProgress?.( lastProgress, files.length, `Sequential fallback after worker issue: ${message}`, ); + await processParsingSequential( + graph, + fallbackFiles, + symbolTable, + astCache, + scopeTreeCache, + reportProgress, + ); + return null; } } diff --git a/gitnexus/src/core/ingestion/workers/worker-pool.ts b/gitnexus/src/core/ingestion/workers/worker-pool.ts index 2113685676..0e5c3e75b7 100644 --- a/gitnexus/src/core/ingestion/workers/worker-pool.ts +++ b/gitnexus/src/core/ingestion/workers/worker-pool.ts @@ -29,6 +29,16 @@ export interface WorkerPoolOptions { timeoutBackoffFactor?: number; } +export class WorkerPoolDispatchError extends Error { + readonly fallbackExcludePaths: readonly string[]; + + constructor(message: string, fallbackExcludePaths: readonly string[] = []) { + super(message); + this.name = 'WorkerPoolDispatchError'; + this.fallbackExcludePaths = fallbackExcludePaths; + } +} + /** Message shapes sent back by worker threads. */ type WorkerOutgoingMessage = | { type: 'progress'; filesProcessed: number } @@ -336,13 +346,15 @@ export const createWorkerPool = ( return true; } + const stalledPath = itemPath(job.items[0]); void fail( - new Error( + new WorkerPoolDispatchError( `Worker ${workerIndex} parse job idle timeout after ${job.timeoutMs / 1000}s ` + - `(single item${itemPath(job.items[0]) ? `: ${itemPath(job.items[0])}` : ''}, ` + + `(single item${stalledPath ? `: ${stalledPath}` : ''}, ` + `${job.estimatedBytes} bytes, last progress: ${lastProgress}). ` + `Analyze will retry through sequential fallback. Increase with ` + `--worker-timeout or GITNEXUS_WORKER_SUB_BATCH_TIMEOUT_MS.`, + stalledPath ? [stalledPath] : [], ), ); return false; diff --git a/gitnexus/test/unit/parsing-worker-fallback.test.ts b/gitnexus/test/unit/parsing-worker-fallback.test.ts index d25fe938b6..ab434eb75c 100644 --- a/gitnexus/test/unit/parsing-worker-fallback.test.ts +++ b/gitnexus/test/unit/parsing-worker-fallback.test.ts @@ -2,6 +2,7 @@ import { describe, expect, it, vi } from 'vitest'; import { createASTCache } from '../../src/core/ingestion/ast-cache.js'; import { processParsing } from '../../src/core/ingestion/parsing-processor.js'; import type { WorkerPool } from '../../src/core/ingestion/workers/worker-pool.js'; +import { WorkerPoolDispatchError } from '../../src/core/ingestion/workers/worker-pool.js'; import { createKnowledgeGraph } from '../../src/core/graph/graph.js'; import { createSymbolTable } from '../../src/core/ingestion/model/symbol-table.js'; @@ -41,4 +42,40 @@ describe('processParsing worker fallback', () => { graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'a'), ).toBe(true); }); + + it('skips worker-timeout singleton files during sequential fallback', async () => { + const graph = createKnowledgeGraph(); + const progressDetails: string[] = []; + const workerPool: WorkerPool = { + size: 1, + dispatch: vi.fn(async () => { + throw new WorkerPoolDispatchError('injected worker idle timeout', ['src/stuck.ts']); + }), + terminate: vi.fn(async () => undefined), + }; + + const result = await processParsing( + graph, + [ + { path: 'src/stuck.ts', content: 'export function stuck() { return 0; }\n' }, + { path: 'src/a.ts', content: 'export function a() { return 1; }\n' }, + ], + createSymbolTable(), + createASTCache(), + createASTCache(), + (_current, _total, detail) => { + progressDetails.push(detail); + }, + workerPool, + ); + + expect(result).toBeNull(); + expect(progressDetails).toContain('Skipping 1 worker-timeout file(s) in sequential fallback'); + expect( + graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'a'), + ).toBe(true); + expect( + graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'stuck'), + ).toBe(false); + }); }); From 823cc02f82382814fef2a6390b1d20d16fa88d00 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 19 May 2026 05:32:06 +0000 Subject: [PATCH 03/39] refactor: clarify TS capture helpers after validation feedback Agent-Logs-Url: https://github.com/abhigyanpatwari/GitNexus/sessions/0e53743e-0600-4690-bd0d-198894daef58 --- .../languages/typescript/captures.ts | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/gitnexus/src/core/ingestion/languages/typescript/captures.ts b/gitnexus/src/core/ingestion/languages/typescript/captures.ts index 06ff691a00..93eb4f5990 100644 --- a/gitnexus/src/core/ingestion/languages/typescript/captures.ts +++ b/gitnexus/src/core/ingestion/languages/typescript/captures.ts @@ -64,10 +64,7 @@ const CALL_TAGS = [ '@reference.call.constructor', ] as const; -function pickFirstDefined( - grouped: Record, - tags: readonly string[], -): T | undefined { +function pickFirstCapture(grouped: CaptureMatch, tags: readonly string[]): Capture | undefined { for (const tag of tags) { const cap = grouped[tag]; if (cap !== undefined) return cap; @@ -75,6 +72,17 @@ function pickFirstDefined( return undefined; } +function pickFirstNode( + grouped: Record, + tags: readonly string[], +): SyntaxNode | undefined { + for (const tag of tags) { + const node = grouped[tag]; + if (node !== undefined) return node; + } + return undefined; +} + /** * Drop `@reference.read.member` matches whose underlying `member_expression` * is NOT actually a read context: @@ -117,8 +125,9 @@ function shouldEmitReadMember(memberNode: SyntaxNode): boolean { } function findSelfOrAncestorOfType(node: SyntaxNode | undefined, type: string): SyntaxNode | null { - let current: SyntaxNode | null | undefined = node; - while (current !== undefined && current !== null) { + if (node === undefined) return null; + let current: SyntaxNode | null = node; + while (current !== null) { if (current.type === type) return current; current = current.parent; } @@ -129,8 +138,9 @@ function findSelfOrAncestorOfTypes( node: SyntaxNode | undefined, types: readonly string[], ): SyntaxNode | null { - let current: SyntaxNode | null | undefined = node; - while (current !== undefined && current !== null) { + if (node === undefined) return null; + let current: SyntaxNode | null = node; + while (current !== null) { if (types.includes(current.type)) return current; current = current.parent; } @@ -242,8 +252,8 @@ export function emitTsScopeCaptures( // overloads — TypeScript supports overload signatures via // function_signature, so `parameterTypes` is populated when // available. - const declAnchor = pickFirstDefined(grouped, FUNCTION_DECL_TAGS); - const declAnchorNode = pickFirstDefined(groupedNodes, FUNCTION_DECL_TAGS); + const declAnchor = pickFirstCapture(grouped, FUNCTION_DECL_TAGS); + const declAnchorNode = pickFirstNode(groupedNodes, FUNCTION_DECL_TAGS); if (declAnchor !== undefined) { const fnNode = findFunctionNode(tree.rootNode, declAnchor.range, declAnchorNode); if (fnNode !== null) { @@ -290,8 +300,8 @@ export function emitTsScopeCaptures( // calls to disambiguate by props-arity, a JSX-aware arity // synthesizer would need to count `jsx_attribute` children of the // opening tag instead of `arguments`. - const callAnchor = pickFirstDefined(grouped, CALL_TAGS); - const callAnchorNode = pickFirstDefined(groupedNodes, CALL_TAGS); + const callAnchor = pickFirstCapture(grouped, CALL_TAGS); + const callAnchorNode = pickFirstNode(groupedNodes, CALL_TAGS); if (callAnchor !== undefined && grouped['@reference.arity'] === undefined) { const callNode = findSelfOrAncestorOfTypes(callAnchorNode, ['call_expression', 'new_expression']) ?? From 724bd2c415b705b180af52fc58a225e6d7dd3034 Mon Sep 17 00:00:00 2001 From: Gergo Magyar Date: Tue, 19 May 2026 08:08:53 +0100 Subject: [PATCH 04/39] fix(workers): exclude in-flight file on worker error/exit, not just singleton timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WorkerPoolDispatchError previously surfaced the stalled path only for the singleton-timeout final-fail branch. Worker `error` and `exit` events (and the msg-channel `error` reply) fell back to plain `Error`, so the sequential fallback re-attempted every file in the active job — re-hanging on the same pathological file when the worker crashed mid-parse. Lift the in-flight-file inference into `inFlightExcludePath(job, lastProgress)` and wire it into the three remaining in-pool failure sites. `lastProgress` is already in `runWorker` scope, so `items[lastProgress]` (the next file the worker was about to acknowledge) is the best single guess at the culprit; earlier files are still re-tried sequentially. Returns `[]` when no path is determinable (`lastProgress >= items.length`, or path missing/non-string) so sequential retries the whole job. Replacement-worker startup failures stay plain `Error` (no job context); the result-before-flush protocol bug stays plain `Error` (code fault, not file). Tests cover the three new exclusion paths plus a negative test confirming non-WorkerPoolDispatchError throws fall through to full sequential retry. --- .../src/core/ingestion/workers/worker-pool.ts | 42 ++++++- .../test/unit/parsing-worker-fallback.test.ts | 116 ++++++++++++++++++ 2 files changed, 154 insertions(+), 4 deletions(-) diff --git a/gitnexus/src/core/ingestion/workers/worker-pool.ts b/gitnexus/src/core/ingestion/workers/worker-pool.ts index 0e5c3e75b7..2ef895f2ee 100644 --- a/gitnexus/src/core/ingestion/workers/worker-pool.ts +++ b/gitnexus/src/core/ingestion/workers/worker-pool.ts @@ -142,6 +142,26 @@ function itemPath(item: unknown): string | undefined { return typeof path === 'string' ? path : undefined; } +/** + * Best-guess path of the file in flight when a worker dies mid-job. + * + * `lastProgress` is the number of files the worker has acknowledged via + * `progress` messages, so `items[lastProgress]` is the next file it was + * about to process — the most likely culprit when the worker crashes + * (OOM, native addon SIGSEGV) or reports an error. + * + * Excluding only this single path keeps the blast radius small: earlier + * files in the job get re-tried by the sequential fallback, and any + * pathological file gets the same skip treatment as the singleton- + * timeout path. Returns `[]` when no path is determinable so sequential + * retries the whole job. + */ +function inFlightExcludePath(job: WorkerJob, lastProgress: number): string[] { + if (lastProgress >= job.items.length) return []; + const path = itemPath(job.items[lastProgress]); + return path ? [path] : []; +} + function createJobs( items: TInput[], maxItems: number, @@ -435,7 +455,12 @@ export const createWorkerPool = ( } else if (msg.type === 'error') { settled = true; cleanup(); - void fail(new Error(`Worker ${workerIndex} error: ${msg.error}`)); + void fail( + new WorkerPoolDispatchError( + `Worker ${workerIndex} error: ${msg.error}`, + inFlightExcludePath(job, lastProgress), + ), + ); } else if (msg.type === 'result') { if (!waitingForFlush) { settled = true; @@ -456,7 +481,12 @@ export const createWorkerPool = ( if (!settled) { settled = true; cleanup(); - void fail(err); + void fail( + new WorkerPoolDispatchError( + `Worker ${workerIndex} error: ${err.message}`, + inFlightExcludePath(job, lastProgress), + ), + ); } }; @@ -464,9 +494,13 @@ export const createWorkerPool = ( if (!settled) { settled = true; cleanup(); + const excludes = inFlightExcludePath(job, lastProgress); + const inFlightSuffix = excludes.length > 0 ? ` (in-flight: ${excludes[0]})` : ''; void fail( - new Error( - `Worker ${workerIndex} exited with code ${code}. Likely OOM or native addon failure.`, + new WorkerPoolDispatchError( + `Worker ${workerIndex} exited with code ${code}. ` + + `Likely OOM or native addon failure${inFlightSuffix}.`, + excludes, ), ); } diff --git a/gitnexus/test/unit/parsing-worker-fallback.test.ts b/gitnexus/test/unit/parsing-worker-fallback.test.ts index ab434eb75c..8525e263b0 100644 --- a/gitnexus/test/unit/parsing-worker-fallback.test.ts +++ b/gitnexus/test/unit/parsing-worker-fallback.test.ts @@ -78,4 +78,120 @@ describe('processParsing worker fallback', () => { graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'stuck'), ).toBe(false); }); + + it('skips worker-error in-flight file during sequential fallback', async () => { + const graph = createKnowledgeGraph(); + const progressDetails: string[] = []; + const workerPool: WorkerPool = { + size: 1, + dispatch: vi.fn(async () => { + throw new WorkerPoolDispatchError('Worker 0 error: native crash', ['src/crashed.ts']); + }), + terminate: vi.fn(async () => undefined), + }; + + const result = await processParsing( + graph, + [ + { path: 'src/crashed.ts', content: 'export function crashed() { return 0; }\n' }, + { path: 'src/a.ts', content: 'export function a() { return 1; }\n' }, + ], + createSymbolTable(), + createASTCache(), + createASTCache(), + (_current, _total, detail) => { + progressDetails.push(detail); + }, + workerPool, + ); + + expect(result).toBeNull(); + expect(progressDetails).toContain('Skipping 1 worker-timeout file(s) in sequential fallback'); + expect( + graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'a'), + ).toBe(true); + expect( + graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'crashed'), + ).toBe(false); + }); + + it('skips worker-exit in-flight file during sequential fallback', async () => { + const graph = createKnowledgeGraph(); + const progressDetails: string[] = []; + const workerPool: WorkerPool = { + size: 1, + dispatch: vi.fn(async () => { + throw new WorkerPoolDispatchError( + 'Worker 0 exited with code 134. Likely OOM or native addon failure (in-flight: src/oom.ts).', + ['src/oom.ts'], + ); + }), + terminate: vi.fn(async () => undefined), + }; + + const result = await processParsing( + graph, + [ + { path: 'src/oom.ts', content: 'export function oom() { return 0; }\n' }, + { path: 'src/a.ts', content: 'export function a() { return 1; }\n' }, + ], + createSymbolTable(), + createASTCache(), + createASTCache(), + (_current, _total, detail) => { + progressDetails.push(detail); + }, + workerPool, + ); + + expect(result).toBeNull(); + expect(progressDetails).toContain('Skipping 1 worker-timeout file(s) in sequential fallback'); + expect( + graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'a'), + ).toBe(true); + expect( + graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'oom'), + ).toBe(false); + }); + + it('runs full sequential fallback when the worker pool throws a non-WorkerPoolDispatchError', async () => { + const graph = createKnowledgeGraph(); + const progressDetails: string[] = []; + const workerPool: WorkerPool = { + size: 1, + dispatch: vi.fn(async () => { + throw new Error('replacement worker failed'); + }), + terminate: vi.fn(async () => undefined), + }; + + const result = await processParsing( + graph, + [ + { path: 'src/keep.ts', content: 'export function keep() { return 0; }\n' }, + { path: 'src/a.ts', content: 'export function a() { return 1; }\n' }, + ], + createSymbolTable(), + createASTCache(), + createASTCache(), + (_current, _total, detail) => { + progressDetails.push(detail); + }, + workerPool, + ); + + expect(result).toBeNull(); + expect(progressDetails).toContain( + 'Sequential fallback after worker issue: replacement worker failed', + ); + expect( + progressDetails.some((d) => d.startsWith('Skipping ') && d.includes('worker-timeout file')), + ).toBe(false); + expect( + graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'a'), + ).toBe(true); + expect( + graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'keep'), + ).toBe(true); + }); }); From 25af32c12131cec81faba3271e6238a01c748826 Mon Sep 17 00:00:00 2001 From: Gergo Magyar Date: Tue, 19 May 2026 08:56:03 +0100 Subject: [PATCH 05/39] fix(review): apply autofix feedback - Use cause-neutral "worker-excluded" label in skip messages and tests now that worker error/exit paths share the same exclusion contract as singleton-timeout (correctness + maintainability reviewers). - Add JSDoc to findSelfOrAncestorOfType{s} explaining the parent-walk short-circuit vs root-DFS fallback (maintainability reviewer). --- .../src/core/ingestion/languages/typescript/captures.ts | 5 +++++ gitnexus/src/core/ingestion/parsing-processor.ts | 4 ++-- gitnexus/test/unit/parsing-worker-fallback.test.ts | 8 ++++---- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/gitnexus/src/core/ingestion/languages/typescript/captures.ts b/gitnexus/src/core/ingestion/languages/typescript/captures.ts index 93eb4f5990..22f48e80d9 100644 --- a/gitnexus/src/core/ingestion/languages/typescript/captures.ts +++ b/gitnexus/src/core/ingestion/languages/typescript/captures.ts @@ -124,6 +124,9 @@ function shouldEmitReadMember(memberNode: SyntaxNode): boolean { } } +/** Walks the parent chain from `node` (inclusive), returning the first node + * whose type matches, or null. Faster than `findNodeAtRange` when the caller + * already holds the anchor node — avoids re-scanning the tree from the root. */ function findSelfOrAncestorOfType(node: SyntaxNode | undefined, type: string): SyntaxNode | null { if (node === undefined) return null; let current: SyntaxNode | null = node; @@ -134,6 +137,8 @@ function findSelfOrAncestorOfType(node: SyntaxNode | undefined, type: string): S return null; } +/** Walks the parent chain from `node` (inclusive), returning the first node + * whose type is in the set, or null. Plural form of {@link findSelfOrAncestorOfType}. */ function findSelfOrAncestorOfTypes( node: SyntaxNode | undefined, types: readonly string[], diff --git a/gitnexus/src/core/ingestion/parsing-processor.ts b/gitnexus/src/core/ingestion/parsing-processor.ts index 7a669504ea..73a72aa228 100644 --- a/gitnexus/src/core/ingestion/parsing-processor.ts +++ b/gitnexus/src/core/ingestion/parsing-processor.ts @@ -894,12 +894,12 @@ export const processParsing = async ( { skippedPaths: err.fallbackExcludePaths, }, - 'Skipping worker-timeout files in sequential fallback:', + 'Skipping worker-excluded files in sequential fallback:', ); reportProgress?.( lastProgress, files.length, - `Skipping ${files.length - fallbackFiles.length} worker-timeout file(s) in sequential fallback`, + `Skipping ${files.length - fallbackFiles.length} worker-excluded file(s) in sequential fallback`, ); } logger.warn({ message }, 'Worker pool parsing stopped; continuing with sequential parser:'); diff --git a/gitnexus/test/unit/parsing-worker-fallback.test.ts b/gitnexus/test/unit/parsing-worker-fallback.test.ts index 8525e263b0..3308cf03f6 100644 --- a/gitnexus/test/unit/parsing-worker-fallback.test.ts +++ b/gitnexus/test/unit/parsing-worker-fallback.test.ts @@ -70,7 +70,7 @@ describe('processParsing worker fallback', () => { ); expect(result).toBeNull(); - expect(progressDetails).toContain('Skipping 1 worker-timeout file(s) in sequential fallback'); + expect(progressDetails).toContain('Skipping 1 worker-excluded file(s) in sequential fallback'); expect( graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'a'), ).toBe(true); @@ -106,7 +106,7 @@ describe('processParsing worker fallback', () => { ); expect(result).toBeNull(); - expect(progressDetails).toContain('Skipping 1 worker-timeout file(s) in sequential fallback'); + expect(progressDetails).toContain('Skipping 1 worker-excluded file(s) in sequential fallback'); expect( graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'a'), ).toBe(true); @@ -145,7 +145,7 @@ describe('processParsing worker fallback', () => { ); expect(result).toBeNull(); - expect(progressDetails).toContain('Skipping 1 worker-timeout file(s) in sequential fallback'); + expect(progressDetails).toContain('Skipping 1 worker-excluded file(s) in sequential fallback'); expect( graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'a'), ).toBe(true); @@ -185,7 +185,7 @@ describe('processParsing worker fallback', () => { 'Sequential fallback after worker issue: replacement worker failed', ); expect( - progressDetails.some((d) => d.startsWith('Skipping ') && d.includes('worker-timeout file')), + progressDetails.some((d) => d.startsWith('Skipping ') && d.includes('worker-excluded file')), ).toBe(false); expect( graph.nodes.some((node) => node.label === 'Function' && node.properties.name === 'a'), From 3ffd6ad396c0e10fb12bf8176d56d45ad7c9ed5a Mon Sep 17 00:00:00 2001 From: Gergo Magyar Date: Tue, 19 May 2026 09:28:02 +0100 Subject: [PATCH 06/39] feat(workers): resilient + scalable worker pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Restructures `createWorkerPool` so a single bad file no longer kills the pool for the rest of an analyze run. Five interlocking layers: 1. **Auto-respawn on error/exit** — worker death triggers `replaceWorker` on the same slot, bounded by `maxRespawnsPerSlot` (default 3). The slot is dropped from rotation when the budget is exhausted; other slots keep running. 2. **Circuit breaker** — replaces the permanent `poolBroken=true` with a consecutive-failure counter. The pool only trips after `consecutiveFailureThreshold` deaths (default `max(3, poolSize)`) with no successful job in between. A successful job resets the counter so transient bursts of bad files don't escalate. 3. **Session-scoped file quarantine** — paths identified as the in-flight file at the moment of a worker death are added to a `Set` on the pool. `dispatch()` filters quarantined items up front (they never reach a worker again this pool lifetime). Exposed via the new `WorkerPool.getQuarantinedPaths()` so callers can log/route them. `processParsing` surfaces the per-chunk quarantine summary alongside the existing fallback-exclusion log. 4. **Authoritative in-flight tracking** — `parse-worker.ts` emits `{type:'starting-file', path}` before each file. The pool tracks this per slot and uses it for crash attribution, falling back to the `items[lastProgress]` heuristic only when no starting-file has been observed (very-early crash, older worker build). Closes the reorder/race concerns raised by reviewers C1 and R3 in the earlier review run. 5. **Per-job cumulative timeout budget** — each `WorkerJob` tracks the total wall time spent across attempts/splits/retries. When the budget is exhausted (default 5x `subBatchIdleTimeoutMs`), the pool surfaces the in-flight path instead of letting exponential backoff balloon into multi-hour stalls. Cross-layer wiring: a new `wakeIdleSlots` helper kicks any non-busy live slot when items are requeued (after a death or split-retry), so a dropped slot doesn't strand work in the queue. `recoverAndResume` consolidates the per-job teardown shared by the three in-pool death sites (`error`, `exit`, msg-channel `error`). New env knobs: `GITNEXUS_WORKER_MAX_RESPAWNS_PER_SLOT`, `GITNEXUS_WORKER_MAX_CUMULATIVE_TIMEOUT_MS`, `GITNEXUS_WORKER_CONSECUTIVE_FAILURE_THRESHOLD`. New `WorkerPoolOptions.workerFactory` injection point for unit tests. Tests: 12 new unit tests using a FakeWorker mock cover quarantine seeding, slot-respawn, slot-drop after budget, breaker trip + reset, and quarantine filtering. Plus option-resolution tests for the three new env vars. All 19 worker-pool/-fallback/-options tests pass; full unit suite 6040 passed / 30 skipped / 0 failed. --- .../src/core/ingestion/parsing-processor.ts | 24 +- .../core/ingestion/workers/parse-worker.ts | 7 + .../src/core/ingestion/workers/worker-pool.ts | 563 +++++++++++++++--- .../test/unit/worker-pool-resilience.test.ts | 344 +++++++++++ 4 files changed, 859 insertions(+), 79 deletions(-) create mode 100644 gitnexus/test/unit/worker-pool-resilience.test.ts diff --git a/gitnexus/src/core/ingestion/parsing-processor.ts b/gitnexus/src/core/ingestion/parsing-processor.ts index 73a72aa228..a276a0ab4f 100644 --- a/gitnexus/src/core/ingestion/parsing-processor.ts +++ b/gitnexus/src/core/ingestion/parsing-processor.ts @@ -875,7 +875,7 @@ export const processParsing = async ( ); } try { - return await processParsingWithWorkers( + const data = await processParsingWithWorkers( graph, files, symbolTable, @@ -884,6 +884,28 @@ export const processParsing = async ( reportProgress, outRawResults, ); + // Session-scoped quarantine (worker-pool resilience Layer 3): surface + // any files this pool has decided are unsafe for workers so the + // operator can see what was skipped. The pool already filtered them + // out of dispatch; we only need to log + progress-report. Quarantine + // is session-scoped per pool instance — a fresh `createWorkerPool` + // call clears it. + const quarantineSet = new Set(workerPool.getQuarantinedPaths?.() ?? []); + if (quarantineSet.size > 0) { + const quarantinedInChunk = files.filter((file) => quarantineSet.has(file.path)); + if (quarantinedInChunk.length > 0) { + logger.warn( + { quarantinedFiles: quarantinedInChunk.map((file) => file.path) }, + `Worker quarantine: ${quarantinedInChunk.length} file(s) skipped in this chunk (cumulative pool quarantine: ${quarantineSet.size}).`, + ); + reportProgress?.( + lastProgress, + files.length, + `${quarantinedInChunk.length} worker-quarantined file(s) skipped`, + ); + } + } + return data; } catch (err) { const message = err instanceof Error ? err.message : String(err); let fallbackFiles = files; diff --git a/gitnexus/src/core/ingestion/workers/parse-worker.ts b/gitnexus/src/core/ingestion/workers/parse-worker.ts index da681b0700..2166ca9506 100644 --- a/gitnexus/src/core/ingestion/workers/parse-worker.ts +++ b/gitnexus/src/core/ingestion/workers/parse-worker.ts @@ -1401,6 +1401,13 @@ const processFileGroup = ( // Skip files larger than the max tree-sitter buffer (32 MB) if (getTreeSitterContentByteLength(file.content) > TREE_SITTER_MAX_BUFFER) continue; + // Authoritative in-flight signal for the pool: lets `WorkerPool` exclude + // exactly this file if the worker dies during parse/extract, instead of + // guessing from `items[lastProgress]` (which the language-grouped order + // here would defeat). The pool gracefully ignores this when running an + // older worker build that doesn't emit it. + if (parentPort) parentPort.postMessage({ type: 'starting-file', path: file.path }); + // Vue SFC preprocessing: extract