Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
285 changes: 198 additions & 87 deletions packages/api/src/services/etl/processCatalogEtl.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { createDbClient } from '@packrat/api/db';
import { mapCsvRowToItem } from '@packrat/api/utils/csv-utils';
import type { Env } from '@packrat/api/utils/env-validation';
import { isJsonlFile, mapJsonRowToItem } from '@packrat/api/utils/json-utils';
import { etlJobs, type NewCatalogItem, type NewInvalidItemLog } from '@packrat/db';
import { toRecord } from '@packrat/guards';
import { parse } from 'csv-parse';
import { eq } from 'drizzle-orm';
import { R2BucketService } from '../r2-bucket';
Expand Down Expand Up @@ -74,111 +76,220 @@ export async function processCatalogETL({
}

let rowIndex = 0;
let fieldMap: Record<string, number> = {};
let isHeaderProcessed = false;
const validItemsBatch: Partial<NewCatalogItem>[] = [];
const invalidItemsBatch: NewInvalidItemLog[] = [];

const validator = new CatalogItemValidator();
const useJsonl = isJsonlFile(objectKey);

const parser = parse({
relax_column_count: true,
relax_quotes: true,
skip_empty_lines: true,
skip_records_with_error: true,
on_skip: (err: Error) => {
const parserLine = (err as { lines?: number }).lines ?? rowIndex;
const parseErrorLog: NewInvalidItemLog = {
jobId,
errors: [{ field: 'csv_parse', reason: err.message }],
rawData: { parseError: err.message },
rowIndex: parserLine,
};
invalidItemsBatch.push(parseErrorLog);
console.warn(
`[ETL] Skipped malformed CSV row at parser line ${parserLine}: ${err.message}`,
);
},
});

(async () => {
// Non-first chunks: inject the header row so csv-parse sees a valid header,
// then skip the partial row at the chunk boundary (tail of the previous chunk).
if (injectedHeader) {
parser.write(`${injectedHeader}\n`);
}
let skipPartialRow = byteStart !== undefined && byteStart > 0;
if (useJsonl) {
// --- JSONL streaming path ---
// No csv-parse, no header injection. Each line is a JSON object.
let buffer = '';
// The chunker snaps boundaries to newlines, so every chunk starts at a
// clean line boundary — no partial first-line skip needed for any chunk.
let firstLineSkipped = true;

for await (const chunk of streamToText(r2Object.body)) {
let text = chunk;

if (skipPartialRow) {
// Discard bytes up to and including the first newline — those bytes are
// the tail of the row that the previous chunk already processed.
const nl = text.indexOf('\n');
if (nl === -1) continue; // entire buffer is still the partial row tail
text = text.slice(nl + 1);
skipPartialRow = false;
if (!text) continue;
}
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop() ?? '';

// Respect backpressure: if the parser buffer is full, wait for drain before
// pushing more data. Without this, R2 fills the parser buffer for the entire
// file (up to 600 MB) before the main loop processes any rows → Worker OOM.
const ok = parser.write(text);
if (!ok) await new Promise<void>((resolve) => parser.once('drain', resolve));
}
parser.end();
})();

for await (const record of parser) {
if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1)); // Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit on large files
const row = record as string[];
if (!isHeaderProcessed) {
fieldMap = row.reduce<Record<string, number>>((acc, header, idx) => {
acc[header.trim()] = idx;
return acc;
}, {});
isHeaderProcessed = true;
console.log(
`🔍 [TRACE] Header processed - fields: ${Object.keys(fieldMap).length}, mapping:`,
Object.keys(fieldMap),
);
continue;
}
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;

if (!firstLineSkipped) {
firstLineSkipped = true;
continue; // discard partial row at chunk boundary
}
Comment on lines +86 to +105

// Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit
if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1));

let obj: Record<string, unknown>;
try {
obj = toRecord(JSON.parse(trimmed));
} catch (parseErr) {
invalidItemsBatch.push({
jobId,
errors: [{ field: 'json_parse', reason: String(parseErr) }],
rawData: { parseError: String(parseErr) },
rowIndex,
});
rowIndex++;
if (invalidItemsBatch.length >= BATCH_SIZE) {
await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env });
invalidItemsBatch.length = 0;
}
continue;
}

const item = mapCsvRowToItem({ values: row, fieldMap });
const item = mapJsonRowToItem(obj);
if (item) {
const validated = validator.validateItem(item);
if (validated.isValid) {
validItemsBatch.push(validated.item);
} else {
invalidItemsBatch.push({
jobId,
errors: validated.errors,
rawData: validated.item,
rowIndex,
});
}
}
rowIndex++;

if (item) {
const validatedItem = validator.validateItem(item);
if (validItemsBatch.length >= BATCH_SIZE) {
await processValidItemsBatch({ jobId, items: [...validItemsBatch], env });
validItemsBatch.length = 0;
}
if (invalidItemsBatch.length >= BATCH_SIZE) {
await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env });
invalidItemsBatch.length = 0;
}
}
}

if (validatedItem.isValid) {
validItemsBatch.push(validatedItem.item);
} else {
const invalidItemLog = {
// Flush remaining buffer line (last line without trailing newline)
const lastLine = buffer.trim();
if (lastLine && firstLineSkipped) {
try {
const obj = toRecord(JSON.parse(lastLine));
const item = mapJsonRowToItem(obj);
if (item) {
const validated = validator.validateItem(item);
if (validated.isValid) {
validItemsBatch.push(validated.item);
} else {
invalidItemsBatch.push({
jobId,
errors: validated.errors,
rawData: validated.item,
rowIndex,
});
}
}
rowIndex++;
} catch (parseErr) {
invalidItemsBatch.push({
jobId,
errors: validatedItem.errors,
rawData: validatedItem.item,
errors: [{ field: 'json_parse', reason: String(parseErr) }],
rawData: { parseError: String(parseErr) },
rowIndex,
};
invalidItemsBatch.push(invalidItemLog);
});
rowIndex++;
}
}
} else {
// --- CSV path (unchanged) ---
let fieldMap: Record<string, number> = {};
let isHeaderProcessed = false;

rowIndex++;
const parser = parse({
relax_column_count: true,
relax_quotes: true,
skip_empty_lines: true,
skip_records_with_error: true,
on_skip: (err) => {
const parserLine = (err as { lines?: number } | undefined)?.lines ?? rowIndex;
const message = err?.message ?? 'unknown parse error';
const parseErrorLog: NewInvalidItemLog = {
jobId,
errors: [{ field: 'csv_parse', reason: message }],
rawData: { parseError: message },
rowIndex: parserLine,
};
invalidItemsBatch.push(parseErrorLog);
console.warn(`[ETL] Skipped malformed CSV row at parser line ${parserLine}: ${message}`);
},
});

// Flush valid batch to DB every BATCH_SIZE rows to avoid Worker OOM on large files.
// totalProcessed is incremented atomically inside processValidItemsBatch via updateEtlJobProgress.
if (validItemsBatch.length >= BATCH_SIZE) {
await processValidItemsBatch({ jobId, items: [...validItemsBatch], env });
validItemsBatch.length = 0;
}
// Flush invalid batch to DB every BATCH_SIZE rows.
// totalProcessed is incremented atomically inside processLogsBatch via updateEtlJobProgress.
if (invalidItemsBatch.length >= BATCH_SIZE) {
await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env });
invalidItemsBatch.length = 0;
const writerPromise = (async () => {
// Non-first chunks: inject the header row so csv-parse sees a valid header,
// then skip the partial row at the chunk boundary (tail of the previous chunk).
if (injectedHeader) {
parser.write(`${injectedHeader}\n`);
}
let skipPartialRow = byteStart !== undefined && byteStart > 0;

for await (const chunk of streamToText(r2Object.body)) {
let text = chunk;

if (skipPartialRow) {
// Discard bytes up to and including the first newline — those bytes are
// the tail of the row that the previous chunk already processed.
const nl = text.indexOf('\n');
if (nl === -1) continue; // entire buffer is still the partial row tail
text = text.slice(nl + 1);
skipPartialRow = false;
if (!text) continue;
}

// Respect backpressure: if the parser buffer is full, wait for drain before
// pushing more data. Without this, R2 fills the parser buffer for the entire
// file (up to 600 MB) before the main loop processes any rows → Worker OOM.
const ok = parser.write(text);
if (!ok) await new Promise<void>((resolve) => parser.once('drain', resolve));
}
parser.end();
})().catch((err) => {
parser.destroy(err instanceof Error ? err : new Error(String(err)));
throw err;
});

for await (const record of parser) {
if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1)); // Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit on large files
const row = record as string[];
if (!isHeaderProcessed) {
fieldMap = row.reduce<Record<string, number>>((acc, header, idx) => {
acc[header.trim()] = idx;
return acc;
}, {});
isHeaderProcessed = true;
console.log(
`🔍 [TRACE] Header processed - fields: ${Object.keys(fieldMap).length}, mapping:`,
Object.keys(fieldMap),
);
continue;
}

const item = mapCsvRowToItem({ values: row, fieldMap });

if (item) {
const validatedItem = validator.validateItem(item);

if (validatedItem.isValid) {
validItemsBatch.push(validatedItem.item);
} else {
const invalidItemLog = {
jobId,
errors: validatedItem.errors,
rawData: validatedItem.item,
rowIndex,
};
invalidItemsBatch.push(invalidItemLog);
}
}

rowIndex++;

// Flush valid batch to DB every BATCH_SIZE rows to avoid Worker OOM on large files.
// totalProcessed is incremented atomically inside processValidItemsBatch via updateEtlJobProgress.
if (validItemsBatch.length >= BATCH_SIZE) {
await processValidItemsBatch({ jobId, items: [...validItemsBatch], env });
validItemsBatch.length = 0;
}
// Flush invalid batch to DB every BATCH_SIZE rows.
// totalProcessed is incremented atomically inside processLogsBatch via updateEtlJobProgress.
if (invalidItemsBatch.length >= BATCH_SIZE) {
await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env });
invalidItemsBatch.length = 0;
}
}

await writerPromise;
}

console.log(`🔍 [TRACE] Streaming complete - processing remaining batches`);
Expand Down
Loading
Loading