diff --git a/packages/api/src/services/etl/processCatalogEtl.ts b/packages/api/src/services/etl/processCatalogEtl.ts index 7c3ee865d4..e9a29529cb 100644 --- a/packages/api/src/services/etl/processCatalogEtl.ts +++ b/packages/api/src/services/etl/processCatalogEtl.ts @@ -130,9 +130,22 @@ export async function processCatalogETL({ console.log(`🔍 [TRACE] Streaming complete - processing remaining batches`); - // Flush remaining items after the stream ends - const remainingItems = validItemsBatch.length + invalidItemsBatch.length; + // Flush remaining items BEFORE updating totalProcessed so that if a flush throws, + // totalProcessed isn't inflated while valid/invalid counts stay null. + const remainingValid = validItemsBatch.length; + const remainingInvalid = invalidItemsBatch.length; + + if (remainingValid > 0) { + console.log(`🔍 [TRACE] Processing valid items batch - size: ${remainingValid}`); + await processValidItemsBatch({ jobId, items: validItemsBatch, env }); + } + + if (remainingInvalid > 0) { + console.log(`🔍 [TRACE] Processing invalid items batch - size: ${remainingInvalid}`); + await processLogsBatch({ jobId, logs: invalidItemsBatch, env }); + } + const remainingItems = remainingValid + remainingInvalid; if (remainingItems > 0) { await db .update(etlJobs) @@ -140,32 +153,16 @@ export async function processCatalogETL({ .where(eq(etlJobs.id, jobId)); } - if (validItemsBatch.length > 0) { - console.log(`🔍 [TRACE] Processing valid items batch - size: ${validItemsBatch.length}`); - await processValidItemsBatch({ - jobId, - items: validItemsBatch, - env, - }); - } - - if (invalidItemsBatch.length > 0) { - console.log(`🔍 [TRACE] Processing invalid items batch - size: ${invalidItemsBatch.length}`); - await processLogsBatch({ - jobId, - logs: invalidItemsBatch, - env, - }); - } - const totalRows = rowIndex; - // Use raw SQL to avoid neon-http enum serialization issues with Drizzle ORM. + // Mark completed using Drizzle ORM (same as the failed path below) — avoids the + // silent failure that ::etl_job_status raw SQL casts produced in some Neon HTTP driver versions. // Isolated try-catch so a transient DB hiccup here doesn't cascade to status='failed'. try { - await db.execute( - sql`UPDATE etl_jobs SET status = 'completed'::etl_job_status, completed_at = NOW() WHERE id = ${jobId}`, - ); + await db + .update(etlJobs) + .set({ status: 'completed', completedAt: new Date() }) + .where(eq(etlJobs.id, jobId)); } catch (completionErr) { console.error( `[ETL] Failed to mark job ${jobId} completed — will be reset by stuck-job sweep:`,