Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 10 additions & 28 deletions packages/api/src/services/etl/processCatalogEtl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { etlJobs, type NewCatalogItem, type NewInvalidItemLog } from '@packrat/a
import type { Env } from '@packrat/api/types/env';
import { mapCsvRowToItem } from '@packrat/api/utils/csv-utils';
import { parse } from 'csv-parse';
import { eq, sql } from 'drizzle-orm';
import { eq } from 'drizzle-orm';
import { R2BucketService } from '../r2-bucket';
import { CatalogItemValidator } from './CatalogItemValidator';
import { processLogsBatch } from './processLogsBatch';
Expand Down Expand Up @@ -152,51 +152,33 @@ export async function processCatalogETL({

rowIndex++;

// Flush valid batch to DB every BATCH_SIZE rows to avoid Worker OOM on large files
// Flush valid batch to DB every BATCH_SIZE rows to avoid Worker OOM on large files.
// totalProcessed is incremented atomically inside processValidItemsBatch via updateEtlJobProgress.
if (validItemsBatch.length >= BATCH_SIZE) {
await processValidItemsBatch({ jobId, items: [...validItemsBatch], env });
await db
.update(etlJobs)
.set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${BATCH_SIZE}` })
.where(eq(etlJobs.id, jobId));
validItemsBatch.length = 0;
}
// Flush invalid batch to DB every BATCH_SIZE rows
// Flush invalid batch to DB every BATCH_SIZE rows.
// totalProcessed is incremented atomically inside processLogsBatch via updateEtlJobProgress.
if (invalidItemsBatch.length >= BATCH_SIZE) {
await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env });
await db
.update(etlJobs)
.set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${BATCH_SIZE}` })
.where(eq(etlJobs.id, jobId));
invalidItemsBatch.length = 0;
}
}

console.log(`🔍 [TRACE] Streaming complete - processing remaining batches`);

// Flush remaining items BEFORE updating totalProcessed so that if a flush throws,
// totalProcessed isn't inflated while valid/invalid counts stay null.
const remainingValid = validItemsBatch.length;
const remainingInvalid = invalidItemsBatch.length;

if (remainingValid > 0) {
console.log(`🔍 [TRACE] Processing valid items batch - size: ${remainingValid}`);
// Flush remaining items. totalProcessed is updated atomically inside each batch function.
if (validItemsBatch.length > 0) {
console.log(`🔍 [TRACE] Processing valid items batch - size: ${validItemsBatch.length}`);
await processValidItemsBatch({ jobId, items: validItemsBatch, env });
}

if (remainingInvalid > 0) {
console.log(`🔍 [TRACE] Processing invalid items batch - size: ${remainingInvalid}`);
if (invalidItemsBatch.length > 0) {
console.log(`🔍 [TRACE] Processing invalid items batch - size: ${invalidItemsBatch.length}`);
await processLogsBatch({ jobId, logs: invalidItemsBatch, env });
}

const remainingItems = remainingValid + remainingInvalid;
if (remainingItems > 0) {
await db
.update(etlJobs)
.set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${remainingItems}` })
.where(eq(etlJobs.id, jobId));
}

const totalRows = rowIndex;

// Mark completed using Drizzle ORM (same as the failed path below) — avoids the
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/services/etl/processLogsBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export async function processLogsBatch({
await updateEtlJobProgress(env, {
jobId,
invalid: logs.length,
processed: logs.length,
});

console.log(`📝 Processed and wrote ${logs.length} invalid items for job ${jobId}`);
Expand Down
5 changes: 4 additions & 1 deletion packages/api/src/services/etl/processValidItemsBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ export async function processValidItemsBatch({
const upsertedItems = await catalogService.upsertCatalogItems(itemsWithEmbeddings);
// Track the ETL job that processed these items
await catalogService.trackEtlJob(upsertedItems, jobId);
// Update the ETL job progress
// Update the ETL job progress — processed is incremented atomically with valid to prevent
// totalValid > totalProcessed if the Worker dies between two separate DB updates.
await updateEtlJobProgress(env, {
jobId,
valid: items.length,
processed: items.length,
});
} catch (error) {
console.error(`Error generating embeddings for batch ${jobId}:`, error);
Expand All @@ -55,6 +57,7 @@ export async function processValidItemsBatch({
await updateEtlJobProgress(env, {
jobId,
valid: items.length,
processed: items.length,
});
} finally {
console.log(`📦 Batch ${jobId}: Processed ${items.length} valid items`);
Expand Down
14 changes: 3 additions & 11 deletions packages/api/src/services/etl/updateEtlJobProgress.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,20 @@ import { eq, sql } from 'drizzle-orm';

export async function updateEtlJobProgress(
env: Env,
params: { jobId: string; valid?: number; invalid?: number },
params: { jobId: string; valid?: number; invalid?: number; processed?: number },
): Promise<void> {
const db = createDbClient(env);

const valid = params?.valid ?? 0;
const invalid = params?.invalid ?? 0;
const processed = params?.processed ?? 0;

await db
.update(etlJobs)
.set({
totalValid: sql`COALESCE(${etlJobs.totalValid}, 0) + ${valid}`,
totalInvalid: sql`COALESCE(${etlJobs.totalInvalid}, 0) + ${invalid}`,
status: sql`CASE
WHEN COALESCE(${etlJobs.totalProcessed}, 0) = COALESCE(${etlJobs.totalValid}, 0) + ${valid} + COALESCE(${etlJobs.totalInvalid}, 0) + ${invalid}
THEN 'completed'
ELSE ${etlJobs.status}
END`,
completedAt: sql`CASE
WHEN COALESCE(${etlJobs.totalProcessed}, 0) = COALESCE(${etlJobs.totalValid}, 0) + ${valid} + COALESCE(${etlJobs.totalInvalid}, 0) + ${invalid}
THEN CURRENT_TIMESTAMP
ELSE ${etlJobs.completedAt}
END`,
totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${processed}`,
})
.where(eq(etlJobs.id, params.jobId));
}
14 changes: 14 additions & 0 deletions packages/api/test/etl.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@ describe('processCatalogETL', () => {
expect(job?.totalProcessed).toBe(250);
});

it('totalProcessed never exceeds totalValid + totalInvalid after completion', async () => {
const jobId = crypto.randomUUID();
await insertJob(jobId);
mockR2WithCsv(makeCsv(10));

await processCatalogETL({ message: makeMessage(jobId) as any, env: TEST_ENV });

const job = await getJob(jobId);
const processed = job?.totalProcessed ?? 0;
const valid = job?.totalValid ?? 0;
const invalid = job?.totalInvalid ?? 0;
expect(processed).toBe(valid + invalid);
});

it('marks job as completed even when items have no weight', async () => {
const jobId = crypto.randomUUID();
await insertJob(jobId);
Expand Down
Loading