fix: prevent drain listener leak in relationship CSV streaming#818
Conversation
Dynamically-created per-pair WriteStreams for relationship CSV splitting
default to Node.js's maxListeners limit of 10. On large repositories with
many relationship types, readline backpressure causes repeated
ws.once('drain', ...) calls that exceed this limit, flooding stderr with
MaxListenersExceededWarning messages.
This matches the existing pattern in csv-generator.ts where
BufferedCSVWriter already calls this.ws.setMaxListeners(50).
|
@MekayelAnik is attempting to deploy a commit to the NexusCore Team on Vercel. A member of the Team first needs to authorize it. |
There was a problem hiding this comment.
Pull request overview
Reduces MaxListenersExceededWarning noise during large-repository analysis by increasing the EventEmitter listener cap on per-relationship-pair WriteStreams created during relationship CSV splitting.
Changes:
- Set
ws.setMaxListeners(50)immediately after creating each per-pair relationshipWriteStreaminloadGraphToLbug().
Comments suppressed due to low confidence (1)
gitnexus/src/core/lbug/lbug-adapter.ts:292
- Raising the stream’s max listener cap suppresses the warning, but it doesn’t address the underlying cause: when
ws.write()returns false, this code can still enqueue multipleonce('drain')listeners for the same stream beforerl.pause()takes effect. Consider coalescing backpressure handling so only a single pending drain-resume callback can be registered perWriteStream(e.g., track a per-streamwaitingForDrainflag and only attach the listener once, clearing it on drain). This avoids unbounded listener growth and extra closures under heavy backpressure, rather than just increasing the threshold.
ws.setMaxListeners(50);
ws.write(relHeader + '\n');
pairWriteStreams.set(pairKey, ws);
relsByPairMeta.set(pairKey, { csvPath: pairCsvPath, rows: 0 });
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@claude try to identify if there's a memory leak in the pipeline. The fix temporarily fixes the issue but I think, we are leaking open file descriptors or something like that. |
|
Claude finished @magyargergo's task in 2m 5s —— View job Memory / FD Leak Analysis —
|
| # | Issue | Severity | This PR fixes it? |
|---|---|---|---|
| 1 | No error handler on per-pair WriteStreams → hung Promise + all FDs open forever |
🔴 High | ❌ No |
| 2 | No waitingForDrain flag → N drain listeners accumulate per stream |
🟠 Medium | ❌ No (only masks symptom) |
| 3 | rl not closed in error handler → ReadStream FD may linger |
🟡 Low | ❌ No |
Recommendation: This PR should not be merged as-is. The setMaxListeners(50) change hides bug #2 without fixing it, and leaves bug #1 (the actual FD leak) entirely unaddressed. The correct fix is to:
- Add a
ws.on('error', ...)/ws.once('error', reject)handler on each per-pair stream (and callrl.destroy()inside it to break the hung-Promise scenario) - Guard
ws.once('drain', ...)with a per-streamwaitingForDrainSet so only one listener is ever queued at a time - Add
rl.close()in the readline error handler
The pattern in BufferedCSVWriter (csv-generator.ts lines 149–196) already does this correctly and should be the model.
|
@MekayelAnik could you please look into this? |
Addresses review feedback from @magyargergo and Claude CI analysis: Bug 1 (High): Add error handlers to per-pair WriteStreams. Previously, if a WriteStream errored (disk full, EMFILE) while rl was paused waiting for drain, the drain callback never fired, rl.resume() was never called, and the outer Promise hung forever — leaking all open file descriptors until process kill. Now each WriteStream gets an error handler that destroys all streams, closes the readline interface + its input ReadStream, and rejects the Promise. Bug 2 (Medium): Add waitingForDrain Set to prevent drain listener accumulation. rl.pause() is not synchronous — buffered line events continue firing after pause(), and multiple lines targeting the same pairKey each added another ws.once('drain', ...) listener. This was the root cause of MaxListenersExceededWarning. Now a Set<string> tracks which streams are already waiting for drain. Only the first backpressure event registers the listener; subsequent lines for the same stream are silently skipped (they're already written to the stream buffer). This eliminates listener accumulation entirely and makes setMaxListeners(50) a safety net rather than a band-aid. Bug 3 (Low): Close readline and destroy input ReadStream in error handler. Previously only the WriteStreams were destroyed on error, leaving the ReadStream FD to linger until GC.
|
Thanks @magyargergo for the thorough analysis! The Claude CI review was spot-on — the initial I've pushed a second commit ( Bug 1 (High) — No error handler on per-pair WriteStreams ✅ FixedAdded a shared
Uses a Bug 2 (Medium) — No
|
|
@claude try to find potential bugs, performance issues, potential leaks and if it resolves the issue properly |
|
Claude finished @magyargergo's task in 2m 19s —— View job Code Review — Bug, Leak & Performance Analysis
Verdict on the Previous 3 BugsAll 3 bugs identified in the prior Claude analysis are correctly fixed in commit
The original Remaining Bug Found —
|
CI Report✅ All checks passed Pipeline Status
Test Results
✅ All 6233 tests passed 98 test(s) skipped — expand for details
Code CoverageTests
📋 View full run · Generated by CI |
|
Could you please address the remaining findings? Also please make sure the ci is green. Plus we need to find a way to test this. |
|
@claude we need a way to test this reliably so it wont regress in the future |
|
Claude finished @magyargergo's task in 12m 59s —— View job Regression Tests for Stream Backpressure Fixes
What Was Done
The tests use a lightweight CI NoteThe existing CI failure (
|
xkonjin
left a comment
There was a problem hiding this comment.
Looks good overall — the backpressure fix is solid and the drain-guard prevents listener leaks. A few notes:
-
Bug risk in
cleanup:inputStream.destroy()andws.destroy()can throw synchronously in rare edge cases (already-destroyed streams). Consider wrapping intry/catchor using?.destroy()where appropriate. -
Resource leak on normal close: The normal
rl.on('close', ...)path resolves but does not explicitly close the per-pairpairWriteStreams. That should be fine because the subsequentfor (const ws of pairWriteStreams.values()) ws.end()will drain them, but ifresolve()throws (unlikely), the streams stay open. Not blocking. -
Test coverage gap: The PR fixes a real production issue (hanging Promise on ws error during backpressure), but I don't see a test that simulates a
WriteStreamerrorevent duringrl.pause(). Adding a unit test that triggersws.emit('error', new Error('disk full'))mid-backpressure would make this regression-proof. -
setMaxListeners(50): The comment says "generous limit to avoid false warnings." This is pragmatic, but if a repo has >50 labels, could it still fire warnings? Should the limit scale with the number of unique label pairs instead of being hard-coded?
Happy to approve once there's either a test for the error-during-backpressure path or a brief note on why it's covered elsewhere.
- Remove setMaxListeners(50) entirely. The waitingForDrain guard guarantees at most 1 drain listener per stream at any time. Tested with 200 pairs x 500 lines (100k total) — max listeners was always 1, zero warnings. No hard-coded limit needed. - Wrap destroy() calls in cleanup() with try/catch so already-destroyed streams don't throw synchronously (addresses @xkonjin review point 1). - Add ws.once('error', reject) to the ws.end() phase so flush errors during stream close properly reject instead of hanging Promise.all (addresses Claude CI Bug 3b finding).
|
Thanks for the review @xkonjin. Pushed Point 1 (destroy() can throw) — Wrapped all Point 2 (resource leak on normal close) — Agreed, non-issue. The Point 3 (test coverage) — The Claude CI bot already wrote 8 regression tests covering this exact scenario (error-during-backpressure path). Working on getting those into the branch. Point 4 (setMaxListeners scaling) — Removed |
|
@magyargergo I've pushed
Regarding the CI — the For the regression tests — the Claude CI bot wrote 8 tests covering all the bug scenarios but couldn't push them (no write access to the fork). I can add those in a follow-up commit if you'd like, or they could be pulled from the CI run directly. |
|
There are no tests 🤔 Did you forget to git push? |
Covers all bugs fixed in this PR: - Bug 1: WriteStream error rejects Promise and destroys all streams - Bug 2: waitingForDrain guard keeps drain listeners at max 1 per stream - Bug 3: cleanup() handles already-destroyed streams safely Tests use a MockWriteStream with controllable backpressure and error injection to verify the exact patterns in loadGraphToLbug() without needing a real LadybugDB instance.
|
@magyargergo Tests are up now —
Uses a MockWriteStream with controllable backpressure and error injection so it doesn't need a real LadybugDB instance. Should run fine in the existing vitest setup. |
|
Can you merge origin main into your branch? |
…rning-rel-streams
|
@magyargergo Done — merged upstream main into the branch. No conflicts. |
|
Can you please run prettier to format files? |
|
@magyargergo Done — ran prettier on both changed files. Pre-commit hooks (prettier, eslint, typecheck) all passed locally. |
|
Tests are faling: |
The error tests were racing — readline finished reading the tiny CSV and resolved the Promise before setTimeout fired the error. Now the mock streams use blocked=true to trigger backpressure, keeping the Promise pending so the error fires while the split is still in progress.
|
@magyargergo Fixed the failing test in |
|
@claude run a last review before merging. focus on clean code principles, performance bottle neck and potential issues. |
|
Claude finished @magyargergo's task in 1m 40s —— View job Final Pre-Merge Review
|
| Area | Assessment |
|---|---|
cleanup() with settled flag |
Correct — no double-reject |
waitingForDrain Set guard (1 listener per stream) |
Correct — root cause fixed |
ws.end() + ws.once('error', reject) |
Correct — flush errors now propagate |
rl.close() + inputStream.destroy() in error path |
Correct — no ReadStream FD leak |
try/catch around ws.destroy() in cleanup |
Correct — handles already-destroyed streams |
Removal of setMaxListeners(50) |
Correct — guard makes it unnecessary |
Summary
The production fix is solid. Two issues are worth resolving before merge:
- The multi-stream drain coordination bug (🟠) — single-line fix, prevents memory growth on large repos with many simultaneously full streams
- Tests exercise a copy of production logic (🟠) — extracting
splitRelCsvByLabelPairas a real exported function would make the regression tests meaningful
The rest is clean and ready.
xkonjin
left a comment
There was a problem hiding this comment.
Thanks for the fix — backpressure handling is much improved. A couple of things to tighten up before approving:
-
setMaxListeners is missing from the actual diff. The PR title and description say
ws.setMaxListeners(50)was added, but the current diff only shows the backpressure guard, error handling, and tests. If the max-listeners bump was intentionally dropped, update the title/description. If it's meant to be there, add it back on line 292 aftercreateWriteStream. -
Cleanup double-rejects. In the
ws.end()Promise, you attach anerrorlistener withws.once('error', reject)but remove it viaremoveListener('error', reject). Sinceoncewraps the handler,removeListenerwon't match and the listener leaks. Use a named function orremoveAllListeners('error')if you control the stream. -
Test coverage for the actual adapter. The new test file mirrors the logic, but it's testing a mock reimplementation rather than
loadGraphToLbugitself. Consider adding at least one integration-level test that exercises the real adapter (or refactorsloadGraphToLloadGraphToLbugso the split logic is importable and directly testable) to prevent drift between the mock and production code. -
Hard-coded limit vs scaling. If you keep
setMaxListeners(50), note that repos with >50 unique label pairs could still hit the warning on a single stream. A dynamic limit based on the max expected buffered lines (or thewaitingForDrainguard alone) may be cleaner.
Happy to re-review once the title/description aligns with the diff and the listener leak is fixed.
ws.once() wraps the callback, so removeListener with the original function reference won't match. Switch to ws.on() with a named onError function so removeListener correctly detaches it after successful close.
|
@xkonjin Thanks for catching the listener leak. Pushed setMaxListeners — Intentionally removed. The once/removeListener mismatch — Good catch, that was a real bug. Test coverage — Fair point about testing a mirror vs the real code. The tests verify the exact pattern used in Hard-coded limit — No longer applies since we removed |
|
@MekayelAnik there's some issues claude found, could you look into them please? |
1. Extract splitRelCsvByLabelPair as an exported function with optional wsFactory parameter for dependency injection. loadGraphToLbug now delegates to it. Tests import and call the real function instead of a local reimplementation. 2. Fix multi-stream drain coordination: rl.resume() is now guarded by waitingForDrain.size === 0, so readline only resumes when ALL backpressured streams have drained. Previously, any single stream draining would resume readline while other streams were still full, allowing unbounded buffer growth. 3. Export WriteStreamFactory type and RelCsvSplitResult interface for test consumption.
|
@magyargergo Pushed Multi-stream drain coordination — Tests exercise the real code — Extracted All pre-commit hooks (prettier, eslint, typecheck) passing locally. |
|
Thank you for your contribution! |
… RC, group sync - Take upstream splitRelCsvByLabelPair + tests (abhigyanpatwari#818/abhigyanpatwari#832); preserve fork closeLbugForPath and import evictPoolsForDbPath from pool-adapter. - Fix nightly-refresh evictPools import path to ../core/lbug/pool-adapter.js. - Includes abhigyanpatwari#818 drain fix, abhigyanpatwari#823 embeddings PK, abhigyanpatwari#825 RC workflow, abhigyanpatwari#827 manifest sync.
…yanpatwari#818) * fix: add setMaxListeners(50) to relationship pair WriteStreams Dynamically-created per-pair WriteStreams for relationship CSV splitting default to Node.js's maxListeners limit of 10. On large repositories with many relationship types, readline backpressure causes repeated ws.once('drain', ...) calls that exceed this limit, flooding stderr with MaxListenersExceededWarning messages. This matches the existing pattern in csv-generator.ts where BufferedCSVWriter already calls this.ws.setMaxListeners(50). * fix: address all 3 stream bugs in relationship CSV splitting Addresses review feedback from @magyargergo and Claude CI analysis: Bug 1 (High): Add error handlers to per-pair WriteStreams. Previously, if a WriteStream errored (disk full, EMFILE) while rl was paused waiting for drain, the drain callback never fired, rl.resume() was never called, and the outer Promise hung forever — leaking all open file descriptors until process kill. Now each WriteStream gets an error handler that destroys all streams, closes the readline interface + its input ReadStream, and rejects the Promise. Bug 2 (Medium): Add waitingForDrain Set to prevent drain listener accumulation. rl.pause() is not synchronous — buffered line events continue firing after pause(), and multiple lines targeting the same pairKey each added another ws.once('drain', ...) listener. This was the root cause of MaxListenersExceededWarning. Now a Set<string> tracks which streams are already waiting for drain. Only the first backpressure event registers the listener; subsequent lines for the same stream are silently skipped (they're already written to the stream buffer). This eliminates listener accumulation entirely and makes setMaxListeners(50) a safety net rather than a band-aid. Bug 3 (Low): Close readline and destroy input ReadStream in error handler. Previously only the WriteStreams were destroyed on error, leaving the ReadStream FD to linger until GC. * fix: address review feedback — remove setMaxListeners, harden cleanup - Remove setMaxListeners(50) entirely. The waitingForDrain guard guarantees at most 1 drain listener per stream at any time. Tested with 200 pairs x 500 lines (100k total) — max listeners was always 1, zero warnings. No hard-coded limit needed. - Wrap destroy() calls in cleanup() with try/catch so already-destroyed streams don't throw synchronously (addresses @xkonjin review point 1). - Add ws.once('error', reject) to the ws.end() phase so flush errors during stream close properly reject instead of hanging Promise.all (addresses Claude CI Bug 3b finding). * test: add 8 regression tests for relationship CSV stream fixes Covers all bugs fixed in this PR: - Bug 1: WriteStream error rejects Promise and destroys all streams - Bug 2: waitingForDrain guard keeps drain listeners at max 1 per stream - Bug 3: cleanup() handles already-destroyed streams safely Tests use a MockWriteStream with controllable backpressure and error injection to verify the exact patterns in loadGraphToLbug() without needing a real LadybugDB instance. * style: run prettier on changed files * fix(test): use backpressure to keep promise pending during error tests The error tests were racing — readline finished reading the tiny CSV and resolved the Promise before setTimeout fired the error. Now the mock streams use blocked=true to trigger backpressure, keeping the Promise pending so the error fires while the split is still in progress. * fix: use named error handler in ws.end() to prevent listener leak ws.once() wraps the callback, so removeListener with the original function reference won't match. Switch to ws.on() with a named onError function so removeListener correctly detaches it after successful close. * refactor: extract splitRelCsvByLabelPair, fix multi-stream drain 1. Extract splitRelCsvByLabelPair as an exported function with optional wsFactory parameter for dependency injection. loadGraphToLbug now delegates to it. Tests import and call the real function instead of a local reimplementation. 2. Fix multi-stream drain coordination: rl.resume() is now guarded by waitingForDrain.size === 0, so readline only resumes when ALL backpressured streams have drained. Previously, any single stream draining would resume readline while other streams were still full, allowing unbounded buffer growth. 3. Export WriteStreamFactory type and RelCsvSplitResult interface for test consumption.

Problem
When analyzing large repositories, GitNexus floods stderr with hundreds of
MaxListenersExceededWarningmessages:Root Cause
In
loadGraphToLbug(), relationship CSV lines are split into per-label-pair files (rel_*.csv). When backpressure occurs (ws.write()returnsfalse), the code callsrl.pause()and adds aws.once('drain', ...)listener. However,rl.pause()is not synchronous — buffered line events continue firing, and each one targeting the same pair stream adds another drain listener before the first fires. This causes unbounded listener accumulation.Additionally, per-pair WriteStreams had no error handlers, meaning a stream error (disk full, EMFILE) while waiting for drain would hang the Promise forever, leaking all open file descriptors.
Fix
waitingForDrainguard — ASet<string>tracks which streams already have a pending drain listener. Only the first backpressure event registers the listener; subsequent writes to the same stream are skipped. This keeps drain listeners at exactly 1 per stream at all times, eliminating the warning entirely without needingsetMaxListeners.ws.on('error', cleanup). The sharedcleanup()function tears down all streams (with try/catch for already-destroyed streams), closes the readline interface and input ReadStream, and rejects the Promise.ws.end()phase — Usesws.on('error', onError)with a named function soremoveListenercorrectly matches after successful close. PreventsPromise.allfrom hanging on flush errors.rl.close()+inputStream.destroy()in error handler — Prevents ReadStream FD from lingering until GC.Why no
setMaxListeners?Tested with 200 unique label pairs and 100k total lines — max drain listeners on any single stream was always 1, zero warnings fired. The
waitingForDrainguard is the proper fix;setMaxListenerswould just mask the symptom.Testing
8 regression tests in
test/unit/rel-csv-split.test.ts:Tests use a MockWriteStream with controllable backpressure and error injection.