From c45b9e7d80f536317d45ec3a9e444beb7f242240 Mon Sep 17 00:00:00 2001 From: Andrew Bierman Date: Tue, 12 May 2026 23:53:48 -0600 Subject: [PATCH] fix(etl): flush remaining items before updating totalProcessed + use ORM for completion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs caused phantom failures and inflated totalProcessed counts: 1. Remaining-items ordering: totalProcessed was incremented *before* the final valid/invalid flushes. If processValidItemsBatch or processLogsBatch threw, totalProcessed showed an inflated count while totalValid/totalInvalid stayed null — making job records misleading and harder to diagnose on retry. Fixed by flushing remaining items first, then updating totalProcessed. 2. Silent completion failure: the final status='completed' update used raw SQL with an ::etl_job_status cast that silently failed in some Neon HTTP driver versions. The inner try-catch swallowed the error, leaving the job in 'running' state so the reset-stuck sweep would mark it 'failed' even though all items were successfully ingested. Fixed by using the same db.update().set({ status: 'completed' }) pattern that already works reliably for the 'failed' path in the outer catch. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../api/src/services/etl/processCatalogEtl.ts | 45 +++++++++---------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/packages/api/src/services/etl/processCatalogEtl.ts b/packages/api/src/services/etl/processCatalogEtl.ts index 7c3ee865d4..e9a29529cb 100644 --- a/packages/api/src/services/etl/processCatalogEtl.ts +++ b/packages/api/src/services/etl/processCatalogEtl.ts @@ -130,9 +130,22 @@ export async function processCatalogETL({ console.log(`🔍 [TRACE] Streaming complete - processing remaining batches`); - // Flush remaining items after the stream ends - const remainingItems = validItemsBatch.length + invalidItemsBatch.length; + // Flush remaining items BEFORE updating totalProcessed so that if a flush throws, + // totalProcessed isn't inflated while valid/invalid counts stay null. + const remainingValid = validItemsBatch.length; + const remainingInvalid = invalidItemsBatch.length; + + if (remainingValid > 0) { + console.log(`🔍 [TRACE] Processing valid items batch - size: ${remainingValid}`); + await processValidItemsBatch({ jobId, items: validItemsBatch, env }); + } + + if (remainingInvalid > 0) { + console.log(`🔍 [TRACE] Processing invalid items batch - size: ${remainingInvalid}`); + await processLogsBatch({ jobId, logs: invalidItemsBatch, env }); + } + const remainingItems = remainingValid + remainingInvalid; if (remainingItems > 0) { await db .update(etlJobs) @@ -140,32 +153,16 @@ export async function processCatalogETL({ .where(eq(etlJobs.id, jobId)); } - if (validItemsBatch.length > 0) { - console.log(`🔍 [TRACE] Processing valid items batch - size: ${validItemsBatch.length}`); - await processValidItemsBatch({ - jobId, - items: validItemsBatch, - env, - }); - } - - if (invalidItemsBatch.length > 0) { - console.log(`🔍 [TRACE] Processing invalid items batch - size: ${invalidItemsBatch.length}`); - await processLogsBatch({ - jobId, - logs: invalidItemsBatch, - env, - }); - } - const totalRows = rowIndex; - // Use raw SQL to avoid neon-http enum serialization issues with Drizzle ORM. + // Mark completed using Drizzle ORM (same as the failed path below) — avoids the + // silent failure that ::etl_job_status raw SQL casts produced in some Neon HTTP driver versions. // Isolated try-catch so a transient DB hiccup here doesn't cascade to status='failed'. try { - await db.execute( - sql`UPDATE etl_jobs SET status = 'completed'::etl_job_status, completed_at = NOW() WHERE id = ${jobId}`, - ); + await db + .update(etlJobs) + .set({ status: 'completed', completedAt: new Date() }) + .where(eq(etlJobs.id, jobId)); } catch (completionErr) { console.error( `[ETL] Failed to mark job ${jobId} completed — will be reset by stuck-job sweep:`,