-
Notifications
You must be signed in to change notification settings - Fork 38
fix(admin,etl): CORS preflight fix + ETL retry/failure ops + schema alignment #2409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,13 +1,15 @@ | ||
| import { createDb } from '@packrat/api/db'; | ||
| import { catalogItems, etlJobs } from '@packrat/api/db/schema'; | ||
| import { catalogItems, etlJobs, invalidItemLogs } from '@packrat/api/db/schema'; | ||
| import { | ||
| AdminErrorResponses, | ||
| BrandRowSchema, | ||
| CatalogOverviewSchema, | ||
| EtlResponseSchema, | ||
| PriceBucketSchema, | ||
| } from '@packrat/api/schemas/admin'; | ||
| import { and, avg, count, desc, gt, isNotNull, max, min, sql } from 'drizzle-orm'; | ||
| import { queueCatalogETL } from '@packrat/api/services/etl/queue'; | ||
| import { getEnv } from '@packrat/api/utils/env-validation'; | ||
| import { and, avg, count, desc, eq, gt, isNotNull, lt, max, min, sql } from 'drizzle-orm'; | ||
| import { Elysia, status, t } from 'elysia'; | ||
| import { z } from 'zod'; | ||
|
|
||
|
|
@@ -257,4 +259,189 @@ export const catalogAnalyticsRoutes = new Elysia({ prefix: '/catalog' }) | |
| } | ||
| }, | ||
| { detail: { tags: ['Admin'], summary: 'Embedding coverage' } }, | ||
| ) | ||
|
|
||
| // ─── ETL failure summary ────────────────────────────────────────────────────── | ||
|
|
||
| .get( | ||
| '/etl/failure-summary', | ||
| async ({ query }) => { | ||
| const db = createDb(); | ||
| const { limit = 20 } = query; | ||
|
|
||
| try { | ||
| const rows = await db.execute<{ field: string; reason: string; count: number }>( | ||
| sql` | ||
| SELECT | ||
| err->>'field' AS field, | ||
| err->>'reason' AS reason, | ||
| COUNT(*)::int AS count | ||
| FROM ${invalidItemLogs}, | ||
| jsonb_array_elements(${invalidItemLogs.errors}) AS err | ||
| GROUP BY err->>'field', err->>'reason' | ||
| ORDER BY count DESC | ||
| LIMIT ${limit} | ||
| `, | ||
| ); | ||
|
|
||
| const [total] = await db.select({ n: count() }).from(invalidItemLogs); | ||
|
|
||
| return { | ||
| topErrors: rows.rows.map((r) => ({ | ||
| field: r.field, | ||
| reason: r.reason, | ||
| count: r.count, | ||
| })), | ||
| totalInvalidItems: total?.n ?? 0, | ||
| }; | ||
| } catch (error) { | ||
| console.error('ETL failure summary error:', error); | ||
| return status(500, { | ||
| error: 'Failed to fetch failure summary', | ||
| code: 'ETL_FAILURE_SUMMARY_ERROR', | ||
| }); | ||
| } | ||
| }, | ||
| { | ||
| query: z.object({ | ||
| limit: z.coerce.number().int().min(1).max(100).optional().default(20), | ||
| }), | ||
| detail: { tags: ['Admin'], summary: 'Top ETL validation failure patterns' }, | ||
| }, | ||
| ) | ||
|
|
||
| // ─── Per-job failure drill-down ─────────────────────────────────────────────── | ||
|
|
||
| .get( | ||
| '/etl/:jobId/failures', | ||
| async ({ params, query }) => { | ||
| const db = createDb(); | ||
| const { limit = 50 } = query; | ||
|
|
||
| try { | ||
| const samples = await db | ||
| .select() | ||
| .from(invalidItemLogs) | ||
| .where(eq(invalidItemLogs.jobId, params.jobId)) | ||
| .orderBy(invalidItemLogs.rowIndex) | ||
| .limit(limit); | ||
|
Comment on lines
+322
to
+327
|
||
|
|
||
| const breakdown = await db.execute<{ field: string; reason: string; count: number }>( | ||
| sql` | ||
| SELECT | ||
| err->>'field' AS field, | ||
| err->>'reason' AS reason, | ||
| COUNT(*)::int AS count | ||
| FROM ${invalidItemLogs}, | ||
| jsonb_array_elements(${invalidItemLogs.errors}) AS err | ||
| WHERE ${invalidItemLogs.jobId} = ${params.jobId} | ||
| GROUP BY err->>'field', err->>'reason' | ||
| ORDER BY count DESC | ||
| `, | ||
| ); | ||
|
Comment on lines
+322
to
+341
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial | ⚡ Quick win Parallelize the two job-scoped queries. Same shape as the previous endpoint — 🤖 Prompt for AI Agents |
||
|
|
||
| return { | ||
| jobId: params.jobId, | ||
| errorBreakdown: breakdown.rows.map((r) => ({ | ||
| field: r.field, | ||
| reason: r.reason, | ||
| count: r.count, | ||
| })), | ||
| samples: samples.map((s) => ({ | ||
| rowIndex: s.rowIndex, | ||
| errors: s.errors, | ||
| rawData: s.rawData, | ||
| })), | ||
| totalShown: samples.length, | ||
| }; | ||
| } catch (error) { | ||
| console.error('ETL job failures error:', error); | ||
| return status(500, { | ||
| error: 'Failed to fetch job failures', | ||
| code: 'ETL_JOB_FAILURES_ERROR', | ||
| }); | ||
| } | ||
| }, | ||
| { | ||
| params: z.object({ jobId: z.string() }), | ||
| query: z.object({ | ||
| limit: z.coerce.number().int().min(1).max(200).optional().default(50), | ||
| }), | ||
| detail: { tags: ['Admin'], summary: 'Validation failures for a specific ETL job' }, | ||
| }, | ||
| ) | ||
|
|
||
| // ─── Reset stuck jobs ───────────────────────────────────────────────────────── | ||
|
|
||
| .post( | ||
| '/etl/reset-stuck', | ||
| async () => { | ||
| const db = createDb(); | ||
|
|
||
| try { | ||
| // Jobs stuck in 'running' for more than 30 minutes are considered stalled | ||
| const stuckCutoff = new Date(Date.now() - 30 * 60 * 1000); | ||
|
|
||
| const reset = await db | ||
| .update(etlJobs) | ||
| .set({ status: 'failed', completedAt: new Date() }) | ||
| .where(and(eq(etlJobs.status, 'running'), lt(etlJobs.startedAt, stuckCutoff))) | ||
| .returning(); | ||
|
|
||
| return { reset: reset.length, ids: reset.map((r) => r.id) }; | ||
| } catch (error) { | ||
| console.error('ETL reset stuck error:', error); | ||
| return status(500, { error: 'Failed to reset stuck jobs', code: 'ETL_RESET_STUCK_ERROR' }); | ||
| } | ||
| }, | ||
| { detail: { tags: ['Admin'], summary: 'Mark stuck running ETL jobs as failed' } }, | ||
| ) | ||
|
Comment on lines
+376
to
+398
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Find the ETL queue consumer and check how it updates etlJobs status.
fd -t f -e ts -e js | xargs rg -nP -C5 '\betlJobs\b' -g '!**/admin/**' 2>/dev/null
echo '---'
rg -nP -C3 "queueCatalogETL|ETL_QUEUE" --type=tsRepository: PackRat-AI/PackRat Length of output: 34809 🏁 Script executed: # Check processCatalogEtl function entry and queue batch processor for status validation
rg -A10 'async function processCatalogETL' packages/api/src/services/etl/processCatalogEtl.ts
rg -A15 'export async function processQueueBatch' packages/api/src/services/etl/queue.tsRepository: PackRat-AI/PackRat Length of output: 710 The ETL worker does not validate job status before processing or during writes — creating TOCTOU and race-condition risks. The worker (
Additionally, the 30-minute stuck threshold is hardcoded in the endpoint; make it a query parameter (with a sane minimum) or environment variable so operators can adjust detection without redeploying. 🤖 Prompt for AI Agents |
||
|
|
||
| // ─── Retry a failed job ─────────────────────────────────────────────────────── | ||
|
|
||
| .post( | ||
| '/etl/:jobId/retry', | ||
| async ({ params }) => { | ||
| const db = createDb(); | ||
|
|
||
| try { | ||
| const [original] = await db | ||
| .select() | ||
| .from(etlJobs) | ||
| .where(eq(etlJobs.id, params.jobId)) | ||
| .limit(1); | ||
|
|
||
| if (!original) return status(404, { error: 'ETL job not found' }); | ||
| if (original.status === 'running') | ||
| return status(409, { | ||
| error: 'Job is still running — wait for it to complete or reset stuck jobs first', | ||
| }); | ||
|
|
||
| const newJobId = crypto.randomUUID(); | ||
| const objectKey = `v2/${original.source}/${original.filename}`; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Confirm whether etlJobs schema already stores an object key, and where the original v2/<source>/<filename> convention is established.
fd -t f -e ts | xargs rg -nP -C3 'etlJobs\s*=|object[Kk]ey|v2/\$\{' 2>/dev/null | head -200Repository: PackRat-AI/PackRat Length of output: 9305 🏁 Script executed: sed -n '497,530p' packages/api/src/db/schema.tsRepository: PackRat-AI/PackRat Length of output: 1139 🏁 Script executed: rg -n 'etlJobs.*insert|db\.insert.*etlJobs' packages/api/src -A 5 -B 2Repository: PackRat-AI/PackRat Length of output: 1365 🏁 Script executed: rg -n 'objectKey.*=|fileName.*=' packages/api/src/routes/upload.ts -B 5 -A 2Repository: PackRat-AI/PackRat Length of output: 362 🏁 Script executed: sed -n '150,190p' packages/api/src/routes/catalog/index.tsRepository: PackRat-AI/PackRat Length of output: 959 🏁 Script executed: sed -n '19,100p' packages/api/src/routes/upload.tsRepository: PackRat-AI/PackRat Length of output: 1978 🏁 Script executed: rg -n 'chunks.*=|objectKey.*=' packages/admin/lib -A 2 -B 2Repository: PackRat-AI/PackRat Length of output: 121 🏁 Script executed: sed -n '140,200p' packages/api/src/routes/catalog/index.tsRepository: PackRat-AI/PackRat Length of output: 1573 🏁 Script executed: rg -n 'v2/' packages/api --type ts | head -20Repository: PackRat-AI/PackRat Length of output: 287 🏁 Script executed: rg -n 'catalogETLSchema' packages/api/src -A 10Repository: PackRat-AI/PackRat Length of output: 1694 🏁 Script executed: rg -n 'queueCatalogETL|CatalogETL' packages/api/src/routes -B 3 -A 3 | head -50Repository: PackRat-AI/PackRat Length of output: 2577 Add The retry endpoint reconstructs the key as Either: (a) add an 🤖 Prompt for AI Agents |
||
| const env = getEnv(); | ||
|
|
||
| if (!env.ETL_QUEUE) return status(400, { error: 'ETL_QUEUE is not configured' }); | ||
|
|
||
| await db.insert(etlJobs).values({ | ||
| id: newJobId, | ||
| status: 'running', | ||
| source: original.source, | ||
| filename: original.filename, | ||
| scraperRevision: original.scraperRevision, | ||
| startedAt: new Date(), | ||
| }); | ||
|
Comment on lines
+408
to
+433
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Retry accepts any non-running status and can orphan a Two correctness problems in this handler:
Also worth noting: there's no idempotency on retries — clicking the button twice creates two parallel runs of the same file. A 🛡️ Proposed fix- if (!original) return status(404, { error: 'ETL job not found' });
- if (original.status === 'running')
- return status(409, {
- error: 'Job is still running — wait for it to complete or reset stuck jobs first',
- });
+ if (!original) return status(404, { error: 'ETL job not found' });
+ if (original.status !== 'failed') {
+ return status(409, {
+ error: `Only failed jobs can be retried (current status: ${original.status})`,
+ });
+ }
const newJobId = crypto.randomUUID();
const objectKey = `v2/${original.source}/${original.filename}`;
const env = getEnv();
if (!env.ETL_QUEUE) return status(400, { error: 'ETL_QUEUE is not configured' });
await db.insert(etlJobs).values({
id: newJobId,
status: 'running',
source: original.source,
filename: original.filename,
scraperRevision: original.scraperRevision,
startedAt: new Date(),
});
-
- await queueCatalogETL({ queue: env.ETL_QUEUE, objectKeys: [objectKey], jobId: newJobId });
+
+ try {
+ await queueCatalogETL({
+ queue: env.ETL_QUEUE,
+ objectKeys: [objectKey],
+ jobId: newJobId,
+ });
+ } catch (enqueueErr) {
+ await db
+ .update(etlJobs)
+ .set({ status: 'failed', completedAt: new Date() })
+ .where(eq(etlJobs.id, newJobId));
+ throw enqueueErr;
+ }🤖 Prompt for AI Agents |
||
|
|
||
| await queueCatalogETL({ queue: env.ETL_QUEUE, objectKeys: [objectKey], jobId: newJobId }); | ||
|
|
||
|
Comment on lines
+420
to
+436
|
||
| return { success: true, newJobId, objectKey }; | ||
| } catch (error) { | ||
| console.error('ETL retry error:', error); | ||
| return status(500, { error: 'Failed to retry ETL job', code: 'ETL_RETRY_ERROR' }); | ||
| } | ||
| }, | ||
| { | ||
| params: z.object({ jobId: z.string() }), | ||
| detail: { tags: ['Admin'], summary: 'Retry a failed ETL job' }, | ||
| }, | ||
|
Comment on lines
+264
to
+446
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win No Every other route in this file declares 🤖 Prompt for AI Agents |
||
| ); | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Run aggregation and count in parallel.
The
db.executeanddb.selectare independent and currently run sequentially, doubling latency for what's essentially a dashboard read. Wrap them inPromise.all. Also note the rest of this file consistently attaches aresponseTypeBox schema (see/overview,/brands,/etl) — this endpoint and the three below ship without one, so OpenAPI/Eden Treaty consumers getunknownpayloads.♻️ Proposed parallelization
🤖 Prompt for AI Agents