fix(etl): raise cpu_ms limit to 400k (CF max) for large-file queue processing#2419
Conversation
…ocessing 50k-row files exhaust the 300k CPU budget (~6ms CPU/row for 42-field CSV). This extends the per-invocation limit to ~66k rows. Chunking in ScrapyD is the long-term fix for files beyond that threshold.
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (1)
WalkthroughThis PR implements large-file chunking for catalog ETL processing. R2 objects larger than ~20MB are now split into byte-range chunks at the queue endpoint, each chunk is processed independently with header injection and boundary-row skipping in the worker, and parser backpressure is enforced to prevent out-of-memory conditions. The CPU time limit is increased to 400 seconds to support this larger scope. ChangesLarge-file catalog ETL chunking
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR updates the Cloudflare Worker execution limits for the API’s queue processing so large CSV ETL jobs have more CPU time per invocation and are less likely to be terminated mid-file.
Changes:
- Increased Cloudflare Worker
limits.cpu_msfrom 300,000ms (5 minutes) to 400,000ms (Cloudflare maximum).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… time Instead of streaming the full file (up to 600 MB) in one Worker invocation, the notify endpoint now HEADs each object and splits files >20 MB into sequential byte-range messages. Each Worker invocation fetches only its slice via a Range request (~30k rows) — well within the 400k CPU budget. Non-first chunks fetch the header row via a separate 4 KB range request and skip the partial row at the chunk boundary. No ScrapyD changes needed.
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
packages/api/src/services/etl/processCatalogEtl.ts (2)
205-209:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftDon't mark a chunked job completed from each chunk worker.
After this PR, one ETL job can fan out into multiple byte-range messages. The first chunk that reaches Lines 205-209 flips the whole job to
completedeven if sibling chunks are still running, so admin history can show a finished job with incomplete totals. Completion needs to move behind a chunk-level completion counter or an orchestrator step that runs after the last chunk.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/api/src/services/etl/processCatalogEtl.ts` around lines 205 - 209, The current chunk worker code in processCatalogEtl.ts flips the entire ETL job to status 'completed' (the db.update on etlJobs using jobId) from within each chunk handler, which prematurely completes the job; instead, stop marking the job completed here and implement a chunk-level completion mechanism: have the chunk worker mark its chunk record as finished and atomically increment a completedChunks counter (or insert/update a chunk status table) in the same transaction, then check totalChunks vs completedChunks and only when they match perform the etlJobs.update to set status: 'completed' and completedAt using jobId; alternatively, publish a "chunk-finished" message to an orchestrator service which performs the final completion update after verifying all chunks (reference symbols: etlJobs, jobId, the try block in processCatalogEtl.ts).
89-117:⚠️ Potential issue | 🟠 Major | ⚡ Quick winAwait the stream pump to prevent consumer stall on read failures.
The async pump (lines 89–117) is fire-and-forget. If the R2 stream fails mid-read,
parser.end()never executes, and the consumer loop (for await (const record of parser)) hangs indefinitely waiting for more data—eventually timing out the Worker instead of failing cleanly. Capture the pump promise, await it after the consumer loop, and tear down the parser on error.Proposed direction
- (async () => { + const pump = (async () => { if (injectedHeader) { parser.write(`${injectedHeader}\n`); } let skipPartialRow = byteStart !== undefined && byteStart > 0; for await (const chunk of streamToText(r2Object.body)) { let text = chunk; // ... const ok = parser.write(text); if (!ok) await new Promise<void>((resolve) => parser.once('drain', resolve)); } parser.end(); - })(); + })(); - for await (const record of parser) { - // ... - } + try { + for await (const record of parser) { + // ... + } + await pump; + } catch (error) { + parser.destroy(error as Error); + await pump.catch(() => undefined); + throw error; + }This also aligns with the guideline to use async/await everywhere in the API package.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/api/src/services/etl/processCatalogEtl.ts` around lines 89 - 117, The stream pump launched as a fire-and-forget IIFE must be captured and awaited so read failures don't leave `parser` hanging; change the IIFE to assign its promise (e.g., const pumpPromise = (async () => { ... })()), wrap its body in try/catch that calls `parser.end()` (or `parser.destroy()` if available) on error and rethrows, and after the consumer loop (for await (const record of parser)) await pumpPromise so any pump errors propagate and the parser is torn down cleanly; reference `parser`, `streamToText(r2Object.body)`, `injectedHeader`, and `byteStart` when locating the pump logic to update.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/api/src/routes/admin/analytics/catalog.ts`:
- Line 435: The retry path is requeuing the entire object as a single chunk by
calling queueCatalogETL({ ..., chunks: [{ objectKey }], ... }); instead, call
the same chunk-planning service used by the main /catalog/etl route to split the
object into chunks before calling queueCatalogETL; locate the chunk-planning
helper used by the /catalog/etl handler (the function that computes/splits
chunks for an object) and replace the hard-coded [{ objectKey }] with its
returned chunk list, then pass that chunk list into queueCatalogETL along with
the same env.ETL_QUEUE and newJobId.
In `@packages/api/src/routes/catalog/index.ts`:
- Around line 187-189: The code currently treats a null result from
r2.head(objectKey) as a valid whole-file job; change the logic in the handler
that calls r2.head so that if meta is null you fail fast instead of pushing to
queueChunks: explicitly check if meta === null (or falsy but distinguish null vs
size) and return/throw an error (or mark the job as invalid) with a clear
message referencing objectKey so the caller/workflow does not create a running
job for a missing object; keep the existing branch that uses meta.size <=
CHUNK_BYTES to enqueue whole-file jobs only when meta is present.
- Around line 180-205: Extract the chunk planning logic (CHUNK_BYTES constant,
R2BucketService usage, loop building queueChunks) into a new ETL service
function (e.g., planCatalogChunks or buildChunkRanges) under the API services
namespace; the new function should accept (env, chunks) and return the array of
{ objectKey, byteStart?, byteEnd? } entries after performing r2.head and the
byte-range math, leaving the route handler to only call that service and then
call queueCatalogETL with its result. Ensure the route no longer instantiates
R2BucketService or contains the loop — instead import and await the new
planCatalogChunks(env, chunks) function, preserve existing semantics
(CHUNK_BYTES, Math.ceil, byteEnd calculation), and surface any errors from the
service so the handler can handle validation/persistence/dispatch only.
In `@packages/api/src/services/etl/processCatalogEtl.ts`:
- Around line 95-105: skipPartialRow currently treats every non-zero byteStart
as mid-row and can drop a full row that starts exactly at byteStart; update the
logic so we only discard a leading fragment when the chunk truly begins mid-row.
Either propagate a startsMidRow flag from the splitter into the consumer
(preferred) or, before setting skipPartialRow in processCatalogEtl,
fetch/inspect the single byte immediately before byteStart (byteStart-1) from
the R2 object and set skipPartialRow = byteStart > 0 && previousByte !== '\n'.
Apply this change around the streamToText(r2Object.body) loop where
skipPartialRow is used so the code only slices off a partial row when an actual
row fragment precedes the chunk.
In `@packages/api/src/services/etl/types.ts`:
- Around line 6-16: Define and export a single shared chunk union type (e.g.,
CatalogChunk = { objectKey: string } | { objectKey: string; byteStart: number;
byteEnd: number }) and replace the ad-hoc inline chunk shapes with that type:
update the existing `data` shape and the `QueueCatalogETLParams.chunks`
signature to use `CatalogChunk[]`, ensuring ranged chunks require both
`byteStart` and `byteEnd` (no optionals) while whole-object chunks have only
`objectKey`; export the new `CatalogChunk` type so route and queue layers can
import and reuse it to prevent contract drift.
---
Outside diff comments:
In `@packages/api/src/services/etl/processCatalogEtl.ts`:
- Around line 205-209: The current chunk worker code in processCatalogEtl.ts
flips the entire ETL job to status 'completed' (the db.update on etlJobs using
jobId) from within each chunk handler, which prematurely completes the job;
instead, stop marking the job completed here and implement a chunk-level
completion mechanism: have the chunk worker mark its chunk record as finished
and atomically increment a completedChunks counter (or insert/update a chunk
status table) in the same transaction, then check totalChunks vs completedChunks
and only when they match perform the etlJobs.update to set status: 'completed'
and completedAt using jobId; alternatively, publish a "chunk-finished" message
to an orchestrator service which performs the final completion update after
verifying all chunks (reference symbols: etlJobs, jobId, the try block in
processCatalogEtl.ts).
- Around line 89-117: The stream pump launched as a fire-and-forget IIFE must be
captured and awaited so read failures don't leave `parser` hanging; change the
IIFE to assign its promise (e.g., const pumpPromise = (async () => { ... })()),
wrap its body in try/catch that calls `parser.end()` (or `parser.destroy()` if
available) on error and rethrows, and after the consumer loop (for await (const
record of parser)) await pumpPromise so any pump errors propagate and the parser
is torn down cleanly; reference `parser`, `streamToText(r2Object.body)`,
`injectedHeader`, and `byteStart` when locating the pump logic to update.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 22f763d7-f274-45b6-846f-7a3ea6c252e4
📒 Files selected for processing (6)
packages/api/src/routes/admin/analytics/catalog.tspackages/api/src/routes/catalog/index.tspackages/api/src/services/etl/processCatalogEtl.tspackages/api/src/services/etl/queue.tspackages/api/src/services/etl/types.tspackages/api/wrangler.jsonc
| }); | ||
|
|
||
| await queueCatalogETL({ queue: env.ETL_QUEUE, objectKeys: [objectKey], jobId: newJobId }); | ||
| await queueCatalogETL({ queue: env.ETL_QUEUE, chunks: [{ objectKey }], jobId: newJobId }); |
There was a problem hiding this comment.
Retry still requeues large files as a single chunk.
This matches the new queueCatalogETL shape, but it bypasses the chunk-splitting logic added on the main /catalog/etl path. Retrying a previously failed large object will enqueue the full file again and can reproduce the CPU-limit failure this PR is trying to avoid. Reuse the same chunk-planning service here before calling queueCatalogETL.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/api/src/routes/admin/analytics/catalog.ts` at line 435, The retry
path is requeuing the entire object as a single chunk by calling
queueCatalogETL({ ..., chunks: [{ objectKey }], ... }); instead, call the same
chunk-planning service used by the main /catalog/etl route to split the object
into chunks before calling queueCatalogETL; locate the chunk-planning helper
used by the /catalog/etl handler (the function that computes/splits chunks for
an object) and replace the hard-coded [{ objectKey }] with its returned chunk
list, then pass that chunk list into queueCatalogETL along with the same
env.ETL_QUEUE and newJobId.
| // Split large files into 20 MB byte-range chunks so each Worker | ||
| // invocation stays within the CPU time budget (~30k rows / chunk). | ||
| const CHUNK_BYTES = 20 * 1024 * 1024; | ||
| const r2 = new R2BucketService({ env, bucketType: 'catalog' }); | ||
| const queueChunks: Array<{ objectKey: string; byteStart?: number; byteEnd?: number }> = []; | ||
|
|
||
| for (const objectKey of chunks) { | ||
| const meta = await r2.head(objectKey); | ||
| if (!meta || meta.size <= CHUNK_BYTES) { | ||
| queueChunks.push({ objectKey }); | ||
| } else { | ||
| const n = Math.ceil(meta.size / CHUNK_BYTES); | ||
| for (let i = 0; i < n; i++) { | ||
| queueChunks.push({ | ||
| objectKey, | ||
| byteStart: i * CHUNK_BYTES, | ||
| byteEnd: Math.min((i + 1) * CHUNK_BYTES - 1, meta.size - 1), | ||
| }); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| await queueCatalogETL({ | ||
| queue: env.ETL_QUEUE, | ||
| objectKeys: chunks, | ||
| chunks: queueChunks, | ||
| jobId, |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Move chunk planning into an ETL service.
This block is ETL business logic, not route orchestration. Extract the R2 head + byte-range planning into src/services/etl/* and keep the handler focused on validation, persistence, and dispatch.
As per coding guidelines, "Keep business logic in src/services/ of the API package, not in route handlers".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/api/src/routes/catalog/index.ts` around lines 180 - 205, Extract the
chunk planning logic (CHUNK_BYTES constant, R2BucketService usage, loop building
queueChunks) into a new ETL service function (e.g., planCatalogChunks or
buildChunkRanges) under the API services namespace; the new function should
accept (env, chunks) and return the array of { objectKey, byteStart?, byteEnd? }
entries after performing r2.head and the byte-range math, leaving the route
handler to only call that service and then call queueCatalogETL with its result.
Ensure the route no longer instantiates R2BucketService or contains the loop —
instead import and await the new planCatalogChunks(env, chunks) function,
preserve existing semantics (CHUNK_BYTES, Math.ceil, byteEnd calculation), and
surface any errors from the service so the handler can handle
validation/persistence/dispatch only.
| const meta = await r2.head(objectKey); | ||
| if (!meta || meta.size <= CHUNK_BYTES) { | ||
| queueChunks.push({ objectKey }); |
There was a problem hiding this comment.
Fail fast when head() returns no metadata.
When r2.head(objectKey) returns null, this branch still enqueues the object as if it were a normal whole-file job. That turns a known validation failure into a worker failure later, and the job has already been created as running. Reject missing objects here instead of pushing them into the queue.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/api/src/routes/catalog/index.ts` around lines 187 - 189, The code
currently treats a null result from r2.head(objectKey) as a valid whole-file
job; change the logic in the handler that calls r2.head so that if meta is null
you fail fast instead of pushing to queueChunks: explicitly check if meta ===
null (or falsy but distinguish null vs size) and return/throw an error (or mark
the job as invalid) with a clear message referencing objectKey so the
caller/workflow does not create a running job for a missing object; keep the
existing branch that uses meta.size <= CHUNK_BYTES to enqueue whole-file jobs
only when meta is present.
| let skipPartialRow = byteStart !== undefined && byteStart > 0; | ||
|
|
||
| for await (const chunk of streamToText(r2Object.body)) { | ||
| let text = chunk; | ||
|
|
||
| if (skipPartialRow) { | ||
| // Discard bytes up to and including the first newline — those bytes are | ||
| // the tail of the row that the previous chunk already processed. | ||
| const nl = text.indexOf('\n'); | ||
| if (nl === -1) continue; // entire buffer is still the partial row tail | ||
| text = text.slice(nl + 1); |
There was a problem hiding this comment.
Guard the boundary skip with an actual row-fragment check.
skipPartialRow is enabled for every non-first chunk. If a slice begins exactly on the first byte of a real row, Lines 100-105 still discard everything through the next newline, so that row is silently lost. Carry a startsMidRow flag from the splitter or inspect the byte before byteStart before skipping.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/api/src/services/etl/processCatalogEtl.ts` around lines 95 - 105,
skipPartialRow currently treats every non-zero byteStart as mid-row and can drop
a full row that starts exactly at byteStart; update the logic so we only discard
a leading fragment when the chunk truly begins mid-row. Either propagate a
startsMidRow flag from the splitter into the consumer (preferred) or, before
setting skipPartialRow in processCatalogEtl, fetch/inspect the single byte
immediately before byteStart (byteStart-1) from the R2 object and set
skipPartialRow = byteStart > 0 && previousByte !== '\n'. Apply this change
around the streamToText(r2Object.body) loop where skipPartialRow is used so the
code only slices off a partial row when an actual row fragment precedes the
chunk.
| data: { | ||
| objectKey: string; | ||
| byteStart?: number; | ||
| byteEnd?: number; | ||
| }; | ||
| } | ||
|
|
||
| export interface QueueCatalogETLParams { | ||
| queue: Queue; | ||
| objectKey: string; | ||
| userId: string; | ||
| source: string; | ||
| scraperRevision: string; | ||
| chunks: Array<{ objectKey: string; byteStart?: number; byteEnd?: number }>; | ||
| jobId: string; |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Model chunk ranges as an all-or-nothing contract.
{ byteStart?: number; byteEnd?: number } allows invalid states like only one bound being present. This contract is safer as a union: either a whole object { objectKey } or a ranged chunk { objectKey, byteStart, byteEnd }. Export that shared chunk type here and reuse it across the queue and route layers so the contract cannot drift.
Proposed refactor
+export type CatalogETLChunk =
+ | { objectKey: string }
+ | { objectKey: string; byteStart: number; byteEnd: number };
+
export interface CatalogETLMessage {
timestamp: number;
id: string;
- data: {
- objectKey: string;
- byteStart?: number;
- byteEnd?: number;
- };
+ data: CatalogETLChunk;
}
export interface QueueCatalogETLParams {
queue: Queue;
- chunks: Array<{ objectKey: string; byteStart?: number; byteEnd?: number }>;
+ chunks: CatalogETLChunk[];
jobId: string;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| data: { | |
| objectKey: string; | |
| byteStart?: number; | |
| byteEnd?: number; | |
| }; | |
| } | |
| export interface QueueCatalogETLParams { | |
| queue: Queue; | |
| objectKey: string; | |
| userId: string; | |
| source: string; | |
| scraperRevision: string; | |
| chunks: Array<{ objectKey: string; byteStart?: number; byteEnd?: number }>; | |
| jobId: string; | |
| export type CatalogETLChunk = | |
| | { objectKey: string } | |
| | { objectKey: string; byteStart: number; byteEnd: number }; | |
| export interface CatalogETLMessage { | |
| timestamp: number; | |
| id: string; | |
| data: CatalogETLChunk; | |
| } | |
| export interface QueueCatalogETLParams { | |
| queue: Queue; | |
| chunks: CatalogETLChunk[]; | |
| jobId: string; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/api/src/services/etl/types.ts` around lines 6 - 16, Define and
export a single shared chunk union type (e.g., CatalogChunk = { objectKey:
string } | { objectKey: string; byteStart: number; byteEnd: number }) and
replace the ad-hoc inline chunk shapes with that type: update the existing
`data` shape and the `QueueCatalogETLParams.chunks` signature to use
`CatalogChunk[]`, ensuring ranged chunks require both `byteStart` and `byteEnd`
(no optionals) while whole-object chunks have only `objectKey`; export the new
`CatalogChunk` type so route and queue layers can import and reuse it to prevent
contract drift.
Deploying with
|
| Status | Name | Latest Commit | Preview URL | Updated (UTC) |
|---|---|---|---|---|
| ✅ Deployment successful! View logs |
packrat-admin | 9b744c1 | Commit Preview URL Branch Preview URL |
May 13 2026, 07:54 PM |
Coverage Report for Expo Unit Tests Coverage (./apps/expo)
File CoverageNo changed files found. |
Coverage Report for API Unit Tests Coverage (./packages/api)
File CoverageNo changed files found. |
Deploying packrat-landing with
|
| Latest commit: |
9b744c1
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://533fb489.packrat-landing.pages.dev |
| Branch Preview URL: | https://fix-etl-completion-ordering.packrat-landing.pages.dev |
Problem
The backpressure fix (PR #2418) solved Worker OOM. But large CSV files now hit a second limit: the 5-minute CPU time limit (
cpu_ms: 300000).Observed in wrangler tail:
Fix
Raise
cpu_msfrom 300,000ms to 400,000ms (the Cloudflare maximum).This extends the per-invocation budget to ~66k rows — enough for evo.
Limitations
400k is the CF hard cap. Backcountry (329 MB, likely 200k+ rows) will still hit it. The permanent solution is chunking large files in ScrapyD so each queue message processes ≤40k rows. That work is tracked separately.
Post-Deploy Validation
After deploy, re-queue evo and confirm it completes end-to-end.
🤖 Generated with Claude Code
Co-Authored-By: Claude noreply@anthropic.com
Summary by CodeRabbit
New Features
Bug Fixes
Chores