Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 193 additions & 87 deletions packages/api/src/services/etl/processCatalogEtl.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { createDbClient } from '@packrat/api/db';
import { toRecord } from '@packrat/guards';
import { mapCsvRowToItem } from '@packrat/api/utils/csv-utils';
import type { Env } from '@packrat/api/utils/env-validation';
import { isJsonlFile, mapJsonRowToItem } from '@packrat/api/utils/json-utils';
import { etlJobs, type NewCatalogItem, type NewInvalidItemLog } from '@packrat/db';
import { parse } from 'csv-parse';
import { eq } from 'drizzle-orm';
Expand Down Expand Up @@ -74,110 +76,214 @@ export async function processCatalogETL({
}

let rowIndex = 0;
let fieldMap: Record<string, number> = {};
let isHeaderProcessed = false;
const validItemsBatch: Partial<NewCatalogItem>[] = [];
const invalidItemsBatch: NewInvalidItemLog[] = [];

const validator = new CatalogItemValidator();
const useJsonl = isJsonlFile(objectKey);

const parser = parse({
relax_column_count: true,
relax_quotes: true,
skip_empty_lines: true,
skip_records_with_error: true,
on_skip: (err: Error) => {
const parserLine = (err as { lines?: number }).lines ?? rowIndex;
const parseErrorLog: NewInvalidItemLog = {
jobId,
errors: [{ field: 'csv_parse', reason: err.message }],
rawData: { parseError: err.message },
rowIndex: parserLine,
};
invalidItemsBatch.push(parseErrorLog);
console.warn(
`[ETL] Skipped malformed CSV row at parser line ${parserLine}: ${err.message}`,
);
},
});

(async () => {
// Non-first chunks: inject the header row so csv-parse sees a valid header,
// then skip the partial row at the chunk boundary (tail of the previous chunk).
if (injectedHeader) {
parser.write(`${injectedHeader}\n`);
}
let skipPartialRow = byteStart !== undefined && byteStart > 0;
if (useJsonl) {
// --- JSONL streaming path ---
// No csv-parse, no header injection. Each line is a JSON object.
let buffer = '';
const skipPartialLine = byteStart !== undefined && byteStart > 0;
let firstLineSkipped = !skipPartialLine;

for await (const chunk of streamToText(r2Object.body)) {
let text = chunk;

if (skipPartialRow) {
// Discard bytes up to and including the first newline — those bytes are
// the tail of the row that the previous chunk already processed.
const nl = text.indexOf('\n');
if (nl === -1) continue; // entire buffer is still the partial row tail
text = text.slice(nl + 1);
skipPartialRow = false;
if (!text) continue;
}
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop() ?? '';

// Respect backpressure: if the parser buffer is full, wait for drain before
// pushing more data. Without this, R2 fills the parser buffer for the entire
// file (up to 600 MB) before the main loop processes any rows → Worker OOM.
const ok = parser.write(text);
if (!ok) await new Promise<void>((resolve) => parser.once('drain', resolve));
}
parser.end();
})();

for await (const record of parser) {
if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1)); // Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit on large files
const row = record as string[];
if (!isHeaderProcessed) {
fieldMap = row.reduce<Record<string, number>>((acc, header, idx) => {
acc[header.trim()] = idx;
return acc;
}, {});
isHeaderProcessed = true;
console.log(
`🔍 [TRACE] Header processed - fields: ${Object.keys(fieldMap).length}, mapping:`,
Object.keys(fieldMap),
);
continue;
}
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;

const item = mapCsvRowToItem({ values: row, fieldMap });
if (!firstLineSkipped) {
firstLineSkipped = true;
continue; // discard partial row at chunk boundary
}
Comment on lines +86 to +105

if (item) {
const validatedItem = validator.validateItem(item);
// Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit
if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1));

if (validatedItem.isValid) {
validItemsBatch.push(validatedItem.item);
} else {
const invalidItemLog = {
let obj: Record<string, unknown>;
try {
obj = toRecord(JSON.parse(trimmed));
} catch (parseErr) {
invalidItemsBatch.push({
jobId,
errors: [{ field: 'json_parse', reason: String(parseErr) }],
rawData: { parseError: String(parseErr) },
rowIndex,
});
rowIndex++;
if (invalidItemsBatch.length >= BATCH_SIZE) {
await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env });
invalidItemsBatch.length = 0;
}
continue;
}

const item = mapJsonRowToItem(obj);
if (item) {
const validated = validator.validateItem(item);
if (validated.isValid) {
validItemsBatch.push(validated.item);
} else {
invalidItemsBatch.push({
jobId,
errors: validated.errors,
rawData: validated.item,
rowIndex,
});
}
}
rowIndex++;

if (validItemsBatch.length >= BATCH_SIZE) {
await processValidItemsBatch({ jobId, items: [...validItemsBatch], env });
validItemsBatch.length = 0;
}
if (invalidItemsBatch.length >= BATCH_SIZE) {
await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env });
invalidItemsBatch.length = 0;
}
}
}

// Flush remaining buffer line (last line without trailing newline)
const lastLine = buffer.trim();
if (lastLine && firstLineSkipped) {
try {
const obj = toRecord(JSON.parse(lastLine));
const item = mapJsonRowToItem(obj);
if (item) {
const validated = validator.validateItem(item);
if (validated.isValid) {
validItemsBatch.push(validated.item);
} else {
invalidItemsBatch.push({
jobId,
errors: validated.errors,
rawData: validated.item,
rowIndex,
});
}
}
rowIndex++;
} catch (parseErr) {
invalidItemsBatch.push({
jobId,
errors: validatedItem.errors,
rawData: validatedItem.item,
errors: [{ field: 'json_parse', reason: String(parseErr) }],
rawData: { parseError: String(parseErr) },
rowIndex,
};
invalidItemsBatch.push(invalidItemLog);
});
rowIndex++;
}
}
} else {
// --- CSV path (unchanged) ---
let fieldMap: Record<string, number> = {};
let isHeaderProcessed = false;

const parser = parse({
relax_column_count: true,
relax_quotes: true,
skip_empty_lines: true,
skip_records_with_error: true,
on_skip: (err: Error) => {
const parserLine = (err as { lines?: number }).lines ?? rowIndex;
const parseErrorLog: NewInvalidItemLog = {
jobId,
errors: [{ field: 'csv_parse', reason: err.message }],
rawData: { parseError: err.message },
rowIndex: parserLine,
};
invalidItemsBatch.push(parseErrorLog);
console.warn(
`[ETL] Skipped malformed CSV row at parser line ${parserLine}: ${err.message}`,
);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

file="packages/api/src/services/etl/processCatalogEtl.ts"
echo "== File =="
ls -l "$file"

echo "== Snippet around on_skip (lines ~170-240) =="
nl -ba "$file" | sed -n '150,260p'

echo "== Snippet around invalidItemsBatch flush points (search) =="
rg -n "invalidItemsBatch|push\\(parseErrorLog\\)|flush|EOF|for await \\(const record of parser\\)" "$file" || true

echo "== Find unawaited async IIFE / writer task =="
rg -n "IIFE|\\(async \\(\\) =>|void \\(async|async \\(\\) =>|new Promise|setImmediate|writer|append|writeStream|createWrite|stream|pipeline|then\\(" "$file" || true

echo "== Snippet around parser loop (around for-await) =="
lnum=$(rg -n "for await \\(const record of parser\\)" "$file" | head -n1 | cut -d: -f1 || true)
if [ -n "${lnum:-}" ]; then
  start=$((lnum-60)); end=$((lnum+80))
  nl -ba "$file" | sed -n "${start},${end}p"
fi

Repository: PackRat-AI/PackRat

Length of output: 281


🏁 Script executed:

#!/bin/bash
set -euo pipefail

file="packages/api/src/services/etl/processCatalogEtl.ts"

echo "== rg locations =="
rg -n "on_skip" "$file" || true
rg -n "invalidItemsBatch" "$file" || true
rg -n "for await \\(const record of parser\\)" "$file" || true
rg -n "IIFE|void \\(async|\\(async \\(\\) =>|new Promise|then\\(|append\\(|writeStream|createWrite|pipeline|stream\\.write|writer" "$file" || true
rg -n "flush|EOF|end-of|close\\(|finish\\(" "$file" || true

echo "== Show on_skip block (±40 lines) =="
on_skip_line=$(rg -n "on_skip" "$file" | head -n1 | cut -d: -f1)
if [ -n "${on_skip_line:-}" ]; then
  start=$((on_skip_line-60)); end=$((on_skip_line+60))
  awk -v s="$start" -v e="$end" 'NR>=s && NR<=e {printf "%5d\t%s\n", NR, $0}' "$file"
fi

echo "== Show invalidItemsBatch section (±80 lines around first occurrence) =="
inv_line=$(rg -n "invalidItemsBatch" "$file" | head -n1 | cut -d: -f1)
if [ -n "${inv_line:-}" ]; then
  start=$((inv_line-80)); end=$((inv_line+160))
  awk -v s="$start" -v e="$end" 'NR>=s && NR<=e {printf "%5d\t%s\n", NR, $0}' "$file"
fi

echo "== Show parser for-await loop section (±80 lines) =="
loop_line=$(rg -n "for await \\(const record of parser\\)" "$file" | head -n1 | cut -d: -f1 || true)
if [ -n "${loop_line:-}" ]; then
  start=$((loop_line-80)); end=$((loop_line+140))
  awk -v s="$start" -v e="$end" 'NR>=s && NR<=e {printf "%5d\t%s\n", NR, $0}' "$file"
fi

Repository: PackRat-AI/PackRat

Length of output: 24727


Bound on_skip buffering and await the CSV writer task.

  • on_skip only pushes into invalidItemsBatch and never flushes; flushing happens only while consuming for await (const record of parser) and at the final post-loop flush, so a chunk/file with mostly or entirely malformed rows can grow invalidItemsBatch to total-row scale (risking OOM).
  • The CSV writer runs in an unawaited async IIFE (the streamToText(...)->parser.write(...)->parser.end() path), so failures there can bypass the outer try/catch and surface as unhandled promise rejections.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/api/src/services/etl/processCatalogEtl.ts` around lines 194 - 205,
The on_skip handler currently only pushes parse errors into invalidItemsBatch
(in processCatalogEtl.ts) and doesn't trigger a flush, which can let
invalidItemsBatch grow unbounded when many rows are malformed; also the CSV
writer launched via the streamToText -> parser.write(...) -> parser.end() path
runs in an unawaited async IIFE so its failures can become unhandled rejections.
Fix by making on_skip an async function that (a) increments/uses the same
batching logic as the main consumer (the for await (const record of parser)
flush threshold) and calls the same flush function when invalidItemsBatch
reaches the batch size, and (b) awaits the shared CSV writer promise instead of
letting its IIFE run detached (capture the promise returned by the writer task
created around streamToText/parser.write/parser.end and await it before
completing the outer try/catch), ensuring any writer errors propagate and the
batch is flushed timely from on_skip.

},
});

rowIndex++;
(async () => {
// Non-first chunks: inject the header row so csv-parse sees a valid header,
// then skip the partial row at the chunk boundary (tail of the previous chunk).
if (injectedHeader) {
parser.write(`${injectedHeader}\n`);
}
let skipPartialRow = byteStart !== undefined && byteStart > 0;

// Flush valid batch to DB every BATCH_SIZE rows to avoid Worker OOM on large files.
// totalProcessed is incremented atomically inside processValidItemsBatch via updateEtlJobProgress.
if (validItemsBatch.length >= BATCH_SIZE) {
await processValidItemsBatch({ jobId, items: [...validItemsBatch], env });
validItemsBatch.length = 0;
}
// Flush invalid batch to DB every BATCH_SIZE rows.
// totalProcessed is incremented atomically inside processLogsBatch via updateEtlJobProgress.
if (invalidItemsBatch.length >= BATCH_SIZE) {
await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env });
invalidItemsBatch.length = 0;
for await (const chunk of streamToText(r2Object.body)) {
let text = chunk;

if (skipPartialRow) {
// Discard bytes up to and including the first newline — those bytes are
// the tail of the row that the previous chunk already processed.
const nl = text.indexOf('\n');
if (nl === -1) continue; // entire buffer is still the partial row tail
text = text.slice(nl + 1);
skipPartialRow = false;
if (!text) continue;
}

// Respect backpressure: if the parser buffer is full, wait for drain before
// pushing more data. Without this, R2 fills the parser buffer for the entire
// file (up to 600 MB) before the main loop processes any rows → Worker OOM.
const ok = parser.write(text);
if (!ok) await new Promise<void>((resolve) => parser.once('drain', resolve));
}
parser.end();
})();
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

for await (const record of parser) {
if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1)); // Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit on large files
const row = record as string[];
if (!isHeaderProcessed) {
fieldMap = row.reduce<Record<string, number>>((acc, header, idx) => {
acc[header.trim()] = idx;
return acc;
}, {});
isHeaderProcessed = true;
console.log(
`🔍 [TRACE] Header processed - fields: ${Object.keys(fieldMap).length}, mapping:`,
Object.keys(fieldMap),
);
continue;
}

const item = mapCsvRowToItem({ values: row, fieldMap });

if (item) {
const validatedItem = validator.validateItem(item);

if (validatedItem.isValid) {
validItemsBatch.push(validatedItem.item);
} else {
const invalidItemLog = {
jobId,
errors: validatedItem.errors,
rawData: validatedItem.item,
rowIndex,
};
invalidItemsBatch.push(invalidItemLog);
}
}

rowIndex++;

// Flush valid batch to DB every BATCH_SIZE rows to avoid Worker OOM on large files.
// totalProcessed is incremented atomically inside processValidItemsBatch via updateEtlJobProgress.
if (validItemsBatch.length >= BATCH_SIZE) {
await processValidItemsBatch({ jobId, items: [...validItemsBatch], env });
validItemsBatch.length = 0;
}
// Flush invalid batch to DB every BATCH_SIZE rows.
// totalProcessed is incremented atomically inside processLogsBatch via updateEtlJobProgress.
if (invalidItemsBatch.length >= BATCH_SIZE) {
await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env });
invalidItemsBatch.length = 0;
}
}
}

Expand Down
Loading
Loading