fix(admin,etl): CORS preflight fix + ETL retry/failure ops + schema alignment#2409
Conversation
…dmin ops - Expand AdminCatalogItemSchema to all 24 columns the route returns - Fix AdminUserItemSchema/AdminPackItemSchema to match SELECT outputs - Derive AdminUser/AdminPack/AdminCatalogItem via Static<> (drop hand-written interfaces) - Remove 3 `as unknown as PaginatedResponse<T>` casts in admin API client - Fix edit-catalog-dialog.tsx for nullable weightUnit ripple ETL admin routes (catalog analytics): - GET /etl/failure-summary: jsonb unnest aggregation of top validation errors - GET /etl/:jobId/failures: per-job error breakdown + raw samples - POST /etl/reset-stuck: marks running jobs stuck >30min as failed - POST /etl/:jobId/retry: clones job record and re-queues file to Cloudflare Queue
WalkthroughAdmin schemas expand to include richer metadata fields; TypeScript types now derive from those schemas using TypeBox Static. A small weight unit input field is adjusted to handle nullability. Four new ETL administrative endpoints are added to inspect validation failures and control stuck or failed job retries. ChangesAdmin Schema, Types, and ETL Management
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Coverage Report for API Unit Tests Coverage (./packages/api)
File CoverageNo changed files found. |
Coverage Report for Expo Unit Tests Coverage (./apps/expo)
File CoverageNo changed files found. |
Deploying with
|
| Status | Name | Latest Commit | Preview URL | Updated (UTC) |
|---|---|---|---|---|
| ✅ Deployment successful! View logs |
packrat-admin | b5a461c | Commit Preview URL Branch Preview URL |
May 12 2026, 05:27 PM |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/api/src/routes/admin/analytics/catalog.ts`:
- Around line 322-341: Run the two independent DB calls (the select that assigns
samples and the sql execute that assigns breakdown) in parallel using
Promise.all to avoid serial round-trips: kick off
db.select().from(invalidItemLogs).where(eq(invalidItemLogs.jobId,
params.jobId)).orderBy(...).limit(limit) and db.execute(...) together and await
Promise.all to assign samples and breakdown from the results; also tighten the
jobId validation to z.string().uuid() so params.jobId is validated as a UUID
before any DB interpolation (affecting the code paths that call
samples/breakdown and the schema that currently accepts a plain string).
- Around line 264-446: The four new routes ('/etl/failure-summary',
'/etl/:jobId/failures', '/etl/reset-stuck', '/etl/:jobId/retry') are missing
response schemas so the admin client sees their payloads as unknown; add
appropriate response TypeBox schemas in packages/api/src/schemas/admin (mirror
EtlResponseSchema structure) for each route's success shape (e.g., Top error
list + total for failure-summary, jobId/errorBreakdown/samples for per-job
failures, reset count/ids for reset-stuck, and { success, newJobId, objectKey }
for retry), export them (e.g., EtlFailureSummaryResponseSchema,
EtlJobFailuresResponseSchema, EtlResetStuckResponseSchema,
EtlRetryResponseSchema) and wire them into each route's options under response:
{ 200: <schema>, ...AdminErrorResponses } so OpenAPI and the Admin Static<>
types pick up the correct types; use the route-local symbols like
invalidItemLogs, etlJobs, queueCatalogETL, createDb to locate the handlers to
update.
- Around line 273-287: The two independent DB calls (db.execute<{ field: string;
reason: string; count: number }>(...) and db.select({ n: count()
}).from(invalidItemLogs)) should be executed in parallel to reduce latency: run
them with Promise.all and destructure the results into rows and total; keep the
same query text and types (referencing db.execute, db.select and
invalidItemLogs). Additionally, add the missing response TypeBox schema used
elsewhere to this route (and the three subsequent endpoints) so OpenAPI/Eden
Treaty consumers get a typed payload (match the pattern used in /overview,
/brands, /etl and attach the same response schema variable to this handler).
- Around line 376-398: The endpoint that marks ETL jobs as failed and the worker
lack status guards and a configurable cutoff: update the '/etl/reset-stuck'
handler to accept a query parameter (or read an environment variable) for the
stuck threshold with a sensible minimum, and use that value instead of the
hardcoded 30 minutes; in the DB update (currently using etlJobs.update in that
handler) ensure the WHERE includes status = 'running' to avoid changing
non-running rows; in the worker (processCatalogETL) add a pre-processing
validation that reloads the job row and aborts if status !== 'running', and
change the worker's error/cleanup write logic to include a conditional WHERE
(e.g., WHERE id = jobId AND status = 'running') so it never overwrites a reset
or retry decision; these changes together prevent TOCTOU and
concurrent-processor races.
- Around line 408-433: Change the retry handler to only allow retries when
original.status === 'failed' (replace the current check that only rejects
'running'), and make the enqueue + DB insert atomic: call
queueCatalogETL(objectKey, newJobId, ...) before committing the new row or wrap
the insert (db.insert(etlJobs).values(...)) in a try/catch that rolls back the
insert or updates the newly inserted row back to 'failed' if queueCatalogETL
throws; ensure you still validate env.ETL_QUEUE before attempting enqueue.
Additionally, harden against duplicate parallel retries by checking for an
existing recent/pending job for the same (original.source, original.filename)
(e.g., a failed_at uniqueness check or short-lived advisory lock) before
creating newJobId.
- Line 421: Persist the original R2 key instead of reconstructing it: add an
objectKey column to the etlJobs table and, in the code path that creates/queues
the initial ETL job (where original and chunks are accepted and etlJobs rows are
inserted), store the full object key (use the existing original or chunks value)
into etlJobs.objectKey; then update the retry logic (which currently builds
`v2/${source}/${filename}`) to read and re-enqueue using etlJobs.objectKey.
Alternatively, if you prefer not to alter schema, change the retry handler to
call R2 to validate the reconstructed key before enqueueing and return 404 if
the R2 object does not exist; ensure the variable objectKey and the etlJobs
lookup are updated accordingly so retries use the persisted key or validated
key.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 5e860c50-5568-4f4b-8c75-3a8ed97917a7
📒 Files selected for processing (4)
apps/admin/components/edit-catalog-dialog.tsxapps/admin/lib/api.tspackages/api/src/routes/admin/analytics/catalog.tspackages/api/src/schemas/admin.ts
| // ─── ETL failure summary ────────────────────────────────────────────────────── | ||
|
|
||
| .get( | ||
| '/etl/failure-summary', | ||
| async ({ query }) => { | ||
| const db = createDb(); | ||
| const { limit = 20 } = query; | ||
|
|
||
| try { | ||
| const rows = await db.execute<{ field: string; reason: string; count: number }>( | ||
| sql` | ||
| SELECT | ||
| err->>'field' AS field, | ||
| err->>'reason' AS reason, | ||
| COUNT(*)::int AS count | ||
| FROM ${invalidItemLogs}, | ||
| jsonb_array_elements(${invalidItemLogs.errors}) AS err | ||
| GROUP BY err->>'field', err->>'reason' | ||
| ORDER BY count DESC | ||
| LIMIT ${limit} | ||
| `, | ||
| ); | ||
|
|
||
| const [total] = await db.select({ n: count() }).from(invalidItemLogs); | ||
|
|
||
| return { | ||
| topErrors: rows.rows.map((r) => ({ | ||
| field: r.field, | ||
| reason: r.reason, | ||
| count: r.count, | ||
| })), | ||
| totalInvalidItems: total?.n ?? 0, | ||
| }; | ||
| } catch (error) { | ||
| console.error('ETL failure summary error:', error); | ||
| return status(500, { | ||
| error: 'Failed to fetch failure summary', | ||
| code: 'ETL_FAILURE_SUMMARY_ERROR', | ||
| }); | ||
| } | ||
| }, | ||
| { | ||
| query: z.object({ | ||
| limit: z.coerce.number().int().min(1).max(100).optional().default(20), | ||
| }), | ||
| detail: { tags: ['Admin'], summary: 'Top ETL validation failure patterns' }, | ||
| }, | ||
| ) | ||
|
|
||
| // ─── Per-job failure drill-down ─────────────────────────────────────────────── | ||
|
|
||
| .get( | ||
| '/etl/:jobId/failures', | ||
| async ({ params, query }) => { | ||
| const db = createDb(); | ||
| const { limit = 50 } = query; | ||
|
|
||
| try { | ||
| const samples = await db | ||
| .select() | ||
| .from(invalidItemLogs) | ||
| .where(eq(invalidItemLogs.jobId, params.jobId)) | ||
| .orderBy(invalidItemLogs.rowIndex) | ||
| .limit(limit); | ||
|
|
||
| const breakdown = await db.execute<{ field: string; reason: string; count: number }>( | ||
| sql` | ||
| SELECT | ||
| err->>'field' AS field, | ||
| err->>'reason' AS reason, | ||
| COUNT(*)::int AS count | ||
| FROM ${invalidItemLogs}, | ||
| jsonb_array_elements(${invalidItemLogs.errors}) AS err | ||
| WHERE ${invalidItemLogs.jobId} = ${params.jobId} | ||
| GROUP BY err->>'field', err->>'reason' | ||
| ORDER BY count DESC | ||
| `, | ||
| ); | ||
|
|
||
| return { | ||
| jobId: params.jobId, | ||
| errorBreakdown: breakdown.rows.map((r) => ({ | ||
| field: r.field, | ||
| reason: r.reason, | ||
| count: r.count, | ||
| })), | ||
| samples: samples.map((s) => ({ | ||
| rowIndex: s.rowIndex, | ||
| errors: s.errors, | ||
| rawData: s.rawData, | ||
| })), | ||
| totalShown: samples.length, | ||
| }; | ||
| } catch (error) { | ||
| console.error('ETL job failures error:', error); | ||
| return status(500, { | ||
| error: 'Failed to fetch job failures', | ||
| code: 'ETL_JOB_FAILURES_ERROR', | ||
| }); | ||
| } | ||
| }, | ||
| { | ||
| params: z.object({ jobId: z.string() }), | ||
| query: z.object({ | ||
| limit: z.coerce.number().int().min(1).max(200).optional().default(50), | ||
| }), | ||
| detail: { tags: ['Admin'], summary: 'Validation failures for a specific ETL job' }, | ||
| }, | ||
| ) | ||
|
|
||
| // ─── Reset stuck jobs ───────────────────────────────────────────────────────── | ||
|
|
||
| .post( | ||
| '/etl/reset-stuck', | ||
| async () => { | ||
| const db = createDb(); | ||
|
|
||
| try { | ||
| // Jobs stuck in 'running' for more than 30 minutes are considered stalled | ||
| const stuckCutoff = new Date(Date.now() - 30 * 60 * 1000); | ||
|
|
||
| const reset = await db | ||
| .update(etlJobs) | ||
| .set({ status: 'failed', completedAt: new Date() }) | ||
| .where(and(eq(etlJobs.status, 'running'), lt(etlJobs.startedAt, stuckCutoff))) | ||
| .returning(); | ||
|
|
||
| return { reset: reset.length, ids: reset.map((r) => r.id) }; | ||
| } catch (error) { | ||
| console.error('ETL reset stuck error:', error); | ||
| return status(500, { error: 'Failed to reset stuck jobs', code: 'ETL_RESET_STUCK_ERROR' }); | ||
| } | ||
| }, | ||
| { detail: { tags: ['Admin'], summary: 'Mark stuck running ETL jobs as failed' } }, | ||
| ) | ||
|
|
||
| // ─── Retry a failed job ─────────────────────────────────────────────────────── | ||
|
|
||
| .post( | ||
| '/etl/:jobId/retry', | ||
| async ({ params }) => { | ||
| const db = createDb(); | ||
|
|
||
| try { | ||
| const [original] = await db | ||
| .select() | ||
| .from(etlJobs) | ||
| .where(eq(etlJobs.id, params.jobId)) | ||
| .limit(1); | ||
|
|
||
| if (!original) return status(404, { error: 'ETL job not found' }); | ||
| if (original.status === 'running') | ||
| return status(409, { | ||
| error: 'Job is still running — wait for it to complete or reset stuck jobs first', | ||
| }); | ||
|
|
||
| const newJobId = crypto.randomUUID(); | ||
| const objectKey = `v2/${original.source}/${original.filename}`; | ||
| const env = getEnv(); | ||
|
|
||
| if (!env.ETL_QUEUE) return status(400, { error: 'ETL_QUEUE is not configured' }); | ||
|
|
||
| await db.insert(etlJobs).values({ | ||
| id: newJobId, | ||
| status: 'running', | ||
| source: original.source, | ||
| filename: original.filename, | ||
| scraperRevision: original.scraperRevision, | ||
| startedAt: new Date(), | ||
| }); | ||
|
|
||
| await queueCatalogETL({ queue: env.ETL_QUEUE, objectKeys: [objectKey], jobId: newJobId }); | ||
|
|
||
| return { success: true, newJobId, objectKey }; | ||
| } catch (error) { | ||
| console.error('ETL retry error:', error); | ||
| return status(500, { error: 'Failed to retry ETL job', code: 'ETL_RETRY_ERROR' }); | ||
| } | ||
| }, | ||
| { | ||
| params: z.object({ jobId: z.string() }), | ||
| detail: { tags: ['Admin'], summary: 'Retry a failed ETL job' }, | ||
| }, |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
No response TypeBox schemas on any of the four new routes.
Every other route in this file declares response: { 200: ..., ...AdminErrorResponses }, which is what feeds OpenAPI and the Eden Treaty types consumed by apps/admin/lib/api.ts (the very thing this PR is tightening with Static<> derivations). Without response schemas here, the admin client will see these payloads as unknown and you'll lose the type-safety win for the new endpoints. Add schemas to packages/api/src/schemas/admin and wire them in, mirroring EtlResponseSchema.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/api/src/routes/admin/analytics/catalog.ts` around lines 264 - 446,
The four new routes ('/etl/failure-summary', '/etl/:jobId/failures',
'/etl/reset-stuck', '/etl/:jobId/retry') are missing response schemas so the
admin client sees their payloads as unknown; add appropriate response TypeBox
schemas in packages/api/src/schemas/admin (mirror EtlResponseSchema structure)
for each route's success shape (e.g., Top error list + total for
failure-summary, jobId/errorBreakdown/samples for per-job failures, reset
count/ids for reset-stuck, and { success, newJobId, objectKey } for retry),
export them (e.g., EtlFailureSummaryResponseSchema,
EtlJobFailuresResponseSchema, EtlResetStuckResponseSchema,
EtlRetryResponseSchema) and wire them into each route's options under response:
{ 200: <schema>, ...AdminErrorResponses } so OpenAPI and the Admin Static<>
types pick up the correct types; use the route-local symbols like
invalidItemLogs, etlJobs, queueCatalogETL, createDb to locate the handlers to
update.
| const rows = await db.execute<{ field: string; reason: string; count: number }>( | ||
| sql` | ||
| SELECT | ||
| err->>'field' AS field, | ||
| err->>'reason' AS reason, | ||
| COUNT(*)::int AS count | ||
| FROM ${invalidItemLogs}, | ||
| jsonb_array_elements(${invalidItemLogs.errors}) AS err | ||
| GROUP BY err->>'field', err->>'reason' | ||
| ORDER BY count DESC | ||
| LIMIT ${limit} | ||
| `, | ||
| ); | ||
|
|
||
| const [total] = await db.select({ n: count() }).from(invalidItemLogs); |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Run aggregation and count in parallel.
The db.execute and db.select are independent and currently run sequentially, doubling latency for what's essentially a dashboard read. Wrap them in Promise.all. Also note the rest of this file consistently attaches a response TypeBox schema (see /overview, /brands, /etl) — this endpoint and the three below ship without one, so OpenAPI/Eden Treaty consumers get unknown payloads.
♻️ Proposed parallelization
- const rows = await db.execute<{ field: string; reason: string; count: number }>(
- sql`
- SELECT
- err->>'field' AS field,
- err->>'reason' AS reason,
- COUNT(*)::int AS count
- FROM ${invalidItemLogs},
- jsonb_array_elements(${invalidItemLogs.errors}) AS err
- GROUP BY err->>'field', err->>'reason'
- ORDER BY count DESC
- LIMIT ${limit}
- `,
- );
-
- const [total] = await db.select({ n: count() }).from(invalidItemLogs);
+ const [rows, totals] = await Promise.all([
+ db.execute<{ field: string; reason: string; count: number }>(sql`
+ SELECT
+ err->>'field' AS field,
+ err->>'reason' AS reason,
+ COUNT(*)::int AS count
+ FROM ${invalidItemLogs},
+ jsonb_array_elements(${invalidItemLogs.errors}) AS err
+ GROUP BY err->>'field', err->>'reason'
+ ORDER BY count DESC
+ LIMIT ${limit}
+ `),
+ db.select({ n: count() }).from(invalidItemLogs),
+ ]);
+ const total = totals[0];🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/api/src/routes/admin/analytics/catalog.ts` around lines 273 - 287,
The two independent DB calls (db.execute<{ field: string; reason: string; count:
number }>(...) and db.select({ n: count() }).from(invalidItemLogs)) should be
executed in parallel to reduce latency: run them with Promise.all and
destructure the results into rows and total; keep the same query text and types
(referencing db.execute, db.select and invalidItemLogs). Additionally, add the
missing response TypeBox schema used elsewhere to this route (and the three
subsequent endpoints) so OpenAPI/Eden Treaty consumers get a typed payload
(match the pattern used in /overview, /brands, /etl and attach the same response
schema variable to this handler).
| const samples = await db | ||
| .select() | ||
| .from(invalidItemLogs) | ||
| .where(eq(invalidItemLogs.jobId, params.jobId)) | ||
| .orderBy(invalidItemLogs.rowIndex) | ||
| .limit(limit); | ||
|
|
||
| const breakdown = await db.execute<{ field: string; reason: string; count: number }>( | ||
| sql` | ||
| SELECT | ||
| err->>'field' AS field, | ||
| err->>'reason' AS reason, | ||
| COUNT(*)::int AS count | ||
| FROM ${invalidItemLogs}, | ||
| jsonb_array_elements(${invalidItemLogs.errors}) AS err | ||
| WHERE ${invalidItemLogs.jobId} = ${params.jobId} | ||
| GROUP BY err->>'field', err->>'reason' | ||
| ORDER BY count DESC | ||
| `, | ||
| ); |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | ⚡ Quick win
Parallelize the two job-scoped queries.
Same shape as the previous endpoint — samples and breakdown are independent and should run concurrently via Promise.all. Also consider tightening jobId validation to z.string().uuid() on Line 366 so malformed IDs short-circuit before hitting the DB (especially relevant because the raw sql query interpolates ${params.jobId} — parameterized via Drizzle, so not an injection issue, but a wasted round-trip).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/api/src/routes/admin/analytics/catalog.ts` around lines 322 - 341,
Run the two independent DB calls (the select that assigns samples and the sql
execute that assigns breakdown) in parallel using Promise.all to avoid serial
round-trips: kick off
db.select().from(invalidItemLogs).where(eq(invalidItemLogs.jobId,
params.jobId)).orderBy(...).limit(limit) and db.execute(...) together and await
Promise.all to assign samples and breakdown from the results; also tighten the
jobId validation to z.string().uuid() so params.jobId is validated as a UUID
before any DB interpolation (affecting the code paths that call
samples/breakdown and the schema that currently accepts a plain string).
| .post( | ||
| '/etl/reset-stuck', | ||
| async () => { | ||
| const db = createDb(); | ||
|
|
||
| try { | ||
| // Jobs stuck in 'running' for more than 30 minutes are considered stalled | ||
| const stuckCutoff = new Date(Date.now() - 30 * 60 * 1000); | ||
|
|
||
| const reset = await db | ||
| .update(etlJobs) | ||
| .set({ status: 'failed', completedAt: new Date() }) | ||
| .where(and(eq(etlJobs.status, 'running'), lt(etlJobs.startedAt, stuckCutoff))) | ||
| .returning(); | ||
|
|
||
| return { reset: reset.length, ids: reset.map((r) => r.id) }; | ||
| } catch (error) { | ||
| console.error('ETL reset stuck error:', error); | ||
| return status(500, { error: 'Failed to reset stuck jobs', code: 'ETL_RESET_STUCK_ERROR' }); | ||
| } | ||
| }, | ||
| { detail: { tags: ['Admin'], summary: 'Mark stuck running ETL jobs as failed' } }, | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find the ETL queue consumer and check how it updates etlJobs status.
fd -t f -e ts -e js | xargs rg -nP -C5 '\betlJobs\b' -g '!**/admin/**' 2>/dev/null
echo '---'
rg -nP -C3 "queueCatalogETL|ETL_QUEUE" --type=tsRepository: PackRat-AI/PackRat
Length of output: 34809
🏁 Script executed:
# Check processCatalogEtl function entry and queue batch processor for status validation
rg -A10 'async function processCatalogETL' packages/api/src/services/etl/processCatalogEtl.ts
rg -A15 'export async function processQueueBatch' packages/api/src/services/etl/queue.tsRepository: PackRat-AI/PackRat
Length of output: 710
The ETL worker does not validate job status before processing or during writes — creating TOCTOU and race-condition risks.
The worker (processCatalogETL) starts processing without checking the job's current status, and its error handler unconditionally updates the row with no status guard. This creates two real failure modes:
-
TOCTOU race on job row: If
reset-stuckmarks a jobfailed, the worker continues processing and can still write its own status/completedAt updates, overwriting the reset. At minimum, the error handler should include aWHERE status = 'running'guard, or the worker should validate the job is still'running'before committing writes. -
Concurrent processors from race: An operator triggering
retrywhilereset-stuckis executing can spawn two workers processing the same object key, resulting in duplicate catalog rows.
Additionally, the 30-minute stuck threshold is hardcoded in the endpoint; make it a query parameter (with a sane minimum) or environment variable so operators can adjust detection without redeploying.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/api/src/routes/admin/analytics/catalog.ts` around lines 376 - 398,
The endpoint that marks ETL jobs as failed and the worker lack status guards and
a configurable cutoff: update the '/etl/reset-stuck' handler to accept a query
parameter (or read an environment variable) for the stuck threshold with a
sensible minimum, and use that value instead of the hardcoded 30 minutes; in the
DB update (currently using etlJobs.update in that handler) ensure the WHERE
includes status = 'running' to avoid changing non-running rows; in the worker
(processCatalogETL) add a pre-processing validation that reloads the job row and
aborts if status !== 'running', and change the worker's error/cleanup write
logic to include a conditional WHERE (e.g., WHERE id = jobId AND status =
'running') so it never overwrites a reset or retry decision; these changes
together prevent TOCTOU and concurrent-processor races.
| const [original] = await db | ||
| .select() | ||
| .from(etlJobs) | ||
| .where(eq(etlJobs.id, params.jobId)) | ||
| .limit(1); | ||
|
|
||
| if (!original) return status(404, { error: 'ETL job not found' }); | ||
| if (original.status === 'running') | ||
| return status(409, { | ||
| error: 'Job is still running — wait for it to complete or reset stuck jobs first', | ||
| }); | ||
|
|
||
| const newJobId = crypto.randomUUID(); | ||
| const objectKey = `v2/${original.source}/${original.filename}`; | ||
| const env = getEnv(); | ||
|
|
||
| if (!env.ETL_QUEUE) return status(400, { error: 'ETL_QUEUE is not configured' }); | ||
|
|
||
| await db.insert(etlJobs).values({ | ||
| id: newJobId, | ||
| status: 'running', | ||
| source: original.source, | ||
| filename: original.filename, | ||
| scraperRevision: original.scraperRevision, | ||
| startedAt: new Date(), | ||
| }); |
There was a problem hiding this comment.
Retry accepts any non-running status and can orphan a running row on enqueue failure.
Two correctness problems in this handler:
- Status not constrained to
failed. Line 415 only rejectsrunning, so acompletedjob can be "retried" — re-queuing the same object key will re-ingest every row and likely duplicate catalog items (or churn embeddings). The PR description explicitly scopes this to failed jobs; enforce it. - Non-atomic insert-then-enqueue. The new
etlJobsrow is committed at Line 426 beforequeueCatalogETLat Line 435. If the enqueue throws (queue unavailable, network blip), you've left a brand-newrunningrow that will never progress — exactly the kind of stuck job/etl/reset-stuckwas added to clean up. Enqueue first (or wrap in try/catch that rolls back the insert / marks the new rowfailed).
Also worth noting: there's no idempotency on retries — clicking the button twice creates two parallel runs of the same file. A failed_at-based uniqueness check or a short-lived advisory lock on (source, filename) would harden this.
🛡️ Proposed fix
- if (!original) return status(404, { error: 'ETL job not found' });
- if (original.status === 'running')
- return status(409, {
- error: 'Job is still running — wait for it to complete or reset stuck jobs first',
- });
+ if (!original) return status(404, { error: 'ETL job not found' });
+ if (original.status !== 'failed') {
+ return status(409, {
+ error: `Only failed jobs can be retried (current status: ${original.status})`,
+ });
+ }
const newJobId = crypto.randomUUID();
const objectKey = `v2/${original.source}/${original.filename}`;
const env = getEnv();
if (!env.ETL_QUEUE) return status(400, { error: 'ETL_QUEUE is not configured' });
await db.insert(etlJobs).values({
id: newJobId,
status: 'running',
source: original.source,
filename: original.filename,
scraperRevision: original.scraperRevision,
startedAt: new Date(),
});
-
- await queueCatalogETL({ queue: env.ETL_QUEUE, objectKeys: [objectKey], jobId: newJobId });
+
+ try {
+ await queueCatalogETL({
+ queue: env.ETL_QUEUE,
+ objectKeys: [objectKey],
+ jobId: newJobId,
+ });
+ } catch (enqueueErr) {
+ await db
+ .update(etlJobs)
+ .set({ status: 'failed', completedAt: new Date() })
+ .where(eq(etlJobs.id, newJobId));
+ throw enqueueErr;
+ }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/api/src/routes/admin/analytics/catalog.ts` around lines 408 - 433,
Change the retry handler to only allow retries when original.status === 'failed'
(replace the current check that only rejects 'running'), and make the enqueue +
DB insert atomic: call queueCatalogETL(objectKey, newJobId, ...) before
committing the new row or wrap the insert (db.insert(etlJobs).values(...)) in a
try/catch that rolls back the insert or updates the newly inserted row back to
'failed' if queueCatalogETL throws; ensure you still validate env.ETL_QUEUE
before attempting enqueue. Additionally, harden against duplicate parallel
retries by checking for an existing recent/pending job for the same
(original.source, original.filename) (e.g., a failed_at uniqueness check or
short-lived advisory lock) before creating newJobId.
| }); | ||
|
|
||
| const newJobId = crypto.randomUUID(); | ||
| const objectKey = `v2/${original.source}/${original.filename}`; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm whether etlJobs schema already stores an object key, and where the original v2/<source>/<filename> convention is established.
fd -t f -e ts | xargs rg -nP -C3 'etlJobs\s*=|object[Kk]ey|v2/\$\{' 2>/dev/null | head -200Repository: PackRat-AI/PackRat
Length of output: 9305
🏁 Script executed:
sed -n '497,530p' packages/api/src/db/schema.tsRepository: PackRat-AI/PackRat
Length of output: 1139
🏁 Script executed:
rg -n 'etlJobs.*insert|db\.insert.*etlJobs' packages/api/src -A 5 -B 2Repository: PackRat-AI/PackRat
Length of output: 1365
🏁 Script executed:
rg -n 'objectKey.*=|fileName.*=' packages/api/src/routes/upload.ts -B 5 -A 2Repository: PackRat-AI/PackRat
Length of output: 362
🏁 Script executed:
sed -n '150,190p' packages/api/src/routes/catalog/index.tsRepository: PackRat-AI/PackRat
Length of output: 959
🏁 Script executed:
sed -n '19,100p' packages/api/src/routes/upload.tsRepository: PackRat-AI/PackRat
Length of output: 1978
🏁 Script executed:
rg -n 'chunks.*=|objectKey.*=' packages/admin/lib -A 2 -B 2Repository: PackRat-AI/PackRat
Length of output: 121
🏁 Script executed:
sed -n '140,200p' packages/api/src/routes/catalog/index.tsRepository: PackRat-AI/PackRat
Length of output: 1573
🏁 Script executed:
rg -n 'v2/' packages/api --type ts | head -20Repository: PackRat-AI/PackRat
Length of output: 287
🏁 Script executed:
rg -n 'catalogETLSchema' packages/api/src -A 10Repository: PackRat-AI/PackRat
Length of output: 1694
🏁 Script executed:
rg -n 'queueCatalogETL|CatalogETL' packages/api/src/routes -B 3 -A 3 | head -50Repository: PackRat-AI/PackRat
Length of output: 2577
Add objectKey field to etlJobs table to persist the original R2 key on first queue.
The retry endpoint reconstructs the key as v2/${source}/${filename}, but that assumes all uploads follow this prefix shape. Since the initial ETL queueing accepts arbitrary chunks (object keys) from the client and never stores them, the retry has no way to know what the original keys were. If an upload used a different naming convention or path structure, the retry silently queues non-existent keys and returns 200—masking the failure.
Either: (a) add an objectKey column to etlJobs and persist the full keys during initial queue, then retrieve and re-enqueue them during retry, or (b) validate the reconstructed key exists in R2 before enqueueing and return 404 if missing.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/api/src/routes/admin/analytics/catalog.ts` at line 421, Persist the
original R2 key instead of reconstructing it: add an objectKey column to the
etlJobs table and, in the code path that creates/queues the initial ETL job
(where original and chunks are accepted and etlJobs rows are inserted), store
the full object key (use the existing original or chunks value) into
etlJobs.objectKey; then update the retry logic (which currently builds
`v2/${source}/${filename}`) to read and re-enqueue using etlJobs.objectKey.
Alternatively, if you prefer not to alter schema, change the retry handler to
call R2 to validate the reconstructed key before enqueueing and return 404 if
the R2 object does not exist; ensure the variable objectKey and the etlJobs
lookup are updated accordingly so retries use the persisted key or validated
key.
There was a problem hiding this comment.
Pull request overview
This PR improves the admin experience and operability by aligning admin TypeBox schemas with actual API payloads, replacing hand-written admin client interfaces with Static<>-derived types, and adding new admin endpoints to inspect and remediate ETL failures.
Changes:
- Updated admin list item schemas (users, packs, catalog) to match returned columns and nullability.
- Added ETL admin ops endpoints for failure aggregation, per-job drilldowns, stuck-job resets, and job retries.
- Updated the admin app API client types to derive from shared schemas, and adjusted catalog edit form handling for nullable
weightUnit.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| packages/api/src/schemas/admin.ts | Expands/adjusts admin list item schemas for users/packs/catalog. |
| packages/api/src/routes/admin/analytics/catalog.ts | Adds ETL failure summary/drilldown endpoints plus reset-stuck and retry operations. |
| apps/admin/lib/api.ts | Replaces local interfaces/casts with Static<typeof ...Schema> derived types and removes unsafe casts. |
| apps/admin/components/edit-catalog-dialog.tsx | Handles nullable weightUnit more safely in form submit/defaults. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| export const AdminUserItemSchema = t.Object({ | ||
| id: t.Number(), | ||
| email: t.String(), | ||
| firstName: t.Nullable(t.String()), | ||
| lastName: t.Nullable(t.String()), |
| const samples = await db | ||
| .select() | ||
| .from(invalidItemLogs) | ||
| .where(eq(invalidItemLogs.jobId, params.jobId)) | ||
| .orderBy(invalidItemLogs.rowIndex) | ||
| .limit(limit); |
| const newJobId = crypto.randomUUID(); | ||
| const objectKey = `v2/${original.source}/${original.filename}`; | ||
| const env = getEnv(); | ||
|
|
||
| if (!env.ETL_QUEUE) return status(400, { error: 'ETL_QUEUE is not configured' }); | ||
|
|
||
| await db.insert(etlJobs).values({ | ||
| id: newJobId, | ||
| status: 'running', | ||
| source: original.source, | ||
| filename: original.filename, | ||
| scraperRevision: original.scraperRevision, | ||
| startedAt: new Date(), | ||
| }); | ||
|
|
||
| await queueCatalogETL({ queue: env.ETL_QUEUE, objectKeys: [objectKey], jobId: newJobId }); | ||
|
|
Summary
CORS fix: admin scoped CORS was silently dropping
Access-Control-Allow-Originon preflight — two stacked CORS plugins conflicted (root usescredentials:false/*, admin usescredentials:true/specific-origin). Switched to origin-function so Elysia reflects the exact origin back. Also bypasses auth guard forOPTIONSpreflights. Fixeshttps://admin.packratai.comCORS errors.TypeBox schema alignment: expanded all 3 admin list schemas (
AdminCatalogItemSchema,AdminUserItemSchema,AdminPackItemSchema) to match the exact columns each route returns. All hand-written TypeScript interfaces inapps/admin/lib/api.tsreplaced withStatic<>derived types — removes 3as unknown ascasts.ETL admin operations (4 new routes on
/api/admin/analytics/catalog):GET /etl/failure-summary— global top validation errors via jsonb unnest aggregationGET /etl/:jobId/failures— per-job error breakdown + raw sample rowsPOST /etl/reset-stuck— marks running jobs stuck >30 min as failedPOST /etl/:jobId/retry— clones job record and re-queues original file to Cloudflare QueueMigration 0047: regenerated nullable
weight/weight_unitmigration — main merged 0037–0046 after this branch was cut.Testing
Access-Control-Allow-Originheader on preflight fromadmin.packratai.comStatic<>adoptionPost-Deploy Monitoring & Validation
401 Unauthorizedon admin preflight requests after deploy — indicates OPTIONS bypass not appliedwrangler tail packrat-api --format json | jq 'select(.message | test("admin.*CORS|OPTIONS"))'OPTIONS https://packrat-api.orange-frost-d665.workers.dev/api/admin/statsfromadmin.packratai.comorigin — expectAccess-Control-Allow-Origin: https://admin.packratai.comin response🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes