Fix/etl memory and weight validation v2#2383
Conversation
…files Replace end-of-stream single flush with per-BATCH_SIZE flushes during the CSV parse loop. Valid and invalid item batches are written to DB every 100 rows, arrays are cleared, and totalProcessed is updated incrementally so progress is visible on long-running jobs. Remaining rows are flushed after the loop as before. Fixes evo (174K rows) and gearx (166K rows) jobs stuck in 'running' forever.
Clothing and footwear brands (kuhl, obozfootwear, prana) routinely omit weight data, causing 0% ingest rates. Weight is important for comparisons but it is better to ingest items without it than to reject them entirely. Items missing weight are stored as-is and simply excluded from weight views. Weight and weightUnit remain nullable at the DB layer (no schema change).
The ON CONFLICT DO UPDATE SET clause was using COALESCE(table.field, excluded.field) which means existing non-null values could never be corrected by a fresh scrape. Changed to excluded.field directly so re-scraping always wins, allowing price, weight, availability, and other fields to be corrected when the source data changes. Exception: created_at retains COALESCE so the original creation timestamp is preserved across upserts.
…state Manually runnable script that fails any ETL job that has been in 'running' status for more than 30 minutes. Addresses jobs left stuck when the Worker crashes mid-stream (e.g. OOM) without reaching the error handler.
Protect against scrapers returning weight=0 or weight<0 overwriting valid existing weight data. Weight and weight_unit now only update when the incoming value is a positive number; otherwise the stored value is preserved. All other fields continue to accept the fresh scraped value so stale data can still be corrected by re-scraping.
|
Warning Rate limit exceeded
To continue reviewing without waiting, purchase usage credits in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (7)
Warning
|
Coverage Report for Expo Unit Tests Coverage (./apps/expo)
File CoverageNo changed files found. |
Coverage Report for API Unit Tests Coverage (./packages/api)
File CoverageNo changed files found. |
There was a problem hiding this comment.
Pull request overview
This PR aims to improve catalog ETL robustness (memory usage + weight handling) and adjust downstream catalog upsert behavior to better handle missing/invalid weight data.
Changes:
- Flush ETL valid/invalid batches to the DB during streaming to reduce worker memory pressure.
- Make catalog ETL validation treat
weight/weightUnitas optional and adjust upsert conflict behavior (notably around weight fields). - Misc: add a SQL script to fail “stuck” ETL jobs; adjust Expo UI callsites to compile by casting
ttoany; tweak root TS config.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| tsconfig.json | Adjusts TS compiler options (removes baseUrl, changes ignoreDeprecations). |
| packages/api/src/services/etl/processCatalogEtl.ts | Streams CSV rows and flushes valid/invalid batches periodically to avoid OOM. |
| packages/api/src/services/etl/CatalogItemValidator.ts | Changes ETL validation to no longer require weight / weightUnit. |
| packages/api/src/services/catalogService.ts | Modifies catalog item upsert conflict-update behavior, including special handling for weight fields. |
| packages/api/scripts/reset-stuck-etl-jobs.sql | Adds a manual script to mark long-running ETL jobs as failed. |
| apps/expo/app/(app)/recent-packs.tsx | Casts t to any when calling getRelativeTime. |
| apps/expo/app/(app)/current-pack/[id].tsx | Casts t to any when calling getRelativeTime. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Flush valid batch to DB every BATCH_SIZE rows to avoid Worker OOM on large files | ||
| if (validItemsBatch.length >= BATCH_SIZE) { | ||
| await processValidItemsBatch({ jobId, items: [...validItemsBatch], env }); | ||
| await db | ||
| .update(etlJobs) | ||
| .set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${BATCH_SIZE}` }) | ||
| .where(eq(etlJobs.id, jobId)); | ||
| validItemsBatch.length = 0; | ||
| } | ||
| // Flush invalid batch to DB every BATCH_SIZE rows | ||
| if (invalidItemsBatch.length >= BATCH_SIZE) { | ||
| await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env }); | ||
| await db | ||
| .update(etlJobs) | ||
| .set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${BATCH_SIZE}` }) | ||
| .where(eq(etlJobs.id, jobId)); | ||
| invalidItemsBatch.length = 0; | ||
| } | ||
| } | ||
|
|
||
| console.log(`🔍 [TRACE] Streaming complete - processing batches`); | ||
| console.log(`🔍 [TRACE] Streaming complete - processing remaining batches`); | ||
|
|
||
| const itemsProcessed = validItemsBatch.length + invalidItemsBatch.length; | ||
| // Flush remaining items after the stream ends | ||
| const remainingItems = validItemsBatch.length + invalidItemsBatch.length; | ||
|
|
||
| await db | ||
| .update(etlJobs) | ||
| .set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${itemsProcessed}` }) | ||
| .where(eq(etlJobs.id, jobId)); | ||
| if (remainingItems > 0) { | ||
| await db | ||
| .update(etlJobs) | ||
| .set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${remainingItems}` }) | ||
| .where(eq(etlJobs.id, jobId)); | ||
| } |
| acc[col.name] = | ||
| sql`CASE WHEN excluded.${sql.identifier('weight')} IS NOT NULL AND excluded.${sql.identifier('weight')} > 0 THEN excluded.${sql.identifier('weight_unit')} ELSE COALESCE(${catalogItems.weightUnit}, excluded.${sql.identifier('weight_unit')}) END`; | ||
| } else { | ||
| acc[col.name] = sql`excluded.${sql.identifier(col.name)}`; |
| sql`CASE WHEN excluded.${sql.identifier('weight')} IS NOT NULL AND excluded.${sql.identifier('weight')} > 0 THEN excluded.${sql.identifier('weight')} ELSE COALESCE(${catalogItems.weight}, excluded.${sql.identifier('weight')}) END`; | ||
| } else if (col.name === 'weight_unit') { | ||
| // weight_unit stays in sync with weight validity | ||
| acc[col.name] = | ||
| sql`CASE WHEN excluded.${sql.identifier('weight')} IS NOT NULL AND excluded.${sql.identifier('weight')} > 0 THEN excluded.${sql.identifier('weight_unit')} ELSE COALESCE(${catalogItems.weightUnit}, excluded.${sql.identifier('weight_unit')}) END`; |
| @@ -45,7 +45,7 @@ function RecentPackCard({ pack }: { pack: Pack }) { | |||
| </View> | |||
| <Text variant="caption1" className="text-muted-foreground"> | |||
| {t('packs.lastUpdated', { | |||
| time: getRelativeTime(pack.localUpdatedAt ?? pack.updatedAt, t), | |||
| time: getRelativeTime(pack.localUpdatedAt ?? pack.updatedAt, t as any), | |||
| })} | |||
| <Text variant="subhead" className="mt-1 text-muted-foreground"> | ||
| {t('packs.lastUpdated', { | ||
| time: getRelativeTime(pack.localUpdatedAt ?? pack.updatedAt, t), | ||
| time: getRelativeTime(pack.localUpdatedAt ?? pack.updatedAt, t as any), | ||
| })} |
| // Additional validations | ||
| // Note: weight and weightUnit are intentionally not required — clothing/footwear brands often | ||
| // omit weight data. Items without weight are ingested but won't appear in weight comparisons. |
…lidation-v2 Fix/etl memory and weight validation v2
Description
Closes #
Type of change
Area(s) affected
apps/expo)packages/api)apps/landing)apps/guides).github/)Testing
curlor Postman)Screenshots / recordings
Pre-merge checklist
bun format && bun lintpasses with no errorsbun check-typespasses with no errorsfeat:,fix:,chore:, etc.)