Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/expo/app/(app)/current-pack/[id].tsx
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ export default function CurrentPackScreen() {
</Text>
<Text variant="subhead" className="mt-1 text-muted-foreground">
{t('packs.lastUpdated', {
time: getRelativeTime(pack.localUpdatedAt ?? pack.updatedAt, t),
time: getRelativeTime(pack.localUpdatedAt ?? pack.updatedAt, t as any),
})}
Comment on lines 156 to 159
</Text>
</View>
Expand Down
4 changes: 2 additions & 2 deletions apps/expo/app/(app)/recent-packs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function RecentPackCard({ pack }: { pack: Pack }) {
{pack.totalWeight ?? 0} g
</Text>
<Text variant="footnote" className="text-muted-foreground">
{getRelativeTime(pack.localCreatedAt ?? pack.createdAt, t)}
{getRelativeTime(pack.localCreatedAt ?? pack.createdAt, t as any)}
</Text>
</View>
</View>
Expand All @@ -45,7 +45,7 @@ function RecentPackCard({ pack }: { pack: Pack }) {
</View>
<Text variant="caption1" className="text-muted-foreground">
{t('packs.lastUpdated', {
time: getRelativeTime(pack.localUpdatedAt ?? pack.updatedAt, t),
time: getRelativeTime(pack.localUpdatedAt ?? pack.updatedAt, t as any),
})}
Comment on lines 36 to 49
</Text>
</View>
Expand Down
7 changes: 7 additions & 0 deletions packages/api/scripts/reset-stuck-etl-jobs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Reset ETL jobs stuck in 'running' state for more than 3 hours.
-- 3h accounts for large first-time imports (~500K rows + embedding generation).
-- Run manually when zombie jobs are detected.
UPDATE etl_jobs
SET status = 'failed', completed_at = NOW()
WHERE status = 'running'
AND started_at < NOW() - INTERVAL '3 hours';
15 changes: 14 additions & 1 deletion packages/api/src/services/catalogService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,20 @@ export class CatalogService {
.onConflictDoUpdate({
target: catalogItems.sku,
set: Object.values(columns).reduce<Record<string, SQL>>((acc, col) => {
acc[col.name] = sql.raw(`COALESCE(catalog_items.${col.name}, excluded."${col.name}")`);
if (col.name === 'id' || col.name === 'created_at') {
// Never overwrite PK or original creation timestamp
acc[col.name] = sql`COALESCE(${col}, excluded.${sql.identifier(col.name)})`;
} else if (col.name === 'weight') {
// Keep old weight if new weight is missing or invalid (0 / negative)
acc[col.name] =
sql`CASE WHEN excluded.${sql.identifier('weight')} IS NOT NULL AND excluded.${sql.identifier('weight')} > 0 THEN excluded.${sql.identifier('weight')} ELSE COALESCE(${catalogItems.weight}, excluded.${sql.identifier('weight')}) END`;
} else if (col.name === 'weight_unit') {
// weight_unit stays in sync with weight validity
acc[col.name] =
sql`CASE WHEN excluded.${sql.identifier('weight')} IS NOT NULL AND excluded.${sql.identifier('weight')} > 0 THEN excluded.${sql.identifier('weight_unit')} ELSE COALESCE(${catalogItems.weightUnit}, excluded.${sql.identifier('weight_unit')}) END`;
Comment on lines +352 to +356
} else {
acc[col.name] = sql`excluded.${sql.identifier(col.name)}`;
}
return acc;
}, {}),
})
Expand Down
18 changes: 2 additions & 16 deletions packages/api/src/services/etl/CatalogItemValidator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,9 @@ export class CatalogItemValidator {
});
}

if (!item.weight || !isNumber(item.weight) || item.weight <= 0) {
errors.push({
field: 'weight',
reason: 'Weight is required and must be a positive number',
value: item.weight,
});
}

if (!item.weightUnit || !isString(item.weightUnit) || item.weightUnit.trim().length === 0) {
errors.push({
field: 'weightUnit',
reason: 'Weight unit is required and must be a non-empty string',
value: item.weightUnit,
});
}

// Additional validations
// Note: weight and weightUnit are intentionally not required — clothing/footwear brands often
// omit weight data. Items without weight are ingested but won't appear in weight comparisons.
Comment on lines 34 to +36
if (item.productUrl && !this.isValidUrl(item.productUrl)) {
errors.push({
field: 'productUrl',
Expand Down
34 changes: 28 additions & 6 deletions packages/api/src/services/etl/processCatalogEtl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,38 @@ export async function processCatalogETL({
}

rowIndex++;

// Flush valid batch to DB every BATCH_SIZE rows to avoid Worker OOM on large files
if (validItemsBatch.length >= BATCH_SIZE) {
await processValidItemsBatch({ jobId, items: [...validItemsBatch], env });
await db
.update(etlJobs)
.set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${BATCH_SIZE}` })
.where(eq(etlJobs.id, jobId));
validItemsBatch.length = 0;
}
// Flush invalid batch to DB every BATCH_SIZE rows
if (invalidItemsBatch.length >= BATCH_SIZE) {
await processLogsBatch({ jobId, logs: [...invalidItemsBatch], env });
await db
.update(etlJobs)
.set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${BATCH_SIZE}` })
.where(eq(etlJobs.id, jobId));
invalidItemsBatch.length = 0;
}
}

console.log(`🔍 [TRACE] Streaming complete - processing batches`);
console.log(`🔍 [TRACE] Streaming complete - processing remaining batches`);

const itemsProcessed = validItemsBatch.length + invalidItemsBatch.length;
// Flush remaining items after the stream ends
const remainingItems = validItemsBatch.length + invalidItemsBatch.length;

await db
.update(etlJobs)
.set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${itemsProcessed}` })
.where(eq(etlJobs.id, jobId));
if (remainingItems > 0) {
await db
.update(etlJobs)
.set({ totalProcessed: sql`COALESCE(${etlJobs.totalProcessed}, 0) + ${remainingItems}` })
.where(eq(etlJobs.id, jobId));
}
Comment on lines +111 to +141

if (validItemsBatch.length > 0) {
console.log(`🔍 [TRACE] Processing valid items batch - size: ${validItemsBatch.length}`);
Expand Down
3 changes: 1 addition & 2 deletions tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
{
"compilerOptions": {
"allowJs": true,
"baseUrl": ".",
"esModuleInterop": true,
"ignoreDeprecations": "6.0",
"ignoreDeprecations": "5.0",
"jsx": "react-native",
"lib": ["DOM", "ESNext"],
"module": "preserve",
Expand Down
Loading