diff --git a/packages/api/src/services/etl/processCatalogEtl.ts b/packages/api/src/services/etl/processCatalogEtl.ts index 57a8f9f799..6e66231efc 100644 --- a/packages/api/src/services/etl/processCatalogEtl.ts +++ b/packages/api/src/services/etl/processCatalogEtl.ts @@ -1,7 +1,9 @@ import { createDbClient } from '@packrat/api/db'; import { mapCsvRowToItem } from '@packrat/api/utils/csv-utils'; import type { Env } from '@packrat/api/utils/env-validation'; +import { isJsonlFile, mapJsonRowToItem } from '@packrat/api/utils/json-utils'; import { etlJobs, type NewCatalogItem, type NewInvalidItemLog } from '@packrat/db'; +import { toRecord } from '@packrat/guards'; import { parse } from 'csv-parse'; import { eq } from 'drizzle-orm'; import { R2BucketService } from '../r2-bucket'; @@ -74,111 +76,220 @@ export async function processCatalogETL({ } let rowIndex = 0; - let fieldMap: Record = {}; - let isHeaderProcessed = false; const validItemsBatch: Partial[] = []; const invalidItemsBatch: NewInvalidItemLog[] = []; const validator = new CatalogItemValidator(); + const useJsonl = isJsonlFile(objectKey); - const parser = parse({ - relax_column_count: true, - relax_quotes: true, - skip_empty_lines: true, - skip_records_with_error: true, - on_skip: (err: Error) => { - const parserLine = (err as { lines?: number }).lines ?? rowIndex; - const parseErrorLog: NewInvalidItemLog = { - jobId, - errors: [{ field: 'csv_parse', reason: err.message }], - rawData: { parseError: err.message }, - rowIndex: parserLine, - }; - invalidItemsBatch.push(parseErrorLog); - console.warn( - `[ETL] Skipped malformed CSV row at parser line ${parserLine}: ${err.message}`, - ); - }, - }); - - (async () => { - // Non-first chunks: inject the header row so csv-parse sees a valid header, - // then skip the partial row at the chunk boundary (tail of the previous chunk). - if (injectedHeader) { - parser.write(`${injectedHeader}\n`); - } - let skipPartialRow = byteStart !== undefined && byteStart > 0; + if (useJsonl) { + // --- JSONL streaming path --- + // No csv-parse, no header injection. Each line is a JSON object. + let buffer = ''; + // The chunker snaps boundaries to newlines, so every chunk starts at a + // clean line boundary — no partial first-line skip needed for any chunk. + let firstLineSkipped = true; for await (const chunk of streamToText(r2Object.body)) { - let text = chunk; - - if (skipPartialRow) { - // Discard bytes up to and including the first newline — those bytes are - // the tail of the row that the previous chunk already processed. - const nl = text.indexOf('\n'); - if (nl === -1) continue; // entire buffer is still the partial row tail - text = text.slice(nl + 1); - skipPartialRow = false; - if (!text) continue; - } + buffer += chunk; + const lines = buffer.split('\n'); + buffer = lines.pop() ?? ''; - // Respect backpressure: if the parser buffer is full, wait for drain before - // pushing more data. Without this, R2 fills the parser buffer for the entire - // file (up to 600 MB) before the main loop processes any rows → Worker OOM. - const ok = parser.write(text); - if (!ok) await new Promise((resolve) => parser.once('drain', resolve)); - } - parser.end(); - })(); - - for await (const record of parser) { - if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1)); // Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit on large files - const row = record as string[]; - if (!isHeaderProcessed) { - fieldMap = row.reduce>((acc, header, idx) => { - acc[header.trim()] = idx; - return acc; - }, {}); - isHeaderProcessed = true; - console.log( - `🔍 [TRACE] Header processed - fields: ${Object.keys(fieldMap).length}, mapping:`, - Object.keys(fieldMap), - ); - continue; - } + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + + if (!firstLineSkipped) { + firstLineSkipped = true; + continue; // discard partial row at chunk boundary + } + + // Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit + if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1)); + + let obj: Record; + try { + obj = toRecord(JSON.parse(trimmed)); + } catch (parseErr) { + invalidItemsBatch.push({ + jobId, + errors: [{ field: 'json_parse', reason: String(parseErr) }], + rawData: { parseError: String(parseErr) }, + rowIndex, + }); + rowIndex++; + if (invalidItemsBatch.length >= BATCH_SIZE) { + await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); + invalidItemsBatch.length = 0; + } + continue; + } - const item = mapCsvRowToItem({ values: row, fieldMap }); + const item = mapJsonRowToItem(obj); + if (item) { + const validated = validator.validateItem(item); + if (validated.isValid) { + validItemsBatch.push(validated.item); + } else { + invalidItemsBatch.push({ + jobId, + errors: validated.errors, + rawData: validated.item, + rowIndex, + }); + } + } + rowIndex++; - if (item) { - const validatedItem = validator.validateItem(item); + if (validItemsBatch.length >= BATCH_SIZE) { + await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); + validItemsBatch.length = 0; + } + if (invalidItemsBatch.length >= BATCH_SIZE) { + await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); + invalidItemsBatch.length = 0; + } + } + } - if (validatedItem.isValid) { - validItemsBatch.push(validatedItem.item); - } else { - const invalidItemLog = { + // Flush remaining buffer line (last line without trailing newline) + const lastLine = buffer.trim(); + if (lastLine && firstLineSkipped) { + try { + const obj = toRecord(JSON.parse(lastLine)); + const item = mapJsonRowToItem(obj); + if (item) { + const validated = validator.validateItem(item); + if (validated.isValid) { + validItemsBatch.push(validated.item); + } else { + invalidItemsBatch.push({ + jobId, + errors: validated.errors, + rawData: validated.item, + rowIndex, + }); + } + } + rowIndex++; + } catch (parseErr) { + invalidItemsBatch.push({ jobId, - errors: validatedItem.errors, - rawData: validatedItem.item, + errors: [{ field: 'json_parse', reason: String(parseErr) }], + rawData: { parseError: String(parseErr) }, rowIndex, - }; - invalidItemsBatch.push(invalidItemLog); + }); + rowIndex++; } } + } else { + // --- CSV path (unchanged) --- + let fieldMap: Record = {}; + let isHeaderProcessed = false; - rowIndex++; + const parser = parse({ + relax_column_count: true, + relax_quotes: true, + skip_empty_lines: true, + skip_records_with_error: true, + on_skip: (err) => { + const parserLine = (err as { lines?: number } | undefined)?.lines ?? rowIndex; + const message = err?.message ?? 'unknown parse error'; + const parseErrorLog: NewInvalidItemLog = { + jobId, + errors: [{ field: 'csv_parse', reason: message }], + rawData: { parseError: message }, + rowIndex: parserLine, + }; + invalidItemsBatch.push(parseErrorLog); + console.warn(`[ETL] Skipped malformed CSV row at parser line ${parserLine}: ${message}`); + }, + }); - // Flush valid batch to DB every BATCH_SIZE rows to avoid Worker OOM on large files. - // totalProcessed is incremented atomically inside processValidItemsBatch via updateEtlJobProgress. - if (validItemsBatch.length >= BATCH_SIZE) { - await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); - validItemsBatch.length = 0; - } - // Flush invalid batch to DB every BATCH_SIZE rows. - // totalProcessed is incremented atomically inside processLogsBatch via updateEtlJobProgress. - if (invalidItemsBatch.length >= BATCH_SIZE) { - await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); - invalidItemsBatch.length = 0; + const writerPromise = (async () => { + // Non-first chunks: inject the header row so csv-parse sees a valid header, + // then skip the partial row at the chunk boundary (tail of the previous chunk). + if (injectedHeader) { + parser.write(`${injectedHeader}\n`); + } + let skipPartialRow = byteStart !== undefined && byteStart > 0; + + for await (const chunk of streamToText(r2Object.body)) { + let text = chunk; + + if (skipPartialRow) { + // Discard bytes up to and including the first newline — those bytes are + // the tail of the row that the previous chunk already processed. + const nl = text.indexOf('\n'); + if (nl === -1) continue; // entire buffer is still the partial row tail + text = text.slice(nl + 1); + skipPartialRow = false; + if (!text) continue; + } + + // Respect backpressure: if the parser buffer is full, wait for drain before + // pushing more data. Without this, R2 fills the parser buffer for the entire + // file (up to 600 MB) before the main loop processes any rows → Worker OOM. + const ok = parser.write(text); + if (!ok) await new Promise((resolve) => parser.once('drain', resolve)); + } + parser.end(); + })().catch((err) => { + parser.destroy(err instanceof Error ? err : new Error(String(err))); + throw err; + }); + + for await (const record of parser) { + if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1)); // Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit on large files + const row = record as string[]; + if (!isHeaderProcessed) { + fieldMap = row.reduce>((acc, header, idx) => { + acc[header.trim()] = idx; + return acc; + }, {}); + isHeaderProcessed = true; + console.log( + `🔍 [TRACE] Header processed - fields: ${Object.keys(fieldMap).length}, mapping:`, + Object.keys(fieldMap), + ); + continue; + } + + const item = mapCsvRowToItem({ values: row, fieldMap }); + + if (item) { + const validatedItem = validator.validateItem(item); + + if (validatedItem.isValid) { + validItemsBatch.push(validatedItem.item); + } else { + const invalidItemLog = { + jobId, + errors: validatedItem.errors, + rawData: validatedItem.item, + rowIndex, + }; + invalidItemsBatch.push(invalidItemLog); + } + } + + rowIndex++; + + // Flush valid batch to DB every BATCH_SIZE rows to avoid Worker OOM on large files. + // totalProcessed is incremented atomically inside processValidItemsBatch via updateEtlJobProgress. + if (validItemsBatch.length >= BATCH_SIZE) { + await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); + validItemsBatch.length = 0; + } + // Flush invalid batch to DB every BATCH_SIZE rows. + // totalProcessed is incremented atomically inside processLogsBatch via updateEtlJobProgress. + if (invalidItemsBatch.length >= BATCH_SIZE) { + await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); + invalidItemsBatch.length = 0; + } } + + await writerPromise; } console.log(`🔍 [TRACE] Streaming complete - processing remaining batches`); diff --git a/packages/api/src/utils/__tests__/json-utils.test.ts b/packages/api/src/utils/__tests__/json-utils.test.ts new file mode 100644 index 0000000000..cc78ac20c2 --- /dev/null +++ b/packages/api/src/utils/__tests__/json-utils.test.ts @@ -0,0 +1,223 @@ +import { describe, expect, it } from 'vitest'; +import { isJsonlFile, mapJsonRowToItem } from '../json-utils'; + +describe('json-utils', () => { + describe('isJsonlFile', () => { + it('returns true for .jsonl extension', () => { + expect(isJsonlFile('v2/brand/file.jsonl')).toBe(true); + }); + + it('returns true for .ndjson extension', () => { + expect(isJsonlFile('v2/brand/file.ndjson')).toBe(true); + }); + + it('returns true for uppercase extensions', () => { + expect(isJsonlFile('FILE.JSONL')).toBe(true); + expect(isJsonlFile('FILE.NDJSON')).toBe(true); + }); + + it('returns false for .csv extension', () => { + expect(isJsonlFile('v2/brand/file.csv')).toBe(false); + }); + + it('returns false for .json extension', () => { + expect(isJsonlFile('v2/brand/file.json')).toBe(false); + }); + }); + + describe('mapJsonRowToItem', () => { + it('maps basic string scalar fields', () => { + const result = mapJsonRowToItem({ + name: ' Trail Shoe ', + brand: 'Salomon', + model: 'XT-6', + color: 'Black', + size: 'M', + sku: 'SKU-001', + productSku: 'PROD-001', + seller: 'REI', + material: 'Mesh', + condition: 'new', + currency: 'USD', + productUrl: ' https://example.com ', + }); + + expect(result).toMatchObject({ + name: 'Trail Shoe', + brand: 'Salomon', + model: 'XT-6', + color: 'Black', + size: 'M', + sku: 'SKU-001', + productSku: 'PROD-001', + seller: 'REI', + material: 'Mesh', + condition: 'new', + currency: 'USD', + productUrl: 'https://example.com', + }); + }); + + it('strips newlines from description', () => { + const result = mapJsonRowToItem({ + description: 'Line one\nLine two\r\nLine three', + }); + expect(result?.description).toBe('Line one Line two Line three'); + }); + + it('maps reviewCount from number', () => { + const result = mapJsonRowToItem({ reviewCount: 42.9 }); + expect(result?.reviewCount).toBe(42); + }); + + it('maps reviewCount from string', () => { + const result = mapJsonRowToItem({ reviewCount: '128' }); + expect(result?.reviewCount).toBe(128); + }); + + it('defaults reviewCount to 0 for missing value', () => { + const result = mapJsonRowToItem({}); + expect(result?.reviewCount).toBe(0); + }); + + it('maps price from number', () => { + const result = mapJsonRowToItem({ price: 149.99 }); + expect(result?.price).toBe(149.99); + }); + + it('maps price from string', () => { + const result = mapJsonRowToItem({ price: '$129.00' }); + expect(result?.price).toBeCloseTo(129.0); + }); + + it('maps ratingValue from number', () => { + const result = mapJsonRowToItem({ ratingValue: 4.5 }); + expect(result?.ratingValue).toBe(4.5); + }); + + it('maps ratingValue from string', () => { + const result = mapJsonRowToItem({ ratingValue: '4.2' }); + expect(result?.ratingValue).toBeCloseTo(4.2); + }); + + it('sets ratingValue to null for invalid string', () => { + const result = mapJsonRowToItem({ ratingValue: 'N/A' }); + expect(result?.ratingValue).toBeNull(); + }); + + it('passes categories array through', () => { + const result = mapJsonRowToItem({ categories: ['Footwear', 'Trail Running'] }); + expect(result?.categories).toEqual(['Footwear', 'Trail Running']); + }); + + it('filters non-string values from categories array', () => { + const result = mapJsonRowToItem({ categories: ['Footwear', 42, null, 'Trail'] }); + expect(result?.categories).toEqual(['Footwear', 'Trail']); + }); + + it('splits categories from comma-separated string', () => { + const result = mapJsonRowToItem({ categories: 'Footwear, Trail Running' }); + expect(result?.categories).toEqual(['Footwear', 'Trail Running']); + }); + + it('parses categories from JSON array string', () => { + const result = mapJsonRowToItem({ categories: '["Footwear","Trail Running"]' }); + expect(result?.categories).toEqual(['Footwear', 'Trail Running']); + }); + + it('filters non-strings from JSON array string categories', () => { + const result = mapJsonRowToItem({ categories: '["Footwear",42,null]' }); + expect(result?.categories).toEqual(['Footwear']); + }); + + it('wraps unparseable categories string in array', () => { + const result = mapJsonRowToItem({ categories: 'Footwear' }); + expect(result?.categories).toEqual(['Footwear']); + }); + + it('passes images array through, filtering non-strings', () => { + const result = mapJsonRowToItem({ images: ['https://img1.jpg', 42, 'https://img2.jpg'] }); + expect(result?.images).toEqual(['https://img1.jpg', 'https://img2.jpg']); + }); + + it('maps weight from number with unit string', () => { + const result = mapJsonRowToItem({ weight: 280, weightUnit: 'g' }); + expect(result?.weight).toBeGreaterThan(0); + expect(result?.weightUnit).toBeDefined(); + }); + + it('maps weight from string', () => { + const result = mapJsonRowToItem({ weight: '1.5 lbs' }); + expect(result?.weight).toBeGreaterThan(0); + }); + + it('ignores weight of zero', () => { + const result = mapJsonRowToItem({ weight: 0 }); + expect(result?.weight).toBeUndefined(); + }); + + it('passes variants, links, reviews, qas arrays through', () => { + const variants = [{ color: 'Red' }]; + const links = [{ url: 'https://example.com' }]; + const reviews = [{ text: 'Great!' }]; + const qas = [{ question: 'Size?', answer: 'True to size' }]; + const result = mapJsonRowToItem({ variants, links, reviews, qas }); + expect(result?.variants).toBe(variants); + expect(result?.links).toBe(links); + expect(result?.reviews).toBe(reviews); + expect(result?.qas).toBe(qas); + }); + + it('passes faqs array through', () => { + const faqs = [{ question: 'Is it waterproof?', answer: 'Yes' }]; + const result = mapJsonRowToItem({ faqs }); + expect(result?.faqs).toBe(faqs); + }); + + it('passes techs object through', () => { + const techs = { 'Claimed Weight': '280g', Material: 'Mesh' }; + const result = mapJsonRowToItem({ techs }); + expect(result?.techs).toEqual(techs); + }); + + it('parses techs from JSON string', () => { + const result = mapJsonRowToItem({ + techs: '{"Claimed Weight":"280g","Material":"Mesh"}', + }); + expect(result?.techs).toEqual({ 'Claimed Weight': '280g', Material: 'Mesh' }); + }); + + it('falls back to weight from techs Claimed Weight field', () => { + const result = mapJsonRowToItem({ techs: { 'Claimed Weight': '280g' } }); + expect(result?.weight).toBeGreaterThan(0); + }); + + it('falls back to weight from techs weight field', () => { + const result = mapJsonRowToItem({ techs: { weight: '1.2 lbs' } }); + expect(result?.weight).toBeGreaterThan(0); + }); + + it('maps availability from valid string', () => { + const result = mapJsonRowToItem({ availability: 'in_stock' }); + expect(result?.availability).toBe('in_stock'); + }); + + it('ignores invalid availability value', () => { + const result = mapJsonRowToItem({ availability: 'InStock' }); + expect(result?.availability).toBeUndefined(); + }); + + it('returns empty item for empty input object', () => { + const result = mapJsonRowToItem({}); + expect(result).toBeDefined(); + expect(result?.reviewCount).toBe(0); + }); + + it('ignores non-string/non-number values for scalar fields', () => { + const result = mapJsonRowToItem({ name: 42, brand: null, price: [] }); + expect(result?.name).toBeUndefined(); + expect(result?.brand).toBeUndefined(); + expect(result?.price).toBeUndefined(); + }); + }); +}); diff --git a/packages/api/src/utils/json-utils.ts b/packages/api/src/utils/json-utils.ts new file mode 100644 index 0000000000..8b310ec3b8 --- /dev/null +++ b/packages/api/src/utils/json-utils.ts @@ -0,0 +1,206 @@ +import { parseFaqs, parsePrice, parseWeight, safeJsonParse } from '@packrat/api/utils/csv-utils'; +import type { NewCatalogItem } from '@packrat/db'; +import { isNumber, isObject, isString, toStringRecord } from '@packrat/guards'; +import { AvailabilitySchema, WeightUnitSchema } from '@packrat/schemas/constants'; + +// Module-level regex constant (Biome useTopLevelRegex) +const NEWLINE_CHARS = /[\r\n]+/g; + +/** + * Returns true if the R2 object key has a JSONL or NDJSON extension. + */ +export function isJsonlFile(objectKey: string): boolean { + const lower = objectKey.toLowerCase(); + return lower.endsWith('.jsonl') || lower.endsWith('.ndjson'); +} + +/** + * Maps a parsed JSON object (one line from a JSONL file) to a partial catalog item. + * Uses `unknown` with proper type narrowing — no `any`. + */ +export function mapJsonRowToItem(obj: Record): Partial | null { + const item: Partial = {}; + + // --- String scalar fields --- + const rawName = obj.name; + if (isString(rawName)) item.name = rawName.trim(); + + const rawProductUrl = obj.productUrl; + if (isString(rawProductUrl)) item.productUrl = rawProductUrl.trim(); + + const rawCurrency = obj.currency; + if (isString(rawCurrency)) item.currency = rawCurrency.trim(); + + const rawBrand = obj.brand; + if (isString(rawBrand)) item.brand = rawBrand.trim(); + + const rawModel = obj.model; + if (isString(rawModel)) item.model = rawModel.trim(); + + const rawColor = obj.color; + if (isString(rawColor)) item.color = rawColor.trim(); + + const rawSize = obj.size; + if (isString(rawSize)) item.size = rawSize.trim(); + + const rawSku = obj.sku; + if (isString(rawSku)) item.sku = rawSku.trim(); + + const rawProductSku = obj.productSku; + if (isString(rawProductSku)) item.productSku = rawProductSku.trim(); + + const rawSeller = obj.seller; + if (isString(rawSeller)) item.seller = rawSeller.trim(); + + const rawMaterial = obj.material; + if (isString(rawMaterial)) item.material = rawMaterial.trim(); + + const rawCondition = obj.condition; + if (isString(rawCondition)) item.condition = rawCondition.trim(); + + // --- Description: strip newline chars --- + const rawDescription = obj.description; + if (isString(rawDescription)) { + item.description = rawDescription.replace(NEWLINE_CHARS, ' ').trim(); + } + + // --- reviewCount: direct number or parse from string --- + const rawReviewCount = obj.reviewCount; + if (isNumber(rawReviewCount)) { + item.reviewCount = Math.trunc(rawReviewCount) || 0; + } else if (isString(rawReviewCount)) { + item.reviewCount = parseInt(rawReviewCount, 10) || 0; + } else { + item.reviewCount = 0; + } + + // --- price: direct number or parsePrice from string --- + const rawPrice = obj.price; + if (isNumber(rawPrice)) { + item.price = rawPrice; + } else if (isString(rawPrice)) { + item.price = parsePrice(rawPrice) ?? undefined; + } + + // --- ratingValue: direct number or parseFloat from string --- + const rawRatingValue = obj.ratingValue; + if (isNumber(rawRatingValue)) { + item.ratingValue = rawRatingValue; + } else if (isString(rawRatingValue)) { + const parsed = parseFloat(rawRatingValue); + item.ratingValue = Number.isNaN(parsed) ? null : parsed; + } + + // --- categories: array passthrough or split string --- + const rawCategories = obj.categories; + if (Array.isArray(rawCategories)) { + item.categories = rawCategories.filter((c): c is string => isString(c)); + } else if (isString(rawCategories) && rawCategories.trim()) { + const val = rawCategories.trim(); + try { + item.categories = val.startsWith('[') + ? (JSON.parse(val) as unknown[]).filter((c): c is string => isString(c)) + : val + .split(',') + .map((v) => v.trim()) + .filter(Boolean); + } catch { + item.categories = [val]; + } + } + + // --- images: array passthrough --- + const rawImages = obj.images; + if (Array.isArray(rawImages)) { + item.images = rawImages.filter((i): i is string => isString(i)); + } + + // --- weight + weightUnit --- + const rawWeight = obj.weight; + const rawWeightUnit = obj.weightUnit; + const unitStr = isString(rawWeightUnit) ? rawWeightUnit : undefined; + + if (isNumber(rawWeight) && rawWeight > 0) { + const { weight, unit } = parseWeight(String(rawWeight), unitStr); + item.weight = weight ?? undefined; + const parsedUnit = WeightUnitSchema.safeParse(unit); + item.weightUnit = parsedUnit.success ? parsedUnit.data : undefined; + } else if (isString(rawWeight) && parseFloat(rawWeight) > 0) { + const { weight, unit } = parseWeight(rawWeight, unitStr); + item.weight = weight ?? undefined; + const parsedUnit = WeightUnitSchema.safeParse(unit); + item.weightUnit = parsedUnit.success ? parsedUnit.data : undefined; + } + + // --- variants: passthrough as-is (already objects) --- + const rawVariants = obj.variants; + if (Array.isArray(rawVariants)) { + item.variants = rawVariants as NewCatalogItem['variants']; + } + + // --- links: passthrough --- + const rawLinks = obj.links; + if (Array.isArray(rawLinks)) { + item.links = rawLinks as NewCatalogItem['links']; + } + + // --- reviews: passthrough --- + const rawReviews = obj.reviews; + if (Array.isArray(rawReviews)) { + item.reviews = rawReviews as NewCatalogItem['reviews']; + } + + // --- qas: passthrough --- + const rawQas = obj.qas; + if (Array.isArray(rawQas)) { + item.qas = rawQas as NewCatalogItem['qas']; + } + + // --- faqs: array passthrough or parseFaqs from string --- + const rawFaqs = obj.faqs; + if (Array.isArray(rawFaqs)) { + item.faqs = rawFaqs as NewCatalogItem['faqs']; + } else if (isString(rawFaqs) && rawFaqs.trim()) { + try { + item.faqs = parseFaqs(rawFaqs); + } catch { + item.faqs = []; + } + } + + // --- techs: passthrough --- + const rawTechs = obj.techs; + if (isObject(rawTechs)) { + item.techs = toStringRecord(rawTechs); + } else if (isString(rawTechs) && rawTechs.trim()) { + try { + const parsed = safeJsonParse>(rawTechs); + item.techs = Array.isArray(parsed) ? {} : toStringRecord(parsed); + } catch { + item.techs = {}; + } + } + + // --- weight fallback from techs (same as CSV path) --- + if (!item.weight && item.techs && isObject(item.techs)) { + const techs = toStringRecord(item.techs); + const claimedWeight = techs['Claimed Weight'] ?? techs.weight; + if (claimedWeight) { + const { weight, unit } = parseWeight(claimedWeight); + item.weight = weight ?? undefined; + const parsedUnit = WeightUnitSchema.safeParse(unit); + item.weightUnit = parsedUnit.success ? parsedUnit.data : undefined; + } + } + + // --- availability: string → AvailabilitySchema.safeParse --- + const rawAvailability = obj.availability; + if (isString(rawAvailability) && rawAvailability.trim()) { + const parsedAvailability = AvailabilitySchema.safeParse(rawAvailability.trim()); + if (parsedAvailability.success) { + item.availability = parsedAvailability.data; + } + } + + return item; +} diff --git a/packages/api/src/workflows/catalog-etl-workflow.ts b/packages/api/src/workflows/catalog-etl-workflow.ts index 914239d303..24e4d07313 100644 --- a/packages/api/src/workflows/catalog-etl-workflow.ts +++ b/packages/api/src/workflows/catalog-etl-workflow.ts @@ -27,7 +27,9 @@ import { R2BucketService } from '@packrat/api/services/r2-bucket'; import { mapCsvRowToItem } from '@packrat/api/utils/csv-utils'; import type { Env } from '@packrat/api/utils/env-validation'; import { setWorkerEnv } from '@packrat/api/utils/env-validation'; +import { isJsonlFile, mapJsonRowToItem } from '@packrat/api/utils/json-utils'; import { etlJobs, type NewCatalogItem, type NewInvalidItemLog } from '@packrat/db'; +import { toRecord } from '@packrat/guards'; import { parse } from 'csv-parse'; import { eq } from 'drizzle-orm'; import type { ChunkSpec } from './shared/chunkCsvForR2'; @@ -94,7 +96,7 @@ export async function processChunk({ const r2 = new R2BucketService({ env, bucketType: 'catalog' }); const isNonFirstChunk = chunk.chunkIndex > 0; - const injectedHeader = isNonFirstChunk ? await fetchHeaderRow(r2, chunk.objectKey) : ''; + const useJsonl = isJsonlFile(chunk.objectKey); const length = chunk.byteEnd - chunk.byteStart + 1; const obj = await r2.get(chunk.objectKey, { @@ -106,78 +108,199 @@ export async function processChunk({ const invalidItemsBatch: NewInvalidItemLog[] = []; const validator = new CatalogItemValidator(); - const parser = parse({ - relax_column_count: true, - skip_empty_lines: true, - }); - - const writerPromise = (async () => { - if (injectedHeader) { - parser.write(`${injectedHeader}\n`); - } - for await (const text of streamToText(obj.body)) { - const ok = parser.write(text); - if (!ok) { - await new Promise((resolve) => parser.once('drain', resolve)); - } - } - parser.end(); - })().catch((err) => { - parser.destroy(err instanceof Error ? err : new Error(String(err))); - throw err; - }); - let rowIndex = 0; let rowsValid = 0; let rowsInvalid = 0; - let fieldMap: Record = {}; - let isHeaderProcessed = false; - for await (const record of parser) { - if (rowIndex % 100 === 0) { - await new Promise((resolve) => setTimeout(resolve, 0)); - } - const row = record as string[]; + if (useJsonl) { + // --- JSONL streaming path --- + // The chunker snaps boundaries to newlines, so every chunk starts at a + // clean line boundary — no partial first-line skip needed for any chunk. + let buffer = ''; + let firstLineSkipped = true; - if (!isHeaderProcessed) { - fieldMap = {}; - for (const [idx, header] of row.entries()) { - fieldMap[header.trim()] = idx; + for await (const text of streamToText(obj.body)) { + buffer += text; + const lines = buffer.split('\n'); + buffer = lines.pop() ?? ''; + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + + if (!firstLineSkipped) { + firstLineSkipped = true; + continue; // discard partial row at chunk boundary + } + + if (rowIndex % 100 === 0) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } + + let parsedObj: Record; + try { + parsedObj = toRecord(JSON.parse(trimmed)); + } catch (parseErr) { + invalidItemsBatch.push({ + jobId, + errors: [{ field: 'json_parse', reason: String(parseErr) }], + rawData: { parseError: String(parseErr) }, + rowIndex, + }); + rowIndex++; + if (invalidItemsBatch.length >= BATCH_SIZE) { + await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); + rowsInvalid += invalidItemsBatch.length; + invalidItemsBatch.length = 0; + } + continue; + } + + const item = mapJsonRowToItem(parsedObj); + if (item) { + const validated = validator.validateItem(item); + if (validated.isValid) { + validItemsBatch.push(validated.item); + } else { + invalidItemsBatch.push({ + jobId, + errors: validated.errors, + rawData: validated.item, + rowIndex, + }); + } + } + rowIndex++; + + if (validItemsBatch.length >= BATCH_SIZE) { + await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); + rowsValid += validItemsBatch.length; + validItemsBatch.length = 0; + } + if (invalidItemsBatch.length >= BATCH_SIZE) { + await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); + rowsInvalid += invalidItemsBatch.length; + invalidItemsBatch.length = 0; + } } - isHeaderProcessed = true; - continue; } - const item = mapCsvRowToItem({ values: row, fieldMap }); - if (item) { - const validated = validator.validateItem(item); - if (validated.isValid) { - validItemsBatch.push(validated.item); - } else { + // Flush remaining buffer line (last line without trailing newline) + const lastLine = buffer.trim(); + if (lastLine && firstLineSkipped) { + try { + const parsedObj = toRecord(JSON.parse(lastLine)); + const item = mapJsonRowToItem(parsedObj); + if (item) { + const validated = validator.validateItem(item); + if (validated.isValid) { + validItemsBatch.push(validated.item); + } else { + invalidItemsBatch.push({ + jobId, + errors: validated.errors, + rawData: validated.item, + rowIndex, + }); + } + } + rowIndex++; + } catch (parseErr) { invalidItemsBatch.push({ jobId, - errors: validated.errors, - rawData: validated.item, + errors: [{ field: 'json_parse', reason: String(parseErr) }], + rawData: { parseError: String(parseErr) }, rowIndex, }); + rowIndex++; } } + } else { + // --- CSV path --- + const injectedHeader = isNonFirstChunk ? await fetchHeaderRow(r2, chunk.objectKey) : ''; - rowIndex++; + let fieldMap: Record = {}; + let isHeaderProcessed = false; - if (validItemsBatch.length >= BATCH_SIZE) { - await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); - rowsValid += validItemsBatch.length; - validItemsBatch.length = 0; - } - if (invalidItemsBatch.length >= BATCH_SIZE) { - await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); - rowsInvalid += invalidItemsBatch.length; - invalidItemsBatch.length = 0; + const parser = parse({ + relax_column_count: true, + relax_quotes: true, + skip_empty_lines: true, + skip_records_with_error: true, + on_skip: (err) => { + const parserLine = (err as { lines?: number } | undefined)?.lines ?? rowIndex; + const message = err?.message ?? 'unknown parse error'; + invalidItemsBatch.push({ + jobId, + errors: [{ field: 'csv_parse', reason: message }], + rawData: { parseError: message }, + rowIndex: parserLine, + }); + }, + }); + + const writerPromise = (async () => { + if (injectedHeader) { + parser.write(`${injectedHeader}\n`); + } + for await (const text of streamToText(obj.body)) { + const ok = parser.write(text); + if (!ok) { + await new Promise((resolve) => parser.once('drain', resolve)); + } + } + parser.end(); + })().catch((err) => { + parser.destroy(err instanceof Error ? err : new Error(String(err))); + throw err; + }); + + for await (const record of parser) { + if (rowIndex % 100 === 0) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } + const row = record as string[]; + + if (!isHeaderProcessed) { + fieldMap = {}; + for (const [idx, header] of row.entries()) { + fieldMap[header.trim()] = idx; + } + isHeaderProcessed = true; + continue; + } + + const item = mapCsvRowToItem({ values: row, fieldMap }); + if (item) { + const validated = validator.validateItem(item); + if (validated.isValid) { + validItemsBatch.push(validated.item); + } else { + invalidItemsBatch.push({ + jobId, + errors: validated.errors, + rawData: validated.item, + rowIndex, + }); + } + } + + rowIndex++; + + if (validItemsBatch.length >= BATCH_SIZE) { + await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); + rowsValid += validItemsBatch.length; + validItemsBatch.length = 0; + } + if (invalidItemsBatch.length >= BATCH_SIZE) { + await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); + rowsInvalid += invalidItemsBatch.length; + invalidItemsBatch.length = 0; + } } - } - await writerPromise; + await writerPromise; + } if (validItemsBatch.length > 0) { await processValidItemsBatch({ jobId, items: validItemsBatch, env });