From c64cf9b4665e622cc81ae32006a789ffec3d3bec Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 21 May 2026 09:56:58 -0600 Subject: [PATCH 01/10] =?UTF-8?q?=E2=9C=A8=20feat(etl):=20add=20JSONL/NDJS?= =?UTF-8?q?ON=20support=20to=20catalog=20ETL=20pipeline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create json-utils.ts with isJsonlFile() and mapJsonRowToItem() helpers, then branch both the queue-path (processCatalogEtl) and workflow-path (catalog-etl-workflow) processors to stream JSONL when the object key ends in .jsonl/.ndjson. CSV path is unchanged. Also backports relax_quotes + on_skip to the workflow CSV parser to match the queue path. Co-Authored-By: Claude Sonnet 4.6 --- .../api/src/services/etl/processCatalogEtl.ts | 279 ++++++++++++------ packages/api/src/utils/json-utils.ts | 205 +++++++++++++ .../api/src/workflows/catalog-etl-workflow.ts | 230 +++++++++++---- 3 files changed, 572 insertions(+), 142 deletions(-) create mode 100644 packages/api/src/utils/json-utils.ts diff --git a/packages/api/src/services/etl/processCatalogEtl.ts b/packages/api/src/services/etl/processCatalogEtl.ts index 57a8f9f799..405c3f493d 100644 --- a/packages/api/src/services/etl/processCatalogEtl.ts +++ b/packages/api/src/services/etl/processCatalogEtl.ts @@ -1,6 +1,7 @@ import { createDbClient } from '@packrat/api/db'; import { mapCsvRowToItem } from '@packrat/api/utils/csv-utils'; import type { Env } from '@packrat/api/utils/env-validation'; +import { isJsonlFile, mapJsonRowToItem } from '@packrat/api/utils/json-utils'; import { etlJobs, type NewCatalogItem, type NewInvalidItemLog } from '@packrat/db'; import { parse } from 'csv-parse'; import { eq } from 'drizzle-orm'; @@ -74,110 +75,214 @@ export async function processCatalogETL({ } let rowIndex = 0; - let fieldMap: Record = {}; - let isHeaderProcessed = false; const validItemsBatch: Partial[] = []; const invalidItemsBatch: NewInvalidItemLog[] = []; const validator = new CatalogItemValidator(); + const useJsonl = isJsonlFile(objectKey); - const parser = parse({ - relax_column_count: true, - relax_quotes: true, - skip_empty_lines: true, - skip_records_with_error: true, - on_skip: (err: Error) => { - const parserLine = (err as { lines?: number }).lines ?? rowIndex; - const parseErrorLog: NewInvalidItemLog = { - jobId, - errors: [{ field: 'csv_parse', reason: err.message }], - rawData: { parseError: err.message }, - rowIndex: parserLine, - }; - invalidItemsBatch.push(parseErrorLog); - console.warn( - `[ETL] Skipped malformed CSV row at parser line ${parserLine}: ${err.message}`, - ); - }, - }); - - (async () => { - // Non-first chunks: inject the header row so csv-parse sees a valid header, - // then skip the partial row at the chunk boundary (tail of the previous chunk). - if (injectedHeader) { - parser.write(`${injectedHeader}\n`); - } - let skipPartialRow = byteStart !== undefined && byteStart > 0; + if (useJsonl) { + // --- JSONL streaming path --- + // No csv-parse, no header injection. Each line is a JSON object. + let buffer = ''; + const skipPartialLine = byteStart !== undefined && byteStart > 0; + let firstLineSkipped = !skipPartialLine; for await (const chunk of streamToText(r2Object.body)) { - let text = chunk; - - if (skipPartialRow) { - // Discard bytes up to and including the first newline — those bytes are - // the tail of the row that the previous chunk already processed. - const nl = text.indexOf('\n'); - if (nl === -1) continue; // entire buffer is still the partial row tail - text = text.slice(nl + 1); - skipPartialRow = false; - if (!text) continue; - } + buffer += chunk; + const lines = buffer.split('\n'); + buffer = lines.pop() ?? ''; - // Respect backpressure: if the parser buffer is full, wait for drain before - // pushing more data. Without this, R2 fills the parser buffer for the entire - // file (up to 600 MB) before the main loop processes any rows → Worker OOM. - const ok = parser.write(text); - if (!ok) await new Promise((resolve) => parser.once('drain', resolve)); - } - parser.end(); - })(); - - for await (const record of parser) { - if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1)); // Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit on large files - const row = record as string[]; - if (!isHeaderProcessed) { - fieldMap = row.reduce>((acc, header, idx) => { - acc[header.trim()] = idx; - return acc; - }, {}); - isHeaderProcessed = true; - console.log( - `🔍 [TRACE] Header processed - fields: ${Object.keys(fieldMap).length}, mapping:`, - Object.keys(fieldMap), - ); - continue; - } + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; - const item = mapCsvRowToItem({ values: row, fieldMap }); + if (!firstLineSkipped) { + firstLineSkipped = true; + continue; // discard partial row at chunk boundary + } - if (item) { - const validatedItem = validator.validateItem(item); + // Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit + if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1)); - if (validatedItem.isValid) { - validItemsBatch.push(validatedItem.item); - } else { - const invalidItemLog = { + let obj: Record; + try { + obj = JSON.parse(trimmed) as Record; + } catch (parseErr) { + invalidItemsBatch.push({ + jobId, + errors: [{ field: 'json_parse', reason: String(parseErr) }], + rawData: { parseError: String(parseErr) }, + rowIndex, + }); + rowIndex++; + if (invalidItemsBatch.length >= BATCH_SIZE) { + await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); + invalidItemsBatch.length = 0; + } + continue; + } + + const item = mapJsonRowToItem(obj); + if (item) { + const validated = validator.validateItem(item); + if (validated.isValid) { + validItemsBatch.push(validated.item); + } else { + invalidItemsBatch.push({ + jobId, + errors: validated.errors, + rawData: validated.item, + rowIndex, + }); + } + } + rowIndex++; + + if (validItemsBatch.length >= BATCH_SIZE) { + await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); + validItemsBatch.length = 0; + } + if (invalidItemsBatch.length >= BATCH_SIZE) { + await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); + invalidItemsBatch.length = 0; + } + } + } + + // Flush remaining buffer line (last line without trailing newline) + const lastLine = buffer.trim(); + if (lastLine && firstLineSkipped) { + try { + const obj = JSON.parse(lastLine) as Record; + const item = mapJsonRowToItem(obj); + if (item) { + const validated = validator.validateItem(item); + if (validated.isValid) { + validItemsBatch.push(validated.item); + } else { + invalidItemsBatch.push({ + jobId, + errors: validated.errors, + rawData: validated.item, + rowIndex, + }); + } + } + rowIndex++; + } catch (parseErr) { + invalidItemsBatch.push({ jobId, - errors: validatedItem.errors, - rawData: validatedItem.item, + errors: [{ field: 'json_parse', reason: String(parseErr) }], + rawData: { parseError: String(parseErr) }, rowIndex, - }; - invalidItemsBatch.push(invalidItemLog); + }); + rowIndex++; } } + } else { + // --- CSV path (unchanged) --- + let fieldMap: Record = {}; + let isHeaderProcessed = false; + + const parser = parse({ + relax_column_count: true, + relax_quotes: true, + skip_empty_lines: true, + skip_records_with_error: true, + on_skip: (err: Error) => { + const parserLine = (err as { lines?: number }).lines ?? rowIndex; + const parseErrorLog: NewInvalidItemLog = { + jobId, + errors: [{ field: 'csv_parse', reason: err.message }], + rawData: { parseError: err.message }, + rowIndex: parserLine, + }; + invalidItemsBatch.push(parseErrorLog); + console.warn( + `[ETL] Skipped malformed CSV row at parser line ${parserLine}: ${err.message}`, + ); + }, + }); - rowIndex++; + (async () => { + // Non-first chunks: inject the header row so csv-parse sees a valid header, + // then skip the partial row at the chunk boundary (tail of the previous chunk). + if (injectedHeader) { + parser.write(`${injectedHeader}\n`); + } + let skipPartialRow = byteStart !== undefined && byteStart > 0; - // Flush valid batch to DB every BATCH_SIZE rows to avoid Worker OOM on large files. - // totalProcessed is incremented atomically inside processValidItemsBatch via updateEtlJobProgress. - if (validItemsBatch.length >= BATCH_SIZE) { - await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); - validItemsBatch.length = 0; - } - // Flush invalid batch to DB every BATCH_SIZE rows. - // totalProcessed is incremented atomically inside processLogsBatch via updateEtlJobProgress. - if (invalidItemsBatch.length >= BATCH_SIZE) { - await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); - invalidItemsBatch.length = 0; + for await (const chunk of streamToText(r2Object.body)) { + let text = chunk; + + if (skipPartialRow) { + // Discard bytes up to and including the first newline — those bytes are + // the tail of the row that the previous chunk already processed. + const nl = text.indexOf('\n'); + if (nl === -1) continue; // entire buffer is still the partial row tail + text = text.slice(nl + 1); + skipPartialRow = false; + if (!text) continue; + } + + // Respect backpressure: if the parser buffer is full, wait for drain before + // pushing more data. Without this, R2 fills the parser buffer for the entire + // file (up to 600 MB) before the main loop processes any rows → Worker OOM. + const ok = parser.write(text); + if (!ok) await new Promise((resolve) => parser.once('drain', resolve)); + } + parser.end(); + })(); + + for await (const record of parser) { + if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1)); // Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit on large files + const row = record as string[]; + if (!isHeaderProcessed) { + fieldMap = row.reduce>((acc, header, idx) => { + acc[header.trim()] = idx; + return acc; + }, {}); + isHeaderProcessed = true; + console.log( + `🔍 [TRACE] Header processed - fields: ${Object.keys(fieldMap).length}, mapping:`, + Object.keys(fieldMap), + ); + continue; + } + + const item = mapCsvRowToItem({ values: row, fieldMap }); + + if (item) { + const validatedItem = validator.validateItem(item); + + if (validatedItem.isValid) { + validItemsBatch.push(validatedItem.item); + } else { + const invalidItemLog = { + jobId, + errors: validatedItem.errors, + rawData: validatedItem.item, + rowIndex, + }; + invalidItemsBatch.push(invalidItemLog); + } + } + + rowIndex++; + + // Flush valid batch to DB every BATCH_SIZE rows to avoid Worker OOM on large files. + // totalProcessed is incremented atomically inside processValidItemsBatch via updateEtlJobProgress. + if (validItemsBatch.length >= BATCH_SIZE) { + await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); + validItemsBatch.length = 0; + } + // Flush invalid batch to DB every BATCH_SIZE rows. + // totalProcessed is incremented atomically inside processLogsBatch via updateEtlJobProgress. + if (invalidItemsBatch.length >= BATCH_SIZE) { + await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); + invalidItemsBatch.length = 0; + } } } diff --git a/packages/api/src/utils/json-utils.ts b/packages/api/src/utils/json-utils.ts new file mode 100644 index 0000000000..c263e85b8c --- /dev/null +++ b/packages/api/src/utils/json-utils.ts @@ -0,0 +1,205 @@ +import type { NewCatalogItem } from '@packrat/db'; +import { AvailabilitySchema, WeightUnitSchema } from '@packrat/schemas/constants'; +import { parseFaqs, parsePrice, parseWeight, safeJsonParse } from './csv-utils'; + +// Module-level regex constant (Biome useTopLevelRegex) +const NEWLINE_CHARS = /[\r\n]+/g; + +/** + * Returns true if the R2 object key has a JSONL or NDJSON extension. + */ +export function isJsonlFile(objectKey: string): boolean { + const lower = objectKey.toLowerCase(); + return lower.endsWith('.jsonl') || lower.endsWith('.ndjson'); +} + +/** + * Maps a parsed JSON object (one line from a JSONL file) to a partial catalog item. + * Uses `unknown` with proper type narrowing — no `any`. + */ +export function mapJsonRowToItem(obj: Record): Partial | null { + const item: Partial = {}; + + // --- String scalar fields --- + const rawName = obj.name; + if (typeof rawName === 'string') item.name = rawName.trim(); + + const rawProductUrl = obj.productUrl; + if (typeof rawProductUrl === 'string') item.productUrl = rawProductUrl.trim(); + + const rawCurrency = obj.currency; + if (typeof rawCurrency === 'string') item.currency = rawCurrency.trim(); + + const rawBrand = obj.brand; + if (typeof rawBrand === 'string') item.brand = rawBrand.trim(); + + const rawModel = obj.model; + if (typeof rawModel === 'string') item.model = rawModel.trim(); + + const rawColor = obj.color; + if (typeof rawColor === 'string') item.color = rawColor.trim(); + + const rawSize = obj.size; + if (typeof rawSize === 'string') item.size = rawSize.trim(); + + const rawSku = obj.sku; + if (typeof rawSku === 'string') item.sku = rawSku.trim(); + + const rawProductSku = obj.productSku; + if (typeof rawProductSku === 'string') item.productSku = rawProductSku.trim(); + + const rawSeller = obj.seller; + if (typeof rawSeller === 'string') item.seller = rawSeller.trim(); + + const rawMaterial = obj.material; + if (typeof rawMaterial === 'string') item.material = rawMaterial.trim(); + + const rawCondition = obj.condition; + if (typeof rawCondition === 'string') item.condition = rawCondition.trim(); + + // --- Description: strip newline chars --- + const rawDescription = obj.description; + if (typeof rawDescription === 'string') { + item.description = rawDescription.replace(NEWLINE_CHARS, ' ').trim(); + } + + // --- reviewCount: direct number or parse from string --- + const rawReviewCount = obj.reviewCount; + if (typeof rawReviewCount === 'number') { + item.reviewCount = Math.trunc(rawReviewCount) || 0; + } else if (typeof rawReviewCount === 'string') { + item.reviewCount = parseInt(rawReviewCount, 10) || 0; + } else { + item.reviewCount = 0; + } + + // --- price: direct number or parsePrice from string --- + const rawPrice = obj.price; + if (typeof rawPrice === 'number') { + item.price = rawPrice; + } else if (typeof rawPrice === 'string') { + item.price = parsePrice(rawPrice) ?? undefined; + } + + // --- ratingValue: direct number or parseFloat from string --- + const rawRatingValue = obj.ratingValue; + if (typeof rawRatingValue === 'number') { + item.ratingValue = rawRatingValue; + } else if (typeof rawRatingValue === 'string') { + const parsed = parseFloat(rawRatingValue); + item.ratingValue = Number.isNaN(parsed) ? null : parsed; + } + + // --- categories: array passthrough or split string --- + const rawCategories = obj.categories; + if (Array.isArray(rawCategories)) { + item.categories = rawCategories.filter((c): c is string => typeof c === 'string'); + } else if (typeof rawCategories === 'string' && rawCategories.trim()) { + const val = rawCategories.trim(); + try { + item.categories = val.startsWith('[') + ? JSON.parse(val) + : val + .split(',') + .map((v) => v.trim()) + .filter(Boolean); + } catch { + item.categories = [val]; + } + } + + // --- images: array passthrough --- + const rawImages = obj.images; + if (Array.isArray(rawImages)) { + item.images = rawImages.filter((i): i is string => typeof i === 'string'); + } + + // --- weight + weightUnit --- + const rawWeight = obj.weight; + const rawWeightUnit = obj.weightUnit; + const unitStr = typeof rawWeightUnit === 'string' ? rawWeightUnit : undefined; + + if (typeof rawWeight === 'number' && rawWeight > 0) { + const { weight, unit } = parseWeight(String(rawWeight), unitStr); + item.weight = weight ?? undefined; + const parsedUnit = WeightUnitSchema.safeParse(unit); + item.weightUnit = parsedUnit.success ? parsedUnit.data : undefined; + } else if (typeof rawWeight === 'string' && parseFloat(rawWeight) > 0) { + const { weight, unit } = parseWeight(rawWeight, unitStr); + item.weight = weight ?? undefined; + const parsedUnit = WeightUnitSchema.safeParse(unit); + item.weightUnit = parsedUnit.success ? parsedUnit.data : undefined; + } + + // --- variants: passthrough as-is (already objects) --- + const rawVariants = obj.variants; + if (Array.isArray(rawVariants)) { + item.variants = rawVariants as NewCatalogItem['variants']; + } + + // --- links: passthrough --- + const rawLinks = obj.links; + if (Array.isArray(rawLinks)) { + item.links = rawLinks as NewCatalogItem['links']; + } + + // --- reviews: passthrough --- + const rawReviews = obj.reviews; + if (Array.isArray(rawReviews)) { + item.reviews = rawReviews as NewCatalogItem['reviews']; + } + + // --- qas: passthrough --- + const rawQas = obj.qas; + if (Array.isArray(rawQas)) { + item.qas = rawQas as NewCatalogItem['qas']; + } + + // --- faqs: array passthrough or parseFaqs from string --- + const rawFaqs = obj.faqs; + if (Array.isArray(rawFaqs)) { + item.faqs = rawFaqs as NewCatalogItem['faqs']; + } else if (typeof rawFaqs === 'string' && rawFaqs.trim()) { + try { + item.faqs = parseFaqs(rawFaqs); + } catch { + item.faqs = []; + } + } + + // --- techs: passthrough --- + const rawTechs = obj.techs; + if (rawTechs !== null && typeof rawTechs === 'object' && !Array.isArray(rawTechs)) { + item.techs = rawTechs as Record; + } else if (typeof rawTechs === 'string' && rawTechs.trim()) { + try { + const parsed = safeJsonParse>(rawTechs); + item.techs = Array.isArray(parsed) ? {} : parsed; + } catch { + item.techs = {}; + } + } + + // --- weight fallback from techs (same as CSV path) --- + if (!item.weight && item.techs && typeof item.techs === 'object') { + const techs = item.techs as Record; + const claimedWeight = techs['Claimed Weight'] ?? techs.weight; + if (claimedWeight) { + const { weight, unit } = parseWeight(claimedWeight); + item.weight = weight ?? undefined; + const parsedUnit = WeightUnitSchema.safeParse(unit); + item.weightUnit = parsedUnit.success ? parsedUnit.data : undefined; + } + } + + // --- availability: string → AvailabilitySchema.safeParse --- + const rawAvailability = obj.availability; + if (typeof rawAvailability === 'string' && rawAvailability.trim()) { + const parsedAvailability = AvailabilitySchema.safeParse(rawAvailability.trim()); + if (parsedAvailability.success) { + item.availability = parsedAvailability.data; + } + } + + return item; +} diff --git a/packages/api/src/workflows/catalog-etl-workflow.ts b/packages/api/src/workflows/catalog-etl-workflow.ts index 914239d303..f9eeabacc2 100644 --- a/packages/api/src/workflows/catalog-etl-workflow.ts +++ b/packages/api/src/workflows/catalog-etl-workflow.ts @@ -27,6 +27,7 @@ import { R2BucketService } from '@packrat/api/services/r2-bucket'; import { mapCsvRowToItem } from '@packrat/api/utils/csv-utils'; import type { Env } from '@packrat/api/utils/env-validation'; import { setWorkerEnv } from '@packrat/api/utils/env-validation'; +import { isJsonlFile, mapJsonRowToItem } from '@packrat/api/utils/json-utils'; import { etlJobs, type NewCatalogItem, type NewInvalidItemLog } from '@packrat/db'; import { parse } from 'csv-parse'; import { eq } from 'drizzle-orm'; @@ -94,7 +95,7 @@ export async function processChunk({ const r2 = new R2BucketService({ env, bucketType: 'catalog' }); const isNonFirstChunk = chunk.chunkIndex > 0; - const injectedHeader = isNonFirstChunk ? await fetchHeaderRow(r2, chunk.objectKey) : ''; + const useJsonl = isJsonlFile(chunk.objectKey); const length = chunk.byteEnd - chunk.byteStart + 1; const obj = await r2.get(chunk.objectKey, { @@ -106,78 +107,197 @@ export async function processChunk({ const invalidItemsBatch: NewInvalidItemLog[] = []; const validator = new CatalogItemValidator(); - const parser = parse({ - relax_column_count: true, - skip_empty_lines: true, - }); - - const writerPromise = (async () => { - if (injectedHeader) { - parser.write(`${injectedHeader}\n`); - } - for await (const text of streamToText(obj.body)) { - const ok = parser.write(text); - if (!ok) { - await new Promise((resolve) => parser.once('drain', resolve)); - } - } - parser.end(); - })().catch((err) => { - parser.destroy(err instanceof Error ? err : new Error(String(err))); - throw err; - }); - let rowIndex = 0; let rowsValid = 0; let rowsInvalid = 0; - let fieldMap: Record = {}; - let isHeaderProcessed = false; - for await (const record of parser) { - if (rowIndex % 100 === 0) { - await new Promise((resolve) => setTimeout(resolve, 0)); - } - const row = record as string[]; + if (useJsonl) { + // --- JSONL streaming path --- + let buffer = ''; + const skipPartialLine = isNonFirstChunk; + let firstLineSkipped = !skipPartialLine; - if (!isHeaderProcessed) { - fieldMap = {}; - for (const [idx, header] of row.entries()) { - fieldMap[header.trim()] = idx; + for await (const text of streamToText(obj.body)) { + buffer += text; + const lines = buffer.split('\n'); + buffer = lines.pop() ?? ''; + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + + if (!firstLineSkipped) { + firstLineSkipped = true; + continue; // discard partial row at chunk boundary + } + + if (rowIndex % 100 === 0) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } + + let parsedObj: Record; + try { + parsedObj = JSON.parse(trimmed) as Record; + } catch (parseErr) { + invalidItemsBatch.push({ + jobId, + errors: [{ field: 'json_parse', reason: String(parseErr) }], + rawData: { parseError: String(parseErr) }, + rowIndex, + }); + rowIndex++; + if (invalidItemsBatch.length >= BATCH_SIZE) { + await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); + rowsInvalid += invalidItemsBatch.length; + invalidItemsBatch.length = 0; + } + continue; + } + + const item = mapJsonRowToItem(parsedObj); + if (item) { + const validated = validator.validateItem(item); + if (validated.isValid) { + validItemsBatch.push(validated.item); + } else { + invalidItemsBatch.push({ + jobId, + errors: validated.errors, + rawData: validated.item, + rowIndex, + }); + } + } + rowIndex++; + + if (validItemsBatch.length >= BATCH_SIZE) { + await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); + rowsValid += validItemsBatch.length; + validItemsBatch.length = 0; + } + if (invalidItemsBatch.length >= BATCH_SIZE) { + await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); + rowsInvalid += invalidItemsBatch.length; + invalidItemsBatch.length = 0; + } } - isHeaderProcessed = true; - continue; } - const item = mapCsvRowToItem({ values: row, fieldMap }); - if (item) { - const validated = validator.validateItem(item); - if (validated.isValid) { - validItemsBatch.push(validated.item); - } else { + // Flush remaining buffer line (last line without trailing newline) + const lastLine = buffer.trim(); + if (lastLine && firstLineSkipped) { + try { + const parsedObj = JSON.parse(lastLine) as Record; + const item = mapJsonRowToItem(parsedObj); + if (item) { + const validated = validator.validateItem(item); + if (validated.isValid) { + validItemsBatch.push(validated.item); + } else { + invalidItemsBatch.push({ + jobId, + errors: validated.errors, + rawData: validated.item, + rowIndex, + }); + } + } + rowIndex++; + } catch (parseErr) { invalidItemsBatch.push({ jobId, - errors: validated.errors, - rawData: validated.item, + errors: [{ field: 'json_parse', reason: String(parseErr) }], + rawData: { parseError: String(parseErr) }, rowIndex, }); + rowIndex++; } } + } else { + // --- CSV path --- + const injectedHeader = isNonFirstChunk ? await fetchHeaderRow(r2, chunk.objectKey) : ''; - rowIndex++; + let fieldMap: Record = {}; + let isHeaderProcessed = false; - if (validItemsBatch.length >= BATCH_SIZE) { - await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); - rowsValid += validItemsBatch.length; - validItemsBatch.length = 0; - } - if (invalidItemsBatch.length >= BATCH_SIZE) { - await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); - rowsInvalid += invalidItemsBatch.length; - invalidItemsBatch.length = 0; + const parser = parse({ + relax_column_count: true, + relax_quotes: true, + skip_empty_lines: true, + skip_records_with_error: true, + on_skip: (err: Error) => { + const parserLine = (err as { lines?: number }).lines ?? rowIndex; + invalidItemsBatch.push({ + jobId, + errors: [{ field: 'csv_parse', reason: err.message }], + rawData: { parseError: err.message }, + rowIndex: parserLine, + }); + }, + }); + + const writerPromise = (async () => { + if (injectedHeader) { + parser.write(`${injectedHeader}\n`); + } + for await (const text of streamToText(obj.body)) { + const ok = parser.write(text); + if (!ok) { + await new Promise((resolve) => parser.once('drain', resolve)); + } + } + parser.end(); + })().catch((err) => { + parser.destroy(err instanceof Error ? err : new Error(String(err))); + throw err; + }); + + for await (const record of parser) { + if (rowIndex % 100 === 0) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } + const row = record as string[]; + + if (!isHeaderProcessed) { + fieldMap = {}; + for (const [idx, header] of row.entries()) { + fieldMap[header.trim()] = idx; + } + isHeaderProcessed = true; + continue; + } + + const item = mapCsvRowToItem({ values: row, fieldMap }); + if (item) { + const validated = validator.validateItem(item); + if (validated.isValid) { + validItemsBatch.push(validated.item); + } else { + invalidItemsBatch.push({ + jobId, + errors: validated.errors, + rawData: validated.item, + rowIndex, + }); + } + } + + rowIndex++; + + if (validItemsBatch.length >= BATCH_SIZE) { + await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); + rowsValid += validItemsBatch.length; + validItemsBatch.length = 0; + } + if (invalidItemsBatch.length >= BATCH_SIZE) { + await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); + rowsInvalid += invalidItemsBatch.length; + invalidItemsBatch.length = 0; + } } - } - await writerPromise; + await writerPromise; + } if (validItemsBatch.length > 0) { await processValidItemsBatch({ jobId, items: validItemsBatch, env }); From 603d281658a8001333783ac1d6f937be197ba261 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 21 May 2026 10:46:02 -0600 Subject: [PATCH 02/10] =?UTF-8?q?=F0=9F=9B=A0=EF=B8=8F=20fix(json-utils):?= =?UTF-8?q?=20use=20@packrat/guards,=20add=20unit=20tests=20for=20coverage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace raw typeof checks with isString/isNumber/isObject from @packrat/guards - Fixes custom lint rule violation (no-raw-typeof CI check) - Add json-utils.test.ts with 30+ tests covering all branches - Brings json-utils.ts line/statement coverage above 95% threshold Co-Authored-By: Claude Sonnet 4.6 --- .../src/utils/__tests__/json-utils.test.ts | 218 ++++++++++++++++++ packages/api/src/utils/json-utils.ts | 61 ++--- 2 files changed, 249 insertions(+), 30 deletions(-) create mode 100644 packages/api/src/utils/__tests__/json-utils.test.ts diff --git a/packages/api/src/utils/__tests__/json-utils.test.ts b/packages/api/src/utils/__tests__/json-utils.test.ts new file mode 100644 index 0000000000..fbe93ac2b2 --- /dev/null +++ b/packages/api/src/utils/__tests__/json-utils.test.ts @@ -0,0 +1,218 @@ +import { describe, expect, it } from 'vitest'; +import { isJsonlFile, mapJsonRowToItem } from '../json-utils'; + +describe('json-utils', () => { + describe('isJsonlFile', () => { + it('returns true for .jsonl extension', () => { + expect(isJsonlFile('v2/brand/file.jsonl')).toBe(true); + }); + + it('returns true for .ndjson extension', () => { + expect(isJsonlFile('v2/brand/file.ndjson')).toBe(true); + }); + + it('returns true for uppercase extensions', () => { + expect(isJsonlFile('FILE.JSONL')).toBe(true); + expect(isJsonlFile('FILE.NDJSON')).toBe(true); + }); + + it('returns false for .csv extension', () => { + expect(isJsonlFile('v2/brand/file.csv')).toBe(false); + }); + + it('returns false for .json extension', () => { + expect(isJsonlFile('v2/brand/file.json')).toBe(false); + }); + }); + + describe('mapJsonRowToItem', () => { + it('maps basic string scalar fields', () => { + const result = mapJsonRowToItem({ + name: ' Trail Shoe ', + brand: 'Salomon', + model: 'XT-6', + color: 'Black', + size: 'M', + sku: 'SKU-001', + productSku: 'PROD-001', + seller: 'REI', + material: 'Mesh', + condition: 'new', + currency: 'USD', + productUrl: ' https://example.com ', + }); + + expect(result).toMatchObject({ + name: 'Trail Shoe', + brand: 'Salomon', + model: 'XT-6', + color: 'Black', + size: 'M', + sku: 'SKU-001', + productSku: 'PROD-001', + seller: 'REI', + material: 'Mesh', + condition: 'new', + currency: 'USD', + productUrl: 'https://example.com', + }); + }); + + it('strips newlines from description', () => { + const result = mapJsonRowToItem({ + description: 'Line one\nLine two\r\nLine three', + }); + expect(result?.description).toBe('Line one Line two Line three'); + }); + + it('maps reviewCount from number', () => { + const result = mapJsonRowToItem({ reviewCount: 42.9 }); + expect(result?.reviewCount).toBe(42); + }); + + it('maps reviewCount from string', () => { + const result = mapJsonRowToItem({ reviewCount: '128' }); + expect(result?.reviewCount).toBe(128); + }); + + it('defaults reviewCount to 0 for missing value', () => { + const result = mapJsonRowToItem({}); + expect(result?.reviewCount).toBe(0); + }); + + it('maps price from number', () => { + const result = mapJsonRowToItem({ price: 149.99 }); + expect(result?.price).toBe(149.99); + }); + + it('maps price from string', () => { + const result = mapJsonRowToItem({ price: '$129.00' }); + expect(result?.price).toBeCloseTo(129.0); + }); + + it('maps ratingValue from number', () => { + const result = mapJsonRowToItem({ ratingValue: 4.5 }); + expect(result?.ratingValue).toBe(4.5); + }); + + it('maps ratingValue from string', () => { + const result = mapJsonRowToItem({ ratingValue: '4.2' }); + expect(result?.ratingValue).toBeCloseTo(4.2); + }); + + it('sets ratingValue to null for invalid string', () => { + const result = mapJsonRowToItem({ ratingValue: 'N/A' }); + expect(result?.ratingValue).toBeNull(); + }); + + it('passes categories array through', () => { + const result = mapJsonRowToItem({ categories: ['Footwear', 'Trail Running'] }); + expect(result?.categories).toEqual(['Footwear', 'Trail Running']); + }); + + it('filters non-string values from categories array', () => { + const result = mapJsonRowToItem({ categories: ['Footwear', 42, null, 'Trail'] }); + expect(result?.categories).toEqual(['Footwear', 'Trail']); + }); + + it('splits categories from comma-separated string', () => { + const result = mapJsonRowToItem({ categories: 'Footwear, Trail Running' }); + expect(result?.categories).toEqual(['Footwear', 'Trail Running']); + }); + + it('parses categories from JSON array string', () => { + const result = mapJsonRowToItem({ categories: '["Footwear","Trail Running"]' }); + expect(result?.categories).toEqual(['Footwear', 'Trail Running']); + }); + + it('wraps unparseable categories string in array', () => { + const result = mapJsonRowToItem({ categories: 'Footwear' }); + expect(result?.categories).toEqual(['Footwear']); + }); + + it('passes images array through, filtering non-strings', () => { + const result = mapJsonRowToItem({ images: ['https://img1.jpg', 42, 'https://img2.jpg'] }); + expect(result?.images).toEqual(['https://img1.jpg', 'https://img2.jpg']); + }); + + it('maps weight from number with unit string', () => { + const result = mapJsonRowToItem({ weight: 280, weightUnit: 'g' }); + expect(result?.weight).toBeGreaterThan(0); + expect(result?.weightUnit).toBeDefined(); + }); + + it('maps weight from string', () => { + const result = mapJsonRowToItem({ weight: '1.5 lbs' }); + expect(result?.weight).toBeGreaterThan(0); + }); + + it('ignores weight of zero', () => { + const result = mapJsonRowToItem({ weight: 0 }); + expect(result?.weight).toBeUndefined(); + }); + + it('passes variants, links, reviews, qas arrays through', () => { + const variants = [{ color: 'Red' }]; + const links = [{ url: 'https://example.com' }]; + const reviews = [{ text: 'Great!' }]; + const qas = [{ question: 'Size?', answer: 'True to size' }]; + const result = mapJsonRowToItem({ variants, links, reviews, qas }); + expect(result?.variants).toBe(variants); + expect(result?.links).toBe(links); + expect(result?.reviews).toBe(reviews); + expect(result?.qas).toBe(qas); + }); + + it('passes faqs array through', () => { + const faqs = [{ question: 'Is it waterproof?', answer: 'Yes' }]; + const result = mapJsonRowToItem({ faqs }); + expect(result?.faqs).toBe(faqs); + }); + + it('passes techs object through', () => { + const techs = { 'Claimed Weight': '280g', Material: 'Mesh' }; + const result = mapJsonRowToItem({ techs }); + expect(result?.techs).toEqual(techs); + }); + + it('parses techs from JSON string', () => { + const result = mapJsonRowToItem({ + techs: '{"Claimed Weight":"280g","Material":"Mesh"}', + }); + expect(result?.techs).toEqual({ 'Claimed Weight': '280g', Material: 'Mesh' }); + }); + + it('falls back to weight from techs Claimed Weight field', () => { + const result = mapJsonRowToItem({ techs: { 'Claimed Weight': '280g' } }); + expect(result?.weight).toBeGreaterThan(0); + }); + + it('falls back to weight from techs weight field', () => { + const result = mapJsonRowToItem({ techs: { weight: '1.2 lbs' } }); + expect(result?.weight).toBeGreaterThan(0); + }); + + it('maps availability from valid string', () => { + const result = mapJsonRowToItem({ availability: 'InStock' }); + expect(result?.availability).toBe('InStock'); + }); + + it('ignores invalid availability value', () => { + const result = mapJsonRowToItem({ availability: 'maybe' }); + expect(result?.availability).toBeUndefined(); + }); + + it('returns empty item for empty input object', () => { + const result = mapJsonRowToItem({}); + expect(result).toBeDefined(); + expect(result?.reviewCount).toBe(0); + }); + + it('ignores non-string/non-number values for scalar fields', () => { + const result = mapJsonRowToItem({ name: 42, brand: null, price: [] }); + expect(result?.name).toBeUndefined(); + expect(result?.brand).toBeUndefined(); + expect(result?.price).toBeUndefined(); + }); + }); +}); diff --git a/packages/api/src/utils/json-utils.ts b/packages/api/src/utils/json-utils.ts index c263e85b8c..8f48ba4d8a 100644 --- a/packages/api/src/utils/json-utils.ts +++ b/packages/api/src/utils/json-utils.ts @@ -1,4 +1,5 @@ import type { NewCatalogItem } from '@packrat/db'; +import { isNumber, isObject, isString } from '@packrat/guards'; import { AvailabilitySchema, WeightUnitSchema } from '@packrat/schemas/constants'; import { parseFaqs, parsePrice, parseWeight, safeJsonParse } from './csv-utils'; @@ -22,52 +23,52 @@ export function mapJsonRowToItem(obj: Record): Partial): Partial): Partial typeof c === 'string'); - } else if (typeof rawCategories === 'string' && rawCategories.trim()) { + item.categories = rawCategories.filter((c): c is string => isString(c)); + } else if (isString(rawCategories) && rawCategories.trim()) { const val = rawCategories.trim(); try { item.categories = val.startsWith('[') @@ -111,20 +112,20 @@ export function mapJsonRowToItem(obj: Record): Partial typeof i === 'string'); + item.images = rawImages.filter((i): i is string => isString(i)); } // --- weight + weightUnit --- const rawWeight = obj.weight; const rawWeightUnit = obj.weightUnit; - const unitStr = typeof rawWeightUnit === 'string' ? rawWeightUnit : undefined; + const unitStr = isString(rawWeightUnit) ? rawWeightUnit : undefined; - if (typeof rawWeight === 'number' && rawWeight > 0) { + if (isNumber(rawWeight) && rawWeight > 0) { const { weight, unit } = parseWeight(String(rawWeight), unitStr); item.weight = weight ?? undefined; const parsedUnit = WeightUnitSchema.safeParse(unit); item.weightUnit = parsedUnit.success ? parsedUnit.data : undefined; - } else if (typeof rawWeight === 'string' && parseFloat(rawWeight) > 0) { + } else if (isString(rawWeight) && parseFloat(rawWeight) > 0) { const { weight, unit } = parseWeight(rawWeight, unitStr); item.weight = weight ?? undefined; const parsedUnit = WeightUnitSchema.safeParse(unit); @@ -159,7 +160,7 @@ export function mapJsonRowToItem(obj: Record): Partial): Partial; - } else if (typeof rawTechs === 'string' && rawTechs.trim()) { + } else if (isString(rawTechs) && rawTechs.trim()) { try { const parsed = safeJsonParse>(rawTechs); item.techs = Array.isArray(parsed) ? {} : parsed; @@ -181,7 +182,7 @@ export function mapJsonRowToItem(obj: Record): Partial; const claimedWeight = techs['Claimed Weight'] ?? techs.weight; if (claimedWeight) { @@ -194,7 +195,7 @@ export function mapJsonRowToItem(obj: Record): Partial Date: Thu, 21 May 2026 10:55:20 -0600 Subject: [PATCH 03/10] =?UTF-8?q?=F0=9F=9B=A0=EF=B8=8F=20fix(etl):=20repla?= =?UTF-8?q?ce=20unsafe=20casts=20with=20@packrat/guards,=20fix=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Use toRecord() for JSON.parse results (catalog-etl-workflow, processCatalogEtl) - Use toStringRecord() for techs narrowing (json-utils) - Fix availability test value (in_stock not InStock) Co-Authored-By: Claude Sonnet 4.6 --- packages/api/src/services/etl/processCatalogEtl.ts | 5 +++-- packages/api/src/utils/__tests__/json-utils.test.ts | 6 +++--- packages/api/src/utils/json-utils.ts | 6 +++--- packages/api/src/workflows/catalog-etl-workflow.ts | 5 +++-- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/packages/api/src/services/etl/processCatalogEtl.ts b/packages/api/src/services/etl/processCatalogEtl.ts index 405c3f493d..9d22af7a20 100644 --- a/packages/api/src/services/etl/processCatalogEtl.ts +++ b/packages/api/src/services/etl/processCatalogEtl.ts @@ -1,4 +1,5 @@ import { createDbClient } from '@packrat/api/db'; +import { toRecord } from '@packrat/guards'; import { mapCsvRowToItem } from '@packrat/api/utils/csv-utils'; import type { Env } from '@packrat/api/utils/env-validation'; import { isJsonlFile, mapJsonRowToItem } from '@packrat/api/utils/json-utils'; @@ -107,7 +108,7 @@ export async function processCatalogETL({ let obj: Record; try { - obj = JSON.parse(trimmed) as Record; + obj = toRecord(JSON.parse(trimmed)); } catch (parseErr) { invalidItemsBatch.push({ jobId, @@ -154,7 +155,7 @@ export async function processCatalogETL({ const lastLine = buffer.trim(); if (lastLine && firstLineSkipped) { try { - const obj = JSON.parse(lastLine) as Record; + const obj = toRecord(JSON.parse(lastLine)); const item = mapJsonRowToItem(obj); if (item) { const validated = validator.validateItem(item); diff --git a/packages/api/src/utils/__tests__/json-utils.test.ts b/packages/api/src/utils/__tests__/json-utils.test.ts index fbe93ac2b2..b726a0588e 100644 --- a/packages/api/src/utils/__tests__/json-utils.test.ts +++ b/packages/api/src/utils/__tests__/json-utils.test.ts @@ -193,12 +193,12 @@ describe('json-utils', () => { }); it('maps availability from valid string', () => { - const result = mapJsonRowToItem({ availability: 'InStock' }); - expect(result?.availability).toBe('InStock'); + const result = mapJsonRowToItem({ availability: 'in_stock' }); + expect(result?.availability).toBe('in_stock'); }); it('ignores invalid availability value', () => { - const result = mapJsonRowToItem({ availability: 'maybe' }); + const result = mapJsonRowToItem({ availability: 'InStock' }); expect(result?.availability).toBeUndefined(); }); diff --git a/packages/api/src/utils/json-utils.ts b/packages/api/src/utils/json-utils.ts index 8f48ba4d8a..3a15e3c348 100644 --- a/packages/api/src/utils/json-utils.ts +++ b/packages/api/src/utils/json-utils.ts @@ -1,5 +1,5 @@ import type { NewCatalogItem } from '@packrat/db'; -import { isNumber, isObject, isString } from '@packrat/guards'; +import { isNumber, isObject, isString, toStringRecord } from '@packrat/guards'; import { AvailabilitySchema, WeightUnitSchema } from '@packrat/schemas/constants'; import { parseFaqs, parsePrice, parseWeight, safeJsonParse } from './csv-utils'; @@ -171,7 +171,7 @@ export function mapJsonRowToItem(obj: Record): Partial; + item.techs = toStringRecord(rawTechs); } else if (isString(rawTechs) && rawTechs.trim()) { try { const parsed = safeJsonParse>(rawTechs); @@ -183,7 +183,7 @@ export function mapJsonRowToItem(obj: Record): Partial; + const techs = toStringRecord(item.techs); const claimedWeight = techs['Claimed Weight'] ?? techs.weight; if (claimedWeight) { const { weight, unit } = parseWeight(claimedWeight); diff --git a/packages/api/src/workflows/catalog-etl-workflow.ts b/packages/api/src/workflows/catalog-etl-workflow.ts index f9eeabacc2..e379a93ac5 100644 --- a/packages/api/src/workflows/catalog-etl-workflow.ts +++ b/packages/api/src/workflows/catalog-etl-workflow.ts @@ -24,6 +24,7 @@ import { BATCH_SIZE } from '@packrat/api/services/etl/processCatalogEtl'; import { processLogsBatch } from '@packrat/api/services/etl/processLogsBatch'; import { processValidItemsBatch } from '@packrat/api/services/etl/processValidItemsBatch'; import { R2BucketService } from '@packrat/api/services/r2-bucket'; +import { toRecord } from '@packrat/guards'; import { mapCsvRowToItem } from '@packrat/api/utils/csv-utils'; import type { Env } from '@packrat/api/utils/env-validation'; import { setWorkerEnv } from '@packrat/api/utils/env-validation'; @@ -137,7 +138,7 @@ export async function processChunk({ let parsedObj: Record; try { - parsedObj = JSON.parse(trimmed) as Record; + parsedObj = toRecord(JSON.parse(trimmed)); } catch (parseErr) { invalidItemsBatch.push({ jobId, @@ -187,7 +188,7 @@ export async function processChunk({ const lastLine = buffer.trim(); if (lastLine && firstLineSkipped) { try { - const parsedObj = JSON.parse(lastLine) as Record; + const parsedObj = toRecord(JSON.parse(lastLine)); const item = mapJsonRowToItem(parsedObj); if (item) { const validated = validator.validateItem(item); From 2639f80fee823e0ece3483889389599db818efd6 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 21 May 2026 11:13:30 -0600 Subject: [PATCH 04/10] =?UTF-8?q?=F0=9F=9B=A0=EF=B8=8F=20fix(etl):=20addre?= =?UTF-8?q?ss=20CR/Copilot=20comments=20=E2=80=94=20chunk=20skip,=20import?= =?UTF-8?q?s,=20types?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove incorrect JSONL partial-line skip (chunker guarantees clean boundaries; skipPartialLine was dropping the first valid record per non-first chunk) - Fix @packrat/guards import order for Biome organizeImports (after @packrat/db) - Use @packrat/api/utils/csv-utils alias instead of relative path in json-utils - Filter non-strings from JSON-parsed categories array (parity with native branch) - Apply toStringRecord to safeJsonParse techs result (parity with native branch) - Add test for JSON array categories non-string filtering Co-Authored-By: Claude Sonnet 4.6 --- packages/api/src/services/etl/processCatalogEtl.ts | 7 ++++--- packages/api/src/utils/__tests__/json-utils.test.ts | 5 +++++ packages/api/src/utils/json-utils.ts | 6 +++--- packages/api/src/workflows/catalog-etl-workflow.ts | 7 ++++--- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/packages/api/src/services/etl/processCatalogEtl.ts b/packages/api/src/services/etl/processCatalogEtl.ts index 9d22af7a20..542ad9e452 100644 --- a/packages/api/src/services/etl/processCatalogEtl.ts +++ b/packages/api/src/services/etl/processCatalogEtl.ts @@ -1,9 +1,9 @@ import { createDbClient } from '@packrat/api/db'; -import { toRecord } from '@packrat/guards'; import { mapCsvRowToItem } from '@packrat/api/utils/csv-utils'; import type { Env } from '@packrat/api/utils/env-validation'; import { isJsonlFile, mapJsonRowToItem } from '@packrat/api/utils/json-utils'; import { etlJobs, type NewCatalogItem, type NewInvalidItemLog } from '@packrat/db'; +import { toRecord } from '@packrat/guards'; import { parse } from 'csv-parse'; import { eq } from 'drizzle-orm'; import { R2BucketService } from '../r2-bucket'; @@ -86,8 +86,9 @@ export async function processCatalogETL({ // --- JSONL streaming path --- // No csv-parse, no header injection. Each line is a JSON object. let buffer = ''; - const skipPartialLine = byteStart !== undefined && byteStart > 0; - let firstLineSkipped = !skipPartialLine; + // The chunker snaps boundaries to newlines, so every chunk starts at a + // clean line boundary — no partial first-line skip needed for any chunk. + let firstLineSkipped = true; for await (const chunk of streamToText(r2Object.body)) { buffer += chunk; diff --git a/packages/api/src/utils/__tests__/json-utils.test.ts b/packages/api/src/utils/__tests__/json-utils.test.ts index b726a0588e..cc78ac20c2 100644 --- a/packages/api/src/utils/__tests__/json-utils.test.ts +++ b/packages/api/src/utils/__tests__/json-utils.test.ts @@ -125,6 +125,11 @@ describe('json-utils', () => { expect(result?.categories).toEqual(['Footwear', 'Trail Running']); }); + it('filters non-strings from JSON array string categories', () => { + const result = mapJsonRowToItem({ categories: '["Footwear",42,null]' }); + expect(result?.categories).toEqual(['Footwear']); + }); + it('wraps unparseable categories string in array', () => { const result = mapJsonRowToItem({ categories: 'Footwear' }); expect(result?.categories).toEqual(['Footwear']); diff --git a/packages/api/src/utils/json-utils.ts b/packages/api/src/utils/json-utils.ts index 3a15e3c348..d20a5bd018 100644 --- a/packages/api/src/utils/json-utils.ts +++ b/packages/api/src/utils/json-utils.ts @@ -1,7 +1,7 @@ import type { NewCatalogItem } from '@packrat/db'; import { isNumber, isObject, isString, toStringRecord } from '@packrat/guards'; import { AvailabilitySchema, WeightUnitSchema } from '@packrat/schemas/constants'; -import { parseFaqs, parsePrice, parseWeight, safeJsonParse } from './csv-utils'; +import { parseFaqs, parsePrice, parseWeight, safeJsonParse } from '@packrat/api/utils/csv-utils'; // Module-level regex constant (Biome useTopLevelRegex) const NEWLINE_CHARS = /[\r\n]+/g; @@ -99,7 +99,7 @@ export function mapJsonRowToItem(obj: Record): Partial isString(c)) : val .split(',') .map((v) => v.trim()) @@ -175,7 +175,7 @@ export function mapJsonRowToItem(obj: Record): Partial>(rawTechs); - item.techs = Array.isArray(parsed) ? {} : parsed; + item.techs = Array.isArray(parsed) ? {} : toStringRecord(parsed); } catch { item.techs = {}; } diff --git a/packages/api/src/workflows/catalog-etl-workflow.ts b/packages/api/src/workflows/catalog-etl-workflow.ts index e379a93ac5..477f9e6069 100644 --- a/packages/api/src/workflows/catalog-etl-workflow.ts +++ b/packages/api/src/workflows/catalog-etl-workflow.ts @@ -24,12 +24,12 @@ import { BATCH_SIZE } from '@packrat/api/services/etl/processCatalogEtl'; import { processLogsBatch } from '@packrat/api/services/etl/processLogsBatch'; import { processValidItemsBatch } from '@packrat/api/services/etl/processValidItemsBatch'; import { R2BucketService } from '@packrat/api/services/r2-bucket'; -import { toRecord } from '@packrat/guards'; import { mapCsvRowToItem } from '@packrat/api/utils/csv-utils'; import type { Env } from '@packrat/api/utils/env-validation'; import { setWorkerEnv } from '@packrat/api/utils/env-validation'; import { isJsonlFile, mapJsonRowToItem } from '@packrat/api/utils/json-utils'; import { etlJobs, type NewCatalogItem, type NewInvalidItemLog } from '@packrat/db'; +import { toRecord } from '@packrat/guards'; import { parse } from 'csv-parse'; import { eq } from 'drizzle-orm'; import type { ChunkSpec } from './shared/chunkCsvForR2'; @@ -114,9 +114,10 @@ export async function processChunk({ if (useJsonl) { // --- JSONL streaming path --- + // The chunker snaps boundaries to newlines, so every chunk starts at a + // clean line boundary — no partial first-line skip needed for any chunk. let buffer = ''; - const skipPartialLine = isNonFirstChunk; - let firstLineSkipped = !skipPartialLine; + let firstLineSkipped = true; for await (const text of streamToText(obj.body)) { buffer += text; From 1b27205382d6cb176965ff578f729643d8ab2313 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 21 May 2026 11:16:59 -0600 Subject: [PATCH 05/10] =?UTF-8?q?=F0=9F=9B=A0=EF=B8=8F=20fix(json-utils):?= =?UTF-8?q?=20correct=20Biome=20import=20sort=20order?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit @packrat/api/* → @packrat/db → @packrat/guards → @packrat/schemas/* Co-Authored-By: Claude Sonnet 4.6 --- packages/api/src/utils/json-utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/api/src/utils/json-utils.ts b/packages/api/src/utils/json-utils.ts index d20a5bd018..8b310ec3b8 100644 --- a/packages/api/src/utils/json-utils.ts +++ b/packages/api/src/utils/json-utils.ts @@ -1,7 +1,7 @@ +import { parseFaqs, parsePrice, parseWeight, safeJsonParse } from '@packrat/api/utils/csv-utils'; import type { NewCatalogItem } from '@packrat/db'; import { isNumber, isObject, isString, toStringRecord } from '@packrat/guards'; import { AvailabilitySchema, WeightUnitSchema } from '@packrat/schemas/constants'; -import { parseFaqs, parsePrice, parseWeight, safeJsonParse } from '@packrat/api/utils/csv-utils'; // Module-level regex constant (Biome useTopLevelRegex) const NEWLINE_CHARS = /[\r\n]+/g; From 4af87df61ec59a0606691bc070b7deebae993ddb Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 21 May 2026 11:29:48 -0600 Subject: [PATCH 06/10] =?UTF-8?q?=F0=9F=9B=A0=EF=B8=8F=20fix(etl):=20drop?= =?UTF-8?q?=20explicit=20err=20type=20on=20on=5Fskip=20to=20fix=20TS=20ove?= =?UTF-8?q?rload=20resolution?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit csv-parse infers the correct CsvError type; explicit Error annotation caused the Options overload to be rejected, resolving to Callback instead. Co-Authored-By: Claude Sonnet 4.6 --- packages/api/src/services/etl/processCatalogEtl.ts | 2 +- packages/api/src/workflows/catalog-etl-workflow.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/api/src/services/etl/processCatalogEtl.ts b/packages/api/src/services/etl/processCatalogEtl.ts index 542ad9e452..0deeb5cdb6 100644 --- a/packages/api/src/services/etl/processCatalogEtl.ts +++ b/packages/api/src/services/etl/processCatalogEtl.ts @@ -192,7 +192,7 @@ export async function processCatalogETL({ relax_quotes: true, skip_empty_lines: true, skip_records_with_error: true, - on_skip: (err: Error) => { + on_skip: (err) => { const parserLine = (err as { lines?: number }).lines ?? rowIndex; const parseErrorLog: NewInvalidItemLog = { jobId, diff --git a/packages/api/src/workflows/catalog-etl-workflow.ts b/packages/api/src/workflows/catalog-etl-workflow.ts index 477f9e6069..0583534506 100644 --- a/packages/api/src/workflows/catalog-etl-workflow.ts +++ b/packages/api/src/workflows/catalog-etl-workflow.ts @@ -227,7 +227,7 @@ export async function processChunk({ relax_quotes: true, skip_empty_lines: true, skip_records_with_error: true, - on_skip: (err: Error) => { + on_skip: (err) => { const parserLine = (err as { lines?: number }).lines ?? rowIndex; invalidItemsBatch.push({ jobId, From 534e3f6f23966fd44e753f1f2ec65c404b77281a Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 21 May 2026 11:49:44 -0600 Subject: [PATCH 07/10] =?UTF-8?q?=F0=9F=9B=A0=EF=B8=8F=20fix(etl):=20guard?= =?UTF-8?q?=20err=20possibly-undefined=20in=20on=5Fskip=20(TS18048)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit csv-parse types on_skip as (err: CsvError | undefined, ...) => void Co-Authored-By: Claude Sonnet 4.6 --- packages/api/src/services/etl/processCatalogEtl.ts | 7 ++++--- packages/api/src/workflows/catalog-etl-workflow.ts | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/packages/api/src/services/etl/processCatalogEtl.ts b/packages/api/src/services/etl/processCatalogEtl.ts index 0deeb5cdb6..d3c3252cde 100644 --- a/packages/api/src/services/etl/processCatalogEtl.ts +++ b/packages/api/src/services/etl/processCatalogEtl.ts @@ -193,11 +193,12 @@ export async function processCatalogETL({ skip_empty_lines: true, skip_records_with_error: true, on_skip: (err) => { - const parserLine = (err as { lines?: number }).lines ?? rowIndex; + const parserLine = (err as { lines?: number } | undefined)?.lines ?? rowIndex; + const message = err?.message ?? 'unknown parse error'; const parseErrorLog: NewInvalidItemLog = { jobId, - errors: [{ field: 'csv_parse', reason: err.message }], - rawData: { parseError: err.message }, + errors: [{ field: 'csv_parse', reason: message }], + rawData: { parseError: message }, rowIndex: parserLine, }; invalidItemsBatch.push(parseErrorLog); diff --git a/packages/api/src/workflows/catalog-etl-workflow.ts b/packages/api/src/workflows/catalog-etl-workflow.ts index 0583534506..24e4d07313 100644 --- a/packages/api/src/workflows/catalog-etl-workflow.ts +++ b/packages/api/src/workflows/catalog-etl-workflow.ts @@ -228,11 +228,12 @@ export async function processChunk({ skip_empty_lines: true, skip_records_with_error: true, on_skip: (err) => { - const parserLine = (err as { lines?: number }).lines ?? rowIndex; + const parserLine = (err as { lines?: number } | undefined)?.lines ?? rowIndex; + const message = err?.message ?? 'unknown parse error'; invalidItemsBatch.push({ jobId, - errors: [{ field: 'csv_parse', reason: err.message }], - rawData: { parseError: err.message }, + errors: [{ field: 'csv_parse', reason: message }], + rawData: { parseError: message }, rowIndex: parserLine, }); }, From cd4e13eec9993ba8e02d63513357dc9907acd60a Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 21 May 2026 11:51:49 -0600 Subject: [PATCH 08/10] =?UTF-8?q?=F0=9F=9B=A0=EF=B8=8F=20fix:=20use=20pre-?= =?UTF-8?q?computed=20`message`=20var=20in=20on=5Fskip=20console.warn?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avoids TS18048 — `err` is possibly undefined at line 206; the `message` variable is already safely computed with optional chaining and a fallback. Co-Authored-By: Claude Sonnet 4.6 --- packages/api/src/services/etl/processCatalogEtl.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/api/src/services/etl/processCatalogEtl.ts b/packages/api/src/services/etl/processCatalogEtl.ts index d3c3252cde..b30a17d9f0 100644 --- a/packages/api/src/services/etl/processCatalogEtl.ts +++ b/packages/api/src/services/etl/processCatalogEtl.ts @@ -203,7 +203,7 @@ export async function processCatalogETL({ }; invalidItemsBatch.push(parseErrorLog); console.warn( - `[ETL] Skipped malformed CSV row at parser line ${parserLine}: ${err.message}`, + `[ETL] Skipped malformed CSV row at parser line ${parserLine}: ${message}`, ); }, }); From 46da63e7e34a7db686395ab4050a94f4a98b91d6 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 21 May 2026 11:53:29 -0600 Subject: [PATCH 09/10] =?UTF-8?q?=F0=9F=9B=A0=EF=B8=8F=20fix:=20collapse?= =?UTF-8?q?=20console.warn=20to=20single=20line=20for=20Biome=20formatter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The string fits within the 100-char line width; collapsing it removes the only Biome format error in the checks job. Co-Authored-By: Claude Sonnet 4.6 --- packages/api/src/services/etl/processCatalogEtl.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/api/src/services/etl/processCatalogEtl.ts b/packages/api/src/services/etl/processCatalogEtl.ts index b30a17d9f0..4af563a944 100644 --- a/packages/api/src/services/etl/processCatalogEtl.ts +++ b/packages/api/src/services/etl/processCatalogEtl.ts @@ -202,9 +202,7 @@ export async function processCatalogETL({ rowIndex: parserLine, }; invalidItemsBatch.push(parseErrorLog); - console.warn( - `[ETL] Skipped malformed CSV row at parser line ${parserLine}: ${message}`, - ); + console.warn(`[ETL] Skipped malformed CSV row at parser line ${parserLine}: ${message}`); }, }); From 3af10be11308f33a19fd78b852c9090abcfe5850 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 21 May 2026 12:14:40 -0600 Subject: [PATCH 10/10] fix(etl): capture csv pump promise to prevent silent hang on R2 errors Co-Authored-By: Claude Sonnet 4.6 --- packages/api/src/services/etl/processCatalogEtl.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/api/src/services/etl/processCatalogEtl.ts b/packages/api/src/services/etl/processCatalogEtl.ts index 4af563a944..6e66231efc 100644 --- a/packages/api/src/services/etl/processCatalogEtl.ts +++ b/packages/api/src/services/etl/processCatalogEtl.ts @@ -206,7 +206,7 @@ export async function processCatalogETL({ }, }); - (async () => { + const writerPromise = (async () => { // Non-first chunks: inject the header row so csv-parse sees a valid header, // then skip the partial row at the chunk boundary (tail of the previous chunk). if (injectedHeader) { @@ -234,7 +234,10 @@ export async function processCatalogETL({ if (!ok) await new Promise((resolve) => parser.once('drain', resolve)); } parser.end(); - })(); + })().catch((err) => { + parser.destroy(err instanceof Error ? err : new Error(String(err))); + throw err; + }); for await (const record of parser) { if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1)); // Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit on large files @@ -285,6 +288,8 @@ export async function processCatalogETL({ invalidItemsBatch.length = 0; } } + + await writerPromise; } console.log(`🔍 [TRACE] Streaming complete - processing remaining batches`);