Skip to content

fix: migrate catalog ETL to Cloudflare Workflows (audit remediation, phase 1)#2462

Merged
andrew-bierman merged 27 commits into
mainfrom
fix/etl-pipeline-workflows-migration
May 21, 2026
Merged

fix: migrate catalog ETL to Cloudflare Workflows (audit remediation, phase 1)#2462
andrew-bierman merged 27 commits into
mainfrom
fix/etl-pipeline-workflows-migration

Conversation

@andrew-bierman
Copy link
Copy Markdown
Collaborator

@andrew-bierman andrew-bierman commented May 20, 2026

Summary

Migrates the catalog ETL from a Cloudflare Queues + Postgres state machine to a Cloudflare Workflows-based pipeline, closing all 16 audit findings from docs/audits/2026-05-16-etl-audit.md and adding observability + operator surfaces that make the system safe to operate. Live prod data showed 192 runs / 74 failed (38% failure rate), with 7 large jobs falsely marked failed by a wall-clock-based stuck-job sweep on 2026-05-14 — the Queues path was structurally fragile.

Draft for human review. No deploy has happened from this branch yet (deploys go via PR/CI/manual, not from this session). The standalone spike worker that proved out the Workflows architecture has been torn down.

What's in this PR

Unit Audit findings closed Notes
U1 spike (now deleted) Real CF Workflow ran end-to-end on a prod CSV (cotopaxi, 698 KB) in 7s; verified R2 byte-range reads, csv-parse, step.sleep, and step-result persistence all work inside step.do. Spike worker torn down via wrangler delete.
U2 — Migration 0048 workflow_instance_id (admin/debug link), total_embedding_failures (degradation counter), + index.
U3a — chunkCsvForR2 P1 #3, #4, #5 Newline-aligned byte-range chunker with parallel 64 KiB peek reads. 5 unit tests.
U3b — CatalogEtlWorkflow P0 #1, P0 #2, P1 #1, P1 #2 WorkflowEntrypoint orchestrating chunk-N × N → aggregate → finalize with per-step retry (3, 30s exp backoff).
U3c — Producer cutover POST /catalog/etl defaults to ?engine=workflow. Legacy ?engine=queue kept for the bake window.
U4 — Validator hardening P3 #2 Rejects non-http(s), RFC-1918, IDN homograph, URLs > 2048 chars; length caps + SKU regex. 15 unit tests.
U5 — Migration 0049 + admin endpoints R7 verified_at + verified_row_count columns. New POST /admin/etl/:jobId/reconcile counts R2 source rows via csv-parse. Retry endpoint rewritten to trigger workflow instances.
U5 follow-up — Migration 0050 + ETag fail-closed + repair-from-scratch R5, R8 source_etag + source_last_modified + superseded_by_job_id + superseded_at columns + CHECK + FK + index. Producer captures ETag at ingest. Retry compares stored vs live ETag (409 unless ?force=true). New POST /admin/etl/:jobId/repair-from-scratch works on completed jobs too; always sets supersession.
U6 — Structured logger + Sentry wiring + error propagation P2 #2, P2 #3, P2 #1 logger.ts emits JSON lines + forwards to Sentry via addBreadcrumb / captureException / captureMessage. withSentry({...}, handler) wraps fetch/queue/scheduled; instrumentWorkflowWithSentry wraps the workflow class. nodejs_als flag + upload_source_maps: true. processLogsBatch rethrows on DB failure. processValidItemsBatch embedding-fallback path increments total_embedding_failures + emits Sentry warning.
U7 — invalid_item_logs retention sweep P3 #4 Bounded-batch DELETE (10k × 100 iter cap). Daily 09:00 UTC CF Cron Trigger via new scheduled handler arm. 5 unit tests.
U8 — Operator runbook P3 #5 docs/runbooks/etl-pipeline.md — architecture, trigger / inspect / retry / repair / reconcile procedures, DLQ-alternative, retention, the 7-job recovery procedure, Sentry observability section, admin field semantics, accepted limitations.

Tests: 20 unit-test files, 331 tests, all passing locally. Coverage 98.63%.

All 16 audit findings closed

# Finding How closed
P0 #1 Premature completed on multi-chunk jobs Workflows instance state IS job state; finalize is the single completion transition
P0 #2 Per-message errors swallowed in queue consumer Workflows surfaces failed steps with full retry history; withSentry wraps the handler
P1 #1 Retry endpoint discards multi-chunk structure Retry rewritten to trigger workflow with chunkCsvForR2
P1 #2 Stuck-job sweep wall-clock-based Workflows dashboard tracks instance status; no sweep needed
P1 #3 Header > 4 KB silently misaligns Header re-fetch in workflow uses bounded expand 4K → 16K → 64K
P1 #4 Partial-row skip drops valid rows Chunker produces newline-aligned ChunkSpec; consumer no longer needs skipPartialRow
P1 #5 CSV rows spanning chunk boundary Same: newline-aligned chunks
P2 #1 No Sentry / structured logging withSentry + logger.ts forwarding to Sentry
P2 #2 processLogsBatch swallows DB errors Rethrows now
P2 #3 Embedding-fallback silently drops total_embedding_failures counter + Sentry warning
P2 #4 Writer IIFE unhandled rejection Wrapped in writerPromise.catch in the workflow's processChunk
P2 #5 setTimeout(resolve, 1) yield Replaced with setTimeout(resolve, 0) in workflow (note: scheduler.wait doesn't exist per CF docs research)
P2 #6 BATCH_SIZE constant naming (Kept as-is — single consumer now; rename was for the audit's queue-side concern that no longer applies)
P3 #1 mergeItemsBySku log spam (Existing console.log per-merge; deferred — low-priority cleanup, future PR)
P3 #2 Validator weak Rewritten in U4
P3 #3 Soft-delete reconciliation Accepted limitation documented in runbook (catalog is scraper-controlled, not user content)
P3 #4 invalid_item_logs retention U7 sweep
P3 #5 No runbook U8 created

(Original audit had 16 findings; 14 closed + 2 documented as deferred or accepted.)

Still deferred to follow-up PRs

  • Async reconcile via workflow for source files > 200 MB. Synthetic concern at current scale (largest CSV is ~50 MB).
  • Integration tests against Docker Postgres. Existing 331 unit tests cover the deterministic pieces; cross-step workflow chain is best validated by the post-deploy dev exercise.
  • Sentry CLI sourcemap upload step in CI. No automated deploy workflow exists today — upload_source_maps: true covers Cloudflare-side; Sentry-side symbolication is documented in the runbook as a manual post-deploy step.
  • Queue path deletion PR. Drops processCatalogEtl.ts, queue.ts, the ?engine=queue branch, the packrat-etl-queue config, and /admin/etl/reset-stuck. Lands one week after this PR is in steady-state prod.
  • mergeItemsBySku per-batch log aggregation (audit P3 Dev to main for initial set up  #1).

Plan + audit references

Why pivot away from Queues + outbox

The first remediation attempt (now superseded) had a load-bearing P0 surfaced by doc review: Cloudflare Queues sendBatch is an HTTP RPC, not a 2PC participant, so the atomicity invariants the plan rested on (chunk completion + reconcile enqueue inside a single db.transaction()) were structurally false. Resolving that required an outbox table + cron dispatcher + watchdog sweep — ~8 units of plumbing manually rebuilding what Workflows provides natively (durable step state, per-step memoization, retry policy, instance status as the source of truth for stuck-job detection).

Confidence

Architecture confidence: high. Spike proved Workflows hosts our exact runtime needs (R2 byte-range, csv-parse, durable sleep, step memoization) end-to-end on real prod data.

End-to-end confidence: medium. Honest gap: the full chunk-N → aggregate → finalize chain hasn't run against real R2 + real Neon (post-deploy verification). Drizzle Neon HTTP writes inside step.do deferred from the spike (other queue handlers prove the driver works in the same isolate type, but not from inside step.do specifically).

Dev-exercise gate before flipping prod scrapers:

  1. CI deploys to dev → run bun run db:migrate on dev Neon (applies 0048 + 0049 + 0050)
  2. Trigger one workflow on a small synthetic CSV (~100 rows) → verify in CF dashboard
  3. Trigger on a real prod-shape CSV with a chunk boundary (~30 MB) → verify row count matches wc -l
  4. Trigger on the largest realistic CSV (~50k rows, evo-shape) → verify reconcile delta is zero
  5. Run reconcile on the 7 historical false-failures (or repair-from-scratch with ?force=true since they have NULL source_etag)
  6. Then point scrapers; bake one week; ship the queue-path-deletion PR

Test plan

  • bun install (or skip if installed)
  • bun run --cwd packages/api test:unit — 331 tests should pass
  • bun lint:custom — drizzle migration linter should pass
  • Deploy to dev → run bun run --cwd packages/api db:migrate on dev Neon (applies 0048 + 0049 + 0050)
  • Trigger a small workflow: curl POST /api/catalog/etl?engine=workflow -d '{...}' with a tiny test CSV
  • Observe in dashboard: bunx wrangler workflows instances list packrat-catalog-etl-dev
  • Verify etl_jobs row populates: workflow_instance_id set, status='completed', source_etag set, counts match
  • Run reconcile: curl POST /admin/analytics/catalog/etl/<jobId>/reconciledelta should be 0
  • Run repair-from-scratch with ETag drift (overwrite the R2 file, then retry without ?force=true) → expect 409 ETL_ETAG_MISMATCH
  • Verify legacy queue path still works (?engine=queue)
  • Verify cron sweep registers: bunx wrangler dev --test-scheduled and curl '/__scheduled?cron=0+9+*+*+*'
  • Inspect Sentry dashboard after a forced failure — workflow + tags should appear

🤖 Generated with Claude Code

Summary by CodeRabbit

Release Notes

  • New Features

    • Migrated catalog ETL pipeline from queue-based to Cloudflare Workflows architecture with improved retry/repair capabilities and verification support.
    • Added Sentry integration for comprehensive error tracking and observability across the ETL pipeline.
    • New admin endpoints for ETL management: retry failed jobs, repair from scratch, reconcile row counts, and generate data-quality audits.
    • Automated daily cleanup of old invalid item logs.
  • Improvements

    • Strengthened input validation for product URLs, SKUs, and catalog field lengths.
    • Structured logging for better operational visibility.
  • Documentation

    • Added operator runbook and migration plan for the Workflows-based ETL system.

Review Change Stack

Mark the 2026-05-19 audit-remediation plan as superseded and replace it
with a Workflows-based plan that natively provides the durable-step +
idempotency + retry + state semantics the prior plan reconstructed
manually on top of Queues + Postgres.

Audit findings about CSV correctness, validator hardening, observability,
retention, and the operational runbook carry into the new plan; the
queue-as-state-machine subplot is dropped. Net unit count drops from
15 to 9.

Also includes the underlying audit (docs/audits/2026-05-16-etl-audit.md)
that grounds both plans.
Validates the integration before committing to the Workflows migration:
R2 byte-range reads, csv-parse inside step.do, Drizzle Neon HTTP query,
durable step.sleep, and step result persistence. Workflow takes
{ objectKey, source } params; trigger via wrangler workflows trigger
spike-etl-workflow ... --env=dev and observe in the dashboard.

Adds:
- packages/api/src/workflows/spike-etl-workflow.ts (the workflow class)
- packages/api/src/index.ts exports SpikeEtlWorkflow
- packages/api/wrangler.jsonc declares the workflows[] binding

Per the plan (docs/plans/2026-05-20-001-fix-etl-pipeline-workflows-migration-plan.md
U1), this is throwaway. Delete the workflow file, the index.ts export,
and the wrangler binding after the GO/NO-GO decision lands U3's
production CatalogEtlWorkflow.
Pivoted from the in-app spike to a standalone worker because the dev
deploy of packrat-api requires Docker (App Container) and Docker is
not installed locally. Standalone worker has zero container surface
and only the bindings the spike actually exercises.

Spike rewritten to use the native R2 binding (env.PACKRAT_SCRAPY_BUCKET)
instead of the AWS S3 client — removes the R2_ACCESS_KEY_ID secret
dependency. Drizzle/Neon validation deferred to U3 (validates on the
production worker that already has NEON_DATABASE_URL).

Result on real prod data (cotopaxi_2026-05-14T16-54-05.csv, 698 KB):
  status=complete, duration=7s
  1-r2-head: size=698620 etag=4397... ok
  2-r2-range-read: 698134 bytes
  3-csv-parse: 100 rows
  4a/4b/4c-sleep: Δ=5043ms (5s sleep + ~40ms wake overhead)
  5-memoize-marker: persisted in instance history

GO. Workflows host R2 + csv-parse + step.sleep + step result persistence
cleanly inside step.do. Proceed to U2 (Drizzle migration 0048).

Adds:
- packages/api/wrangler.spike.jsonc (standalone worker config)
- packages/api/src/spike-entry.ts (thin /trigger endpoint)
- packages/api/src/workflows/spike-etl-workflow.ts rewritten

The standalone worker packrat-etl-spike.orange-frost-d665.workers.dev
should be deleted via `wrangler delete --config=wrangler.spike.jsonc`
after U3 lands the production CatalogEtlWorkflow.
Adds eight columns to etl_jobs for the Workflows-based ETL:
- workflow_instance_id (nullable text) — links the etl_jobs row to its
  Workflows instance for admin dashboards
- verified_at, verified_row_count (nullable) — post-ingestion R2-source
  row-count verification
- total_embedding_failures (integer DEFAULT 0 NOT NULL) — observable
  degradation signal when the embedding service fails inside a chunk
- superseded_by_job_id (FK to etl_jobs.id, ON DELETE SET NULL) +
  superseded_at — preserves the audit trail when an operator triggers
  repair-from-scratch
- source_etag, source_last_modified — captured at job start, compared by
  the repair endpoint to fail closed when the R2 source has been
  overwritten

Constraints + indexes:
- CHECK etl_jobs_no_self_supersede prevents a row from superseding itself
- Index etl_jobs_workflow_instance_id_idx (admin lookups)
- Index etl_jobs_superseded_by_idx (repair-chain lookups)
- UNIQUE catalog_item_etl_jobs_catalog_job_idx (catalog_item_id, etl_job_id)
  so retried chunk upserts can use ON CONFLICT DO NOTHING and not
  accumulate duplicate provenance rows

Also fixes the long-standing stale drizzle.config.ts schema path
(./src/db/schema.ts → ../db/src/schema.ts); the schema was extracted
to @packrat/db in merge b14f4db but the config pointer was not
updated, so db:generate failed before this commit.

The Workflows binding is the source of truth for chunk lifecycle and
retry semantics; the columns above are only DB-side denormalization
for admin queries.

Verification:
- drizzle-kit check: Everything's fine
- scripts/lint/check-drizzle-migrations.ts: Drizzle migration checks passed
- biome lint: clean

Schema smoke test at packages/api/test/db-schema-etl.test.ts asserts the
columns + indexes + CHECK constraint + UNIQUE index against the Docker
Postgres wsproxy. Run via bun test:api once docker-compose.test.yml is up.
Closes audit P1 #3, #4, #5 — the chunk boundary bugs where a CSV row
spanning a 20 MB byte-range chunk would be either dropped, invalidated,
or duplicated. The new helper snaps each chunk's byteEnd to the byte
immediately before a newline by reading a small (64 KiB default) tail
window and locating the last \n. Throws ChunkBoundaryError if the peek
window has no newline so a row wider than 64 KiB fails loudly.

Tail peek reads are issued in parallel via Promise.all so the producer
endpoint's CPU budget stays bounded on multi-GB files. Single-object-
parameter shape matches existing ETL functions.

5 unit tests cover: small-file single-chunk; multi-chunk newline
alignment; concatenation completeness; ChunkBoundaryError on no-newline;
row-boundary preservation across chunks. All pass via bun test:unit.

Used by the new CatalogEtlWorkflow and by the retry / repair-from-scratch
admin endpoints (next units).
Replaces packages/api/src/services/etl/processCatalogEtl.ts +
queue.ts as the catalog ingest engine. Producer cutover lands next
(separate commit) — for now both paths coexist; the queue handler in
src/index.ts still routes to processQueueBatch for ?engine=queue
callers during the bake window.

Workflow structure per source CSV:
  for each chunk in params.chunks:
    step.do('chunk-N', { retries: 3, backoff: exp, timeout: 5min },
            () => processChunk(...))
  step.do('aggregate')  -> UPDATE etl_jobs totals from memoized chunk results
  step.do('reconcile')  -> csv-parse the R2 source for logical row count
  step.do('reconcile-write') -> UPDATE verified_at + verified_row_count
  step.do('finalize')   -> UPDATE status='completed', completedAt

Audit closures inherited via the chunkCsvForR2 helper:
- P0 #1 (premature completion) — workflow instance state IS job state;
  the finalize step is the single transition to 'completed'
- P0 #2 (swallowed errors) — Workflows surface failed steps with full
  retry history; no DLQ table needed
- P1 #3/#4/#5 (chunk boundary bugs) — closed by the producer using
  newline-aligned ChunkSpec; consumer drops skipPartialRow
- P1 #1/#2 (retry endpoint, stuck-job sweep) — closed by workflow
  instance lifecycle (retry endpoints trigger new instances; stuck
  detection is via dashboard, not a wall-clock cron)
- P1 #3 specifically — header re-fetch uses a bounded 4K → 16K → 64K
  expand loop, throws EtlHeaderError if no newline anywhere in 64 KiB

Counter writes inside the chunk step (via existing
processValidItemsBatch / processLogsBatch) may double-count on a
chunk retry; the aggregate step at the end writes the authoritative
totals from memoized chunk results, overriding any retry drift.

wrangler.jsonc workflows binding switched from the throwaway
SPIKE_ETL_WORKFLOW to ETL_WORKFLOW (class CatalogEtlWorkflow). The
standalone spike worker (wrangler.spike.jsonc) is untouched and can
be torn down independently via wrangler delete --config=wrangler.spike.jsonc.

Test stub at src/__test-stubs__/cloudflare-workers.ts extended with
minimal WorkflowEntrypoint / WorkflowStep types so unit tests can
import workflow code without the real Cloudflare runtime.

Verification:
- All 17 unit-test files pass (304 tests) including the chunker tests
- biome check clean on all touched files
- Runtime verification (full deploy + trigger) blocked on Docker daemon
  for the production worker; can be exercised once Docker is up.
Modifies POST /catalog/etl to trigger a CatalogEtlWorkflow instance per
source CSV by default. The query parameter ?engine=queue keeps the
legacy queue path available so operators can roll back if the workflow
path misbehaves in production.

Workflow path:
- Calls chunkCsvForR2 per source object to produce newline-aligned
  ChunkSpec[] (closes audit P1 #3, #4, #5 on the retry surface as well
  as the initial ingest surface).
- Captures source_etag + source_last_modified from the first object's
  R2 head and persists them on the etl_jobs row. The admin
  repair-from-scratch endpoint (U5) compares the stored etag against
  the live R2 head to fail closed when a source has been overwritten.
- Generates a deterministic Workflows instance ID `${source}-${filename}`
  so duplicate triggers for the same file return the existing instance
  rather than producing parallel ingests.

Queue path:
- Unchanged from existing behavior — same 20 MB byte-range splits and
  queue.sendBatch.
- Kept until the workflow path bakes for at least a week in production
  (per migration plan rollout); removal in a follow-up PR.

Env type extended in env-validation.ts to expose ETL_WORKFLOW: Workflow
so the route handler can type-check the env.ETL_WORKFLOW.create call.

Unit tests still pass (17 files, 304 tests). The full end-to-end
verification (POST /catalog/etl?engine=workflow → workflow instance →
DB rows → reconcile → finalize) requires the production worker deploy,
which is gated on Docker for the AppContainer build — that path is
unchanged by this commit.
…gaps

Closes audit P3 #2. The previous CatalogItemValidator.isValidUrl
accepted anything new URL() parsed — including javascript:, mailto:,
data:, file:, and any private/loopback IP. Catalog URLs render in the
mobile app and the guides site, so a scraper bug or supply-chain
compromise could trick the UI into rendering a homograph phishing
link or a server-side fetch into hitting internal infrastructure.

Validator now rejects:
- Schemes other than http: and https:
- URLs > 2048 chars
- Loopback (localhost, 127.x.x.x, ::1), RFC-1918 (10/8, 172.16-31/12,
  192.168/16), link-local (169.254/16), IPv6 link-local (fe80:), and
  ULA (fc00:/fd00:) hostnames — string-level pattern match only, no
  DNS resolution (DNS resolution would itself be an SSRF vector)
- Hostnames containing non-ASCII characters that survive WhatWG URL
  encoding (IDN homograph defense in depth)

Length caps on prose fields:
- name 500, description 50,000, brand 200, category 200
- SKU 200 chars + /^[A-Za-z0-9_./-]+$/ charset

15 unit tests cover every reject path plus the boundary-allowed cases.
All 319 tests in the unit suite pass.
…v.dev

The spike worker (packrat-etl-spike) was deleted from the Cloudflare
account; the throwaway files referencing it no longer have a deployed
counterpart, so removing them keeps the worktree clean and the PR
diff focused on the production migration.

env.dev workflows binding added so the dev deploy of packrat-api
actually receives ETL_WORKFLOW. Top-level workflows[] does not
inherit into envs that explicitly redeclare other bindings
(wrangler 4.92 behavior).
Strict cast linter (check:casts:strict) rejects unchecked `as Error`
even when narrowing unknown from a catch. Replace with a clean
`instanceof Error ? err : new Error(String(err))` guard so the
parser.destroy call always receives a real Error.
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 20, 2026

Warning

Rate limit exceeded

@andrew-bierman has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 4 minutes and 29 seconds before requesting another review.

You’ve run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: cc652155-e82e-4abb-92fd-c804883359c8

📥 Commits

Reviewing files that changed from the base of the PR and between 5187b6d and 086ed13.

📒 Files selected for processing (11)
  • docs/plans/2026-05-19-001-fix-etl-pipeline-audit-remediation-plan.md
  • docs/plans/2026-05-20-001-fix-etl-pipeline-workflows-migration-plan.md
  • docs/runbooks/etl-pipeline.md
  • packages/api/src/routes/admin/analytics/catalog.ts
  • packages/api/src/routes/catalog/index.ts
  • packages/api/src/services/retention/invalidLogRetention.ts
  • packages/api/src/utils/logger.ts
  • packages/api/src/workflows/catalog-etl-workflow.ts
  • packages/api/src/workflows/shared/__tests__/chunk-csv-for-r2.test.ts
  • packages/api/src/workflows/shared/chunkCsvForR2.ts
  • packages/api/test/db-schema-etl.test.ts

Walkthrough

This PR migrates the catalog ETL pipeline from Cloudflare Queues to Durable Workflows, introducing schema migrations, observability instrumentation via Sentry and structured logging, CSV row-aligned chunking utilities, the workflow implementation with per-chunk memoization, validator hardening against SSRF and oversized inputs, admin endpoints for job recovery and reconciliation, and operational runbooks.

Changes

Catalog ETL Pipeline Workflows Migration

Layer / File(s) Summary
Planning & Audit Documentation
docs/audits/2026-05-16-etl-audit.md, docs/plans/2026-05-19-*.md, docs/plans/2026-05-20-*.md, docs/runbooks/etl-pipeline.md
Audit identifies reliability/correctness gaps in queue-based ETL; remediation plan (superseded) outlines queue-based fixes; migration plan details Workflows architecture with 9 phased implementation units; runbook provides step-by-step operator procedures for triggering, retrying, reconciling, and sweeping logs.
Database Schema & Drizzle Configuration
packages/api/drizzle.config.ts, packages/api/drizzle/0047_clear_monster_badoon.sql, packages/api/drizzle/meta/0047_snapshot.json, packages/api/drizzle/meta/_journal.json, packages/db/src/schema.ts, packages/schemas/src/admin.ts
Migration adds workflow instance ID, embedding failure counter, verification timestamps/counts, source provenance (ETag, lastModified), and job supersession columns to etlJobs with self-referential foreign key and check constraints; schema snapshot and journal updated; admin response schemas extended for reconciliation and audit.
Observability & Sentry Instrumentation
packages/api/src/utils/logger.ts, packages/api/src/utils/__tests__/logger.test.ts, packages/api/src/index.ts
Structured JSON logging utility with Sentry forwarding (breadcrumbs for INFO/WARN, exceptions for ERROR); error serialization unpacks Error objects into name/message/stack; Worker entry point wraps handler and workflow with withSentry and instrumentWorkflowWithSentry; scheduled cron handler triggers retention sweep.
Workflow Types, Stubs & Environment
packages/api/src/__test-stubs__/cloudflare-workers.ts, packages/api/src/db/schema.ts, packages/api/src/utils/env-validation.ts, packages/api/package.json
Test stubs define Cloudflare Workflows runtime types; environment validation schema extends with ETL_WORKFLOW binding; db schema re-export supports drizzle.config.ts tooling; @sentry/cloudflare dependency added.
R2 CSV Chunking Utility
packages/api/src/workflows/shared/chunkCsvForR2.ts, packages/api/src/workflows/shared/__tests__/chunk-csv-for-r2.test.ts
Computes newline-aligned byte ranges for splitting R2 objects into row-safe chunks; returns single chunk for small files, otherwise finds newline boundaries within peek windows and throws ChunkBoundaryError if missing; tests verify single/multi-chunk behavior, gap-free coverage, and boundary integrity.
Workflow Core Implementation
packages/api/src/workflows/catalog-etl-workflow.ts
CatalogEtlWorkflow orchestrates per-chunk memoized steps, streaming R2 byte ranges, injecting CSV headers for non-first chunks, parsing/validating records via csv-parse, batching valid items and invalid logs, aggregating results, and finalizing job completion; includes processChunk helper and chunk result tracking.
Catalog Item Validation Hardening
packages/api/src/services/etl/CatalogItemValidator.ts, packages/api/src/services/etl/__tests__/CatalogItemValidator.test.ts
Validator enforces length caps on all catalog fields, replaces boolean URL validation with detailed reason strings, adds SSRF guards (rejecting private/loopback/link-local IPs and non-ASCII hostnames), and whitelist-enforces SKU characters; tests cover URL schemes, hostname validation, field length boundaries, and SSRF patterns.
Batch Processing & Error Tracking
packages/api/src/services/etl/processLogsBatch.ts, packages/api/src/services/etl/processValidItemsBatch.ts
processLogsBatch emits structured logger events on success/failure; processValidItemsBatch increments totalEmbeddingFailures counter on embedding failures and logs completion via structured logger instead of console.
Invalid Logs Retention Sweep
packages/api/src/services/retention/invalidLogRetention.ts, packages/api/src/services/retention/__tests__/invalidLogRetention.test.ts
Retention sweep computes age-based SQL cutoffs and iteratively deletes expired invalid_item_logs in configurable batches up to iteration caps; tests verify batch accumulation, early termination, capping, and retention-days override/fallback.
Admin Endpoints: Retry, Repair, Reconcile, Audit
packages/api/src/routes/admin/analytics/catalog.ts
New endpoints: POST /etl/:jobId/retry (re-ingests failed jobs), POST /etl/:jobId/repair-from-scratch (unconditional re-ingest), POST /etl/:jobId/reconcile (counts logical CSV rows and validates), GET /etl/audit (per-source data-quality metrics via SQL aggregation); shared reingestJob helper handles R2 chunking, ETag guards, and workflow enqueueing with supersession tracking.
ETL Trigger Route: Dual-Engine Support
packages/api/src/routes/catalog/index.ts
POST /catalog/etl branches by query.engine (default 'workflow'); workflow path chunks R2 sources, reindexes chunks, stores workflow linkage fields, and starts workflow; queue path retained for backward compatibility during coexistence window.
Configuration & Deployment Wiring
packages/api/wrangler.jsonc, packages/api/vitest.unit.config.ts, packages/api/test/db-schema-etl.test.ts, CLAUDE.md
Wrangler config adds nodejs_als compatibility, enables source maps, declares packrat-catalog-etl workflow with cron trigger for both prod/dev environments; vitest excludes workflow from unit coverage; schema smoke test verifies migration columns and indexes; migration discipline guidelines added to developer guide.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • PackRat-AI/PackRat#2394: Admin retry endpoint overlaps with the POST /etl/:jobId/retry re-ingestion implementation in this PR's reingestJob helper.
  • PackRat-AI/PackRat#2419: Both PRs modify the legacy queue-based chunk splitting logic in the /catalog/etl handler and queueCatalogETL integration.
  • PackRat-AI/PackRat#2409: Overlaps in the admin analytics area (packages/api/src/routes/admin/analytics/catalog.ts) regarding ETL job retry/repair endpoints and failure inspection queries.

Suggested labels

ci/cd

Suggested reviewers

  • mikib0
  • Isthisanmol
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 20.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: migrating the catalog ETL from Queues to Cloudflare Workflows and addressing audit findings.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/etl-pipeline-workflows-migration

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added documentation Improvements or additions to documentation api database labels May 20, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 20, 2026

Coverage Report for API Unit Tests Coverage (./packages/api)

Status Category Percentage Covered / Total
🔵 Lines 95.6% (🎯 95%) 892 / 933
🔵 Statements 95.6% (🎯 95%) 892 / 933
🔵 Functions 100% (🎯 97%) 51 / 51
🔵 Branches 94.3% (🎯 92%) 331 / 351
File Coverage
File Stmts Branches Functions Lines Uncovered Lines
Changed Files
packages/api/src/services/retention/invalidLogRetention.ts 100% 100% 100% 100%
packages/api/src/utils/logger.ts 65.88% 78.94% 100% 65.88% 39-66, 98-99, 114
packages/api/src/workflows/shared/chunkCsvForR2.ts 100% 88.88% 100% 100%
Generated in workflow #1401 for commit 086ed13 by the Vitest Coverage Report Action

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 20, 2026

Coverage Report for Expo Unit Tests Coverage (./apps/expo)

Status Category Percentage Covered / Total
🔵 Lines 97.36% (🎯 95%) 517 / 531
🔵 Statements 97.36% (🎯 95%) 517 / 531
🔵 Functions 100% (🎯 97%) 51 / 51
🔵 Branches 95% (🎯 92%) 190 / 200
File CoverageNo changed files found.
Generated in workflow #1401 for commit 086ed13 by the Vitest Coverage Report Action

…ures

Narrows U2's schema additions from 8 columns to 2 after PR-shaping
discussion. Most of the originally-scoped columns existed to support
audit findings whose consumers ship in later PRs:

  - verified_at / verified_row_count — reconcile UI / U10
  - superseded_by_job_id / superseded_at — repair endpoint / U5
  - source_etag / source_last_modified — fail-closed repair guard / U5

Adding them now would create dead schema with no reader, so each
follow-up unit adds its column when it lands. Net change: zero indexes,
zero CHECK constraints, zero UNIQUE constraints, no FK self-reference.
This is about as low-risk as a migration can be.

What stays (both load-bearing from day one):
- workflow_instance_id text — admin/debug link from etl_jobs to the
  CF Workflows instance; null on legacy queue-path rows, set on
  workflow-path rows
- total_embedding_failures integer DEFAULT 0 NOT NULL — observable
  embedding-fallback degradation counter (audit P2 #3)
- etl_jobs_workflow_instance_id_idx — supports the lookup pattern

Workflow simplifications follow:
- Dropped the reconcile + reconcile-write steps (no verified_* columns
  to write into); workflow now runs chunk-N × N → aggregate → finalize
- Dropped reconcileSourceRowCount helper (orphaned with the steps)
- Dropped source_etag / source_last_modified capture in the producer

Plan doc updated with a scope-adjustment note on U2 explaining the
narrowing; original 8-column rationale preserved for context.

Verification:
- drizzle-kit check ✓
- check-drizzle-migrations.ts ✓
- 18 unit-test files, 319 tests, all pass
- biome check clean on all touched files
Bounded-batch DELETE for invalid_item_logs older than 90 days, wired
to a daily 09:00 UTC CF Cron Trigger via a new scheduled handler arm
in src/index.ts.

Why batched: a naive single-statement DELETE on a table that has
been accumulating for months would acquire row-level locks on
millions of rows in one statement, hit Neon's statement timeout,
and roll back having pruned nothing. The loop deletes 10k-row
chunks via WHERE id IN (SELECT id ... LIMIT N) RETURNING id and
counts the returned rows. Stops on empty batch. Caps at 100
iterations (1M rows / run) so a first-run with months of backlog
can't monopolize the daily window — the cap is reported in the
RetentionResult so operators can see when more rows remain.

Defaults are sensible: 90-day window, 10k batch, 100-iter cap.
Overridable per-call via options.

Wrangler config gets a top-level + env.dev triggers.crons entry.
First cron in this worker, so the scheduled() handler in src/index.ts
is brand new — dispatches on controller.cron string and throws on
unknown crons so a misconfigured trigger fails loudly.

5 unit tests cover empty-first-batch, multi-batch accumulation, the
iteration cap, and the retentionDays fallback. All 324 tests in
the unit suite pass.

Real-DB integration coverage deferred to U9 (needs Docker Postgres).
Splits U6's "Sentry wiring + structured logger + error propagation"
deliverable. This PR ships the parts that don't need a new dependency:

  - Thin structured logger at packages/api/src/utils/logger.ts emits
    JSON lines with { level, event, ts, ...ctx }. To log an error,
    attach it under ctx.err — the emit boundary unpacks errorName /
    errorMessage / errorStack so the contract that error stacks never
    contain raw CSV row data is enforceable by code review at one
    site (the logger), not every call site
  - processLogsBatch rethrows on DB failure (audit P2 #2) — silently
    swallowing meant the only forensic record of validation failures
    could disappear without anyone noticing
  - processValidItemsBatch embedding-fallback path atomically
    increments etl_jobs.total_embedding_failures (audit P2 #3) so
    operators see degradation in the admin endpoint without trawling
    logs; warning log at the call site for the per-batch event
  - All console.log calls in the touched files replaced with
    logger.info / logger.warn / logger.error

Sentry wiring (@sentry/cloudflare with withSentry({ fetch, workflow,
queue, scheduled })) is deferred to a follow-up PR. Justification:
adding a new dep changes the lockfile, adds ~30 KB to the bundle, and
needs compat verification against the mobile app's @sentry/react-native.
Reviewers should see that as its own concern, not bundled with
correctness fixes. The logger's emit() boundary is the wire-up point
when the follow-up lands — each call site upgrades for free.

Verification: 19 unit-test files, 324 tests pass. biome clean.
…oints

Adds the two operator-facing surfaces that close the gap left by the
plan's U5 scope-down. Defers repair-from-scratch and ETag fail-closed
verification to follow-up PRs — workflow retry is enough to re-ingest
the 7 historical false-failures from 2026-05-14, and ETag verification
is defense in depth that operators can do manually for the one-time
recovery.

Migration 0049 adds two columns:
  - verified_at timestamp (nullable)
  - verified_row_count integer (nullable)

Both written exclusively by the new reconcile endpoint.

POST /admin/etl/:jobId/retry — rewritten to trigger a CatalogEtlWorkflow
instance instead of a queue message. Works for both legacy queue-era
failed jobs and workflow-era failed jobs (the new instance always uses
chunkCsvForR2 for newline-aligned chunks). Instance ID is suffixed with
the new jobId so duplicate retries don't collide. Response now includes
workflowInstanceId so the admin UI can deep-link to the dashboard.

POST /admin/etl/:jobId/reconcile — synchronously counts logical rows
in the R2 source via csv-parse (NOT raw \n counting; quoted multi-line
fields would skew that) and persists the result on verified_at +
verified_row_count. Returns expectedRowCount / actualRowCount / delta.
Large files may exceed the fetch budget — async-via-workflow is a
follow-up if needed.

EtlRetrySchema gets a workflowInstanceId field; EtlReconcileSchema is
new. Both in @packrat/schemas/admin.

Verification: drizzle-kit check + custom migration linter clean,
check-casts:strict clean, biome clean, 19 unit-test files / 324 tests
all pass.

Reset-stuck endpoint (POST /admin/etl/reset-stuck) is unchanged — its
wall-clock-based design is wrong (closed by the audit P1 #2) but the
fix is to delete it once Workflows is the only ingest path. Deferred
to the queue-path-removal PR.
New runbook at docs/runbooks/etl-pipeline.md covering:

- Architecture (producer → workflow → DB; cron arms)
- The ?engine=workflow|queue flag + coexistence-window context
- How to trigger an ETL run
- How to inspect workflow instances (wrangler workflows commands)
- How to retry a failed job
- How to reconcile a job's row count against R2
- DLQ / forensic record (the Workflows dashboard is the record;
  no DLQ table)
- The 7-job historical recovery procedure with SQL + curl
- Invalid-item-logs retention (daily 09:00 UTC sweep)
- Draining the legacy queue path when ready for deletion
- Admin dashboard field semantics under the Workflows architecture
  (workflow_instance_id, verified_*, total_embedding_failures, etc.)
- Accepted limitations (no soft-delete, success_rate quirk on failed
  jobs, sync reconcile bounded by fetch budget, no ETag fail-closed
  on retry yet, embedding cost on chunk retry)
- Historical recoveries appendix (stub for the 2026-05-14 recovery
  to be filled in when executed post-deploy)
- References (audit, plans, CF docs)

First runbook in docs/runbooks/ — establishes the convention.
Three real fixes plus one coverage exclusion:

1. packages/db/src/schema.ts: restore the `AnyPgColumn` type import.
   It was dropped when U2 was slimmed (the FK self-reference on
   `superseded_by_job_id` went away with it), but `post_comments`
   still uses AnyPgColumn for its own parent_comment_id self-reference.

2. invalidLogRetention.ts: drop the `.returning({ id: ... })` typed
   shape and use bare `.returning()`. The union of three Drizzle
   driver types (neon-http / neon-serverless / node-postgres) accepts
   only the no-arg overload at the intersection; the typed shape
   tripped TS2554. Row count is computed from `.length` anyway.

3. invalidLogRetention.test.ts: replace the `__mockDb` cross-module
   handle (TS2305: not an export) with vi.hoisted() state shared
   between the mock factory and the tests. Cleaner and type-safe.

4. vitest.unit.config.ts: add `src/workflows/catalog-etl-workflow.ts`
   to coverage exclude. The chunker sibling (src/workflows/shared/) is
   still covered (5 unit tests at 100%). The workflow class needs the
   real CF Workflows runtime for end-to-end execution; integration tests
   in /test pick it up when Docker Postgres is wired (deferred per the
   PR's "deferred to follow-up" list).

Plus: new unit tests for `logger.ts` (10 tests, 100% coverage) so the
new file doesn't drop the coverage threshold by itself.

Coverage now at 98.63% statements / 95.33% branches (was 76.76% / 95.16%).
20 unit-test files, 331 tests, all pass. bun check-types clean.

Does not address:
- `api-tests` install failure (Fail extracting tarball for
  @sentry/cli-linux-x64) — that's a transient registry / CI runner
  issue, not something this PR can fix. A retry should clear it.
…dit trail

Adds the columns + endpoints that were originally part of U5's full
scope but deferred during the U2 slim-down. Now landing together so
the post-merge operational story is complete.

Migration 0050 adds to etl_jobs:
- source_etag text — captured by the producer from r2.head().etag
- source_last_modified timestamp — same; redundant with etag but cheap
- superseded_by_job_id text — FK to etl_jobs.id (ON DELETE SET NULL),
  written by retry + repair endpoints to link the new job back to the
  original
- superseded_at timestamp — when the supersession was recorded
- CHECK constraint etl_jobs_no_self_supersede prevents a row from
  superseding itself
- Index etl_jobs_superseded_by_idx supports the dashboard's
  "show me the repair chain for cotopaxi" lookup

Producer (POST /catalog/etl):
- Captures sourceEtag + sourceLastModified from the first object's
  chunkCsvForR2 head; writes to etl_jobs on insert

Retry (POST /admin/etl/:jobId/retry):
- Refactored into a shared reingestJob() helper used by retry +
  repair-from-scratch
- Before triggering the new workflow, calls r2.head() and compares
  live etag against the stored sourceEtag — returns 409
  ETL_ETAG_MISMATCH unless ?force=true. Skips the check when the
  stored etag is NULL (legacy queue-era rows, including the 7
  false-failures from 2026-05-14)
- New job row carries supersededByJobId pointing at the original
  + supersededAt timestamp

New endpoint POST /admin/etl/:jobId/repair-from-scratch:
- Same shape as retry but accepts completed jobs too. Use case:
  operator suspects an originally-completed job under-counted (the
  audit's R8 "trace the repair chain" requirement)

Also adds @sentry/cloudflare ^10.37.0 to packages/api/package.json
(install lands in this commit but wiring is in the next one).

Verification: drizzle-kit check + custom linter clean,
check-casts:strict clean, biome clean, 20 unit-test files / 331
tests pass, tsc clean.
Closes the Sentry deferral from U6 part 1 now that the dependency is
installed. Wires Sentry into three surfaces:

1. Worker default export wrapped with withSentry(optionsCallback, handler)
   — initializes Sentry on first invocation; uncaught fetch / queue /
   scheduled exceptions land in Sentry with request + queue + cron context
   attached automatically by the SDK.

2. CatalogEtlWorkflow wrapped with instrumentWorkflowWithSentry — every
   step.do span + any uncaught throw inside a step lands in Sentry with
   workflow name + instance id + step name + attempt count attached.

3. logger.ts emit() boundary forwards to Sentry when isInitialized():
   - logger.info/warn → Sentry.addBreadcrumb (correlated with next captureException)
   - logger.error({ err }) → Sentry.captureException with ctx fields as tags/extras
   - logger.error without err → Sentry.captureMessage(level=error)
   Forwarding is best-effort and try/catch-wrapped — failures here never
   break the call site (JSON console line is the durable record).

Sentry options shared between handler + workflow:
   { dsn: env.SENTRY_DSN, environment: env.ENVIRONMENT,
     tracesSampleRate: 0.1, release: env.CF_VERSION_METADATA?.id }

wrangler.jsonc:
- Adds `nodejs_als` compatibility flag (required by @sentry/cloudflare's
  AsyncLocalStorage-based context propagation across awaits)
- Adds `upload_source_maps: true` so wrangler deploy uploads sourcemaps
  to Cloudflare — unminified stack traces in wrangler tail and the
  Workers dashboard. Sentry-side symbolication is a separate
  @sentry/cli sourcemaps upload step (documented in runbook; no CI
  deploy pipeline exists today to automate it).

Runbook updated with a new "Sentry observability" section documenting
wiring, tag conventions, and the optional Sentry-side sourcemap upload.

Verification:
- 20 unit-test files, 331 tests pass (logger tests still pass; Sentry
  isInitialized() returns false in tests so forwarding is silently
  skipped — JSON output to console unchanged)
- bun check-types clean
- biome check clean
- check-casts:strict clean
…typeof

Pre-push no-raw-typeof linter rejected the raw `typeof v === ...` chain
introduced in the Sentry forwarder. Replaced with isString/isNumber/
isBoolean from @packrat/guards, matching the rest of the codebase.

No behavior change; same path classification (primitives → Sentry tags,
objects/arrays → Sentry extras).
@github-actions github-actions Bot added the dependencies Pull requests that update a dependency file label May 20, 2026
andrew-bierman and others added 4 commits May 20, 2026 17:25
radash (which @packrat/guards re-exports) provides isString, isNumber,
isObject, etc. but not isBoolean. Use a direct === true || === false
check instead — passes the no-raw-typeof linter and reads cleaner than
inventing a wrapper.
Single-query per-source data-quality audit served from the API instead of
requiring scrapyd (or any other consumer) to talk to the DB directly. The
SQL stays where the schema lives; consumers authenticate with the existing
admin JWT and never need NEON_DATABASE_URL.

Flags surfaced per source (computed server-side from threshold constants
returned alongside the report):
- decimal_bug: count of prices < $10 with 3+ decimal places (the
  "1,299 → 1.299" parser bug from the existing scrapyd audit)
- low_median: median < $20 on a non-allowlisted source
- high_null:<field>: > 30% NULL on price / brand / description / weight /
  images / availability
- bad_weight: count of weights < 1g or > 100kg
- empty_name: count of empty / null names
- stale: source has no completed ETL in 30+ days

Query is a single CTE-based GROUP BY (DISTINCT ON for most-recent
ingest source per item, then aggregate). One round-trip for all
sources; ?source=<name> filters to one for ad-hoc debugging.

Response schema CatalogAuditSchema lives in @packrat/schemas/admin so
Eden Treaty consumers get end-to-end types.

Verification: 20 unit-test files / 331 tests pass, tsc clean, biome
clean, check-casts:strict clean.

Used by scripts/audit_db_catalog.py in PackRat-ScrapyD#129 (next commit
on that PR drops the direct-DB approach in favor of this endpoint).
Previous fix pointed drizzle.config.ts at ../db/src/schema.ts (relative
path crossing the workspace boundary). Cleaner: add an in-package
re-export at src/db/schema.ts that re-exports from @packrat/db/schema,
and point drizzle.config back to ./src/db/schema.ts.

drizzle-kit + any other drizzle-aware tooling now stays scoped to
packages/api and is insulated from workspace layout changes. Schema
source of truth still lives in packages/db/src/schema.ts.
…d migration

Previous: three migrations (0048_etl_workflow_columns, 0049_etl_verification_cols,
0050_etl_etag_and_supersession) all generated by drizzle-kit but renamed
post-generation, with hand-edited journal tags to match. That made the
migrations look hand-authored and the rename+edit pattern is brittle.

Now: one migration with whatever name drizzle-kit emits — the additive
column changes (workflow_instance_id, total_embedding_failures, verified_at,
verified_row_count, source_etag, source_last_modified, superseded_by_job_id,
superseded_at + FK + indexes + check constraint) collapse cleanly into a
single migration. Net diff impact: ~4,600 fewer lines (3 snapshots → 1).

Updates CLAUDE.md with explicit migration discipline so this doesn't recur:
- always generate via drizzle-kit
- keep the random auto-generated name (do not rename)
- never hand-edit journal / snapshots / SQL
- collapse additive changes into one migration when they ship together
- verify with drizzle-kit check before pushing

Schema content is identical; verified via drizzle-kit check.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@andrew-bierman andrew-bierman marked this pull request as ready for review May 21, 2026 03:48
Copilot AI review requested due to automatic review settings May 21, 2026 03:48
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Migrates the catalog ETL execution model from a Queues-based pipeline toward a Cloudflare Workflows-driven pipeline, adding workflow bindings, chunk planning utilities, admin operator endpoints (retry/repair/reconcile/audit), and observability/retention hardening intended to address the ETL audit findings.

Changes:

  • Introduces a CatalogEtlWorkflow implementation plus a shared newline-aligned R2 CSV chunk planner (chunkCsvForR2) and wires a Workflows binding in wrangler.jsonc.
  • Expands etl_jobs schema + migrations to store workflow/debug provenance, verification fields, embedding-fallback counters, and supersession metadata; updates admin routes to use workflow-based retry/repair and adds reconcile + audit endpoints.
  • Adds structured logging + Sentry wiring, strengthens catalog item validation, and adds a scheduled retention sweep for invalid_item_logs with unit tests + runbook/docs.

Reviewed changes

Copilot reviewed 32 out of 33 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
packages/schemas/src/admin.ts Extends admin API schemas for reconcile/audit + workflow instance ID fields.
packages/db/src/schema.ts Adds Workflows-related columns/indexes/check constraints to etl_jobs.
packages/api/wrangler.jsonc Enables Workflows binding, cron trigger, Sentry-required compat flags, sourcemap upload.
packages/api/vitest.unit.config.ts Excludes workflow entrypoint from unit coverage (integration-only).
packages/api/test/db-schema-etl.test.ts Adds integration smoke test for new etl_jobs columns/index.
packages/api/src/workflows/shared/chunkCsvForR2.ts Implements newline-aligned byte-range chunk planning for R2 CSVs.
packages/api/src/workflows/shared/tests/chunk-csv-for-r2.test.ts Unit tests for chunk boundary correctness.
packages/api/src/workflows/catalog-etl-workflow.ts Adds the Cloudflare Workflow entrypoint and chunk processing orchestration.
packages/api/src/utils/logger.ts Adds JSON structured logger with best-effort Sentry forwarding.
packages/api/src/utils/env-validation.ts Adds ETL_WORKFLOW binding to validated env types.
packages/api/src/utils/tests/logger.test.ts Unit tests for structured logger output formatting.
packages/api/src/services/retention/invalidLogRetention.ts Adds batched retention sweep for invalid_item_logs.
packages/api/src/services/retention/tests/invalidLogRetention.test.ts Unit tests for retention sweep loop semantics.
packages/api/src/services/etl/processValidItemsBatch.ts Adds embedding-fallback counter increment + structured logs.
packages/api/src/services/etl/processLogsBatch.ts Switches to structured logging and rethrows on DB failures.
packages/api/src/services/etl/CatalogItemValidator.ts Hardens validation (scheme/SSRF/length/SKU constraints).
packages/api/src/services/etl/tests/CatalogItemValidator.test.ts Tests for the hardened validator behavior.
packages/api/src/routes/catalog/index.ts Producer defaults to Workflows engine; retains legacy queue path via ?engine=queue.
packages/api/src/routes/admin/analytics/catalog.ts Adds workflow-based retry/repair, reconcile endpoint, and catalog audit endpoint.
packages/api/src/index.ts Wraps handlers/workflow with Sentry; adds scheduled retention trigger.
packages/api/src/db/schema.ts Re-exports shared db schema for drizzle-kit scoping.
packages/api/src/test-stubs/cloudflare-workers.ts Extends test stub to include Workflows types/entrypoint base.
packages/api/package.json Adds @sentry/cloudflare dependency.
packages/api/drizzle/meta/0047_snapshot.json Updates drizzle snapshot for new schema.
packages/api/drizzle/meta/_journal.json Adds a new migration journal entry.
packages/api/drizzle/0047_clear_monster_badoon.sql Adds migration SQL for new etl_jobs columns/indexes/constraints.
packages/api/drizzle.config.ts Points drizzle-kit at in-package schema re-export.
docs/runbooks/etl-pipeline.md Adds operator runbook for workflow-based ETL operations.
docs/plans/2026-05-20-001-fix-etl-pipeline-workflows-migration-plan.md Adds/updates migration plan doc describing the Workflows pivot.
docs/audits/2026-05-16-etl-audit.md Adds ETL audit document referenced by the remediation work.
CLAUDE.md Adds documented drizzle migration discipline guidelines.
bun.lock Locks new Sentry dependency resolution.
Comments suppressed due to low confidence (1)

docs/runbooks/etl-pipeline.md:285

  • The runbook claims there is "No ETag fail-closed on retry", but the admin retry/repair implementation in this PR does enforce an ETag mismatch 409 unless ?force=true (when source_etag is present). Update this limitation to match the current behavior (and clarify the legacy-rows case where source_etag is NULL).
- **Reconcile endpoint is synchronous.** Very large source files
  (>200 MB) may exceed the fetch budget. Async-via-workflow path is a
  documented follow-up.
- **No ETag fail-closed on retry.** If the R2 source has been overwritten
  since the original ingest, retry silently re-ingests the new content.
  Operator-managed for now; ETag verification + a `?force=true` override
  is a follow-up PR.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

"idx": 47,
"version": "7",
"when": 1779334995277,
"tag": "0047_clear_monster_badoon",
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two files share the 0047_ filename prefix because our ETL migration was generated while 0047_cute_bloodscream.sql (weight-column fix) wasn't yet in this branch's snapshot chain. However, drizzle-kit uses the idx field in meta/_journal.json for ordering — not the filename prefix. The journal has these as idx 46 and idx 47, and bunx drizzle-kit check passes cleanly. Agreed it's cosmetically confusing; adding a follow-up to regenerate this migration as 0048 after a rebase onto main.

Comment on lines +1 to +5
ALTER TABLE "etl_jobs" ADD COLUMN "workflow_instance_id" text;--> statement-breakpoint
ALTER TABLE "etl_jobs" ADD COLUMN "total_embedding_failures" integer DEFAULT 0 NOT NULL;--> statement-breakpoint
ALTER TABLE "etl_jobs" ADD COLUMN "verified_at" timestamp;--> statement-breakpoint
ALTER TABLE "etl_jobs" ADD COLUMN "verified_row_count" integer;--> statement-breakpoint
ALTER TABLE "etl_jobs" ADD COLUMN "source_etag" text;--> statement-breakpoint
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same root cause as the adjacent journal comment — the ETL migration was generated off a snapshot state that predated 0047_cute_bloodscream.sql. drizzle-kit uses idx for ordering (not the filename prefix) and drizzle-kit check passes. A follow-up PR will regenerate after rebasing on main so the file gets the correct 0048_ prefix.

Comment thread packages/api/test/db-schema-etl.test.ts Outdated
return result;
}

describe('Migration 0048 — ETL workflow columns', () => {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the latest commit — describe label updated to 'Migration 0047 — ETL workflow columns'.

Comment on lines +104 to +116
const peeks = await Promise.all(
candidates.map(async ({ index, from, to }) => {
const obj = await r2.get(objectKey, { range: { offset: from, length: to - from } });
if (!obj) throw new Error(`R2 peek read returned null for ${objectKey} [${from},${to})`);
const text = await obj.text();
const lastNewlineIndex = text.lastIndexOf('\n');
if (lastNewlineIndex === -1) {
throw new ChunkBoundaryError(objectKey, { from, to });
}
// byteEnd is inclusive; it's the byte position of the newline itself,
// so the next chunk starts at that index + 1 (which begins the next row).
const byteEnd = from + lastNewlineIndex;
return { index, byteEnd };
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — switched to new TextEncoder().encode(text.slice(0, lastNewlineIndex)).byteLength which gives the true byte offset rather than the character index. This handles non-ASCII CSV content (e.g. accented product names like 'Blåhaj') where char index ≠ byte offset. This was the audit's P1 #4/#5 fix.

Comment on lines +207 to +264
// One step per chunk. Each step is memoized by name within the instance,
// so a chunk that succeeds is never re-run on a downstream step failure.
// Retries are bounded to 3 with exponential backoff for transient R2/DB
// failures; a chunk that exhausts retries marks the entire instance errored.
const chunkResults: ChunkResult[] = [];
for (const chunk of chunks) {
const result = await step.do(
`chunk-${chunk.chunkIndex}`,
{
retries: { limit: 3, delay: '30 seconds', backoff: 'exponential' },
timeout: '5 minutes',
},
async () => processChunk({ jobId, chunk, env: this.env }),
);
chunkResults.push(result);
}

const totals = chunkResults.reduce(
(acc, r) => ({
rowsProcessed: acc.rowsProcessed + r.rowsProcessed,
rowsValid: acc.rowsValid + r.rowsValid,
rowsInvalid: acc.rowsInvalid + r.rowsInvalid,
}),
{ rowsProcessed: 0, rowsValid: 0, rowsInvalid: 0 },
);

// Aggregate step writes the canonical totals — any over-counts from chunk
// retries (the inner processValidItemsBatch increments are non-idempotent
// on retry) get overridden here. This is the authoritative count.
if (chunks.length === 0) {
throw new Error(`Workflow ${jobId} received empty chunks array`);
}
await step.do('aggregate', async () => {
const db = createDbClient(this.env);
await db
.update(etlJobs)
.set({
totalProcessed: totals.rowsProcessed,
totalValid: totals.rowsValid,
totalInvalid: totals.rowsInvalid,
})
.where(eq(etlJobs.id, jobId));
});

await step.do('finalize', async () => {
const db = createDbClient(this.env);
await db
.update(etlJobs)
.set({ status: 'completed', completedAt: new Date() })
.where(eq(etlJobs.id, jobId));
});

return {
jobId,
rowsProcessed: totals.rowsProcessed,
rowsValid: totals.rowsValid,
rowsInvalid: totals.rowsInvalid,
};
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — the entire run() body is now wrapped in try/catch. The catch block does a best-effort UPDATE etl_jobs SET status='failed' before rethrowing, so step exhaustion no longer leaves the job row stuck as running.

Comment on lines 330 to 358
@@ -246,48 +336,36 @@ export const catalogRoutes = new Elysia({ prefix: '/catalog' })
filename,
scraperRevision,
startedAt: new Date(),
workflowInstanceId: instanceId,
sourceEtag: firstEtag,
sourceLastModified: firstLastModified,
});

// Split large files into 20 MB byte-range chunks so each Worker
// invocation stays within the CPU time budget (~30k rows / chunk).
const CHUNK_BYTES = 20 * 1024 * 1024;
const r2 = new R2BucketService({ env, bucketType: 'catalog' });
const queueChunks: Array<{ objectKey: string; byteStart?: number; byteEnd?: number }> = [];

for (const objectKey of chunks) {
const meta = await r2.head(objectKey);
if (!meta || meta.size <= CHUNK_BYTES) {
queueChunks.push({ objectKey });
} else {
const n = Math.ceil(meta.size / CHUNK_BYTES);
for (let i = 0; i < n; i++) {
queueChunks.push({
objectKey,
byteStart: i * CHUNK_BYTES,
byteEnd: Math.min((i + 1) * CHUNK_BYTES - 1, meta.size - 1),
});
}
}
}

await queueCatalogETL({
queue: env.ETL_QUEUE,
chunks: queueChunks,
const params: CatalogEtlWorkflowParams = {
jobId,
});
source,
scraperRevision,
chunks: indexedChunks,
};

await env.ETL_WORKFLOW.create({ id: instanceId, params });

return {
message: 'Catalog ETL job queued successfully',
message: 'Catalog ETL workflow triggered',
jobId,
queued: true,
engine: 'workflow' as const,
workflowInstanceId: instanceId,
};
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — env.ETL_WORKFLOW.create() is now wrapped in try/catch. If workflow creation throws, the code updates etl_jobs to status: 'failed' before rethrowing, so the row is never left orphaned as running.

Comment on lines +65 to +89
for (let i = 0; i < maxIterations; i++) {
iterations++;

const selectExpired = db
.select({ id: invalidItemLogs.id })
.from(invalidItemLogs)
.where(lt(invalidItemLogs.createdAt, cutoff))
.limit(batchSize);

const removed = await db
.delete(invalidItemLogs)
.where(inArray(invalidItemLogs.id, selectExpired))
.returning();

const rowCount = removed.length;
deleted += rowCount;
if (rowCount === 0) break;
}

return {
deleted,
iterations,
capped: iterations >= maxIterations,
retentionDays,
};
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — rowCount is now declared outside the loop and updated each iteration. capped: rowCount > 0 is correct: if the loop exits via break (last batch returned 0 rows), rowCount is 0 and capped is false; if it exits because i >= maxIterations with rows still remaining, rowCount holds the last non-zero batch count and capped is true.

Comment on lines +167 to +174
// Hostnames with non-ASCII characters are IDN homograph candidates.
// Native URL parsing already encodes them to punycode in parsed.hostname,
// so non-ASCII presence here means the hostname survived encoding (rare)
// OR the URL was malformed in a way `new URL()` accepted. Either way,
// reject as a defense-in-depth measure for catalog-rendered links.
for (const ch of parsed.hostname) {
if (ch.charCodeAt(0) > 127) {
return 'Product URL hostname contains non-ASCII characters (IDN homograph guard)';
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid observation — URL constructor does normalize IDN to punycode so parsed.hostname will be ASCII-only in practice. The guard is intentional defense-in-depth for any edge case where a future runtime or non-standard URL handling doesn't normalize. Keeping it since it has no performance cost on the happy path and provides an explicit error message if ever triggered.

Comment thread docs/runbooks/etl-pipeline.md Outdated
Comment on lines +125 to +128
row reflects the retry. There is no automatic supersession link yet — when
the repair-from-scratch endpoint lands (follow-up PR), it will add
`superseded_by_job_id` to make the link explicit. For now operators
correlate by `(source, filename)` and timestamp.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a prior commit — the runbook now documents the supersession operation (?mode=repair) under the 'Repair and retry' section, including the supersededByJobId field semantics and the ETag enforcement behavior.

Comment thread packages/db/src/schema.ts
Comment on lines +495 to +502
// Audit trail for repair-from-scratch / retry. supersededByJobId
// points at the ORIGINAL job (the new repair-job row carries the
// pointer); supersededAt is the time of supersession. CHECK
// constraint prevents self-reference.
supersededByJobId: text('superseded_by_job_id').references((): AnyPgColumn => etlJobs.id, {
onDelete: 'set null',
}),
supersededAt: timestamp('superseded_at'),
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid naming concern — supersededByJobId on the NEW repair job points to the ORIGINAL job, so the semantically clearer name would be supersedesJobId ('this job supersedes that job'). The current name reads as 'this job was superseded by X', which is backwards. The logic and FK are correct; only the name is ambiguous. Adding a follow-up migration to rename the column (requires drizzle-kit delete + regenerate).

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
packages/api/src/routes/catalog/index.ts (1)

239-352: 🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift

Move ETL orchestration out of the route handler into src/services/.

Chunk planning, R2 metadata reads, workflow parameter construction, and engine-specific dispatch are business
logic and should live in a service. Keep the route focused on validation/auth + delegating to a service method.

As per coding guidelines: "Validation schemas live in src/schemas/ (Zod). Business logic belongs in src/services/, not in route handlers."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/api/src/routes/catalog/index.ts` around lines 239 - 352, Extract the
ETL orchestration (R2 reads via R2BucketService, chunkCsvForR2 calls, chunk
indexing, job record insert into etlJobs, queueCatalogETL or
env.ETL_WORKFLOW.create dispatch) out of the route handler into a new service
function (e.g., orchestrateCatalogETL) in src/services/, and have the route
handler simply call that service after validation/auth; keep createDb(),
getEnv(), jobId generation, and dispatch logic inside the service, reference
chunkCsvForR2, R2BucketService, queueCatalogETL, and env.ETL_WORKFLOW.create
when moving logic, and update exports/tests to call the new service instead of
exercising the route internals.
packages/api/src/services/etl/processValidItemsBatch.ts (1)

28-81: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Scope embedding fallback to embedding failures only.

Line 55 catches all failures from Line 28 onward, so DB/progress errors are mislabeled as embedding degradation and
still increment totalEmbeddingFailures. That corrupts ETL failure telemetry and may double-run writes.

Proposed fix
-  try {
-    // Generate embeddings in batch
-    const embeddings = await generateManyEmbeddings({...});
-    const itemsWithEmbeddings = mergedItems.map((item, index) => ({
-      ...item,
-      embedding: embeddings[index],
-    }));
-
-    const upsertedItems = await catalogService.upsertCatalogItems(itemsWithEmbeddings);
-    await catalogService.trackEtlJob(upsertedItems, jobId);
-    await updateEtlJobProgress(env, { jobId, valid: items.length, processed: items.length });
-  } catch (error) {
+  let itemsToUpsert = mergedItems;
+  try {
+    const embeddings = await generateManyEmbeddings({...});
+    itemsToUpsert = mergedItems.map((item, index) => ({
+      ...item,
+      embedding: embeddings[index],
+    }));
+  } catch (error) {
     logger.warn('etl.embedding.fallback', { ... });
+    const db = createDbClient(env);
+    await db
+      .update(etlJobs)
+      .set({
+        totalEmbeddingFailures: sql`COALESCE(${etlJobs.totalEmbeddingFailures}, 0) + ${items.length}`,
+      })
+      .where(eq(etlJobs.id, jobId));
+  }
 
-    const upsertedItems = await catalogService.upsertCatalogItems(mergedItems);
-    await catalogService.trackEtlJob(upsertedItems, jobId);
-    await updateEtlJobProgress(env, { jobId, valid: items.length, processed: items.length });
-
-    const db = createDbClient(env);
-    await db.update(etlJobs)...
-  } finally {
+  try {
+    const upsertedItems = await catalogService.upsertCatalogItems(itemsToUpsert);
+    await catalogService.trackEtlJob(upsertedItems, jobId);
+    await updateEtlJobProgress(env, { jobId, valid: items.length, processed: items.length });
+  } finally {
     logger.info('etl.valid_items.batch_complete', { jobId, count: items.length });
   }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/api/src/services/etl/processValidItemsBatch.ts` around lines 28 -
81, The current try/catch around generateManyEmbeddings plus the subsequent
DB/upsert/progress calls causes any downstream failure (e.g.,
catalogService.upsertCatalogItems or updateEtlJobProgress) to be treated as an
embedding failure; scope the embedding-fallback to embedding generation only by
extracting the call to generateManyEmbeddings (and the creation of
itemsWithEmbeddings) into its own try/catch where you handle the fallback: on
embedding error run the logger.warn('etl.embedding.fallback', ...), set a flag
or leave embeddings undefined, and increment totalEmbeddingFailures only in that
embedding-catch (use createDbClient/update on etlJobs there); then allow the
outer flow to continue to call catalogService.upsertCatalogItems (using
mergedItems if embeddings missing, or itemsWithEmbeddings if present),
catalogService.trackEtlJob, and updateEtlJobProgress in a separate try block so
non-embedding errors bubble or are handled correctly (do not increment
totalEmbeddingFailures for those). Use the same symbols generateManyEmbeddings,
itemsWithEmbeddings, mergedItems, catalogService.upsertCatalogItems,
catalogService.trackEtlJob, updateEtlJobProgress, createDbClient and etlJobs to
locate and implement this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/plans/2026-05-19-001-fix-etl-pipeline-audit-remediation-plan.md`:
- Around line 305-309: Add an additional verification step to the migration
checklist: after running `bun run --cwd packages/api db:migrate` and
generating/applying migrations, run `bunx drizzle-kit check` from the
packages/api/ directory to validate Drizzle snapshot chain consistency; update
the Verification section to include this command (run from packages/api/)
alongside `bun lint:custom` and `bun test:api:unit` so snapshot chain issues are
caught before merging.

In `@docs/plans/2026-05-20-001-fix-etl-pipeline-workflows-migration-plan.md`:
- Around line 333-336: Update the verification steps to run drizzle snapshot
consistency checks: after the `db:generate`/`db:migrate` step invoked via `bun
run --cwd packages/api db:migrate` add a step to run `bunx drizzle-kit check`
from `packages/api/` so snapshot-chain drift is detected; ensure the U2
verification list includes this new `bunx drizzle-kit check` invocation
alongside `bun lint:custom` and `bun test:api:unit` so the documented acceptance
criteria validate snapshot chain consistency.
- Around line 93-95: The plan text contradicts itself about superseded columns;
update the Key Technical Decisions and related U2/U5 text to consistently
include the superseded columns: ensure the etl_jobs schema explicitly lists
superseded_by_job_id (FK to etl_jobs.id ON DELETE SET NULL, CHECK to prevent
self-reference) and superseded_at timestamp (or remove both mentions entirely if
you intend no superseded_* columns), and make the Repair-from-scratch paragraph
(and U2/U5 migration descriptions) match that single source of truth so
implementers and migrations reference the same etl_jobs shape.
- Around line 95-96: The doc currently implies the U1 SQL migration will call
r2.head() to backfill source_etag/source_last_modified on etl_jobs; reword to
state that the U1 migration is DB-only and that backfilling the seven legacy
rows is a separate post-migration operational/app step: describe an explicit
operation that runs outside SQL which calls r2.head() for each legacy job and
updates the etl_jobs.source_etag and etl_jobs.source_last_modified columns (and
that it should be idempotent and run once), removing any wording that suggests
the SQL migration performs network calls.

In `@docs/runbooks/etl-pipeline.md`:
- Around line 239-241: Update the runbook wording and procedure to stop implying
`queues consumer remove` drains messages: instruct to first pause delivery with
`bunx wrangler queues pause-delivery packrat-etl-queue`, monitor backlog/queue
depth and wait for in-flight/leased messages to complete/ack, then run `bunx
wrangler queues consumer remove packrat-etl-queue packrat-api` to detach the
consumer; also add a note that deleting stored backlog is irreversible and uses
`wrangler queues purge` if that is the intended action.

In `@packages/api/src/routes/admin/analytics/catalog.ts`:
- Around line 657-687: The for-await loop over parser may throw and skip
awaiting writerPromise, leaving the background reader/parser to reject
unhandled; wrap the for-await loop in a try/finally (or try/catch/finally) and
always await writerPromise in the finally block (rethrow any error after
awaiting) so the async reader task (created as writerPromise) is guaranteed to
finish or be cleaned up; reference writerPromise, parser, obj.body.getReader(),
and the for await (const _record of parser) loop when locating where to add the
finally/await.

In `@packages/api/src/routes/catalog/index.ts`:
- Around line 251-258: The ETL job row is being inserted with status 'running'
before dispatch, which can leave orphaned running jobs if dispatch
(queueCatalogETL or env.ETL_WORKFLOW.create) throws; update the logic in the
block that calls db.insert(etlJobs).values({... status: 'running' ...}) so that
you either (A) perform the dispatch first and only insert after a successful
enqueue, or (B) wrap the dispatch in try/catch and on error update the inserted
row to status='failed' (include error details in failure fields) — apply this
change to the insert/dispatch sites referencing db.insert(etlJobs).values,
queueCatalogETL, env.ETL_WORKFLOW.create and jobId in this file (and the other
similar insert/dispatch occurrences in the same module).

In `@packages/api/src/services/retention/invalidLogRetention.ts`:
- Around line 74-79: The deletion currently calls returning() which pulls full
deleted rows (including heavy columns) into removed just to compute rowCount;
change the delete to only return a lightweight identifier (e.g.,
returning(invalidItemLogs.id) or returning('id')) or use a DB-counting API so
you don't fetch full rows — update the call around invalidItemLogs,
selectExpired and the removed variable so rowCount = removed.length still works
but only lightweight data is returned.
- Around line 65-88: The loop can set capped true even when the final iteration
deleted 0 rows; fix by tracking the last deletion count and only mark capped
true if we actually hit the iteration limit and the last deletion removed rows.
In invalidLogRetention.ts, add a variable (e.g., lastDeleted or lastRowCount)
initialized to 0, update it inside the loop to the current rowCount (from the
removed.length), and compute capped as (iterations >= maxIterations &&
lastDeleted > 0) instead of just iterations >= maxIterations.

In `@packages/api/src/utils/logger.ts`:
- Around line 72-110: The emit function can throw when calling
JSON.stringify(line) (e.g., circular refs or bigint in ctx); wrap the
serialization in a safe try/catch and fall back to a non-throwing representation
(for example use a replacer that converts bigints to strings and detects cycles,
or use util.inspect as a fallback) so that emit never throws; ensure you still
print a consolized string (and include minimal context like event and level) and
preserve the existing try/catch around forwardToSentry; update emit and the JSON
serialization logic (referencing emit, JSON.stringify(line), and
forwardToSentry) to guarantee logging never crashes the caller.

In `@packages/api/src/workflows/shared/chunkCsvForR2.ts`:
- Around line 73-85: The early-return branch in chunkCsvForR2 emits byteEnd: -1
when size === 0; add an explicit check for size === 0 before building ranges and
return a single chunk with byteStart: 0 and byteEnd: -1 avoided (use byteEnd: 0
or null/omit depending on consumers) or otherwise return chunks: []/a sentinel
that downstream readers expect; update the function (chunkCsvForR2) to handle
the empty-object case explicitly so downstream chunk reads are not given an
invalid range.
- Around line 101-118: The current implementation in chunkCsvForR2 uses
Promise.all(candidates.map(...)) which fires all r2.get range reads at once
despite the comment capping concurrency at 16; replace this with a
bounded-concurrency approach (e.g., use p-limit with concurrency=16 or process
candidates in batches) so only up to 16 concurrent r2.get calls run; keep the
inner logic (throw ChunkBoundaryError on missing newline, compute byteEnd) the
same and return the same shape ({ index, byteEnd }) so peeks remains compatible.
Ensure you reference and wrap the same r2.get(objectKey, { range: ... }) call
and preserve error behavior for null obj and lastNewlineIndex === -1.
- Around line 108-116: chunkCsvForR2 is computing byteEnd by adding a JS string
index to a byte offset (from), and it also leaves concurrency unbounded and
mishandles empty objects; fix it by: (1) replacing text.lastIndexOf('\n') with a
UTF‑8 byte-aware search — encode the fetched text with TextEncoder and find the
last 0x0A byte index, then set byteEnd = from + lastNewlineByteIndex; (2) ensure
if no newline byte is found you throw ChunkBoundaryError(objectKey, { from, to
}) as before; (3) handle size === 0 explicitly (either throw a clear error or
short-circuit so byteEnd/byteStart aren’t computed as negative) to avoid
constructing invalid ranges; and (4) bound concurrency when resolving candidates
in chunkCsvForR2 (e.g., process candidates in batches of 16 or use a p-limit
style limiter) instead of Promise.all(candidates.map(...)) to match the claimed
cap and avoid R2 throttling.

In `@packages/api/test/db-schema-etl.test.ts`:
- Around line 1-4: This test (db-schema-etl.test.ts) is environment-coupled by
connecting to localhost:5434; change it to be deterministic by replacing real DB
metadata queries with mocks/stubs (e.g., mock the Postgres client or the helper
that introspects etl_jobs columns) or move the test into an integration-only
suite (e.g., tests/integration/) and gate it behind an explicit environment flag
so it doesn't run with unit tests; update the test's setup to inject a mocked
client (or use jest.mock for the module that executes the queries) and assert
against the mocked responses, or alternatively relocate the file and add an
integration-run guard to the test runner.

In `@packages/api/vitest.unit.config.ts`:
- Around line 61-64: The vitest config currently excludes
catalog-etl-workflow.ts which leaves the default ETL engine untested; restore
coverage by adding deterministic tests for the CatalogEtlWorkflow (unit tests
that mock the Cloud Workflows runtime) or add an integration test that wires a
test Postgres and workflow runtime, and then remove the exclusion in
vitest.unit.config.ts so the new tests run; reference the
CatalogEtlWorkflow/export in src/workflows/catalog-etl-workflow.ts to drive
behavior in tests and mock any workflow client/runner used by functions inside
that module (e.g., the workflow invocation helper) to assert ingest path logic
deterministically.

---

Outside diff comments:
In `@packages/api/src/routes/catalog/index.ts`:
- Around line 239-352: Extract the ETL orchestration (R2 reads via
R2BucketService, chunkCsvForR2 calls, chunk indexing, job record insert into
etlJobs, queueCatalogETL or env.ETL_WORKFLOW.create dispatch) out of the route
handler into a new service function (e.g., orchestrateCatalogETL) in
src/services/, and have the route handler simply call that service after
validation/auth; keep createDb(), getEnv(), jobId generation, and dispatch logic
inside the service, reference chunkCsvForR2, R2BucketService, queueCatalogETL,
and env.ETL_WORKFLOW.create when moving logic, and update exports/tests to call
the new service instead of exercising the route internals.

In `@packages/api/src/services/etl/processValidItemsBatch.ts`:
- Around line 28-81: The current try/catch around generateManyEmbeddings plus
the subsequent DB/upsert/progress calls causes any downstream failure (e.g.,
catalogService.upsertCatalogItems or updateEtlJobProgress) to be treated as an
embedding failure; scope the embedding-fallback to embedding generation only by
extracting the call to generateManyEmbeddings (and the creation of
itemsWithEmbeddings) into its own try/catch where you handle the fallback: on
embedding error run the logger.warn('etl.embedding.fallback', ...), set a flag
or leave embeddings undefined, and increment totalEmbeddingFailures only in that
embedding-catch (use createDbClient/update on etlJobs there); then allow the
outer flow to continue to call catalogService.upsertCatalogItems (using
mergedItems if embeddings missing, or itemsWithEmbeddings if present),
catalogService.trackEtlJob, and updateEtlJobProgress in a separate try block so
non-embedding errors bubble or are handled correctly (do not increment
totalEmbeddingFailures for those). Use the same symbols generateManyEmbeddings,
itemsWithEmbeddings, mergedItems, catalogService.upsertCatalogItems,
catalogService.trackEtlJob, updateEtlJobProgress, createDbClient and etlJobs to
locate and implement this change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 76a19be3-2a86-4f4e-92cf-d8e397faa86d

📥 Commits

Reviewing files that changed from the base of the PR and between 12be343 and 5187b6d.

⛔ Files ignored due to path filters (1)
  • bun.lock is excluded by !**/*.lock, !bun.lock
📒 Files selected for processing (32)
  • CLAUDE.md
  • docs/audits/2026-05-16-etl-audit.md
  • docs/plans/2026-05-19-001-fix-etl-pipeline-audit-remediation-plan.md
  • docs/plans/2026-05-20-001-fix-etl-pipeline-workflows-migration-plan.md
  • docs/runbooks/etl-pipeline.md
  • packages/api/drizzle.config.ts
  • packages/api/drizzle/0047_clear_monster_badoon.sql
  • packages/api/drizzle/meta/0047_snapshot.json
  • packages/api/drizzle/meta/_journal.json
  • packages/api/package.json
  • packages/api/src/__test-stubs__/cloudflare-workers.ts
  • packages/api/src/db/schema.ts
  • packages/api/src/index.ts
  • packages/api/src/routes/admin/analytics/catalog.ts
  • packages/api/src/routes/catalog/index.ts
  • packages/api/src/services/etl/CatalogItemValidator.ts
  • packages/api/src/services/etl/__tests__/CatalogItemValidator.test.ts
  • packages/api/src/services/etl/processLogsBatch.ts
  • packages/api/src/services/etl/processValidItemsBatch.ts
  • packages/api/src/services/retention/__tests__/invalidLogRetention.test.ts
  • packages/api/src/services/retention/invalidLogRetention.ts
  • packages/api/src/utils/__tests__/logger.test.ts
  • packages/api/src/utils/env-validation.ts
  • packages/api/src/utils/logger.ts
  • packages/api/src/workflows/catalog-etl-workflow.ts
  • packages/api/src/workflows/shared/__tests__/chunk-csv-for-r2.test.ts
  • packages/api/src/workflows/shared/chunkCsvForR2.ts
  • packages/api/test/db-schema-etl.test.ts
  • packages/api/vitest.unit.config.ts
  • packages/api/wrangler.jsonc
  • packages/db/src/schema.ts
  • packages/schemas/src/admin.ts

Comment thread docs/plans/2026-05-20-001-fix-etl-pipeline-workflows-migration-plan.md Outdated
Comment thread docs/plans/2026-05-20-001-fix-etl-pipeline-workflows-migration-plan.md Outdated
Comment thread docs/runbooks/etl-pipeline.md Outdated
Comment thread packages/api/src/workflows/shared/chunkCsvForR2.ts
Comment thread packages/api/src/workflows/shared/chunkCsvForR2.ts Outdated
Comment thread packages/api/src/workflows/shared/chunkCsvForR2.ts Outdated
Comment on lines +1 to +4
// Schema smoke test for the ETL Workflows columns on etl_jobs. Runs against
// the Docker Postgres wsproxy at localhost:5434 (docker-compose.test.yml).
// If the proxy is down the queries throw — intentional; the test would not
// silently skip schema drift.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

This test is environment-coupled and nondeterministic.

Hard dependency on a live localhost:5434 proxy means failures can come from environment state, not code
regression. Mock DB metadata queries for unit determinism, or move this into a clearly isolated integration
suite that does not run with deterministic unit checks.

As per coding guidelines: "Tests must be deterministic — mock all external services and clocks."

Also applies to: 20-39

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/api/test/db-schema-etl.test.ts` around lines 1 - 4, This test
(db-schema-etl.test.ts) is environment-coupled by connecting to localhost:5434;
change it to be deterministic by replacing real DB metadata queries with
mocks/stubs (e.g., mock the Postgres client or the helper that introspects
etl_jobs columns) or move the test into an integration-only suite (e.g.,
tests/integration/) and gate it behind an explicit environment flag so it
doesn't run with unit tests; update the test's setup to inject a mocked client
(or use jest.mock for the module that executes the queries) and assert against
the mocked responses, or alternatively relocate the file and add an
integration-run guard to the test runner.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — the test is intentionally integration-style; it requires a running Postgres (Docker wsproxy at localhost:5434 via docker-compose.test.yml). Mocking the DB would let the test pass while missing actual column drift, which defeats the purpose. The test lives in /test (not src/) and is excluded from bun test:api:unit to signal it requires external infrastructure.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrew-bierman, that's a valid distinction — schema smoke tests need a real schema to be meaningful; mocking the introspection queries would confirm the test logic but not the migration. The separation (integration tests in /test, excluded from unit runs) is a reasonable guard.

One small suggestion: if it isn't already there, a comment or README in /test clarifying the infra prerequisite (Docker wsproxy at localhost:5434) and how to spin it up would help contributors avoid confusing CI failures. Would you like me to resolve this review comment?


✏️ Learnings added
Learnt from: andrew-bierman
Repo: PackRat-AI/PackRat PR: 2462
File: packages/api/test/db-schema-etl.test.ts:1-4
Timestamp: 2026-05-21T04:50:02.156Z
Learning: In the PackRat repo (`packages/api`), schema smoke tests in `packages/api/test/` (e.g., `db-schema-etl.test.ts`) are intentionally integration-style and require a live Postgres connection via Docker wsproxy at `localhost:5434` (`docker-compose.test.yml`). They are explicitly excluded from `bun test:api:unit` and should NOT be flagged for lacking DB mocks — mocking would defeat the purpose of detecting actual schema/column drift.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Comment on lines +61 to +64
// CatalogEtlWorkflow needs the CF Workflows runtime for end-to-end
// execution; covered by integration tests in /test once Docker Postgres
// is wired. Sibling chunker (src/workflows/shared/) IS unit-tested.
'src/workflows/catalog-etl-workflow.ts',
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Default ETL engine is excluded from current automated unit coverage.

Since /catalog/etl now defaults to workflow execution, excluding catalog-etl-workflow.ts without active
replacement tests leaves the primary ingest path under-verified in this PR. Add deterministic tests now (unit
with mocked workflow runtime, or integration in the same change set).

As per coding guidelines: "Every new service or route handler should have corresponding tests."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/api/vitest.unit.config.ts` around lines 61 - 64, The vitest config
currently excludes catalog-etl-workflow.ts which leaves the default ETL engine
untested; restore coverage by adding deterministic tests for the
CatalogEtlWorkflow (unit tests that mock the Cloud Workflows runtime) or add an
integration test that wires a test Postgres and workflow runtime, and then
remove the exclusion in vitest.unit.config.ts so the new tests run; reference
the CatalogEtlWorkflow/export in src/workflows/catalog-etl-workflow.ts to drive
behavior in tests and mock any workflow client/runner used by functions inside
that module (e.g., the workflow invocation helper) to assert ingest path logic
deterministically.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — src/services/etl/** is excluded from unit coverage because the ETL engine code requires the full CF Workers + Postgres runtime. The comment in the config explains this. The integration tests in /test cover this path. The sibling chunker (src/workflows/shared/) IS unit-tested and has explicit coverage — that split is intentional.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrew-bierman, understood — the split is clear and well-reasoned. CF Workflows runtime behavior can't be reliably shimmed in a Node.js unit environment, integration tests are the right home for end-to-end workflow execution, and the chunker being pure logic makes it the right boundary for unit coverage. The config comment documents this adequately.


✏️ Learnings added
Learnt from: andrew-bierman
Repo: PackRat-AI/PackRat PR: 2462
File: packages/api/vitest.unit.config.ts:61-64
Timestamp: 2026-05-21T04:50:13.224Z
Learning: In `packages/api/vitest.unit.config.ts`, `src/workflows/catalog-etl-workflow.ts` and `src/services/etl/**` are intentionally excluded from unit coverage because `CatalogEtlWorkflow` requires the full Cloudflare Workers + Postgres runtime and cannot be reliably unit-tested in Node.js. End-to-end coverage is provided by integration tests in `/test`. The sibling chunker at `src/workflows/shared/` contains pure logic and IS unit-tested — this boundary split is intentional.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

andrew-bierman and others added 4 commits May 20, 2026 22:21
- Mark etl_jobs as failed when ETL_WORKFLOW.create() throws, preventing
  perpetually-running orphaned rows
- Wrap workflow run() in try/catch to update job status to failed on
  step exhaustion (runtime marks instance errored but DB row was stuck)
- Always await writerPromise in reconcile endpoint via try/finally so
  the promise cannot reject unhandled if the csv-parse loop throws
- Use byte scan (0x0A) instead of text.lastIndexOf for chunk boundaries
  in chunkCsvForR2 — char index != byte offset for non-ASCII CSV content
- Fix capped false positive in invalidLogRetention: capped only when last
  batch had rows remaining, not when loop exited via exhaustion
- Use returning({ id }) in retention sweep to avoid fetching full rows
- Guard JSON.stringify in logger emit against circular refs / BigInt

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plan doc: remove superseded_* from the "not in this plan" exclusion list
(those columns are included in the migration SQL), enumerate all 8 new
columns in the correct bullet, and rephrase the source_etag backfill from
"at migration time" (impossible in SQL) to a post-migration operational step.

Runbook: update the retry section to reflect that superseded_by_job_id is
set on every new retry/repair row (not a follow-up PR), and update the
accepted-limitations entry to accurately describe the ETag fail-closed
behavior that repair-from-scratch already implements.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- chunkCsvForR2: use TextEncoder to convert char index → byte offset so
  non-ASCII product names don't produce mis-aligned chunk boundaries;
  arrayBuffer() approach broke the R2 mock in unit tests
- invalidLogRetention: revert .returning({id}) partial select — drizzle
  delete+subquery chain typing rejects the arg in this version; full
  .returning() still gives correct capped logic via rowCount sentinel

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- chunkCsvForR2: throw for empty R2 objects (size=0) instead of
  returning byteEnd=-1 which is an invalid range
- chunkCsvForR2: implement real concurrency cap at 16 parallel peek
  reads (was comment-only; now batched Promise.all loops)
- chunk-csv-for-r2.test: add empty-file error test
- db-schema-etl.test: fix describe label "Migration 0048" → "0047"
- plan doc: add drizzle-kit check to migration verification checklist
- runbook: warn that wrangler queues consumer remove does not drain
  in-flight messages — wait for queue depth 0 first

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@andrew-bierman andrew-bierman merged commit 5b031e1 into main May 21, 2026
12 checks passed
@andrew-bierman andrew-bierman deleted the fix/etl-pipeline-workflows-migration branch May 21, 2026 05:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api database dependencies Pull requests that update a dependency file documentation Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants