diff --git a/CLAUDE.md b/CLAUDE.md index e86a7939f6..7846bc27c9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -195,6 +195,24 @@ Defined in root `tsconfig.json`: - Migrations: Drizzle Kit (`drizzle-kit`) - Embeddings: pgvector with 1536 dimensions +### Migration discipline (read before touching `packages/api/drizzle/`) + +1. **Always generate via drizzle-kit.** Edit `packages/api/src/db/schema.ts` (or `packages/db/src/schema.ts` for the shared workspace), then run from the API package: + + ```bash + cd packages/api && bun run db:generate + ``` + + Drizzle-kit emits a random-name file like `0048_loud_squirrel_girl.sql`. That random name is fine — keep it. The naming convention here is "whatever drizzle-kit gives you." + +2. **Do not rename a generated migration file.** The `meta/_journal.json` `tag` field, the migration SQL filename, and the snapshot filename all encode the migration identity together. Renaming any one of them (even with corresponding journal edits) makes the migration look hand-authored and creates drift that future drizzle-kit operations can mis-handle. + +3. **Do not hand-edit `meta/_journal.json`, `meta/*_snapshot.json`, or the generated SQL.** If the generated migration is wrong, fix the schema, delete the bad migration + snapshot + journal entry, and regenerate. Do not patch around it. + +4. **Collapse additive changes into one migration when they ship together** — fewer snapshot files in the diff, easier to revert as a unit. Splitting only makes sense when migrations need to land in separate releases. + +5. **Verify after generating.** Run `bunx drizzle-kit check` from `packages/api/` — it validates the snapshot chain is internally consistent. Run before pushing. + ## EAS Build Profiles | Profile | Use | Distribution | diff --git a/bun.lock b/bun.lock index 95b75188be..8c7e5a5b02 100644 --- a/bun.lock +++ b/bun.lock @@ -473,6 +473,7 @@ "@packrat/schemas": "workspace:*", "@packrat/types": "workspace:*", "@packrat/units": "workspace:*", + "@sentry/cloudflare": "^10.37.0", "@sinclair/typebox": "^0.34.15", "@types/nodemailer": "^6.4.17", "ai": "catalog:", @@ -1980,7 +1981,9 @@ "@sentry/cli-win32-x64": ["@sentry/cli-win32-x64@2.58.4", "", { "os": "win32", "cpu": "x64" }, "sha512-cSzN4PjM1RsCZ4pxMjI0VI7yNCkxiJ5jmWncyiwHXGiXrV1eXYdQ3n1LhUYLZ91CafyprR0OhDcE+RVZ26Qb5w=="], - "@sentry/core": ["@sentry/core@10.37.0", "", {}, "sha512-hkRz7S4gkKLgPf+p3XgVjVm7tAfvcEPZxeACCC6jmoeKhGkzN44nXwLiqqshJ25RMcSrhfFvJa/FlBg6zupz7g=="], + "@sentry/cloudflare": ["@sentry/cloudflare@10.53.1", "", { "dependencies": { "@opentelemetry/api": "^1.9.1", "@sentry/core": "10.53.1" }, "peerDependencies": { "@cloudflare/workers-types": "^4.x" }, "optionalPeers": ["@cloudflare/workers-types"] }, "sha512-iSohVibGRAKg7zLUflfA2ePG69Uw6bqm6iCQLM18hoG2gT4DGigaKcjJmZLTfAtW1DInMCb0DYc/mltCznxMrQ=="], + + "@sentry/core": ["@sentry/core@10.53.1", "", {}, "sha512-XG4ezlkyuAPjBC5+9kXC94rXXuqYTw9NRhfaDHssbTFaGnqBR8vQX2UUgZfY7ucbeelRDGfBu1sywoU+mB04uA=="], "@sentry/hub": ["@sentry/hub@6.19.7", "", { "dependencies": { "@sentry/types": "6.19.7", "@sentry/utils": "6.19.7", "tslib": "^1.9.3" } }, "sha512-y3OtbYFAqKHCWezF0EGGr5lcyI2KbaXW2Ik7Xp8Mu9TxbSTuwTe4rTntwg8ngPjUQU3SUHzgjqVB8qjiGqFXCA=="], @@ -5090,6 +5093,16 @@ "@reduxjs/toolkit/immer": ["immer@11.1.8", "", {}, "sha512-/tbkHMW7y10Lx6i1crLjD4/OhNkRG+Fo7byZHtah0547nIeXYcpIXaUh0IAQY6gO5459qpGGYapcEOHtFXkIuA=="], + "@sentry-internal/browser-utils/@sentry/core": ["@sentry/core@10.37.0", "", {}, "sha512-hkRz7S4gkKLgPf+p3XgVjVm7tAfvcEPZxeACCC6jmoeKhGkzN44nXwLiqqshJ25RMcSrhfFvJa/FlBg6zupz7g=="], + + "@sentry-internal/feedback/@sentry/core": ["@sentry/core@10.37.0", "", {}, "sha512-hkRz7S4gkKLgPf+p3XgVjVm7tAfvcEPZxeACCC6jmoeKhGkzN44nXwLiqqshJ25RMcSrhfFvJa/FlBg6zupz7g=="], + + "@sentry-internal/replay/@sentry/core": ["@sentry/core@10.37.0", "", {}, "sha512-hkRz7S4gkKLgPf+p3XgVjVm7tAfvcEPZxeACCC6jmoeKhGkzN44nXwLiqqshJ25RMcSrhfFvJa/FlBg6zupz7g=="], + + "@sentry-internal/replay-canvas/@sentry/core": ["@sentry/core@10.37.0", "", {}, "sha512-hkRz7S4gkKLgPf+p3XgVjVm7tAfvcEPZxeACCC6jmoeKhGkzN44nXwLiqqshJ25RMcSrhfFvJa/FlBg6zupz7g=="], + + "@sentry/browser/@sentry/core": ["@sentry/core@10.37.0", "", {}, "sha512-hkRz7S4gkKLgPf+p3XgVjVm7tAfvcEPZxeACCC6jmoeKhGkzN44nXwLiqqshJ25RMcSrhfFvJa/FlBg6zupz7g=="], + "@sentry/cli/https-proxy-agent": ["https-proxy-agent@5.0.1", "", { "dependencies": { "agent-base": "6", "debug": "4" } }, "sha512-dFcAjpTQFgoLMzC2VwU+C/CbS7uRL0lWmxDITmqm7C+7F0Odmj6s9l6alZc6AELXhrnggM2CeWSXHGOdX2YtwA=="], "@sentry/cli/node-fetch": ["node-fetch@2.7.0", "", { "dependencies": { "whatwg-url": "^5.0.0" }, "peerDependencies": { "encoding": "^0.1.0" }, "optionalPeers": ["encoding"] }, "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A=="], @@ -5112,6 +5125,12 @@ "@sentry/node/tslib": ["tslib@1.14.1", "", {}, "sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg=="], + "@sentry/react/@sentry/core": ["@sentry/core@10.37.0", "", {}, "sha512-hkRz7S4gkKLgPf+p3XgVjVm7tAfvcEPZxeACCC6jmoeKhGkzN44nXwLiqqshJ25RMcSrhfFvJa/FlBg6zupz7g=="], + + "@sentry/react-native/@sentry/core": ["@sentry/core@10.37.0", "", {}, "sha512-hkRz7S4gkKLgPf+p3XgVjVm7tAfvcEPZxeACCC6jmoeKhGkzN44nXwLiqqshJ25RMcSrhfFvJa/FlBg6zupz7g=="], + + "@sentry/types/@sentry/core": ["@sentry/core@10.37.0", "", {}, "sha512-hkRz7S4gkKLgPf+p3XgVjVm7tAfvcEPZxeACCC6jmoeKhGkzN44nXwLiqqshJ25RMcSrhfFvJa/FlBg6zupz7g=="], + "@sentry/utils/@sentry/types": ["@sentry/types@6.19.7", "", {}, "sha512-jH84pDYE+hHIbVnab3Hr+ZXr1v8QABfhx39KknxqKWr2l0oEItzepV0URvbEhB446lk/S/59230dlUUIBGsXbg=="], "@sentry/utils/tslib": ["tslib@1.14.1", "", {}, "sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg=="], diff --git a/docs/audits/2026-05-16-etl-audit.md b/docs/audits/2026-05-16-etl-audit.md new file mode 100644 index 0000000000..84f1d69449 --- /dev/null +++ b/docs/audits/2026-05-16-etl-audit.md @@ -0,0 +1,183 @@ +# ETL Pipeline Audit — 2026-05-16 + +## Summary + +The catalog ETL pipeline works end-to-end and has been hardened through a recent series of fixes (OOM, CPU-time budget, atomic counters, byte-range chunking), but it is not production-ready: chunking + a single shared `jobId` produces double-counted `totalProcessed`, mis-marks jobs `completed` after the first chunk finishes, and lacks any dead-letter / retry policy at the queue layer. Catastrophic per-message failures silently swallow errors in `processQueueBatch` (`try/catch` with `console.error` only), so the queue happily acks bad chunks. The retry endpoint also re-queues only the original object key, ignoring multi-chunk jobs entirely. + +**Top 3 risks**: (1) cross-chunk job-status race (any one chunk's completion marks the entire job `completed`), (2) consumer swallows errors so failed messages never retry/DLQ, (3) retry endpoint and stuck-job sweep are incompatible with byte-range chunking. + +## Architecture + +``` +POST /api/catalog/etl ── api-key auth + │ body: { filename, chunks[], source, scraperRevision } + ▼ +1. INSERT etl_jobs (status='running') +2. For each objectKey: R2.head() → split into 20 MB byte-range chunks +3. queueCatalogETL → ETL_QUEUE.sendBatch (one message per chunk, same jobId) + +ETL_QUEUE (max_batch_size=1, max_concurrency=1) + ▼ +processQueueBatch + ▼ +processCatalogETL ── per chunk + ├── R2.get(key, {range}) ── stream body + ├── if non-first chunk: GET first 4 KB → extract header → inject; skip partial row + ├── csv-parse stream w/ backpressure (parser.write returns false → wait 'drain') + ├── yield every 100 rows (setTimeout(1)) + ├── flush at BATCH_SIZE=100 rows: + │ valid → processValidItemsBatch → mergeBySku → embeddings → catalogService.upsert → updateEtlJobProgress + │ invalid → processLogsBatch → invalid_item_logs → updateEtlJobProgress + └── on success: UPDATE etl_jobs SET status='completed' ◀── PROBLEM with multi-chunk jobs + on throw: UPDATE etl_jobs SET status='failed' + rethrow +``` + +Counters are atomic per call (`COALESCE(col, 0) + n` in SQL). Job rows are not. + +## Findings + +### [P0] Multi-chunk jobs are marked `completed` after the first chunk finishes +- **What**: All chunks for a single source file share one `jobId`; each chunk independently sets `status='completed'` on success. +- **Where**: `packages/api/src/services/etl/processCatalogEtl.ts:188-191`; chunks created in `packages/api/src/routes/catalog/index.ts:182-200`. +- **Why it matters**: A 100 MB file becomes 5 chunks → 5 messages → the first message to finish flips the job to `completed`, even though 80% of rows haven't been processed yet. The dashboard, `success_rate`, and any downstream check ("is the catalog refresh done?") fire prematurely. Subsequent chunks continue to mutate `totalProcessed/totalValid/totalInvalid`, so the row reads as `completed` with rising counters. +- **Recommendation**: Track per-chunk completion. Two options: (a) add a `chunks_total` and `chunks_completed` column; only set `completed` when `chunks_completed = chunks_total`. (b) give each chunk its own jobId and group by a parent `batch_id`. Option (a) is the smaller change. + +### [P0] `processQueueBatch` swallows errors — failed chunks never retry or DLQ +- **What**: Per-message exceptions are caught and logged but never rethrown; CF Queues auto-acks every message in the batch. +- **Where**: `packages/api/src/services/etl/queue.ts:50-60`. +- **Why it matters**: A transient DB error, OpenAI 429, or R2 read failure permanently loses the chunk. The job is marked `failed` (good) but the message is acked (bad) — there is no retry, no dead-letter queue, and `wrangler.jsonc` does not declare a `dead_letter_queue` or `max_retries`. Combined with the multi-chunk issue above, a single failure can corrupt the job state while other chunks succeed and mark it `completed`. +- **Recommendation**: Rethrow in the catch (or call `message.retry()` explicitly on the specific message). Add `dead_letter_queue` and `max_retries: 3` to the ETL queue consumer in `wrangler.jsonc:76-82`. Process messages with `for...of` calling `message.ack()` / `message.retry()` explicitly so partial-batch semantics are correct even though `max_batch_size=1` today. + +### [P1] Retry endpoint discards multi-chunk structure +- **What**: `POST /admin/etl/:jobId/retry` re-queues exactly one chunk built from `v2/${source}/${filename}` with no chunking. +- **Where**: `packages/api/src/routes/admin/analytics/catalog.ts:434-450`. +- **Why it matters**: If the original job was chunked (20 MB+ files), retry blasts the entire file at one Worker invocation, blowing past the 300s CPU-time limit that prompted the chunking work in the first place. Result: retries of any large failed job silently re-fail. +- **Recommendation**: Re-run the same R2.head + chunk-split logic the producer endpoint uses (lines 182-200). Extract that to a shared helper so both call sites stay in sync. + +### [P1] Stuck-job sweep is wall-clock based and incompatible with serial chunked jobs +- **What**: `POST /admin/etl/reset-stuck` flips any job in `running` for >30 min to `failed`. +- **Where**: `packages/api/src/routes/admin/analytics/catalog.ts:384-403`. +- **Why it matters**: With `max_concurrency=1` and 20 MB chunks each consuming most of a 300s CPU budget, a 500 MB file produces 25 chunks at up to ~5 minutes each → comfortably past 30 minutes. Healthy long jobs will be marked `failed`. The trigger should be "no progress for N minutes" (e.g., `totalProcessed` unchanged), not "started >30 min ago". +- **Recommendation**: Add `lastProgressAt` updated on each `updateEtlJobProgress` call; sweep on `lastProgressAt < now - 15min`. Or check `completedAt IS NULL AND startedAt < now - 2h` for the absolute floor. + +### [P1] First-chunk header injection assumes the first 4 KB contains a complete header +- **What**: For non-first chunks, the parser fetches `bytes 0-4095` and uses `headerText.split('\n')[0]` as the header row. +- **Where**: `packages/api/src/services/etl/processCatalogEtl.ts:53-58`. +- **Why it matters**: If the header row exceeds 4 KB (wide CSVs with 30+ columns and long names — possible here given the catalog schema has 25+ fields), `split('\n')[0]` returns a *truncated* header, so `fieldMap` silently maps the last column wrong. There is also no validation that the slice actually contained a newline before `byteEnd=4095`. +- **Recommendation**: Loop the range request (or use a streaming `until newline` reader). At minimum, throw if no `\n` appears in the first 4 KB so the failure is loud, not silent. + +### [P1] Partial-row skip can drop a valid full row when chunk boundary lands on a newline +- **What**: `skipPartialRow` discards everything up to and including the first `\n` after `byteStart`. If `byteStart` happens to be the first byte *after* a newline (i.e., the previous chunk's last byte is `\n`), the producing chunk processed the full row, and this chunk correctly starts on a row boundary — but the skip logic still throws away the first whole row. +- **Where**: `packages/api/src/services/etl/processCatalogEtl.ts:95-108`. +- **Why it matters**: Off-by-one row drop at every chunk boundary in worst case (data loss, not just dup). For 25-chunk file → potentially 24 lost catalog items. No test covers the boundary-aligned case. +- **Recommendation**: When splitting chunks at line 195-196 of `routes/catalog/index.ts`, do not split on arbitrary 20 MB offsets — peek at R2 with a short range request and align `byteEnd` to a newline so the skip logic is unnecessary, *or* skip only when the previous byte (range `byteStart-1`) was non-newline. + +### [P1] CSV row spanning chunk boundary is never reassembled +- **What**: A row beginning before `byteEnd` and ending after will be cut in half. The producing chunk parses a truncated row (likely fails validation); the next chunk discards the tail. +- **Where**: `packages/api/src/services/etl/processCatalogEtl.ts:95-108` (skip logic), `routes/catalog/index.ts:182-200` (chunk creation). +- **Why it matters**: Every chunk boundary loses (or invalidates) one row. Symptom would be `totalInvalid` rising by ~N per N-chunk job, with field-shaped errors. Severity depends on row width vs 20 MB. +- **Recommendation**: Same as above — align chunk boundaries to row boundaries in the producer. Alternatively, the producing chunk should fetch ~64 KB beyond `byteEnd` to complete its final row, and the next chunk skip logic stays. + +### [P2] `console.log`/`console.error` only — no structured logging, no Sentry +- **What**: Every log uses `console.log` with emoji prefixes; no Sentry integration in ETL paths despite Sentry being a documented monitoring tool. +- **Where**: All ETL files; verified by `grep -rn "Sentry|captureException" packages/api/src/services/etl/` → no results. Same applies to `packages/api/src/`. +- **Why it matters**: A stuck job cannot be debugged without paging through CF Workers logs by hand. No correlation IDs (other than jobId), no per-chunk structured fields (`byteStart`, `rowsProcessed`, `elapsed_ms`), no error categorization. Failures in `processLogsBatch` are caught and `console.error`-ed without rethrow (`packages/api/src/services/etl/processLogsBatch.ts:25-27`) — invalid logs can fail to write and nobody knows. +- **Recommendation**: Add a thin logger (`logger.info({ jobId, chunk: { byteStart, byteEnd }, event: 'chunk_start' })`). Call `Sentry.captureException(err, { tags: { jobId, objectKey } })` in the `processCatalogETL` catch block. + +### [P2] `processLogsBatch` swallows DB errors silently +- **What**: Catch logs to console and returns normally — caller has no idea logs were dropped. +- **Where**: `packages/api/src/services/etl/processLogsBatch.ts:25-27`. +- **Why it matters**: Invalid-item logs are the *only* forensic record of what failed validation. If the INSERT fails (Neon hiccup, payload size, FK violation), we lose visibility forever. The `updateEtlJobProgress` call is also inside the try, so `totalInvalid`/`totalProcessed` will be undercounted. +- **Recommendation**: Rethrow. Let the outer ETL catch flip the job to `failed` — the alternative is silent data quality erosion. + +### [P2] Embedding failure path silently drops embeddings without marking it +- **What**: When `generateManyEmbeddings` throws, items are upserted with `embedding=undefined` (i.e., NULL) but the job still reports as fully successful. +- **Where**: `packages/api/src/services/etl/processValidItemsBatch.ts:52-63`. +- **Why it matters**: No metric distinguishes "successful with embeddings" from "successful but degraded". The `/admin/embeddings` route reports coverage but cannot attribute the drop to a specific job. A backfill is required to recover, and there is no automatic re-queue. +- **Recommendation**: Add a `totalEmbeddingFailures` column on `etl_jobs`, increment it in the fallback path, and surface in the admin dashboard. Optionally enqueue the affected SKUs into `EMBEDDINGS_QUEUE` from the fallback for automatic backfill. + +### [P2] `parser.end()` is called inside a fire-and-forget IIFE — errors are unhandled +- **What**: The async writer is invoked as `(async () => { ... })()` with no `.catch()`. Any stream read error or `parser.write` throw becomes an unhandled rejection. +- **Where**: `packages/api/src/services/etl/processCatalogEtl.ts:89-117`. +- **Why it matters**: In CF Workers, unhandled rejections can terminate the isolate. More commonly the outer `for await (const record of parser)` loop will just hang on a stalled parser if the writer rejected. The job will sit in `running` until the stuck-job sweep notices. +- **Recommendation**: Wrap in an explicit promise: `const writerPromise = (async () => { ... })().catch(err => parser.destroy(err));` and `await writerPromise` after the `for await` loop. Surface the error to the outer catch. + +### [P2] `setTimeout(resolve, 1)` every 100 rows is a fragile yield mechanism +- **What**: Used to yield to event loop / give GC a chance. +- **Where**: `packages/api/src/services/etl/processCatalogEtl.ts:120`. +- **Why it matters**: `setTimeout` consumes wall-clock budget. Workers have a 30s wall-clock per invocation (separate from `cpu_ms`). At 1ms × 600 yields per 60k-row chunk = 0.6s — fine today, but the comment mentions a previous "per-row yield hits the CF Worker wall-clock limit". The thresholds are tightly coupled and undocumented. +- **Recommendation**: Replace with `await scheduler.yield()` (CF supports it) or `await new Promise(setImmediate)`-equivalent. Add a unit test that verifies a 100k-row CSV completes within wall-clock. + +### [P2] `BATCH_SIZE = 100` is exported but reads inconsistent with comment/runtime +- **What**: `processCatalogEtl.ts:13` exports `BATCH_SIZE = 100`. The catalog OpenAI embedding API supports 1000+ per call, so this is conservative; meanwhile the queue's `batchSize` for `sendBatch` is hard-coded at 100 (`queue.ts:17`) for an unrelated reason (max batch size from CF). Reusing the symbol `100` for two different concepts is fragile. +- **Where**: `processCatalogEtl.ts:13`, `queue.ts:17`. +- **Recommendation**: Rename to `ITEM_FLUSH_BATCH_SIZE` and `CF_QUEUE_BATCH_SIZE`, hoist both to a shared constants file. + +### [P3] `mergeItemsBySku` logs change diff on every merge — unbounded console output +- **What**: Logs a `🔄 Merged SKU` line for every SKU collision with every changed field. +- **Where**: `packages/api/src/services/etl/mergeItemsBySku.ts:34-48`. +- **Why it matters**: On a 500 MB CSV with many duplicate SKUs across chunks, this can produce millions of log lines, polluting CF logs and possibly hitting `logpush` quotas. +- **Recommendation**: Aggregate into a single per-batch summary or gate behind a debug flag. + +### [P3] Validator: no URL scheme check, no length limits, no SKU charset rules +- **What**: `isValidUrl` allows any `new URL()`-parseable input (e.g., `mailto:`, `javascript:`, `file:`). +- **Where**: `packages/api/src/services/etl/CatalogItemValidator.ts:60-67`. +- **Why it matters**: `productUrl` is rendered in the mobile app and on the guides site. A scraper bug could inject `javascript:` URLs that survive to the UI. +- **Recommendation**: Restrict to `http:`/`https:`. Add length caps (`name` ≤ 500, `description` ≤ 50k, `sku` matches `[A-Za-z0-9_.\-/]+`). + +### [P3] Soft-delete is not handled by the upsert +- **What**: `catalogItems` has no `deletedAt` column (verified — grep returns nothing). CLAUDE.md notes "Soft deletes for all user content" but catalog items are scraper-controlled, so this may be intentional. However, an item that disappears from the source CSV is never marked unavailable. +- **Where**: `packages/api/src/db/schema.ts:132-215`; `packages/api/src/services/catalogService.ts:337-407`. +- **Why it matters**: The catalog grows monotonically. Discontinued products keep their `availability` from the last successful upsert. There is no "items present in last job but not in this one → mark out-of-stock" reconciliation. +- **Recommendation**: After a successful ETL, run `UPDATE catalog_items SET availability='OutOfStock' WHERE NOT EXISTS (SELECT 1 FROM catalog_item_etl_jobs WHERE catalog_item_id = id AND etl_job_id IN (last N jobs for this source))`. Or accept the limitation and document it. + +### [P3] No invalid-items retention policy +- **What**: `invalid_item_logs` grows forever; no TTL/sweep. +- **Where**: `packages/api/src/db/schema.ts:481-490`. +- **Why it matters**: Each bad row stores `raw_data` as JSONB plus an `errors` array — a single bad upload can write hundreds of MB to Neon. +- **Recommendation**: Add a scheduled task (or CF Cron Trigger) to drop logs >90 days. + +### [P3] No runbook / deploy docs +- **What**: No `docs/runbooks/etl.md`. `grep "etl|ETL"` in `README.md`/`docs/` returns only stale plan files. +- **Where**: N/A (missing). +- **Recommendation**: Write a 1-page runbook: how to trigger an ETL, how to inspect queue depth (`wrangler queues list/info packrat-etl-queue`), how to retry a failed job, how to drain the queue (`wrangler queues consumer remove`), how to interpret `success_rate`. Reference admin endpoints `/admin/etl/*`. + +## Test Coverage Gaps + +Tests cover the happy path with mocked R2 and globally-mocked DB. The following are **not** tested: + +- **Byte-range chunk processing** — no test sets `byteStart`/`byteEnd` in the message. The injected-header fetch, partial-row skip, and boundary off-by-ones (P1 above) are entirely uncovered. +- **Multi-message job (same jobId, multiple chunks)** — no integration test exercises the "two chunks complete sequentially" path, so the P0 premature-completion bug is invisible to CI. +- **Header > 4 KB** — see P1 finding. +- **Row spanning chunk boundary** — see P1 finding. +- **Embedding service failure path** — `processValidItemsBatch.test` mocks the rejection but does not assert that items were upserted without embeddings (the actual fallback behavior). +- **`processLogsBatch` DB failure** — no test for the swallowed-error case. +- **Backpressure** — `parser.write` returning `false` and waiting on `'drain'` is not unit-testable with the current mock (whole CSV emitted in one chunk). +- **Yield/wall-clock budget** — no test asserts a 100k-row CSV completes under wall-clock. +- **`processQueueBatch`** — no direct test; the per-message catch-and-swallow (P0) is untested. +- **Retry endpoint** — no integration test verifies the retry produces a new running job and a queue send. +- **Stuck-job sweep** — no test for the 30-minute cutoff. +- **Concurrent updates to same job row** — no race-condition test (e.g., two batches calling `updateEtlJobProgress` interleaved). Atomicity at the SQL level is good but a parallel-batch test would lock it in. +- **`mergeItemsBySku` cross-chunk SKU collisions** — merging happens within a single batch; SKUs duplicated across batches (or across chunks) hit the DB upsert path, not the merge path. No test for that. +- **Header injection — wrong column ordering** — what if the source CSV has a BOM, or quoted headers with commas inside? + +## Production Readiness Checklist + +- [ ] Multi-chunk job completion tracked correctly (chunks_total / chunks_completed columns) — addresses P0 #1 +- [ ] Queue consumer rethrows on per-message failure; DLQ + max_retries configured in `wrangler.jsonc` — addresses P0 #2 +- [ ] Retry endpoint chunks large files the same way the producer does — addresses P1 #1 +- [ ] Stuck-job sweep keyed on `lastProgressAt`, not `startedAt` — addresses P1 #2 +- [ ] Chunk boundaries aligned to row boundaries in the producer (or reassembly in the consumer) — addresses P1 #3 and P1 #4 +- [ ] Header injection validates first 4 KB contains a `\n`; tested with wide CSV — addresses P1 #5 +- [ ] Sentry integration in ETL paths with `jobId`/`objectKey` tags — addresses P2 #1 +- [ ] `processLogsBatch` rethrows on DB failure — addresses P2 #2 +- [ ] Embedding fallback tracked via counter and visible in admin dashboard — addresses P2 #3 +- [ ] Writer IIFE error attached to outer flow — addresses P2 #4 +- [ ] Yield mechanism uses `scheduler.yield()` and has a wall-clock test — addresses P2 #5 +- [ ] Rename ambiguous `BATCH_SIZE` constants — addresses P2 #6 +- [ ] `mergeItemsBySku` summary log instead of per-SKU — addresses P3 #1 +- [ ] Validator enforces `http(s):` scheme and length caps — addresses P3 #2 +- [ ] Discontinued-item reconciliation strategy chosen and documented — addresses P3 #3 +- [ ] `invalid_item_logs` retention policy — addresses P3 #4 +- [ ] Runbook checked in at `docs/runbooks/etl.md` — addresses P3 #5 +- [ ] Test coverage added for all gaps listed above diff --git a/docs/plans/2026-05-19-001-fix-etl-pipeline-audit-remediation-plan.md b/docs/plans/2026-05-19-001-fix-etl-pipeline-audit-remediation-plan.md new file mode 100644 index 0000000000..6df4f8b89c --- /dev/null +++ b/docs/plans/2026-05-19-001-fix-etl-pipeline-audit-remediation-plan.md @@ -0,0 +1,1063 @@ +--- +title: "fix: ETL pipeline audit remediation" +type: fix +status: superseded +supersededBy: docs/plans/2026-05-20-001-fix-etl-pipeline-workflows-migration-plan.md +supersededReason: "Pivoted execution engine from Cloudflare Queues + outbox to Cloudflare Workflows on 2026-05-20. Workflows natively provides the durable-step + idempotency + retry + state semantics that ~8 of the 15 units in this plan were manually reconstructing. The audit findings about CSV correctness, validator hardening, observability, retention, and runbook remain real and carry into the successor plan; the queue-as-state-machine subplot is dropped." +date: 2026-05-19 +deepened: 2026-05-19 +origin: docs/audits/2026-05-16-etl-audit.md +--- + +# fix: ETL pipeline audit remediation + +## Summary + +Remediate the catalog ETL pipeline against every finding in the 2026-05-16 audit (2 P0, 5 P1, 6 P2, 3 P3), correct two stale assumptions the audit made about Cloudflare runtime APIs, add bucket-vs-job reconciliation (both an admin-triggered tool and automatic post-job verification), and add a "re-ingest from the top" recovery path for jobs the buggy stuck-job sweep has already corrupted. Delivered as one master plan in four sequenced phases — schema + P0 blockers first, then chunking correctness, then observability + reconciliation, then hardening + runbook. + +--- + +## Problem Frame + +The pipeline ingests scraper CSVs from R2 (`packrat-scrapy-bucket`) into Neon Postgres via a Cloudflare Queue consumer. It is currently silently incorrect: live prod admin data (192 runs / 74 failed = 38% failure rate) shows seven large jobs from 2026-05-14 marked `failed` with identical `completedAt` timestamps — the wall-clock-based stuck-job sweep firing on healthy long jobs — while the dashboard reports `successRate: 100%` on those same failed jobs. Audit `docs/audits/2026-05-16-etl-audit.md` enumerates the structural causes: a single shared `jobId` across byte-range chunks lets the first finishing chunk flip the parent job to `completed`, per-message exceptions are swallowed (no DLQ, no retry), byte-range chunk boundaries silently drop or invalidate rows that span them, retries discard chunking entirely, and there is no Sentry / structured logging anywhere in the ETL path. + +The user's stated concern — *"some [data] is missing or falsely labeling as success"* — is corroborated on both ends: `completed` jobs can be premature (P0 #1), and `failed` jobs can be false failures (P1 #2). Either way the catalog count `totalItemsIngested: 304,431` cannot currently be trusted. + +--- + +## Requirements + +- R1. **No chunk causes premature job completion.** A multi-chunk job transitions to `completed` only when every chunk has succeeded. +- R2. **Per-message queue failures retry and ultimately DLQ.** No exception thrown by chunk processing is silently swallowed. +- R3. **Stuck-job sweep is progress-based, not wall-clock-based.** Healthy long-running jobs are not falsely marked `failed`. +- R4. **Chunk boundaries do not drop or invalidate rows.** Every row in the source CSV is processed exactly once. +- R5. **Retry / repair endpoints chunk the same way the producer does.** Retrying a large file does not single-shot it. +- R6. **CSV header injection for non-first chunks is correct or fails loudly.** No silent column misalignment. +- R7. **Every ETL job has post-ingestion verification.** R2 row count is compared to `totalProcessed` and the result is observable; significant deltas are surfaced. +- R8. **Operators can trigger a "from scratch" repair of any historical job** without invoking the original producer endpoint. +- R9. **Failures emit Sentry events with structured context.** Operators can debug a stuck job without paging through raw Worker logs. +- R10. **Embedding-fallback degradation is observable.** A job that completed without embeddings is distinguishable from a fully-successful one. +- R11. **Validator rejects unsafe URLs and oversize fields.** Mobile/web cannot be tricked into rendering `javascript:` URLs from the catalog. +- R12. **`invalid_item_logs` retention is bounded.** A bad upload cannot fill Neon storage indefinitely. +- R13. **A documented runbook exists for ETL operations.** A new on-caller can trigger / inspect / retry / drain without reading source. +- R14. **Test coverage exists for every behavior in R1–R12.** Specifically including the cases the global queue-mock in `packages/api/test/setup.ts` currently hides. + +--- + +## Scope Boundaries + +- The plan does not raise `max_concurrency` above 1 for the ETL queue. Concurrency bump is blocked on per-chunk idempotency keys that this plan introduces; the actual bump is a follow-up after this lands and bakes. +- The plan does not add a DLQ to the embeddings queue. ETL queue DLQ only. +- The plan does not migrate or rewrite the existing `etl_jobs` row data for the 7 historical jobs falsely marked `failed`. The repair-from-scratch endpoint introduced in U6 is the mechanism operators will use; the actual recovery run is operational, not a code unit. +- The plan does not change the producer endpoint's authentication, the source CSV schema, or the scraper revision pinning. +- The plan does not introduce a new ETL Worker — the current `packages/api` Elysia Worker continues to host both the HTTP routes and the queue consumer. +- The plan does not address `apps/landing` / `apps/guides` / `apps/expo` consumers of catalog data even when bucket-vs-job reconciliation finds drift. Surfacing inconsistencies is in scope; downstream cache invalidation is not. + +### Deferred to Follow-Up Work + +- **Concurrency bump on `packrat-etl-queue` consumer**: separate PR after this plan ships and per-chunk idempotency is verified in production for ≥2 weeks. +- **Embeddings-queue DLQ + retry policy**: separate plan; same shape as ETL DLQ work in U3, but a distinct surface. +- **Catalog reconciliation across multiple historical jobs**: only per-job reconciliation is in scope. Historical cross-source rollup ("did we lose 5% of the catalog last quarter?") is a separate analytics workstream. +- **Soft-delete / discontinued-item reconciliation** (audit P3 #3): documented as accepted limitation in the runbook (catalog is scraper-controlled, not user content). A future plan can add `availability='OutOfStock'` reconciliation if business requirements emerge. +- **CLI subcommand surface in `packages/cli/src/commands/admin/etl.ts`**: U12 wires the new admin endpoints into the existing CLI command file. Broader CLI ergonomics work is out of scope. + +--- + +## Context & Research + +### Relevant Code and Patterns + +- **Producer endpoint:** `packages/api/src/routes/catalog/index.ts:229-293` — `POST /catalog/etl`, R2 head + 20 MB chunking at `:253-271`. Chunk creation logic to extract into a shared helper used by U6. +- **Queue producer:** `packages/api/src/services/etl/queue.ts:6-41` — `queueCatalogETL`; uses `sendBatch` with `batchSize: 100` (CF queue per-call cap). +- **Queue consumer dispatch:** `packages/api/src/services/etl/queue.ts:43-61` — `processQueueBatch` with the swallowed catch at `:50-60`. **This is the core P0 #2 surface.** +- **Per-chunk processor:** `packages/api/src/services/etl/processCatalogEtl.ts` — header injection (`:50-58`), partial-row skip (`:95-108`), batch flush (`:120-187`), per-chunk completion (`:188-191`), per-chunk failure (`:201-204`). +- **Atomic counter pattern (mirror this):** `packages/api/src/services/etl/updateEtlJobProgress.ts:16-23` — `sql\`COALESCE(${col}, 0) + ${n}\``. New `chunks_completed` / `total_embedding_failures` increments use the same idiom; the "set status=completed when chunks_completed+1 == chunks_total" branch uses a single `UPDATE ... SET ... WHERE` with a `CASE` expression in the same transaction. +- **Embeddings queue pattern (mirror this):** `packages/api/src/services/catalogService.ts:461-507` — consumer rethrows on failure so CF Queue retries fire. ETL consumer must adopt the same shape. +- **Admin routing pattern:** `packages/api/src/routes/admin/index.ts:117-237` mounts the admin prefix; `:230-237` enforces `adminAuthGuard` on every sub-route. New endpoints in `packages/api/src/routes/admin/analytics/catalog.ts` inherit the guard. +- **R2 access (S3-API not Workers binding):** `packages/api/src/services/r2-bucket.ts:193-360` — `R2BucketService({ env, bucketType: 'catalog' })` wraps `@aws-sdk/client-s3` against the R2 S3 endpoint. `r2.head(key)` and `r2.get(key, { range: { offset, length } })` are the surface. Range format `bytes=offset-(offset+length-1)` at `:675-691`. +- **Schema location:** `packages/db/src/schema.ts:446-510` — `etlJobs`, `invalidItemLogs`, `catalogItemEtlJobs`, status enum at `:460`. **Audit cites a stale path (`packages/api/src/db/schema.ts`); the file was extracted into the `packages/db` package — see merge `b14f4dbd5`.** +- **Drizzle migration location:** `packages/api/drizzle/NNNN_.sql` + `meta/NNNN_snapshot.json` + `_journal.json`. Latest is `0047_cute_bloodscream.sql`; new migrations land at `0048` and `0049` (split per Drizzle Kit's enum-add constraint). Generated via `bun run --cwd packages/api db:generate`. Custom linter at `scripts/lint/check-drizzle-migrations.ts` runs in `lint:custom`. +- **Existing ETL integration test:** `packages/api/test/etl.test.ts` — mocks `R2BucketService` per-test, uses real Postgres via wsproxy at `localhost:5434`. Setup at `packages/api/test/setup.ts:535-572` globally mocks both `queueCatalogETL` and `processQueueBatch` (lines `:544-551`) — this is precisely *why* the per-message swallow in P0 #2 is invisible to CI today, and U14 must un-mock to cover it. +- **Wrangler config:** `packages/api/wrangler.jsonc:65-89` (prod queues) and `:161-194` (dev). Currently `max_batch_size: 1, max_concurrency: 1`, **no `dead_letter_queue`, no `max_retries`** on either consumer. Queue routing handler at `packages/api/src/index.ts:109-124`. +- **Admin CLI surface:** `packages/cli/src/commands/admin/etl.ts` already exists. New endpoints in U6 and U12 add corresponding subcommands. + +### Institutional Learnings + +- `docs/solutions/` has no prior ETL, Cloudflare Queues, R2 byte-range, or Sentry-in-Workers learnings — only an unrelated Better Auth CLI note and an Android UI bug. This remediation is greenfield from an institutional-knowledge standpoint, which makes it a strong `/ce-compound` target after each phase ships. + +### External References + +- **Cloudflare Queues — ack/retry semantics:** `message.ack()` / `message.retry({ delaySeconds })` / `ackAll()` / `retryAll()` documented at . Throwing fails the un-acked remainder of the batch. `retryDelaySeconds` max is 24h per . +- **Cloudflare Queues — DLQ:** `dead_letter_queue` (string name) + `max_retries` (default 3, max 100) in the consumer block per . +- **Cloudflare Workers Scheduler:** Only `scheduler.wait(ms)` is documented at . **`scheduler.yield()` does not exist** — the audit P2 #5 recommendation is wrong on this. Use `await scheduler.wait(0)` instead. +- **Wall-clock limit:** Queue consumer wall-clock cap is **15 minutes**, not 30 seconds, per . The audit's "30 s wall-clock" framing under P2 #5 is stale. +- **Sentry on Cloudflare:** Prefer the first-party `@sentry/cloudflare` over toucan-js. Wrap via `Sentry.withSentry(optsFn, { fetch, queue })` per . Queue instrumentation guidance at . +- **Drizzle enum-add limitation:** `ALTER TYPE … ADD VALUE` inside the same transaction as code that uses the new value fails. Split migrations. Tracked at . +- **R2 range reads with AWS SDK:** R2's S3 API fully supports the `Range` header — `GetObjectCommand({ Range: 'bytes=0-1023' })` behaves identically to S3 per . + +--- + +## Key Technical Decisions + +- **Track chunk completion via two new columns (`chunks_total`, `chunks_completed`) on the existing `etl_jobs` row, gated by a per-chunk idempotency table `etl_job_chunks(job_id, chunk_index, completed_at)` with PK on `(job_id, chunk_index)`.** Rationale: even at `max_concurrency: 1` today, Cloudflare Queues are *at-least-once* — a chunk whose DB writes succeed but whose ack is lost will be redelivered, which would double-increment a naive `chunks_completed = chunks_completed + 1` and either crash through `chunks_total` or transition the job to `completed` while a sibling chunk is still pending. The idempotency table makes the increment a deterministic side-effect of `INSERT … ON CONFLICT (job_id, chunk_index) DO NOTHING RETURNING 1`; the counter only bumps when the insert created a new row. This was originally scoped as a follow-up under "Deferred" but the deepening pass surfaced it as a correctness prerequisite — pulled forward into U1/U2. +- **No new `partial` enum value on `etl_job_status`.** Embedding-fallback degradation is observable via `total_embedding_failures > 0` on a `completed` row. Adding an enum value would force the audit P2 #3 split into two migrations (Drizzle Kit limitation) and complicate every admin filter without observable benefit. +- **Use `@sentry/cloudflare` (first-party), not toucan-js as the audit suggested.** Toucan still works but is no longer the recommended Sentry path on Workers as of 2026. `withSentry({ fetch, queue })` wraps both entry points in one call; no manual `waitUntil` plumbing needed. +- **Use `await scheduler.wait(0)` for yielding, not the non-existent `scheduler.yield()`.** Audit P2 #5 is corrected here. +- **Stuck-job sweep keyed on `last_progress_at < now() - interval '15 minutes'` AND `status = 'running'`,** not on `started_at`. The 15-min figure derives from the actual CF Queue consumer wall-clock cap (15 min), not the audit's stale 30 s/30 min framing. With per-chunk progress updates writing `last_progress_at`, any chunk making real progress is safe; only truly stalled jobs flip to `failed`. +- **Row-boundary alignment happens in the producer**, not the consumer. The producer's `r2.head(key)` flow does an extra small range read on each chunk-end region (e.g., 64 KB) to find the last `\n` and emits chunks with newline-aligned `byteEnd`. This eliminates both the partial-row skip bug (P1 #4) and the row-spanning-chunk bug (P1 #5) in one place. Consumer's `skipPartialRow` logic is removed. +- **CSV header re-read with bounded loop, not a fixed 4 KB slice.** For non-first chunks, the consumer fetches `[0, 4096)`, and if no `\n` appears, expands to `[0, 16384)`, then `[0, 65536)`. If still no newline, throw — header is malformed. Eliminates P1 #3 silent column misalignment. +- **Per-chunk idempotency key is `(jobId, chunkIndex)`** — added to `CatalogETLMessage`. Even though `max_concurrency: 1` means de-facto serialization today, threading the key now unblocks the future concurrency bump without another migration. +- **DLQ is a dedicated new queue `packrat-etl-dlq`** with a minimal consumer that captures the failure to Sentry, persists a row to a new `etl_dlq_events` table for forensics, and acks. The DLQ does *not* attempt to re-process — it's an event sink + visibility tool. +- **Reconciliation runs as both a manual admin endpoint and an automatic post-job step, with the automatic step on its own queue.** Manual endpoint stays synchronous (operator-explicit, scoped to one job). Automatic step is dispatched as a queue message to a new `packrat-etl-reconcile-queue` on the final-chunk completion transition, *not* via `ctx.waitUntil` — `waitUntil` shares the queue invocation's wall-clock budget, which for a multi-GB CSV exceeds the 15-min cap when added on top of the chunk's own processing time. The reconcile consumer streams the file in 100 MB byte-range windows with progress checkpointed to a transient column so retries resume. The consumer's `INSERT … RETURNING` includes `verified_at IS NULL` as an idempotency gate so a redelivered reconcile message is a no-op. Warning threshold remains `> max(10, ceil(0.01 * total_processed))`. +- **Repair-from-scratch endpoint creates a NEW `etl_jobs` row and links it to the old via a new nullable `superseded_by_job_id` column with `ON DELETE SET NULL` and a paired `superseded_at timestamp`.** No mutation of the old row's counters — preserves audit trail and lets the dashboard show "originally failed, repaired by job X". `ON DELETE SET NULL` (not `CASCADE`) so deleting one row never silently nukes a chain of repair attempts. A CHECK constraint prevents self-reference (`superseded_by_job_id != id`). The runbook procedure (U15) requires verifying R2 source presence + ETag match before invoking repair, so an overwritten source cannot silently re-ingest the wrong file. +- **Structured logger lives at `packages/api/src/utils/logger.ts`** as a thin wrapper around `console.*` for now, accepting a `LogContext` (jobId, chunkIndex, r2Key, etc.) and emitting JSON-prefixed lines. Sentry breadcrumbs piggyback on the same call surface. Not a full logger framework — that's a separate decision. + +--- + +## Open Questions + +### Resolved During Planning + +- **Should the chunk completion track go on `etl_jobs` columns alone, or be paired with a per-chunk idempotency table?** Resolved during deepening: both. `etl_jobs.{chunks_total, chunks_completed}` are the counters; `etl_job_chunks(job_id, chunk_index)` is the idempotency gate that makes the increment safe under at-least-once delivery. See Key Technical Decisions. +- **Should embedding-fallback get a new enum value `partial`?** Resolved: no — use `total_embedding_failures` counter on a `completed` row. +- **Toucan-js or `@sentry/cloudflare`?** Resolved: `@sentry/cloudflare`. See External References. +- **Wall-clock budget for the stuck-job sweep cutoff?** Resolved: `last_progress_at < now() - interval '15 minutes'`, matching the actual queue-consumer wall-clock cap. +- **Should the row-boundary alignment happen in producer or consumer?** Resolved: producer. Single source of truth for chunk boundaries. +- **Should auto-reconcile use `ctx.waitUntil` or its own queue?** Resolved during deepening: dedicated queue (`packrat-etl-reconcile-queue`) with resumable byte-range streaming. `waitUntil` shares the chunk consumer's wall-clock budget, which fails at multi-GB files. +- **Should the DLQ consumer's INSERT + status UPDATE be transactional?** Resolved during deepening: yes, single `db.transaction()`. Same for the sweep's UPDATE + sentinel-event INSERT. +- **Should the migration split into 0048a/0048b/0048c?** Resolved during deepening: no — at ~200 rows, the single-migration approach is fine. Splitting becomes correct when `etl_jobs` exceeds ~100k rows, and the migration header carries a comment to revisit at that scale. + +### Deferred to Implementation + +- **Exact Drizzle migration sequencing within Phase 1.** All six columns + the partial index + the new `etl_dlq_events` table can land in a single migration `0048` since none touch the enum. Whether to split `superseded_by_job_id` (added later in U6) into its own migration `0049` or include it in `0048` is decided at U1 implementation. Either way the enum stays untouched in this plan. +- **`@sentry/cloudflare` instrumentation depth for the queue consumer.** The exact `Sentry.startSpan` attributes per queue message (some attributes are conventional, some are CF-specific) get finalized when U8 lands. +- **Sentry sampling rate** for the queue consumer. Default to `tracesSampleRate: 0.1` and tune in production; not a plan-time decision. +- **Exact threshold for "significant" reconciliation delta** that triggers a Sentry warning vs informational event. Default: `> max(10, ceil(0.01 * total_processed))` rows of delta. Tunable in production. +- **Cron schedule for `invalid_item_logs` retention sweep.** Daily at 09:00 UTC unless ops has a quieter window. + +--- + +## High-Level Technical Design + +> *This illustrates the intended approach and is directional guidance for review, not implementation specification. The implementing agent should treat it as context, not code to reproduce.* + +```text +Producer ─── POST /catalog/etl ──┐ + │ + ▼ + ┌─────────────────────────────────────────────┐ + │ chunkCsvForR2(key) (NEW shared helper) │ + │ 1. r2.head(key) -> size │ + │ 2. for each 20 MB window: │ + │ peek (next 64 KB) to find last '\n' │ + │ emit chunk with byteEnd = newline-1 │ + │ 3. tag each chunk: { jobId, chunkIndex, │ + │ chunksTotal, byteRange } + └─────────────────────────────────────────────┘ + │ + INSERT etl_jobs + (status='running', + chunks_total=N, + chunks_completed=0) + │ + ETL_QUEUE.sendBatch(chunks) + │ + ▼ + ┌─────────────────────────────────────────────┐ + │ processQueueBatch (REWRITE) │ + │ for message of batch: │ + │ try { │ + │ processCatalogETL(msg) │ + │ message.ack() │ + │ } catch (err) { │ + │ Sentry.captureException(err, {...}) │ + │ message.retry({ delaySeconds: 30 }) │ + │ } │ + └─────────────────────────────────────────────┘ + │ + ▼ + ┌─────────────────────────────────────────────┐ + │ processCatalogETL (per chunk) │ + │ r2.get(key, range) -> stream │ + │ if chunkIndex > 0: re-fetch header │ + │ (expand 4K→16K→64K, throw if no '\n') │ + │ parse rows (csv-parse, backpressure) │ + │ per 100 rows: scheduler.wait(0) │ + │ flush valid -> processValidItemsBatch │ + │ (embedding fallback increments │ + │ total_embedding_failures atomically) │ + │ flush invalid -> processLogsBatch │ + │ (now RETHROWS on DB failure) │ + │ on success: │ + │ UPDATE etl_jobs │ + │ SET chunks_completed = chunks_completed+1, + │ last_progress_at = now(), │ + │ status = CASE │ + │ WHEN chunks_completed+1 │ + │ = chunks_total │ + │ THEN 'completed' │ + │ ELSE status │ + │ END │ + │ if completed (in same txn): │ + │ enqueue ReconcileMessage to │ + │ packrat-etl-reconcile-queue │ + └─────────────────────────────────────────────┘ + │ + (on completion transition) + ▼ + ┌─────────────────────────────────────────────┐ + │ processReconcileBatch │ + │ reconcileJob(jobId, resumeFromByte=0): │ + │ if verified_at IS NOT NULL: ack │ + │ stream 100 MB byte-range windows │ + │ checkpoint to │ + │ verified_row_count_partial │ + │ if budget low: throw ResumeError │ + │ (consumer re-enqueues) │ + │ on EOF: UPDATE verified_at, count │ + │ if delta > threshold: Sentry warning │ + └─────────────────────────────────────────────┘ + │ + (on any thrown error after retries) + ▼ + packrat-etl-dlq + │ + ▼ + ┌─────────────────────────────────────────────┐ + │ dlqConsumer │ + │ Sentry.captureException │ + │ INSERT etl_dlq_events │ + │ ack │ + └─────────────────────────────────────────────┘ + +Background (CF Cron): + stuck-job sweep: status='running' AND last_progress_at < now()-15min + -> status='failed', emit Sentry warning + invalid-log retention: DELETE FROM invalid_item_logs WHERE created_at < now()-90d +``` + +--- + +## Implementation Units + +### U1. Schema migration: chunk tracking, idempotency table, progress timestamp, embedding failures, reconciliation columns, DLQ events table, constraint hardening + +**Goal:** Add the columns, tables, indexes, and constraints that the rest of the plan reads and writes. Lands first so every subsequent unit can compile and migrate against a known schema. Single migration `0048` is acceptable at the current ~200-row scale of `etl_jobs`; splitting into multiple migrations is unnecessary engineering at this size (revisit if `etl_jobs` exceeds ~100k rows). + +**Requirements:** R1, R3, R7, R8, R10 + +**Dependencies:** None + +**Files:** +- Modify: `packages/db/src/schema.ts` (add columns to `etlJobs`; add new `etlJobChunks` table; add new `etlDlqEvents` table; add UNIQUE constraint to `catalogItemEtlJobs`; export all) +- Create: `packages/api/drizzle/0048_etl_chunking_and_observability.sql` +- Create: `packages/api/drizzle/meta/0048_snapshot.json` (generated) +- Modify: `packages/api/drizzle/meta/_journal.json` (generated) +- Test: `packages/api/test/db-schema-etl.test.ts` (new — schema smoke test asserting columns exist with expected defaults; uses the existing Docker Postgres wsproxy setup at `localhost:5434`) + +**Approach:** +- Columns added to `etl_jobs`: + - `chunks_total integer` (nullable — single-chunk legacy jobs leave it null) + - `chunks_completed integer DEFAULT 0 NOT NULL` + - `last_progress_at timestamp` (nullable initially; backfilled to `started_at` for legacy rows in the same migration) + - `total_embedding_failures integer DEFAULT 0 NOT NULL` + - `verified_at timestamp` (nullable) + - `verified_row_count integer` (nullable) + - `verified_row_count_partial integer` (nullable — checkpoint for resumable reconcile in U10) + - `superseded_by_job_id text` (nullable, FK to `etl_jobs.id` `ON DELETE SET NULL`) + - `superseded_at timestamp` (nullable — paired with `superseded_by_job_id` so the timeline survives even after FK cleanup) + - `source_etag text` (nullable — captured on producer insert from `r2.head(objectKey).etag`; U6's repair endpoint uses this for failure-closed source verification) + - `source_last_modified timestamp` (nullable — same capture; redundant with etag but cheap) +- CHECK constraints on `etl_jobs`: + - `etl_jobs_chunks_completed_lte_total CHECK (chunks_total IS NULL OR chunks_completed <= chunks_total)` — fail loudly on over-count. + - `etl_jobs_no_self_supersede CHECK (superseded_by_job_id IS NULL OR superseded_by_job_id <> id)` — prevent self-referential repair loop. +- New indexes on `etl_jobs`: + - Partial: `etl_jobs_running_progress_idx` on `(status, last_progress_at)` `WHERE status = 'running'` — for the U5 stuck-job sweep. + - Partial: `etl_jobs_unverified_idx` on `(verified_at)` `WHERE status = 'completed' AND verified_at IS NULL` — for the U10 watchdog scan. + - `etl_jobs_superseded_by_idx` on `(superseded_by_job_id)` — for the admin dashboard's "is this job superseded?" lookup. +- New table `etl_job_chunks` (per-chunk idempotency, see Key Technical Decisions): + - `job_id text NOT NULL` (FK to `etl_jobs.id` `ON DELETE CASCADE`) + - `chunk_index integer NOT NULL` + - `completed_at timestamp DEFAULT now() NOT NULL` + - `PRIMARY KEY (job_id, chunk_index)` +- New table `etl_dlq_events`: `id text PK`, `job_id text` (FK, nullable, `ON DELETE SET NULL`), `chunk_index integer`, `message_body jsonb`, `error_message text`, `error_stack text`, `attempts integer`, `source text` (one of `consumer`, `sweep`; defaults to `consumer`), `created_at timestamp DEFAULT now() NOT NULL`. Index on `created_at`. +- Modification to `catalog_item_etl_jobs`: add `UNIQUE (catalog_item_id, etl_job_id)` so a redelivered chunk's upsert can use `ON CONFLICT DO NOTHING` and not produce duplicate provenance rows. +- Backfill: `UPDATE etl_jobs SET last_progress_at = started_at WHERE last_progress_at IS NULL`. Safe — `etl_jobs` is ~200 rows; sub-100ms on Neon. +- Drizzle generator: `bun run --cwd packages/api db:generate` then verify the SQL file matches the design. **Verify Drizzle Kit emits `DEFAULT 0 NOT NULL` literally in the SQL** — Drizzle sometimes drops the SQL-side default and keeps only the JS-side, which would break inserts from in-flight old workers during a rolling deploy. **Do NOT touch the `etl_job_status` enum in this migration** — no new enum value is needed (see Key Technical Decisions). +- Drizzle Kit does not auto-emit `CONCURRENTLY` for indexes. At 200 rows the index build is instant so `CONCURRENTLY` is nice-to-have, not blocking. If the table grows >100k rows before this lands, hand-edit the generated SQL to use `CREATE INDEX CONCURRENTLY IF NOT EXISTS` and split each index into its own statement-breakpoint block. + +**Patterns to follow:** +- Existing `etl_jobs` definition at `packages/db/src/schema.ts:460-479` for column shape and import style. +- Migration `0027_past_madrox.sql` (added `scraper_revision` + index) for the "add column + partial index" pattern. +- `scripts/lint/check-drizzle-migrations.ts` runs in `lint:custom`; the new migration must pass it. + +**Test scenarios:** +- Happy path: After migration runs against a populated test DB, all 8 new `etl_jobs` columns are present with the documented defaults; `etl_job_chunks` and `etl_dlq_events` exist; the three new partial/normal indexes are queryable (`EXPLAIN SELECT ... WHERE status='running' ...` uses the running-progress index; the unverified index serves the watchdog). +- Happy path: `INSERT INTO etl_job_chunks (job_id, chunk_index) VALUES ('j1', 0)` succeeds; a duplicate insert returns no row via `ON CONFLICT DO NOTHING RETURNING 1` and the table still contains exactly one row. +- Edge case: Legacy rows have `chunks_total = NULL` and `last_progress_at` backfilled to `started_at`. +- Edge case: `chunks_completed DEFAULT 0` is correctly applied to existing rows (verify with a row that has `chunks_completed = 0` post-migration). The generated SQL must literally include `DEFAULT 0 NOT NULL` — assert via SQL `information_schema.columns`. +- Edge case: `UNIQUE (catalog_item_id, etl_job_id)` on `catalog_item_etl_jobs` prevents a duplicate-insert (returns conflict). +- Error path: Attempting to insert a row with `chunks_completed > chunks_total` violates the CHECK constraint and errors clearly. +- Error path: Attempting to set `superseded_by_job_id = id` violates the no-self-supersede CHECK. +- Error path: Re-running the migration on an already-migrated DB is a no-op (Drizzle's migration log handles this; smoke-test the up/down via `bun run --cwd packages/api db:migrate`). +- Edge case: Down-migration cleanly drops the new columns/tables on a DB with no Phase 2+ data. **Once Phase 2 ships and writes start landing in the new columns, the migration is forward-only** — document in the migration header comment. + +**Verification:** +- `bun run --cwd packages/api db:migrate` applies cleanly against a fresh Docker Postgres + against a Postgres seeded with current-prod-shape `etl_jobs` rows. +- `bunx drizzle-kit check` (run from `packages/api/`) validates the snapshot chain is internally consistent — run this before pushing any migration change. +- `bun lint:custom` passes on the new migration. +- `bun test:api:unit` includes the new schema test and it passes. + +--- + +### U2. P0 #1 fix: chunk-completion lifecycle in producer + consumer + +**Goal:** A multi-chunk job's `status` transitions to `completed` only after every chunk has finished. Premature completion eliminated. + +**Requirements:** R1 + +**Dependencies:** U1 + +**Files:** +- Modify: `packages/api/src/routes/catalog/index.ts` (producer endpoint sets `chunks_total` on `etl_jobs` insert and tags each `CatalogETLMessage` with `chunkIndex` and `chunksTotal`) +- Modify: `packages/api/src/services/etl/types.ts` (extend `CatalogETLMessage.data` with `chunkIndex: number` and `chunksTotal: number`; `byteStart`/`byteEnd` remain) +- Modify: `packages/api/src/services/etl/processCatalogEtl.ts` (rewrite the `:188-191` success-path UPDATE to use the `CASE` expression that flips status only when `chunks_completed + 1 = chunks_total`; also update `last_progress_at` on every counter write) +- Modify: `packages/api/src/services/etl/updateEtlJobProgress.ts` (include `last_progress_at: sql\`now()\`` in the update set so every progress write refreshes the sweep timestamp) +- Test: `packages/api/test/etl-chunk-completion.test.ts` (new) + +**Approach:** +- Producer: compute `chunks` first, then `INSERT etl_jobs (..., chunks_total) VALUES (..., ${chunks.length})` — a single round-trip including `chunks_total`. Then `sendBatch` with each message carrying `chunkIndex` 0..N-1 and `chunksTotal: N`. Setting `chunks_total` in the initial INSERT (rather than a separate follow-up UPDATE) eliminates a window where a chunk consumer could observe `chunks_total IS NULL` and silently fail the `chunks_completed + 1 = chunks_total` CASE comparison. +- Consumer success path runs inside a single Drizzle `db.transaction()`: + 1. `INSERT INTO etl_job_chunks (job_id, chunk_index) VALUES ($1, $2) ON CONFLICT (job_id, chunk_index) DO NOTHING RETURNING 1` — the idempotency gate. If no row returned, this is a redelivery; skip the increment, ack the message, return. + 2. If the insert created a row, run the atomic UPDATE: `UPDATE etl_jobs SET chunks_completed = chunks_completed + 1, last_progress_at = now(), status = CASE WHEN chunks_completed + 1 = chunks_total THEN 'completed' ELSE status END, completed_at = CASE WHEN chunks_completed + 1 = chunks_total THEN now() ELSE completed_at END WHERE id = $1 AND status = 'running' RETURNING status, chunks_completed, chunks_total`. + 3. The `WHERE status = 'running'` gate prevents clobbering a row the U5 sweep has already flipped to `failed` (status-flip-flop hazard). + 4. If the returned row shows the transition to `completed`, *and* this transaction was the one that created the chunk-row in step 1, send a message to `packrat-etl-reconcile-queue` (see U10) for the auto-reconcile. +- On per-chunk failure: the consumer no longer flips the parent job to `failed` immediately. Instead it lets the message throw / retry. The parent job only flips to `failed` via (a) DLQ consumer when retries are exhausted, or (b) the stuck-job sweep (U5). +- Single-chunk legacy jobs: when `chunks_total IS NULL`, the `etl_job_chunks` insert still gates the increment; legacy rows backfilled to `chunks_total = 1` migrate cleanly. Backwards-compatible with any in-flight legacy messages. +- The CHECK constraint `chunks_completed <= chunks_total` from U1 is the loud-failure safety net — if the idempotency gate ever leaks (e.g., a code bug bypasses the chunk-table insert), the next `UPDATE` errors with a constraint violation rather than silently corrupting the counter. + +**Patterns to follow:** +- Atomic SQL update idiom at `packages/api/src/services/etl/updateEtlJobProgress.ts:16-23`. +- Drizzle transaction shape: `await db.transaction(async (tx) => { ... })`. + +**Test scenarios:** +- Happy path: 5-chunk job; chunks 0..3 complete successfully → status remains `running` with `chunks_completed = 4`; chunk 4 completes → status flips to `completed`, `completed_at` set, `etl_job_chunks` has 5 rows. +- Happy path (idempotency): Chunk 2 succeeds, ack lost, CF redelivers → second attempt's `INSERT … ON CONFLICT DO NOTHING RETURNING` returns no row → increment is skipped → `chunks_completed` increments exactly once over the two deliveries. +- Edge case: Chunks complete out of order (chunk 3 finishes before chunk 1) → status flips only when all five have incremented; the `etl_job_chunks` rows record actual completion order. +- Edge case: Single-chunk legacy job (`chunks_total = 1`) → flips to `completed` on its one success; `etl_job_chunks` has 1 row. +- Edge case: Sweep flips job to `failed` mid-flight; the next chunk's UPDATE `WHERE … AND status = 'running'` returns zero rows → transaction sees the conflict, logs warning, lets the operator route to repair-from-scratch. +- Error path: One chunk throws; other chunks succeed → parent job stays `running` while CF Queue retries the failed chunk; if retries exhaust, DLQ consumer (U3) handles state transition. +- Error path: CHECK constraint trips (hypothetical leaked-idempotency bug) → UPDATE errors loudly, chunk retries, no silent corruption. +- Integration: With `R2BucketService` mocked to return a small CSV split into 3 chunks via `byteRange`, the full producer→queue→consumer cycle ends in exactly one `status=completed` transition for the parent job AND exactly one reconcile message enqueued. +- Integration (idempotency at scale): Replay every chunk message twice → `etl_job_chunks` has exactly `chunks_total` rows, counters match, status = `completed`. + +**Verification:** +- Re-running `etl.test.ts` plus the new test under `bun test:api` shows no `status='completed'` write until `chunks_completed = chunks_total`. +- A manual prod-shape replay (`POST /catalog/etl` against the dev Worker with a CSV that produces ≥3 chunks) shows the dashboard's `successRate` remain at the running state until all chunks finish. + +--- + +### U3. P0 #2 fix: explicit ack/retry + DLQ wiring + +**Goal:** No per-message exception is silently swallowed. Failures retry; exhausted retries land in a dedicated DLQ that emits Sentry events and persists for forensics. + +**Requirements:** R2, R9 + +**Dependencies:** U1 (for `etl_dlq_events` table) + +**Files:** +- Modify: `packages/api/src/services/etl/queue.ts` (rewrite `processQueueBatch` for explicit per-message ack/retry; remove the swallow at `:50-60`) +- Create: `packages/api/src/services/etl/processDlqEvent.ts` (DLQ consumer; INSERT into `etl_dlq_events`, capture Sentry exception, ack) +- Modify: `packages/api/src/index.ts` (extend the `queue()` switch at `:109-124` with arms for `packrat-etl-dlq` and `packrat-etl-dlq-dev`) +- Modify: `packages/api/wrangler.jsonc` (declare `packrat-etl-dlq` and `packrat-etl-dlq-dev` as producer + consumer; add `dead_letter_queue: "packrat-etl-dlq"` and `max_retries: 3` to the ETL consumer block at `:78-82` and dev equivalent at `:178-182`) +- Modify: `packages/api/src/services/etl/processCatalogEtl.ts` (when a chunk's processing throws, also UPDATE `last_progress_at` and increment a transient `last_error_at` if useful — see Approach for trade-off; primary work is removing the per-chunk `status='failed'` write at `:201-204` since the DLQ consumer is now responsible for state transition) +- Test: `packages/api/test/etl-queue-retry.test.ts` (new — covers the global-mock blind spot in `setup.ts:544-551`) + +**Approach:** +- Rewrite `processQueueBatch`: + ```text + for (const message of batch.messages) { + try { + await processCatalogETL({ message: message.body, env }); + message.ack(); + } catch (err) { + logger.error('etl.chunk.failed', { jobId, chunkIndex, err }); + Sentry.captureException(err, { tags: { jobId, chunkIndex, r2Key }, contexts: { queue: { messageId: message.id, attempts: message.attempts } } }); + message.retry({ delaySeconds: 30 }); + } + } + ``` + (Sentry wiring lives in U8; in U3 the call sites are added as no-ops that U8 fills in.) +- DLQ consumer reads from `packrat-etl-dlq` and, inside a single `db.transaction()`, performs: (1) `INSERT INTO etl_dlq_events (… source = 'consumer')` capturing `{ jobId, chunkIndex, message_body, error_message, error_stack, attempts }`, (2) `UPDATE etl_jobs SET status = 'failed', completed_at = now() WHERE id = $1 AND status = 'running'` — the `WHERE status = 'running'` clause is the no-op gate that prevents racing the U5 sweep. `Sentry.captureException` fires *before* the transaction (so the event survives even if the DB transaction rolls back) with tags `{ jobId, chunkIndex, r2Key }`. The `error_stack` field is contractually free of raw CSV row data — only structural error messages — to avoid accidental PII capture (documented at the call site). +- Wrangler config additions: + ```text + // producer + { "queue": "packrat-etl-dlq", "binding": "ETL_DLQ" } + // consumer + { "queue": "packrat-etl-dlq", "max_batch_size": 10, "max_batch_timeout": 30 } + // on the existing ETL consumer: + "dead_letter_queue": "packrat-etl-dlq", + "max_retries": 3 + ``` + Same shape applied to `*-dev` queues. +- The removal of the per-chunk `status='failed'` write at `processCatalogEtl.ts:201-204` is critical — leaving it would race with the DLQ consumer's state transition. +- `processCatalogETL` rethrows on any internal failure (it already does); no behavioral change other than the consumer's catch now retries instead of swallowing. + +**Patterns to follow:** +- Embeddings consumer pattern at `packages/api/src/services/catalogService.ts:461-507` for the rethrow shape. +- Existing `queue()` dispatch at `packages/api/src/index.ts:109-124` for the new DLQ arm. + +**Test scenarios:** +- Happy path: Single message processes successfully → `message.ack()` called exactly once; no retry; no DLQ row. +- Error path: Transient throw (simulated R2 5xx) → first call: `message.retry({ delaySeconds: 30 })` and no DLQ; second call succeeds → ack. Total DLQ rows = 0. +- Error path: Permanent throw (4 attempts all fail) → exhausts `max_retries: 3` → message routed to `packrat-etl-dlq` → DLQ consumer inserts row in `etl_dlq_events` with `attempts = 4`, captures Sentry, flips `etl_jobs.status = 'failed'`. +- Integration: Un-mock `processQueueBatch` (override `setup.ts:544-551` per-file with `vi.doUnmock`) and exercise the real consumer against an in-memory queue stub. +- Edge case: Two messages in a batch, first throws and second succeeds (this should not happen at `max_batch_size: 1` but the code path supports it) → first retries, second acks; no cross-contamination of state. + +**Verification:** +- New test passes with the per-message catch removed; passes with the catch present too (so the test actually proves the new behavior). +- `bun test:api` overall still green. +- Inspecting `packrat-etl-dlq` queue depth in `wrangler queues info packrat-etl-dlq-dev` after a forced failure shows zero (because the DLQ consumer drains immediately). + +--- + +### U4. Sweep cleanup: remove the broken wall-clock stuck-job sweep before U5 replaces it + +**Goal:** Take the existing `POST /admin/etl/reset-stuck` endpoint out of production rotation before U5's progress-based replacement lands, to stop new false-failures while the rest of Phase 2 ships. + +**Requirements:** R3 + +**Dependencies:** None (independent of U1; this is a code removal) + +**Files:** +- Modify: `packages/api/src/routes/admin/analytics/catalog.ts` (remove or guard the `POST /admin/etl/reset-stuck` route at `:384-409`; if removed, also remove from the OpenAPI spec) +- Modify: `packages/cli/src/commands/admin/etl.ts` (drop any subcommand wired to the removed endpoint) +- Test: `packages/api/test/admin-etl-routes.test.ts` (new or extend existing — assert the route returns 410 Gone or is absent) + +**Approach:** +- Two options, both acceptable: + - **Remove the route entirely.** Anyone calling it gets a 404. Cleanest. Recommended if no automation depends on it. + - **Replace the route body with a 410 Gone response** that links to the runbook (added in U15) and the new sweep design from U5. Use if there's any concern about external automation calling it. +- Existing endpoint logic at `:384-409` does `UPDATE etl_jobs SET status='failed' WHERE status='running' AND started_at < now() - interval '30 minutes'`. This is the SQL that wrongly failed the 7 jobs on 2026-05-14. +- This unit ships before U5 lands the replacement, so for a short window there is no automated sweep at all. Acceptable because stuck-job recovery in that window is operational (U15 runbook documents the manual SQL). + +**Patterns to follow:** +- Existing admin route removal pattern (none in repo as of this writing); fall back to standard Elysia route definition omission. + +**Test scenarios:** +- Happy path: `POST /admin/etl/reset-stuck` returns 410 (or 404 if removed) — test asserts on the chosen behavior. +- Edge case: Admin CLI subcommand for the old endpoint no longer exists (or returns a clear "removed, see runbook" message). + +**Verification:** +- `bun test:api` passes with the new assertion. +- Manual `curl` against dev Worker returns the chosen status code. + +--- + +### U5. P1 #2 fix: progress-based stuck-job sweep + +**Goal:** Replace the wall-clock-based sweep with one that uses `last_progress_at` so healthy long jobs (e.g., 50,100-row `evo` file) are not falsely failed. + +**Requirements:** R3 + +**Dependencies:** U1 (for `last_progress_at`), U2 (for the `last_progress_at` write-on-progress), U4 (so the old sweep is gone first) + +**Files:** +- Create: `packages/api/src/services/etl/sweepStuckJobs.ts` (the sweep function — pure DB logic, no HTTP) +- Modify: `packages/api/src/routes/admin/analytics/catalog.ts` (new `POST /admin/etl/sweep-stuck` endpoint that calls `sweepStuckJobs` and returns the affected rows; for manual triggering) +- Modify: `packages/api/wrangler.jsonc` (declare a CF Cron Trigger for the sweep, e.g., `*/5 * * * *`) +- Modify: `packages/api/src/index.ts` (add `scheduled()` handler that invokes `sweepStuckJobs` on the cron event; if a `scheduled` handler doesn't yet exist, add one) +- Test: `packages/api/test/etl-stuck-job-sweep.test.ts` (new) + +**Approach:** +- Sweep runs inside a single `db.transaction()`: + 1. `UPDATE etl_jobs SET status='failed', completed_at = now() WHERE status='running' AND COALESCE(last_progress_at, started_at) < now() - interval '15 minutes' RETURNING id, source, filename, started_at, last_progress_at, chunks_total, chunks_completed`. (The `COALESCE` defends against any legacy row that somehow escaped the U1 backfill.) + 2. For each returned row, `INSERT INTO etl_dlq_events (job_id, error_message, source) VALUES ($1, 'sweep:no_progress', 'sweep')` so the forensic table is the single source of truth for *every* failed transition — whether triggered by the consumer DLQ or by the sweep. `chunk_index = NULL` in sweep-sourced events. +- Returned rows also feed a Sentry warning event per affected job (`level: warning`, tags `{ jobId, source: 'sweep' }`, extra includes `chunks_completed/chunks_total` so the operator immediately sees how far the job got). +- 15-minute interval matches the CF Queue consumer wall-clock cap. Any chunk making real progress writes `last_progress_at = now()` (via U2's modification to `updateEtlJobProgress`), so this only catches truly stalled jobs. +- CF Cron Trigger every 5 minutes (configurable via env if needed). The cron handler is idempotent — the partial index from U1 keeps the query cheap even at thousands of jobs. Wrangler config shape: `"triggers": { "crons": ["*/5 * * * *"] }` — top-level `triggers` object wrapping a `crons` array, not a bare top-level `crons` key. +- Manual admin endpoint exists for on-demand sweep — useful during incident response. + +**Patterns to follow:** +- Admin route structure at `packages/api/src/routes/admin/analytics/catalog.ts` for the new endpoint. +- CF Cron Triggers config in `wrangler.jsonc` (the repo has none today — this is the first; reference ). + +**Test scenarios:** +- Happy path: Insert a job with `status='running'`, `last_progress_at = now() - 30min` → sweep flips it to `failed`. +- Edge case: Insert a job with `status='running'`, `last_progress_at = now() - 5min` → sweep leaves it alone (within budget). +- Edge case: Insert a job with `last_progress_at = NULL` (somehow — legacy row that escaped backfill) → COALESCE the column with `started_at` in the WHERE clause so it still gets evaluated. +- Edge case: 50,100-row job in progress — chunks write `last_progress_at = now()` every 100 rows → sweep never fires on it. +- Integration: Cron-event simulation calls the same code path as the admin endpoint; both return identical results for the same DB state. +- Error path: Sweep query fails (DB down) → caller observes the error; Sentry captures; cron does not silently mask. + +**Verification:** +- After running the sweep against a DB with the seeded test cases, exactly the long-stalled rows are affected. +- `bun test:api` includes the new test and passes. +- Dev cron schedule fires (`wrangler dev --test-scheduled`) and exercises the handler. + +--- + +### U6. P1 #1 fix: shared chunking helper + retry endpoint + repair-from-scratch endpoint + +**Goal:** Both retry and repair use the same producer chunking logic. The repair endpoint creates a brand-new `etl_jobs` row linked to the broken historical one — directly enabling the operational recovery of the 7 wrongly-`failed` jobs from 2026-05-14. + +**Requirements:** R5, R8 + +**Dependencies:** U1 (for `superseded_by_job_id`), U2 (for `chunks_total` write semantics) + +**Files:** +- Create: `packages/api/src/services/etl/chunkCsvForR2.ts` (extracted shared helper: takes `objectKey`, returns an array of `{ chunkIndex, chunksTotal, byteStart, byteEnd }` with newline-aligned boundaries — newline alignment itself ships in U7) +- Modify: `packages/api/src/routes/catalog/index.ts` (replace inline chunking at `:253-271` with a call to `chunkCsvForR2`) +- Modify: `packages/api/src/routes/admin/analytics/catalog.ts` (rewrite `POST /admin/etl/:jobId/retry` at `:413-470` to use `chunkCsvForR2`; add new `POST /admin/etl/:jobId/repair-from-scratch`) +- Modify: `packages/api/src/services/etl/queue.ts` (extend `queueCatalogETL` to accept pre-computed chunks rather than constructing them — or accept either, with the chunk-construction path migrating to the shared helper) +- Modify: `packages/cli/src/commands/admin/etl.ts` (add `retry ` subcommand if not present, plus new `repair-from-scratch ` subcommand) +- Test: `packages/api/test/etl-retry-repair.test.ts` (new) + +**Approach:** +- `chunkCsvForR2(objectKey, r2, options?)`: signature returns `Promise`. Calls `r2.head(objectKey)`, splits into 20 MB windows. Newline-alignment lives in U7 but the shape lands here so U7 is a fill-in. +- Retry endpoint (`POST /admin/etl/:jobId/retry`): looks up `(source, filename, scraperRevision)` from the existing job, generates a fresh `jobId`, INSERTs a new `etl_jobs` row with `chunks_total = chunkCsvForR2(...).length`, sets `superseded_by_job_id = ` on the new row only if the original is `failed`, sends batch. +- Repair-from-scratch (`POST /admin/etl/:jobId/repair-from-scratch`): same behavior as retry but always sets `superseded_by_job_id` and `superseded_at = now()` on the new row, and always re-reads the full file (even if the original was `completed`). Use case: an operator suspects a `completed` job is undercount; repair recreates from scratch. +- **R2 ETag verification (failure-closed)**: before creating the new job row, both endpoints call `r2.head(objectKey)` and compare the returned `etag` (and `lastModified`) against the original job's recorded values. If the original job has no `etag` stored (legacy rows), require an explicit `?force=true` query flag. If the `etag` differs (source was overwritten by a later scrape), return 409 Conflict with a clear message naming both etags — never silently re-ingest a different file under the same path. (This implies adding `source_etag text` and `source_last_modified timestamp` to `etl_jobs` — fold into U1's column list if not already, or capture as a follow-up here.) +- Both endpoints accept an optional `?dryRun=true` query that returns the planned chunk spec without enqueuing anything — operator preview. +- The 7 historical jobs from 2026-05-14 will be recovered by calling repair-from-scratch on each of them once Phase 1+2 ships. U15 runbook documents the operator procedure including the ETag verification step. + +**Patterns to follow:** +- Admin route structure at `packages/api/src/routes/admin/analytics/catalog.ts:178-235` for response shape. +- Existing retry endpoint at `:413-470` for the basic flow (just don't replicate the broken single-chunk behavior). + +**Test scenarios:** +- Happy path: Retry a failed job with a 50 MB source file → 3 chunks created via `chunkCsvForR2`, 3 messages sent, new `etl_jobs` row has `chunks_total = 3`, `superseded_by_job_id` matches original. +- Happy path: Repair-from-scratch a `completed` job with apparent undercount → new job created with `superseded_by_job_id` set; original row untouched. +- Edge case: Retry a single-chunk legacy job (file size < 20 MB) → 1 chunk, `chunks_total = 1`, behaves identically to the producer endpoint. +- Edge case: Retry on a job whose `filename` no longer exists in R2 → endpoint returns 404 with a clear message; no new `etl_jobs` row. +- Edge case: `?dryRun=true` returns the planned chunk spec; no DB writes, no queue sends. +- Integration: Repair-from-scratch on a 50,100-row file (the `evo` case) produces the expected ~3 chunks, all enqueued, and after the full pipeline completes the new job's `total_processed` matches the file's actual row count. +- Covers AE: the 7 jobs from 2026-05-14 can each be repaired by calling repair-from-scratch — verified manually post-deploy. + +**Verification:** +- Both endpoints documented in the OpenAPI spec emitted by `@elysiajs/openapi`. +- CLI subcommands invoke the endpoints with proper auth. +- `bun test:api` passes the new integration test. + +--- + +### U7. P1 #3 + P1 #4 + P1 #5 fix: row-boundary-aligned chunks + robust header injection + +**Goal:** No row is silently dropped, invalidated, or split across chunks. Wide-CSV headers (>4 KB) fail loudly instead of silently misaligning columns. + +**Requirements:** R4, R6 + +**Dependencies:** U6 (for `chunkCsvForR2`) + +**Files:** +- Modify: `packages/api/src/services/etl/chunkCsvForR2.ts` (implement newline alignment — for each 20 MB window, read the next 64 KB tail, find the last `\n`, snap `byteEnd` to the byte before that newline) +- Modify: `packages/api/src/services/etl/processCatalogEtl.ts` (remove `skipPartialRow` at `:95-108`; rewrite header injection at `:50-58` with a bounded expand loop 4K→16K→64K; throw a typed error if no newline in 64 KB) +- Test: `packages/api/test/etl-chunk-boundaries.test.ts` (new) + +**Approach:** +- Newline alignment in producer: + - For each chunk window `[start, start + 20MB)`: + - Read `[start + 20MB - 64KB, start + 20MB)`. + - Find the index of the last `\n` in that slice. + - If found: `byteEnd = (start + 20MB - 64KB) + lastNewlineIndex`. The next chunk's `byteStart = byteEnd + 1`. + - If not found in 64 KB (extremely unlikely with normal CSV row sizes): throw `ChunkBoundaryError` immediately, surfacing to Sentry and aborting the job creation. Caller is told the file has a row larger than 64 KB. + - Last chunk: `byteEnd = file.size - 1`. +- Header re-fetch in consumer (for `chunkIndex > 0`): + ```text + let headerSlice = await r2.get(key, { range: { offset: 0, length: 4096 }}).then(b => b.text()); + let nlIdx = headerSlice.indexOf('\n'); + if (nlIdx === -1) { + headerSlice = await r2.get(key, { range: { offset: 0, length: 16384 }}).then(b => b.text()); + nlIdx = headerSlice.indexOf('\n'); + } + if (nlIdx === -1) { + headerSlice = await r2.get(key, { range: { offset: 0, length: 65536 }}).then(b => b.text()); + nlIdx = headerSlice.indexOf('\n'); + } + if (nlIdx === -1) throw new EtlHeaderError(`No newline in first 64 KB of ${key} — malformed header`); + const headerRow = headerSlice.slice(0, nlIdx); + ``` +- Since chunks are now newline-aligned, `skipPartialRow` is no longer needed — the consumer can stream the chunk body directly into the parser after prepending the header. +- BOM handling: if the first byte of the header slice is `0xEF 0xBB 0xBF`, strip it before extracting the header row. Same treatment for the first chunk. + +**Patterns to follow:** +- R2 byte-range read pattern at `packages/api/src/services/etl/processCatalogEtl.ts:54, 71`. +- Typed-error pattern: extend whatever the repo uses for domain errors (typically `Error` subclasses in `packages/api/src/utils/errors.ts`). + +**Test scenarios:** +- Happy path: 5 MB file, 1 chunk → no boundary logic exercised; row count matches actual. +- Happy path: 60 MB file, 3 chunks; rows of varying width; all `byteEnd` values land immediately before a `\n`; total row count across chunks = file row count. +- Edge case: Chunk boundary lands exactly on a newline character (`source[byteEnd] === '\n'`) → still aligned; next chunk starts on next row; no dropped row. +- Edge case: Header row of 4500 bytes (just over 4 KB) → re-fetch expands to 16 KB, succeeds; columns mapped correctly. +- Edge case: Header row of 50 KB (one absurdly wide CSV) → re-fetch expands to 64 KB, succeeds. +- Edge case: BOM at start of file → stripped from header extraction in both chunk-0 and re-fetch paths. +- Error path: File with no newline in first 64 KB → throws `EtlHeaderError`; job marked `failed` via DLQ (U3). +- Error path: Row larger than 64 KB encountered at a chunk boundary → producer throws `ChunkBoundaryError`; no job created. +- Integration: A real CSV from prod (anonymized fixture in `packages/api/test/fixtures/`) splits into multiple chunks; sum of consumer-reported `totalProcessed` across all chunks equals `wc -l fixture.csv - 1` (subtract header). +- Covers AE: A 50,100-row file (the `evo` shape) ingested via the new chunking logic shows `total_processed = 50100`, `total_valid + total_invalid = 50100`, no missing rows. + +**Verification:** +- Manual run on a real prod fixture file with `wc -l` cross-check matches the job's `total_processed`. +- `bun test:api` passes the new fixture-driven test. +- Sentry catches the malformed-header case during the next dev exercise. + +--- + +### U8. Sentry wiring via `@sentry/cloudflare` + +**Goal:** Every uncaught exception in the API Worker — including queue-consumer paths — emits a Sentry event with structured tags. Operators can debug a stuck job without paging through raw Worker logs. + +**Requirements:** R9 + +**Dependencies:** None (independent; can start in parallel with Phase 1 but lands in Phase 3) + +**Files:** +- Modify: `packages/api/package.json` (add `@sentry/cloudflare` dependency; pin to a specific version) +- Modify: `packages/api/src/index.ts` (wrap the Worker default export with `Sentry.withSentry({...}, { fetch, queue })`; pass the Sentry options factory that reads `env.SENTRY_DSN`) +- Modify: `packages/api/src/utils/env-validation.ts` (no schema change — `SENTRY_DSN` is already declared at `:9, 94`; verify it's required vs optional and adjust accordingly so dev doesn't break without a DSN) +- Modify: `packages/api/wrangler.jsonc` (add `upload_source_maps: true` at the top level) +- Modify: `packages/api/src/services/etl/queue.ts` (fill in the `Sentry.captureException(...)` call site that U3 stubbed) +- Modify: `packages/api/src/services/etl/processCatalogEtl.ts` (Sentry breadcrumbs at chunk-start, batch-flush, and chunk-end; `Sentry.startSpan` around the chunk lifecycle) +- Create: `packages/api/src/utils/logger.ts` (the thin structured logger — accepts `LogContext`, emits JSON-prefixed `console.log` lines, also calls `Sentry.addBreadcrumb` when Sentry is initialized) +- Modify: All `packages/api/src/services/etl/*.ts` console calls migrated to `logger.{info,warn,error}` (mechanical change — sweeps across the ETL files) +- Test: `packages/api/test/sentry-instrumentation.test.ts` (new — mocks `@sentry/cloudflare` and asserts captureException/breadcrumb call shape) + +**Approach:** +- `withSentry({ fetch, queue })` wraps the existing default export at `packages/api/src/index.ts`. The Sentry options factory reads `env.SENTRY_DSN`, `env.ENVIRONMENT`, sets `tracesSampleRate: 0.1`. +- Queue consumer instrumentation per : + - `Sentry.startSpan({ op: 'queue.process', name: 'etl-chunk', attributes: { 'messaging.message.id': msg.id, 'messaging.message.retry.count': msg.attempts, 'jobId': msg.body.id, 'chunkIndex': msg.body.data.chunkIndex } }, async () => { ... })`. + - `Sentry.captureException(err, { tags: { jobId, chunkIndex, r2Key }, contexts: { queue: { messageId, attempts } } })` inside the catch. +- DLQ consumer (from U3) gets the same treatment. +- `logger.ts`: ~30 lines. Functions: `info(event, ctx)`, `warn(event, ctx)`, `error(event, ctx, err?)`. Emits a JSON line; if Sentry is initialized, also calls `Sentry.addBreadcrumb({ category: event, data: ctx, level })`. +- Source maps: `upload_source_maps: true` works with Wrangler 4.x and `compatibility_date: 2025-06-01`. + +**Patterns to follow:** +- No existing Sentry initialization in `packages/api` — this is the first. +- Reference Sentry-in-CF guidance: . + +**Test scenarios:** +- Happy path: Successful chunk → one `startSpan` invocation, breadcrumbs at chunk-start/flush/end, no `captureException`. +- Error path: Chunk throws → `captureException` called once with expected tags; span marks status `internal_error`. +- Edge case: `SENTRY_DSN` empty (dev without secret) → no Sentry calls fire; logger still emits lines; no crash. +- Edge case: Logger called before Sentry initialized (cold-start race) → graceful no-op on breadcrumb path; logger.info still emits the line. +- Integration: A real Sentry test project receives events from `bun api` dev-server when forced failures are triggered. + +**Verification:** +- Dev `bun api` cold start logs the Sentry init line. +- A forced chunk failure produces a Sentry event visible in the project. +- All `packages/api/src/services/etl/*.ts` files have zero `console.*` references (`grep -rn 'console\.' packages/api/src/services/etl/` returns nothing). + +--- + +### U9. P2 #2 + P2 #3 + P2 #4 fix: error propagation + embedding-failure observability + IIFE error handling + +**Goal:** Three related but smaller correctness issues that all share the theme "errors should not vanish silently." + +**Requirements:** R2, R10 + +**Dependencies:** U1 (for `total_embedding_failures`), U8 (so the new error sites can `Sentry.captureException`) + +**Files:** +- Modify: `packages/api/src/services/etl/processLogsBatch.ts` (rethrow on DB failure at `:25-27`; remove the swallow) +- Modify: `packages/api/src/services/etl/processValidItemsBatch.ts` (in the embedding-fallback path at `:52-63`, atomically increment `etl_jobs.total_embedding_failures` before upserting; surface a Sentry warning event with `jobId` and the affected SKU count; do not throw) +- Modify: `packages/api/src/services/etl/processCatalogEtl.ts` (wrap the writer IIFE at `:89-117` in an explicit promise: `const writerPromise = (async () => { ... })().catch(err => parser.destroy(err)); ...; await writerPromise.catch(err => { throw err })` so unhandled rejections become outer-flow throws) +- Modify: `packages/api/src/routes/admin/analytics/catalog.ts` (extend the admin job-list response to include `totalEmbeddingFailures` so dashboards can surface degradation) +- Test: `packages/api/test/etl-error-propagation.test.ts` (new) + +**Approach:** +- `processLogsBatch`: catch block currently logs and returns. Replace with `throw err`. The outer `processCatalogETL` catch already exists and the chunk will retry/DLQ correctly via U3. +- Embedding fallback: at `processValidItemsBatch.ts:52-63`, on `generateManyEmbeddings` throw: + ```text + await db.update(etlJobs).set({ totalEmbeddingFailures: sql`COALESCE(${etlJobs.totalEmbeddingFailures}, 0) + ${items.length}` }).where(eq(etlJobs.id, jobId)); + logger.warn('etl.embedding.fallback', { jobId, skuCount: items.length }); + Sentry.captureMessage('etl.embedding.fallback', { level: 'warning', tags: { jobId }, extra: { skuCount: items.length } }); + // continue with upsert; embedding stays NULL + ``` +- IIFE wrap pattern: + ```text + const writerPromise = (async () => { ... })() + .catch(err => { parser.destroy(err); throw err; }); + // ... for await loop ... + await writerPromise; + ``` + Any rejection in the writer now propagates to the outer try/catch in `processCatalogETL` and triggers retry/DLQ via U3. +- Admin response extension: extend the existing `GET /admin/analytics/catalog/etl` route's select shape to include `totalEmbeddingFailures` and update the response Zod schema if one is declared. + +**Patterns to follow:** +- Atomic update idiom at `packages/api/src/services/etl/updateEtlJobProgress.ts:16-23`. +- Admin route response shape at `packages/api/src/routes/admin/analytics/catalog.ts:178-235`. + +**Test scenarios:** +- Happy path (embedding fallback): Embedding service throws → SKUs upserted with `embedding=NULL`; `total_embedding_failures` increments by exactly `items.length`; Sentry warning event fires once per batch (not per SKU). +- Happy path (logs rethrow): `processLogsBatch` DB INSERT fails → exception propagates to outer catch → chunk retried by CF Queue. +- Happy path (IIFE wrap): Writer throws inside the async IIFE → parser destroyed; outer `for await` loop terminates; outer catch fires; chunk retried. +- Edge case: Multiple consecutive embedding batches in one chunk all fall back → counter increments cumulatively; Sentry warnings fire once per batch, not once per chunk. +- Edge case: Mixed batch — some SKUs embed, then fallback kicks in for the rest → counter increments only for the failed batch's SKU count. +- Integration: Admin endpoint response includes `totalEmbeddingFailures` field for every job; the prod-shape dashboard query still parses cleanly. + +**Verification:** +- New test passes with the rethrow / wrap / counter increments in place. +- `bun test:api` overall green. +- Dev admin endpoint `GET /admin/analytics/catalog/etl?limit=5` returns the new field. + +--- + +### U10. Reconciliation: admin endpoint + automatic post-job verification (via dedicated queue) + CLI subcommand + +**Goal:** Every ETL completion writes a verification row count; operators can also trigger reconciliation on any job on demand. Surfaces the user's "missing or falsely labeling" concern as a first-class observable signal. Auto-reconciliation runs on its own queue, not via `ctx.waitUntil`, so multi-GB files do not exceed the queue invocation's 15-min wall-clock. + +**Requirements:** R7 + +**Dependencies:** U1 (for `verified_at`, `verified_row_count`, `verified_row_count_partial`), U2 (for the completion transition that enqueues the reconcile message), U8 (for Sentry warnings on delta) + +**Files:** +- Create: `packages/api/src/services/etl/reconcileJob.ts` (pure function: given a `jobId` and optional `resumeFromByte`, stream the R2 source in 100 MB byte-range windows, count newlines, checkpoint progress, finalize verification on EOF, return delta) +- Create: `packages/api/src/services/etl/processReconcileBatch.ts` (queue consumer for `packrat-etl-reconcile-queue`; calls `reconcileJob`; handles retry/resume) +- Modify: `packages/api/src/services/etl/queue.ts` (extend producer to enqueue reconcile messages; type `ReconcileMessage { jobId: string; resumeFromByte?: number }`) +- Modify: `packages/api/src/services/etl/processCatalogEtl.ts` (on the final-chunk completion transition from U2, enqueue a `ReconcileMessage` to `packrat-etl-reconcile-queue` *inside the same transaction* as the status flip so a row can never transition to `completed` without an enqueued reconcile) +- Modify: `packages/api/src/index.ts` (extend the `queue()` switch with an arm for `packrat-etl-reconcile-queue` and `packrat-etl-reconcile-queue-dev`) +- Modify: `packages/api/wrangler.jsonc` (declare `packrat-etl-reconcile-queue` and `packrat-etl-reconcile-queue-dev` as producer + consumer with its own `dead_letter_queue: 'packrat-etl-dlq'` and `max_retries: 3`) +- Modify: `packages/api/src/routes/admin/analytics/catalog.ts` (add `POST /admin/etl/:jobId/reconcile` — calls `reconcileJob` synchronously; for small/medium files returns inline; for large files returns 202 Accepted and enqueues to the reconcile queue with the existing job id) +- Modify: `packages/cli/src/commands/admin/etl.ts` (add `reconcile ` subcommand) +- Modify: admin list endpoint response shape (include `verifiedAt`, `verifiedRowCount`, and `verifiedRowCountPartial` so the dashboard surfaces it) +- Test: `packages/api/test/etl-reconciliation.test.ts` (new) + +**Approach:** +- `reconcileJob(jobId, resumeFromByte = 0)`: + 1. Read `(filename, total_processed, verified_at, verified_row_count_partial)` from `etl_jobs`. If `verified_at IS NOT NULL`, return early — idempotent no-op for redelivered messages. + 2. `r2.head(key)` → `fileSize`. + 3. From `resumeFromByte` (or `verified_row_count_partial`'s checkpoint byte position if set), read 100 MB byte-range windows. For each window: + - Count `\n` bytes in the window. + - Add to running `lineCount`. + - On the last window, subtract 1 for the header row. + - Every 5 windows (500 MB processed) or when elapsed time > 10 min: `UPDATE etl_jobs SET verified_row_count_partial = $lineCount` (checkpoint), then throw a typed `ReconcileResumeError` carrying the current byte offset so the queue retry re-enqueues with `resumeFromByte` advanced. Wall-clock budget reset. + 4. On EOF: `UPDATE etl_jobs SET verified_at = now(), verified_row_count = $lineCount, verified_row_count_partial = NULL WHERE id = $1 AND verified_at IS NULL` (idempotency gate). + 5. Compute `delta = lineCount - total_processed`. If `abs(delta) > max(10, ceil(0.01 * lineCount))`: `Sentry.captureMessage('etl.reconciliation.delta', { level: 'warning', tags: { jobId }, extra: { delta, expected: lineCount, actual: total_processed } })`. + 6. Return `{ jobId, expectedRowCount: lineCount, actualRowCount: total_processed, delta, withinThreshold }`. +- `processReconcileBatch` (queue consumer): + - For each message: try `reconcileJob(msg.jobId, msg.resumeFromByte)` → on success `ack()`. On `ReconcileResumeError`: enqueue a new message with the advanced offset and `ack()` the current one. On any other error: `retry({ delaySeconds: 60 })`. +- Auto-trigger: in U2's completion transaction, after the status flip to `completed`, enqueue `{ jobId, resumeFromByte: 0 }` to `packrat-etl-reconcile-queue`. Because both writes are in the same transaction, a row can never be `completed` without an enqueued reconcile. +- Manual endpoint (`POST /admin/etl/:jobId/reconcile`): + - For files where `fileSize < 200 MB`: call `reconcileJob` synchronously and return the result inline. + - For files ≥ 200 MB: enqueue to `packrat-etl-reconcile-queue` and return 202 with a "poll the job for `verified_at`" message. + - Optional `?force=true` query: clear `verified_at` first and re-enqueue (operator override for a re-verify). +- CLI subcommand: `packrat-admin etl reconcile ` → wraps the endpoint, polls until `verifiedAt` is set or timeout. +- The 7 historical jobs from 2026-05-14 can each be reconciled retroactively via this endpoint *before* deciding to repair (U6). Confirms the suspicion that they processed partial data before being swept. + +**Patterns to follow:** +- Queue consumer pattern from U3 (per-message ack/retry, DLQ wired). +- Streaming-count pattern: `for await (const chunk of body)` and accumulate `chunk.filter(byte => byte === 0x0A).length`. + +**Test scenarios:** +- Happy path: Job with `total_processed = 100`, R2 file has 101 lines (100 rows + header) → delta = 0; `verified_at` set; no Sentry warning. +- Happy path: Job with `total_processed = 1000`, R2 file has 1006 lines (1005 rows + header) → delta = 5; within threshold; no warning. +- Edge case: Job with `total_processed = 50000`, R2 file has 50100 lines + header → delta = 100; threshold = `max(10, 500)` = 500; within threshold; no warning. (The 50,100 case stays informational.) +- Edge case (the real case): Job with `total_processed = 400`, R2 file has 50101 lines (50100 rows + header) — what the `campmor`-shape failures looked like → delta = 49700; way over threshold; Sentry warning fires. +- Edge case (resume): A 1.5 GB file forces three resume-error checkpoints; each resume picks up at the right byte offset; final `verified_row_count` matches the true row count. +- Edge case (idempotency): A redelivered reconcile message with `resumeFromByte = 0` against a job that already has `verified_at` set — `reconcileJob` returns early without re-reading the file. +- Error path: R2 object missing → `reconcileJob` throws a typed error; queue consumer retries with backoff; after exhausting `max_retries: 3`, the DLQ captures it. +- Edge case: Job with `total_processed = NULL` (legacy stuck-job-sweep casualty) → reconcileJob computes delta as `expected - 0 = expected`; the warning carries useful context for diagnosing the historical job. +- Integration: Auto-verify fires exactly once per job, enqueued atomically with the completion transition; it does not fire for intermediate chunk completions; it does not fire twice on a redelivered final chunk (idempotency comes from the `etl_job_chunks` gate in U2). + +**Verification:** +- New test passes. +- Calling the endpoint on a real dev-seeded job returns the documented shape (inline for small files, 202 + queued for large). +- The chunk-completion transaction either commits both the status flip and the reconcile enqueue, or neither (verify with a forced enqueue failure mid-transaction). + +--- + +### U11. Quality-of-life: scheduler.wait, BATCH_SIZE rename, mergeBySku log aggregation + +**Goal:** Three tiny correctness/cleanliness wins that share a maintenance flavor and ship together. + +**Requirements:** R9 (log volume), and audit P2 #5, P2 #6, P3 #1 + +**Dependencies:** U8 (for the logger surface used by the aggregated merge summary) + +**Files:** +- Modify: `packages/api/src/services/etl/processCatalogEtl.ts:120` (replace `setTimeout(resolve, 1)` with `await scheduler.wait(0)`) +- Create: `packages/api/src/services/etl/constants.ts` (new — exports `ITEM_FLUSH_BATCH_SIZE = 100` and `CF_QUEUE_BATCH_SIZE = 100`) +- Modify: `packages/api/src/services/etl/processCatalogEtl.ts:13` and `packages/api/src/services/etl/queue.ts:17` (import from the new constants module instead of declaring inline) +- Modify: `packages/api/src/services/etl/mergeItemsBySku.ts:34-48` (replace per-SKU `console.log` with a per-batch summary `logger.info('etl.merge.summary', { jobId, mergedSkuCount, totalChangedFields })`) +- Test: `packages/api/test/etl-yield-and-constants.test.ts` (new — minimal; mostly behavior-preservation) + +**Approach:** +- `await scheduler.wait(0)` is the documented Workers Scheduler API. `scheduler.yield()` does not exist (corrected from audit P2 #5). +- The constants module is dead-simple — two exports — but the rename surfaces intent at the call site and ends the ambiguity the audit flagged at P2 #6. +- The mergeBySku aggregation accumulates change counts across one batch (already a natural unit) and logs once at the end. No per-SKU lines. + +**Patterns to follow:** +- Module organization mirrors `packages/api/src/services/etl/types.ts` for a constants file. + +**Test scenarios:** +- Behavior preservation: A 10,000-row chunk completes at least as fast as before with `scheduler.wait(0)` (regression check, not a strict assertion). +- Happy path (merge log): A batch with 50 SKU merges → exactly one log line emitted, summarizing the batch. +- Edge case: A batch with 0 merges → no log line. + +**Verification:** +- `grep -rn "setTimeout\(.*1.*\)" packages/api/src/services/etl/` returns nothing. +- `grep -rn "BATCH_SIZE\s*=" packages/api/src/services/etl/` returns only the new constants. +- A real ETL run on dev with 1k duplicate SKUs produces 1 merge summary line, not 1000. + +--- + +### U12. Validator hardening: URL scheme + length caps + SKU charset + +**Goal:** Eliminate the audit P3 #2 attack surface — `javascript:` URLs and oversize fields cannot enter the catalog. + +**Requirements:** R11 + +**Dependencies:** None (independent; can land any time after Phase 1) + +**Files:** +- Modify: `packages/api/src/services/etl/CatalogItemValidator.ts` (rewrite `isValidUrl` at `:60-67`; add length caps and SKU regex) +- Test: `packages/api/test/etl-validator.test.ts` (new or extend existing) + +**Approach:** +- `isValidUrl`: parse with `new URL()`; reject any scheme other than `http:` and `https:`. Reject URLs longer than 2048 chars. +- Length caps (rejects, not truncates): `name ≤ 500`, `description ≤ 50000`, `brand ≤ 200`, `category ≤ 200`. +- SKU regex: `/^[A-Za-z0-9_.\-\/]+$/`; max length 200. +- Each rejection produces a structured invalid-item log entry with the specific reason — surfaces in the existing `/admin/etl/:jobId/failures` endpoint. + +**Patterns to follow:** +- Existing validator structure at `packages/api/src/services/etl/CatalogItemValidator.ts`. +- Invalid-log shape at `packages/api/src/services/etl/processLogsBatch.ts`. + +**Test scenarios:** +- Happy path: Valid `https://example.com/product/123` URL accepted. +- Error path: `javascript:alert(1)` URL rejected with reason `INVALID_URL_SCHEME`. +- Error path: `mailto:foo@bar` rejected with `INVALID_URL_SCHEME`. +- Error path: URL of 3000 chars rejected with `URL_TOO_LONG`. +- Edge case: Name of exactly 500 chars accepted; 501 chars rejected. +- Edge case: SKU `ABC-123_/test.sku` accepted; SKU `