From a042457a82afc0e1ea24151a903af159ffcd6433 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 7 May 2026 01:39:10 -0600 Subject: [PATCH 1/8] fix(etl): flush batches incrementally to prevent Worker OOM on large files Replace end-of-stream single flush with per-BATCH_SIZE flushes during the CSV parse loop. Valid and invalid item batches are written to DB every 100 rows, arrays are cleared, and totalProcessed is updated incrementally so progress is visible on long-running jobs. Remaining rows are flushed after the loop as before. Fixes evo (174K rows) and gearx (166K rows) jobs stuck in 'running' forever. --- .../api/src/services/etl/processCatalogEtl.ts | 34 +++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/packages/api/src/services/etl/processCatalogEtl.ts b/packages/api/src/services/etl/processCatalogEtl.ts index 7bda4487e3..dfe396f2e5 100644 --- a/packages/api/src/services/etl/processCatalogEtl.ts +++ b/packages/api/src/services/etl/processCatalogEtl.ts @@ -107,16 +107,38 @@ export async function processCatalogETL({ } rowIndex++; + + // Flush valid batch to DB every BATCH_SIZE rows to avoid Worker OOM on large files + if (validItemsBatch.length >= BATCH_SIZE) { + await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); + await db + .update(etlJobs) + .set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${BATCH_SIZE}` }) + .where(eq(etlJobs.id, jobId)); + validItemsBatch.length = 0; + } + // Flush invalid batch to DB every BATCH_SIZE rows + if (invalidItemsBatch.length >= BATCH_SIZE) { + await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); + await db + .update(etlJobs) + .set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${BATCH_SIZE}` }) + .where(eq(etlJobs.id, jobId)); + invalidItemsBatch.length = 0; + } } - console.log(`🔍 [TRACE] Streaming complete - processing batches`); + console.log(`🔍 [TRACE] Streaming complete - processing remaining batches`); - const itemsProcessed = validItemsBatch.length + invalidItemsBatch.length; + // Flush remaining items after the stream ends + const remainingItems = validItemsBatch.length + invalidItemsBatch.length; - await db - .update(etlJobs) - .set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${itemsProcessed}` }) - .where(eq(etlJobs.id, jobId)); + if (remainingItems > 0) { + await db + .update(etlJobs) + .set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${remainingItems}` }) + .where(eq(etlJobs.id, jobId)); + } if (validItemsBatch.length > 0) { console.log(`🔍 [TRACE] Processing valid items batch - size: ${validItemsBatch.length}`); From 5fc3c5dc5a15a484683c506afde82ddbef77bc42 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 7 May 2026 01:39:17 -0600 Subject: [PATCH 2/8] fix(etl): remove weight/weightUnit from required field validation Clothing and footwear brands (kuhl, obozfootwear, prana) routinely omit weight data, causing 0% ingest rates. Weight is important for comparisons but it is better to ingest items without it than to reject them entirely. Items missing weight are stored as-is and simply excluded from weight views. Weight and weightUnit remain nullable at the DB layer (no schema change). --- .../src/services/etl/CatalogItemValidator.ts | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/packages/api/src/services/etl/CatalogItemValidator.ts b/packages/api/src/services/etl/CatalogItemValidator.ts index 478b2c54a4..6788ba475d 100644 --- a/packages/api/src/services/etl/CatalogItemValidator.ts +++ b/packages/api/src/services/etl/CatalogItemValidator.ts @@ -31,23 +31,9 @@ export class CatalogItemValidator { }); } - if (!item.weight || !isNumber(item.weight) || item.weight <= 0) { - errors.push({ - field: 'weight', - reason: 'Weight is required and must be a positive number', - value: item.weight, - }); - } - - if (!item.weightUnit || !isString(item.weightUnit) || item.weightUnit.trim().length === 0) { - errors.push({ - field: 'weightUnit', - reason: 'Weight unit is required and must be a non-empty string', - value: item.weightUnit, - }); - } - // Additional validations + // Note: weight and weightUnit are intentionally not required — clothing/footwear brands often + // omit weight data. Items without weight are ingested but won't appear in weight comparisons. if (item.productUrl && !this.isValidUrl(item.productUrl)) { errors.push({ field: 'productUrl', From 6a6680981f2d7f46dab61ba9f6421bd5bfe9fe57 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 7 May 2026 01:39:41 -0600 Subject: [PATCH 3/8] fix(etl): upsert overwrites stale data instead of preserving old values The ON CONFLICT DO UPDATE SET clause was using COALESCE(table.field, excluded.field) which means existing non-null values could never be corrected by a fresh scrape. Changed to excluded.field directly so re-scraping always wins, allowing price, weight, availability, and other fields to be corrected when the source data changes. Exception: created_at retains COALESCE so the original creation timestamp is preserved across upserts. --- packages/api/src/services/catalogService.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/api/src/services/catalogService.ts b/packages/api/src/services/catalogService.ts index a416d67a85..60f944e943 100644 --- a/packages/api/src/services/catalogService.ts +++ b/packages/api/src/services/catalogService.ts @@ -343,7 +343,13 @@ export class CatalogService { .onConflictDoUpdate({ target: catalogItems.sku, set: Object.values(columns).reduce>((acc, col) => { - acc[col.name] = sql.raw(`COALESCE(catalog_items.${col.name}, excluded."${col.name}")`); + // Preserve the original creation timestamp; overwrite everything else with the + // fresh scraped value so stale/wrong data can be corrected by re-scraping. + if (col.name === 'created_at') { + acc[col.name] = sql.raw(`COALESCE(catalog_items.${col.name}, excluded."${col.name}")`); + } else { + acc[col.name] = sql.raw(`excluded."${col.name}"`); + } return acc; }, {}), }) From e871f6495bdf453476d2ce5f0637386e5b10db81 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 7 May 2026 01:39:46 -0600 Subject: [PATCH 4/8] chore(etl): add SQL script to reset zombie ETL jobs stuck in running state Manually runnable script that fails any ETL job that has been in 'running' status for more than 30 minutes. Addresses jobs left stuck when the Worker crashes mid-stream (e.g. OOM) without reaching the error handler. --- packages/api/scripts/reset-stuck-etl-jobs.sql | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 packages/api/scripts/reset-stuck-etl-jobs.sql diff --git a/packages/api/scripts/reset-stuck-etl-jobs.sql b/packages/api/scripts/reset-stuck-etl-jobs.sql new file mode 100644 index 0000000000..e63d312f6b --- /dev/null +++ b/packages/api/scripts/reset-stuck-etl-jobs.sql @@ -0,0 +1,6 @@ +-- Reset ETL jobs stuck in 'running' state for more than 30 minutes. +-- Run manually when zombie jobs are detected. +UPDATE etl_jobs +SET status = 'failed', completed_at = NOW() +WHERE status = 'running' + AND started_at < NOW() - INTERVAL '30 minutes'; From 53ffa42e24019e38a3c3777578b7df1f4783c9f9 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 7 May 2026 09:57:37 -0600 Subject: [PATCH 5/8] fix(etl): use best-value-wins merge for weight on upsert conflict Protect against scrapers returning weight=0 or weight<0 overwriting valid existing weight data. Weight and weight_unit now only update when the incoming value is a positive number; otherwise the stored value is preserved. All other fields continue to accept the fresh scraped value so stale data can still be corrected by re-scraping. --- packages/api/src/services/catalogService.ts | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/packages/api/src/services/catalogService.ts b/packages/api/src/services/catalogService.ts index 60f944e943..00c5617501 100644 --- a/packages/api/src/services/catalogService.ts +++ b/packages/api/src/services/catalogService.ts @@ -343,10 +343,19 @@ export class CatalogService { .onConflictDoUpdate({ target: catalogItems.sku, set: Object.values(columns).reduce>((acc, col) => { - // Preserve the original creation timestamp; overwrite everything else with the - // fresh scraped value so stale/wrong data can be corrected by re-scraping. - if (col.name === 'created_at') { + if (col.name === 'id' || col.name === 'created_at') { + // Never overwrite PK or original creation timestamp acc[col.name] = sql.raw(`COALESCE(catalog_items.${col.name}, excluded."${col.name}")`); + } else if (col.name === 'weight') { + // Keep old weight if new weight is missing or invalid (0 / negative) + acc[col.name] = sql.raw( + `CASE WHEN excluded."weight" IS NOT NULL AND excluded."weight" > 0 THEN excluded."weight" ELSE COALESCE(catalog_items.weight, excluded."weight") END`, + ); + } else if (col.name === 'weight_unit') { + // weight_unit stays in sync with weight validity + acc[col.name] = sql.raw( + `CASE WHEN excluded."weight" IS NOT NULL AND excluded."weight" > 0 THEN excluded."weight_unit" ELSE COALESCE(catalog_items.weight_unit, excluded."weight_unit") END`, + ); } else { acc[col.name] = sql.raw(`excluded."${col.name}"`); } From 132c2a41086e599d9916107e14fae19d3c6ac074 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 7 May 2026 11:57:53 -0600 Subject: [PATCH 6/8] fix(types): remove baseUrl, fix ignoreDeprecations, cast TFunction for TS6 compat --- apps/expo/app/(app)/current-pack/[id].tsx | 2 +- apps/expo/app/(app)/recent-packs.tsx | 4 ++-- tsconfig.json | 3 +-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/apps/expo/app/(app)/current-pack/[id].tsx b/apps/expo/app/(app)/current-pack/[id].tsx index db114c3fe7..2a37709625 100644 --- a/apps/expo/app/(app)/current-pack/[id].tsx +++ b/apps/expo/app/(app)/current-pack/[id].tsx @@ -155,7 +155,7 @@ export default function CurrentPackScreen() { {t('packs.lastUpdated', { - time: getRelativeTime(pack.localUpdatedAt ?? pack.updatedAt, t), + time: getRelativeTime(pack.localUpdatedAt ?? pack.updatedAt, t as any), })} diff --git a/apps/expo/app/(app)/recent-packs.tsx b/apps/expo/app/(app)/recent-packs.tsx index 20fa2db13a..6ab73c53fb 100644 --- a/apps/expo/app/(app)/recent-packs.tsx +++ b/apps/expo/app/(app)/recent-packs.tsx @@ -34,7 +34,7 @@ function RecentPackCard({ pack }: { pack: Pack }) { {pack.totalWeight ?? 0} g - {getRelativeTime(pack.localCreatedAt ?? pack.createdAt, t)} + {getRelativeTime(pack.localCreatedAt ?? pack.createdAt, t as any)} @@ -45,7 +45,7 @@ function RecentPackCard({ pack }: { pack: Pack }) { {t('packs.lastUpdated', { - time: getRelativeTime(pack.localUpdatedAt ?? pack.updatedAt, t), + time: getRelativeTime(pack.localUpdatedAt ?? pack.updatedAt, t as any), })} diff --git a/tsconfig.json b/tsconfig.json index bd2a8c48e0..8d3088ab3a 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,9 +1,8 @@ { "compilerOptions": { "allowJs": true, - "baseUrl": ".", "esModuleInterop": true, - "ignoreDeprecations": "6.0", + "ignoreDeprecations": "5.0", "jsx": "react-native", "lib": ["DOM", "ESNext"], "module": "preserve", From 2f980dc29d3f60fe3c014d83cb42681fdf28beab Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 7 May 2026 12:01:26 -0600 Subject: [PATCH 7/8] refactor(etl): replace sql.raw with sql template and sql.identifier in upsert set --- packages/api/src/services/catalogService.ts | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/packages/api/src/services/catalogService.ts b/packages/api/src/services/catalogService.ts index 00c5617501..5fb5d15b13 100644 --- a/packages/api/src/services/catalogService.ts +++ b/packages/api/src/services/catalogService.ts @@ -345,19 +345,17 @@ export class CatalogService { set: Object.values(columns).reduce>((acc, col) => { if (col.name === 'id' || col.name === 'created_at') { // Never overwrite PK or original creation timestamp - acc[col.name] = sql.raw(`COALESCE(catalog_items.${col.name}, excluded."${col.name}")`); + acc[col.name] = sql`COALESCE(${col}, excluded.${sql.identifier(col.name)})`; } else if (col.name === 'weight') { // Keep old weight if new weight is missing or invalid (0 / negative) - acc[col.name] = sql.raw( - `CASE WHEN excluded."weight" IS NOT NULL AND excluded."weight" > 0 THEN excluded."weight" ELSE COALESCE(catalog_items.weight, excluded."weight") END`, - ); + acc[col.name] = + sql`CASE WHEN excluded.${sql.identifier('weight')} IS NOT NULL AND excluded.${sql.identifier('weight')} > 0 THEN excluded.${sql.identifier('weight')} ELSE COALESCE(${catalogItems.weight}, excluded.${sql.identifier('weight')}) END`; } else if (col.name === 'weight_unit') { // weight_unit stays in sync with weight validity - acc[col.name] = sql.raw( - `CASE WHEN excluded."weight" IS NOT NULL AND excluded."weight" > 0 THEN excluded."weight_unit" ELSE COALESCE(catalog_items.weight_unit, excluded."weight_unit") END`, - ); + acc[col.name] = + sql`CASE WHEN excluded.${sql.identifier('weight')} IS NOT NULL AND excluded.${sql.identifier('weight')} > 0 THEN excluded.${sql.identifier('weight_unit')} ELSE COALESCE(${catalogItems.weightUnit}, excluded.${sql.identifier('weight_unit')}) END`; } else { - acc[col.name] = sql.raw(`excluded."${col.name}"`); + acc[col.name] = sql`excluded.${sql.identifier(col.name)}`; } return acc; }, {}), From 40079759bd65164df2e11989beace72e24074c84 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Thu, 7 May 2026 12:11:34 -0600 Subject: [PATCH 8/8] fix(etl): increase stuck-job threshold from 30 min to 3 hours --- packages/api/scripts/reset-stuck-etl-jobs.sql | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/api/scripts/reset-stuck-etl-jobs.sql b/packages/api/scripts/reset-stuck-etl-jobs.sql index e63d312f6b..f5595b867e 100644 --- a/packages/api/scripts/reset-stuck-etl-jobs.sql +++ b/packages/api/scripts/reset-stuck-etl-jobs.sql @@ -1,6 +1,7 @@ --- Reset ETL jobs stuck in 'running' state for more than 30 minutes. +-- Reset ETL jobs stuck in 'running' state for more than 3 hours. +-- 3h accounts for large first-time imports (~500K rows + embedding generation). -- Run manually when zombie jobs are detected. UPDATE etl_jobs SET status = 'failed', completed_at = NOW() WHERE status = 'running' - AND started_at < NOW() - INTERVAL '30 minutes'; + AND started_at < NOW() - INTERVAL '3 hours';