From b5a461ca2c0c50ecbaedcd4f9a840c3c68e305b5 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Tue, 12 May 2026 11:25:24 -0600 Subject: [PATCH] feat(admin,etl): align TypeBox schemas with route columns + add ETL admin ops - Expand AdminCatalogItemSchema to all 24 columns the route returns - Fix AdminUserItemSchema/AdminPackItemSchema to match SELECT outputs - Derive AdminUser/AdminPack/AdminCatalogItem via Static<> (drop hand-written interfaces) - Remove 3 `as unknown as PaginatedResponse` casts in admin API client - Fix edit-catalog-dialog.tsx for nullable weightUnit ripple ETL admin routes (catalog analytics): - GET /etl/failure-summary: jsonb unnest aggregation of top validation errors - GET /etl/:jobId/failures: per-job error breakdown + raw samples - POST /etl/reset-stuck: marks running jobs stuck >30min as failed - POST /etl/:jobId/retry: clones job record and re-queues file to Cloudflare Queue --- apps/admin/components/edit-catalog-dialog.tsx | 4 +- apps/admin/lib/api.ts | 62 +----- .../api/src/routes/admin/analytics/catalog.ts | 191 +++++++++++++++++- packages/api/src/schemas/admin.ts | 30 ++- 4 files changed, 224 insertions(+), 63 deletions(-) diff --git a/apps/admin/components/edit-catalog-dialog.tsx b/apps/admin/components/edit-catalog-dialog.tsx index e03ff86c5f..7019e07d03 100644 --- a/apps/admin/components/edit-catalog-dialog.tsx +++ b/apps/admin/components/edit-catalog-dialog.tsx @@ -49,7 +49,7 @@ export function EditCatalogDialog({ item }: EditCatalogDialogProps) { : null; const weightRaw = fd.get('weight')?.toString().trim(); const weight = weightRaw ? Number(weightRaw) : undefined; - const weightUnit = fd.get('weightUnit')?.toString().trim() || item.weightUnit; + const weightUnit = fd.get('weightUnit')?.toString().trim() || item.weightUnit || undefined; const priceRaw = fd.get('price')?.toString().trim(); const price = priceRaw ? Number(priceRaw) : null; @@ -108,7 +108,7 @@ export function EditCatalogDialog({ item }: EditCatalogDialogProps) { diff --git a/apps/admin/lib/api.ts b/apps/admin/lib/api.ts index 040075d7dd..ed81f6879c 100644 --- a/apps/admin/lib/api.ts +++ b/apps/admin/lib/api.ts @@ -3,6 +3,9 @@ import type { App } from '@packrat/api'; import type { ActiveUsersSchema, ActivityPointSchema, + AdminCatalogItemSchema, + AdminPackItemSchema, + AdminUserItemSchema, BrandRowSchema, BreakdownItemSchema, CatalogOverviewSchema, @@ -69,17 +72,7 @@ export async function getStats(): Promise { // ─── Users ──────────────────────────────────────────────────────────────────── -export interface AdminUser { - id: number; - email: string; - firstName: string | null; - lastName: string | null; - role: string | null; - emailVerified: boolean | null; - avatarUrl: string | null; - createdAt: string | null; - updatedAt: string | null; -} +export type AdminUser = Static; export interface PaginatedResponse { data: T[]; @@ -103,7 +96,7 @@ export async function getUsers({ query: { limit, offset, q, includeDeleted: includeDeleted ? 'true' : undefined }, }); if (error) throwOnError(error); - return unwrap(data, 'users') as unknown as PaginatedResponse; // safe-cast: Eden Treaty infers wide union; runtime shape matches PaginatedResponse + return unwrap(data, 'users'); } export async function deleteUser(id: number): Promise<{ success: boolean }> { @@ -129,19 +122,7 @@ export async function restoreUser(id: number): Promise<{ success: boolean }> { // ─── Packs ──────────────────────────────────────────────────────────────────── -export interface AdminPack { - id: string; - name: string; - description: string | null; - category: string; - isPublic: boolean | null; - isAIGenerated: boolean; - tags: string[] | null; - image: string | null; - createdAt: string | null; - updatedAt: string | null; - userEmail: string | null; -} +export type AdminPack = Static; export async function getPacks({ limit = 100, @@ -158,7 +139,7 @@ export async function getPacks({ query: { limit, offset, q, includeDeleted: includeDeleted ? 'true' : undefined }, }); if (error) throwOnError(error); - return unwrap(data, 'packs') as unknown as PaginatedResponse; // safe-cast: Eden Treaty infers wide union; runtime shape matches PaginatedResponse + return unwrap(data, 'packs'); } export async function deletePack(id: string): Promise<{ success: boolean }> { @@ -169,32 +150,7 @@ export async function deletePack(id: string): Promise<{ success: boolean }> { // ─── Catalog Items ──────────────────────────────────────────────────────────── -export interface AdminCatalogItem { - id: number; - name: string; - description: string | null; - categories: string[] | null; - brand: string | null; - model: string | null; - sku: string | null; - price: number | null; - currency: string | null; - weight: number | null; - weightUnit: string; - availability: string | null; - ratingValue: number | null; - reviewCount: number | null; - color: string | null; - size: string | null; - material: string | null; - seller: string | null; - productUrl: string | null; - images: string[] | null; - variants: Array<{ attribute: string; values: string[] }> | null; - techs: Record | null; - links: Array<{ title: string; url: string }> | null; - createdAt: string | null; -} +export type AdminCatalogItem = Static; export interface UpdateCatalogItemInput { name?: string; @@ -219,7 +175,7 @@ export async function getCatalogItems({ query: { limit, offset, q }, }); if (error) throwOnError(error); - return unwrap(data, 'catalog') as unknown as PaginatedResponse; // safe-cast: Eden Treaty infers wide union; runtime shape matches PaginatedResponse + return unwrap(data, 'catalog'); } export async function deleteCatalogItem(id: number): Promise<{ success: boolean }> { diff --git a/packages/api/src/routes/admin/analytics/catalog.ts b/packages/api/src/routes/admin/analytics/catalog.ts index 44f9bf71d5..936f5a3eae 100644 --- a/packages/api/src/routes/admin/analytics/catalog.ts +++ b/packages/api/src/routes/admin/analytics/catalog.ts @@ -1,5 +1,5 @@ import { createDb } from '@packrat/api/db'; -import { catalogItems, etlJobs } from '@packrat/api/db/schema'; +import { catalogItems, etlJobs, invalidItemLogs } from '@packrat/api/db/schema'; import { AdminErrorResponses, BrandRowSchema, @@ -7,7 +7,9 @@ import { EtlResponseSchema, PriceBucketSchema, } from '@packrat/api/schemas/admin'; -import { and, avg, count, desc, gt, isNotNull, max, min, sql } from 'drizzle-orm'; +import { queueCatalogETL } from '@packrat/api/services/etl/queue'; +import { getEnv } from '@packrat/api/utils/env-validation'; +import { and, avg, count, desc, eq, gt, isNotNull, lt, max, min, sql } from 'drizzle-orm'; import { Elysia, status, t } from 'elysia'; import { z } from 'zod'; @@ -257,4 +259,189 @@ export const catalogAnalyticsRoutes = new Elysia({ prefix: '/catalog' }) } }, { detail: { tags: ['Admin'], summary: 'Embedding coverage' } }, + ) + + // ─── ETL failure summary ────────────────────────────────────────────────────── + + .get( + '/etl/failure-summary', + async ({ query }) => { + const db = createDb(); + const { limit = 20 } = query; + + try { + const rows = await db.execute<{ field: string; reason: string; count: number }>( + sql` + SELECT + err->>'field' AS field, + err->>'reason' AS reason, + COUNT(*)::int AS count + FROM ${invalidItemLogs}, + jsonb_array_elements(${invalidItemLogs.errors}) AS err + GROUP BY err->>'field', err->>'reason' + ORDER BY count DESC + LIMIT ${limit} + `, + ); + + const [total] = await db.select({ n: count() }).from(invalidItemLogs); + + return { + topErrors: rows.rows.map((r) => ({ + field: r.field, + reason: r.reason, + count: r.count, + })), + totalInvalidItems: total?.n ?? 0, + }; + } catch (error) { + console.error('ETL failure summary error:', error); + return status(500, { + error: 'Failed to fetch failure summary', + code: 'ETL_FAILURE_SUMMARY_ERROR', + }); + } + }, + { + query: z.object({ + limit: z.coerce.number().int().min(1).max(100).optional().default(20), + }), + detail: { tags: ['Admin'], summary: 'Top ETL validation failure patterns' }, + }, + ) + + // ─── Per-job failure drill-down ─────────────────────────────────────────────── + + .get( + '/etl/:jobId/failures', + async ({ params, query }) => { + const db = createDb(); + const { limit = 50 } = query; + + try { + const samples = await db + .select() + .from(invalidItemLogs) + .where(eq(invalidItemLogs.jobId, params.jobId)) + .orderBy(invalidItemLogs.rowIndex) + .limit(limit); + + const breakdown = await db.execute<{ field: string; reason: string; count: number }>( + sql` + SELECT + err->>'field' AS field, + err->>'reason' AS reason, + COUNT(*)::int AS count + FROM ${invalidItemLogs}, + jsonb_array_elements(${invalidItemLogs.errors}) AS err + WHERE ${invalidItemLogs.jobId} = ${params.jobId} + GROUP BY err->>'field', err->>'reason' + ORDER BY count DESC + `, + ); + + return { + jobId: params.jobId, + errorBreakdown: breakdown.rows.map((r) => ({ + field: r.field, + reason: r.reason, + count: r.count, + })), + samples: samples.map((s) => ({ + rowIndex: s.rowIndex, + errors: s.errors, + rawData: s.rawData, + })), + totalShown: samples.length, + }; + } catch (error) { + console.error('ETL job failures error:', error); + return status(500, { + error: 'Failed to fetch job failures', + code: 'ETL_JOB_FAILURES_ERROR', + }); + } + }, + { + params: z.object({ jobId: z.string() }), + query: z.object({ + limit: z.coerce.number().int().min(1).max(200).optional().default(50), + }), + detail: { tags: ['Admin'], summary: 'Validation failures for a specific ETL job' }, + }, + ) + + // ─── Reset stuck jobs ───────────────────────────────────────────────────────── + + .post( + '/etl/reset-stuck', + async () => { + const db = createDb(); + + try { + // Jobs stuck in 'running' for more than 30 minutes are considered stalled + const stuckCutoff = new Date(Date.now() - 30 * 60 * 1000); + + const reset = await db + .update(etlJobs) + .set({ status: 'failed', completedAt: new Date() }) + .where(and(eq(etlJobs.status, 'running'), lt(etlJobs.startedAt, stuckCutoff))) + .returning(); + + return { reset: reset.length, ids: reset.map((r) => r.id) }; + } catch (error) { + console.error('ETL reset stuck error:', error); + return status(500, { error: 'Failed to reset stuck jobs', code: 'ETL_RESET_STUCK_ERROR' }); + } + }, + { detail: { tags: ['Admin'], summary: 'Mark stuck running ETL jobs as failed' } }, + ) + + // ─── Retry a failed job ─────────────────────────────────────────────────────── + + .post( + '/etl/:jobId/retry', + async ({ params }) => { + const db = createDb(); + + try { + const [original] = await db + .select() + .from(etlJobs) + .where(eq(etlJobs.id, params.jobId)) + .limit(1); + + if (!original) return status(404, { error: 'ETL job not found' }); + if (original.status === 'running') + return status(409, { + error: 'Job is still running — wait for it to complete or reset stuck jobs first', + }); + + const newJobId = crypto.randomUUID(); + const objectKey = `v2/${original.source}/${original.filename}`; + const env = getEnv(); + + if (!env.ETL_QUEUE) return status(400, { error: 'ETL_QUEUE is not configured' }); + + await db.insert(etlJobs).values({ + id: newJobId, + status: 'running', + source: original.source, + filename: original.filename, + scraperRevision: original.scraperRevision, + startedAt: new Date(), + }); + + await queueCatalogETL({ queue: env.ETL_QUEUE, objectKeys: [objectKey], jobId: newJobId }); + + return { success: true, newJobId, objectKey }; + } catch (error) { + console.error('ETL retry error:', error); + return status(500, { error: 'Failed to retry ETL job', code: 'ETL_RETRY_ERROR' }); + } + }, + { + params: z.object({ jobId: z.string() }), + detail: { tags: ['Admin'], summary: 'Retry a failed ETL job' }, + }, ); diff --git a/packages/api/src/schemas/admin.ts b/packages/api/src/schemas/admin.ts index 2107ba50d3..bdd1b61fd0 100644 --- a/packages/api/src/schemas/admin.ts +++ b/packages/api/src/schemas/admin.ts @@ -33,9 +33,9 @@ export const AdminUserItemSchema = t.Object({ lastName: t.Nullable(t.String()), role: t.Nullable(t.String()), emailVerified: t.Nullable(t.Boolean()), + avatarUrl: t.Nullable(t.String()), createdAt: t.Nullable(t.String()), - lastActiveAt: t.Nullable(t.String()), - deletedAt: t.Nullable(t.String()), + updatedAt: t.Nullable(t.String()), }); // ─── Packs ──────────────────────────────────────────────────────────────────── @@ -46,9 +46,11 @@ export const AdminPackItemSchema = t.Object({ description: t.Nullable(t.String()), category: t.String(), isPublic: t.Nullable(t.Boolean()), - deleted: t.Boolean(), - deletedAt: t.Nullable(t.String()), + isAIGenerated: t.Boolean(), + tags: t.Nullable(t.Array(t.String())), + image: t.Nullable(t.String()), createdAt: t.Nullable(t.String()), + updatedAt: t.Nullable(t.String()), userEmail: t.Nullable(t.String()), }); @@ -57,11 +59,27 @@ export const AdminPackItemSchema = t.Object({ export const AdminCatalogItemSchema = t.Object({ id: t.Number(), name: t.String(), + description: t.Nullable(t.String()), categories: t.Nullable(t.Array(t.String())), brand: t.Nullable(t.String()), + model: t.Nullable(t.String()), + sku: t.String(), price: t.Nullable(t.Number()), - weight: t.Number(), - weightUnit: t.String(), + currency: t.Nullable(t.String()), + weight: t.Nullable(t.Number()), + weightUnit: t.Nullable(t.String()), + availability: t.Nullable(t.String()), + ratingValue: t.Nullable(t.Number()), + reviewCount: t.Nullable(t.Number()), + color: t.Nullable(t.String()), + size: t.Nullable(t.String()), + material: t.Nullable(t.String()), + seller: t.Nullable(t.String()), + productUrl: t.String(), + images: t.Nullable(t.Array(t.String())), + variants: t.Nullable(t.Array(t.Object({ attribute: t.String(), values: t.Array(t.String()) }))), + techs: t.Nullable(t.Record(t.String(), t.String())), + links: t.Nullable(t.Array(t.Object({ title: t.String(), url: t.String() }))), createdAt: t.Nullable(t.String()), });