Skip to content
Merged
211 changes: 148 additions & 63 deletions gitnexus/src/core/lbug/lbug-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,144 @@ import {
} from './schema.js';
import { streamAllCSVsToDisk } from './csv-generator.js';

// ---------------------------------------------------------------------------
// Relationship CSV splitting — extracted for testability (PR #818)
// ---------------------------------------------------------------------------

/** Factory for creating WriteStreams — injectable for testing. */
export type WriteStreamFactory = (filePath: string) => import('fs').WriteStream;

/** Result of splitting the relationship CSV into per-label-pair files. */
export interface RelCsvSplitResult {
relHeader: string;
relsByPairMeta: Map<string, { csvPath: string; rows: number }>;
pairWriteStreams: Map<string, import('fs').WriteStream>;
skippedRels: number;
totalValidRels: number;
}

/**
* Split a relationship CSV into per-label-pair files on disk.
*
* Streams the CSV line-by-line, routing each relationship to a file named
* `rel_{fromLabel}_{toLabel}.csv`. Handles backpressure correctly: only one
* drain listener per stream at a time, and readline resumes only when ALL
* backpressured streams have drained.
*
* @param csvPath Path to the combined relationship CSV
* @param csvDir Directory to write per-pair CSV files
* @param validTables Set of valid node table names
* @param getNodeLabel Function to extract the label from a node ID
* @param wsFactory Optional WriteStream factory (defaults to fs.createWriteStream)
*/
export const splitRelCsvByLabelPair = async (
csvPath: string,
csvDir: string,
validTables: Set<string>,
getNodeLabel: (id: string) => string,
wsFactory: WriteStreamFactory = (p) => createWriteStream(p, 'utf-8'),
): Promise<RelCsvSplitResult> => {
let relHeader = '';
const relsByPairMeta = new Map<string, { csvPath: string; rows: number }>();
const pairWriteStreams = new Map<string, import('fs').WriteStream>();
let skippedRels = 0;
let totalValidRels = 0;

await new Promise<void>((resolve, reject) => {
const inputStream = createReadStream(csvPath, 'utf-8');
const rl = createInterface({
input: inputStream,
crlfDelay: Infinity,
});

// Track which streams are already waiting for drain to prevent
// listener accumulation. rl.pause() is not synchronous — buffered
// line events continue firing after pause(), and without this guard
// each line targeting the same pairKey would add another drain listener.
const waitingForDrain = new Set<string>();

let settled = false;
const cleanup = (err: Error) => {
if (settled) return;
settled = true;
try {
rl.close();
} catch {}
try {
inputStream.destroy();
} catch {}
for (const ws of pairWriteStreams.values()) {
try {
ws.destroy();
} catch {}
}
reject(err);
};

let isFirst = true;
rl.on('line', (line) => {
if (isFirst) {
relHeader = line;
isFirst = false;
return;
}
if (!line.trim()) return;
const match = line.match(/"([^"]*)","([^"]*)"/);
if (!match) {
skippedRels++;
return;
}
const fromLabel = getNodeLabel(match[1]);
const toLabel = getNodeLabel(match[2]);
if (!validTables.has(fromLabel) || !validTables.has(toLabel)) {
skippedRels++;
return;
}
const pairKey = `${fromLabel}|${toLabel}`;
let ws = pairWriteStreams.get(pairKey);
if (!ws) {
const pairCsvPath = path.join(csvDir, `rel_${fromLabel}_${toLabel}.csv`);
ws = wsFactory(pairCsvPath);
// If any per-pair WriteStream errors (disk full, EMFILE, etc.),
// tear down everything and reject the Promise. Without this handler,
// a stream error while rl is paused waiting for drain would cause
// the drain callback to never fire and the Promise to hang forever.
ws.on('error', cleanup);
ws.write(relHeader + '\n');
pairWriteStreams.set(pairKey, ws);
relsByPairMeta.set(pairKey, { csvPath: pairCsvPath, rows: 0 });
}
const ok = ws.write(line + '\n');
relsByPairMeta.get(pairKey)!.rows++;
totalValidRels++;
// Handle backpressure: pause reading when the write buffer is full,
// resume when the stream drains. Prevents unbounded memory growth
// on repos with millions of relationships.
// Guard with waitingForDrain to ensure only one drain listener is
// registered per stream at a time — rl.pause() doesn't stop buffered
// line events immediately. Only resume when ALL streams have drained
// to avoid writing into still-full streams.
if (!ok && !waitingForDrain.has(pairKey)) {
waitingForDrain.add(pairKey);
rl.pause();
ws.once('drain', () => {
waitingForDrain.delete(pairKey);
if (waitingForDrain.size === 0) rl.resume();
});
}
});
rl.on('close', () => {
if (!settled) {
settled = true;
resolve();
}
});
rl.on('error', cleanup);
});

return { relHeader, relsByPairMeta, pairWriteStreams, skippedRels, totalValidRels };
};

let db: lbug.Database | null = null;
let conn: lbug.Connection | null = null;
let currentDbPath: string | null = null;
Expand Down Expand Up @@ -247,74 +385,21 @@ export const loadGraphToLbug = async (
}

// Bulk COPY relationships — split by FROM→TO label pair (LadybugDB requires it)
// Stream-read the relation CSV line by line and write directly to per-pair
// temp files on disk. This avoids accumulating potentially millions of CSV
// lines in memory which could exceed V8 Map or array limits on large repos.
let relHeader = '';
const relsByPairMeta = new Map<string, { csvPath: string; rows: number }>();
const pairWriteStreams = new Map<string, import('fs').WriteStream>();
let skippedRels = 0;
let totalValidRels = 0;

await new Promise<void>((resolve, reject) => {
const rl = createInterface({
input: createReadStream(csvResult.relCsvPath, 'utf-8'),
crlfDelay: Infinity,
});
let isFirst = true;
rl.on('line', (line) => {
if (isFirst) {
relHeader = line;
isFirst = false;
return;
}
if (!line.trim()) return;
const match = line.match(/"([^"]*)","([^"]*)"/);
if (!match) {
skippedRels++;
return;
}
const fromLabel = getNodeLabel(match[1]);
const toLabel = getNodeLabel(match[2]);
if (!validTables.has(fromLabel) || !validTables.has(toLabel)) {
skippedRels++;
return;
}
const pairKey = `${fromLabel}|${toLabel}`;
let ws = pairWriteStreams.get(pairKey);
if (!ws) {
const pairCsvPath = path.join(csvDir, `rel_${fromLabel}_${toLabel}.csv`);
ws = createWriteStream(pairCsvPath, 'utf-8');
ws.write(relHeader + '\n');
pairWriteStreams.set(pairKey, ws);
relsByPairMeta.set(pairKey, { csvPath: pairCsvPath, rows: 0 });
}
const ok = ws.write(line + '\n');
relsByPairMeta.get(pairKey)!.rows++;
totalValidRels++;
// Handle backpressure: pause reading when the write buffer is full,
// resume when the stream drains. Prevents unbounded memory growth
// on repos with millions of relationships.
if (!ok) {
rl.pause();
ws.once('drain', () => rl.resume());
}
});
rl.on('close', resolve);
rl.on('error', (err) => {
// Destroy all open write streams to avoid resource leaks
for (const ws of pairWriteStreams.values()) ws.destroy();
reject(err);
});
});
const { relHeader, relsByPairMeta, pairWriteStreams, skippedRels, totalValidRels } =
await splitRelCsvByLabelPair(csvResult.relCsvPath, csvDir, validTables, getNodeLabel);

// Close all per-pair write streams before COPY
await Promise.all(
Array.from(pairWriteStreams.values()).map(
(ws) =>
new Promise<void>((resolve, reject) =>
ws.end((err: Error | undefined) => (err ? reject(err) : resolve())),
),
new Promise<void>((resolve, reject) => {
const onError = (err: Error) => reject(err);
ws.on('error', onError);
ws.end(() => {
ws.removeListener('error', onError);
resolve();
});
}),
),
);

Expand Down
Loading
Loading