Skip to content

feat(etl): add JSONL/NDJSON support alongside CSV#2471

Merged
andrew-bierman merged 10 commits into
mainfrom
feat/jsonl-etl-support
May 21, 2026
Merged

feat(etl): add JSONL/NDJSON support alongside CSV#2471
andrew-bierman merged 10 commits into
mainfrom
feat/jsonl-etl-support

Conversation

@andrew-bierman
Copy link
Copy Markdown
Collaborator

@andrew-bierman andrew-bierman commented May 21, 2026

Summary

  • Auto-detects format from R2 object key extension (.jsonl/.ndjson → JSONL path, otherwise CSV)
  • Both queue and workflow paths updated — scrapers can now upload either format
  • New json-utils.ts with isJsonlFile + mapJsonRowToItem — mirrors all CSV field mappings with JSON-native types (arrays pass through, numbers pass through, no string deserialization)
  • CSV path unchanged for both paths — zero regression risk
  • Workflow path also gets relax_quotes/skip_records_with_error/on_skip from the prior PR (was missing from workflow path)

Why JSONL over CSV

  • No quoting ambiguity (the zpacks "Quote Not Closed" class of bugs can't happen)
  • Nested fields (variants, images, faqs) are native arrays — no serialized JSON-in-CSV
  • Simpler parser: JSON.parse(line) per row, no library dependency

Test plan

  • Upload a .jsonl file via the ETL endpoint and verify items load
  • Confirm existing .csv upload still works

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added support for JSONL/NDJSON catalog uploads with streaming parsing, batching, and robust mapping/validation of records
    • Automatic format detection with unified handling for JSONL and CSV inputs
  • Bug Fixes

    • Improved CSV parsing resilience and error capture for malformed records
  • Tests

    • Added comprehensive tests for JSONL detection and JSON→catalog mapping across many data shapes and edge cases

Review Change Stack

Create json-utils.ts with isJsonlFile() and mapJsonRowToItem() helpers,
then branch both the queue-path (processCatalogEtl) and workflow-path
(catalog-etl-workflow) processors to stream JSONL when the object key
ends in .jsonl/.ndjson. CSV path is unchanged. Also backports
relax_quotes + on_skip to the workflow CSV parser to match the queue path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 21, 2026 15:58
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 21, 2026

Walkthrough

The PR adds JSONL/NDJSON support to the catalog ETL pipeline. New utilities detect file format by extension and map JSON rows to catalog items. The service and workflow layers branch on format to stream JSONL line-by-line or parse CSV, with tests validating mapping and edge cases.

Changes

JSONL/NDJSON Catalog ETL

Layer / File(s) Summary
JSONL detection and row-to-item mapping utilities
packages/api/src/utils/json-utils.ts
isJsonlFile identifies JSONL/NDJSON by extension; mapJsonRowToItem converts parsed JSON objects to partial catalog items with trimming, numeric coercion, array normalization, weight parsing, techs-derived fallback, and availability validation.
Mapper utility test coverage
packages/api/src/utils/__tests__/json-utils.test.ts
Vitest suite validates extension detection and mapJsonRowToItem behavior across scalar fields, structured fields, categories/images parsing, weight/techs handling, availability validation, and edge cases including empty input and invalid types.
Service-layer JSONL streaming and CSV else-branch
packages/api/src/services/etl/processCatalogEtl.ts
processCatalogEtl detects format via isJsonlFile, adds a JSONL streaming path that buffers text chunks, splits/parses lines, maps/validates rows, batches valid items and parse/validation logs, and retains CSV logic in an else branch.
Workflow branching and CSV parser relaxation
packages/api/src/workflows/catalog-etl-workflow.ts
processChunk adds useJsonl routing, implements JSONL line buffering with final-buffer flush and per-line processing, and updates CSV parser options (relax_*, skip_records_with_error) plus on_skip logging while keeping header injection/backpressure coordination.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • PackRat-AI/PackRat#2418: Overlaps on processCatalogEtl backpressure/streaming changes and incremental chunk feeding into the parser.
  • PackRat-AI/PackRat#2465: Related changes that adjust csv-parse leniency and log skipped/malformed CSV rows.
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and accurately summarizes the main change: adding JSONL/NDJSON format support alongside existing CSV support in the ETL pipeline.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/jsonl-etl-support

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ESLint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

ESLint skipped: no ESLint configuration detected in root package.json. To enable, add eslint to devDependencies.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added the api label May 21, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 21, 2026

Coverage Report for Expo Unit Tests Coverage (./apps/expo)

Status Category Percentage Covered / Total
🔵 Lines 97.36% (🎯 95%) 517 / 531
🔵 Statements 97.36% (🎯 95%) 517 / 531
🔵 Functions 100% (🎯 97%) 51 / 51
🔵 Branches 95% (🎯 92%) 190 / 200
File CoverageNo changed files found.
Generated in workflow #1429 for commit 3af10be by the Vitest Coverage Report Action

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 21, 2026

Coverage Report for API Unit Tests Coverage (./packages/api)

Status Category Percentage Covered / Total
🔵 Lines 95.28% (🎯 95%) 1031 / 1082
🔵 Statements 95.28% (🎯 95%) 1031 / 1082
🔵 Functions 100% (🎯 97%) 53 / 53
🔵 Branches 92.05% (🎯 92%) 394 / 428
File Coverage
File Stmts Branches Functions Lines Uncovered Lines
Changed Files
packages/api/src/utils/json-utils.ts 93.28% 81.81% 100% 93.28% 108-109, 164-169, 180-181
Generated in workflow #1429 for commit 3af10be by the Vitest Coverage Report Action

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends the catalog ETL ingestion pipeline to support JSONL/NDJSON inputs in addition to CSV, auto-detecting the format via the R2 object key extension and updating both the legacy queue-based ETL path and the Cloudflare Workflows-based ETL path.

Changes:

  • Add format detection (.jsonl / .ndjson) and a streaming JSONL parsing path in both ETL engines.
  • Introduce json-utils.ts to map JSONL row objects into NewCatalogItem-shaped partials (mirroring CSV mappings but with JSON-native types).
  • Align the workflow CSV parsing options with the queue path (relax_quotes, skip_records_with_error, on_skip).

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
packages/api/src/workflows/catalog-etl-workflow.ts Adds JSONL streaming support and updates CSV parser options in the workflow chunk processor.
packages/api/src/utils/json-utils.ts New utilities to detect JSONL/NDJSON keys and map parsed JSON objects into catalog items.
packages/api/src/services/etl/processCatalogEtl.ts Adds JSONL streaming support to the legacy queue-based ETL processor.
Comments suppressed due to low confidence (1)

packages/api/src/workflows/catalog-etl-workflow.ts:232

  • csv-parse parse errors handled by on_skip are added to invalidItemsBatch, but they do not increment the chunk’s processed counter (rowIndex). Since the workflow later writes totalProcessed from rowsProcessed: rowIndex, any skipped/malformed CSV rows will be excluded from totalProcessed, making the final aggregate totals inconsistent (often totalValid + totalInvalid > totalProcessed).

Suggested fix: track rowsProcessed separately from rowIndex and increment it inside on_skip (or increment rowIndex on skip), then return that value as rowsProcessed.

    const parser = parse({
      relax_column_count: true,
      relax_quotes: true,
      skip_empty_lines: true,
      skip_records_with_error: true,
      on_skip: (err: Error) => {
        const parserLine = (err as { lines?: number }).lines ?? rowIndex;
        invalidItemsBatch.push({
          jobId,
          errors: [{ field: 'csv_parse', reason: err.message }],

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +117 to +118
const skipPartialLine = isNonFirstChunk;
let firstLineSkipped = !skipPartialLine;
Comment on lines +85 to +103
// --- JSONL streaming path ---
// No csv-parse, no header injection. Each line is a JSON object.
let buffer = '';
const skipPartialLine = byteStart !== undefined && byteStart > 0;
let firstLineSkipped = !skipPartialLine;

for await (const chunk of streamToText(r2Object.body)) {
let text = chunk;

if (skipPartialRow) {
// Discard bytes up to and including the first newline — those bytes are
// the tail of the row that the previous chunk already processed.
const nl = text.indexOf('\n');
if (nl === -1) continue; // entire buffer is still the partial row tail
text = text.slice(nl + 1);
skipPartialRow = false;
if (!text) continue;
}
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop() ?? '';

// Respect backpressure: if the parser buffer is full, wait for drain before
// pushing more data. Without this, R2 fills the parser buffer for the entire
// file (up to 600 MB) before the main loop processes any rows → Worker OOM.
const ok = parser.write(text);
if (!ok) await new Promise<void>((resolve) => parser.once('drain', resolve));
}
parser.end();
})();

for await (const record of parser) {
if (rowIndex % 100 === 0) await new Promise((resolve) => setTimeout(resolve, 1)); // Yield every 100 rows for GC; per-row yield hits the CF Worker wall-clock limit on large files
const row = record as string[];
if (!isHeaderProcessed) {
fieldMap = row.reduce<Record<string, number>>((acc, header, idx) => {
acc[header.trim()] = idx;
return acc;
}, {});
isHeaderProcessed = true;
console.log(
`🔍 [TRACE] Header processed - fields: ${Object.keys(fieldMap).length}, mapping:`,
Object.keys(fieldMap),
);
continue;
}
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;

const item = mapCsvRowToItem({ values: row, fieldMap });
if (!firstLineSkipped) {
firstLineSkipped = true;
continue; // discard partial row at chunk boundary
}
Comment on lines +16 to +24
/**
* Maps a parsed JSON object (one line from a JSONL file) to a partial catalog item.
* Uses `unknown` with proper type narrowing — no `any`.
*/
export function mapJsonRowToItem(obj: Record<string, unknown>): Partial<NewCatalogItem> | null {
const item: Partial<NewCatalogItem> = {};

// --- String scalar fields ---
const rawName = obj.name;
andrew-bierman and others added 2 commits May 21, 2026 10:46
- Replace raw typeof checks with isString/isNumber/isObject from @packrat/guards
- Fixes custom lint rule violation (no-raw-typeof CI check)
- Add json-utils.test.ts with 30+ tests covering all branches
- Brings json-utils.ts line/statement coverage above 95% threshold

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Use toRecord() for JSON.parse results (catalog-etl-workflow, processCatalogEtl)
- Use toStringRecord() for techs narrowing (json-utils)
- Fix availability test value (in_stock not InStock)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/api/src/services/etl/processCatalogEtl.ts`:
- Around line 194-205: The on_skip handler currently only pushes parse errors
into invalidItemsBatch (in processCatalogEtl.ts) and doesn't trigger a flush,
which can let invalidItemsBatch grow unbounded when many rows are malformed;
also the CSV writer launched via the streamToText -> parser.write(...) ->
parser.end() path runs in an unawaited async IIFE so its failures can become
unhandled rejections. Fix by making on_skip an async function that (a)
increments/uses the same batching logic as the main consumer (the for await
(const record of parser) flush threshold) and calls the same flush function when
invalidItemsBatch reaches the batch size, and (b) awaits the shared CSV writer
promise instead of letting its IIFE run detached (capture the promise returned
by the writer task created around streamToText/parser.write/parser.end and await
it before completing the outer try/catch), ensuring any writer errors propagate
and the batch is flushed timely from on_skip.
- Around line 209-237: The CSV pump is started as an un-awaited async IIFE which
can reject out of the outer try/catch and leave the parser hanging; change the
IIFE to a captured promise (e.g., const pump = (async () => { ... })()), wrap
the pump body in try/catch/finally so parser.end() is always called in finally,
and on any pump error call parser.destroy(err) (or parser.destroy()) before
rethrowing so the parser loop won't hang; after the for-await-of parser loop,
await pump to surface any pump errors to the caller. Use the existing
identifiers parser, streamToText(r2Object.body), injectedHeader and byteStart to
locate and modify the code.

In `@packages/api/src/utils/__tests__/json-utils.test.ts`:
- Line 2: The test imports helpers via a relative path; change the import to use
the repo TypeScript path alias instead (so the test imports isJsonlFile and
mapJsonRowToItem from the `@packrat/api` alias rather than ../json-utils). Locate
the import line that currently reads "import { isJsonlFile, mapJsonRowToItem }
from '../json-utils';" in the json-utils.test.ts and replace it with the
corresponding aliased module import (e.g., import { isJsonlFile,
mapJsonRowToItem } from '`@packrat/api/`...') so the test follows the project's
`@packrat/api/`* import convention.

In `@packages/api/src/utils/json-utils.ts`:
- Line 4: The import in json-utils.ts currently uses a relative path
('./csv-utils') for functions like parseFaqs, parsePrice, parseWeight, and
safeJsonParse; update that import to use the repository path alias (replace
'./csv-utils' with the corresponding '`@packrat/api/`...' alias) so it conforms to
the root tsconfig path aliases and keeps imports consistent across the repo.
- Around line 98-109: The JSON-parse branch assigns JSON.parse(val) directly to
item.categories which can introduce non-string entries (e.g., numbers); update
the block that handles val.startsWith('[') to validate and normalize the parsed
value to an array of non-empty strings (like the native-array branch): call
JSON.parse(val), ensure the result is an array, map each element to String(...)
or filter by typeof === 'string' and .trim(), then filter(Boolean) before
assigning to item.categories; keep the existing catch fallback to [val]
unchanged.
- Around line 175-180: The branch that handles stringified techs currently
assigns parsed output directly to item.techs, which can allow primitives,
arrays, or non-string values; instead, after safeJsonParse(readable) check that
parsed is a plain object (typeof parsed === 'object' && parsed !== null &&
!Array.isArray(parsed)), then pass it through toStringRecord(parsed) and assign
that result to item.techs; if parsed is an array or a primitive fall back to {};
keep the existing catch to set item.techs = {} on parse errors.

In `@packages/api/src/workflows/catalog-etl-workflow.ts`:
- Around line 240-249: In writerPromise, when isNonFirstChunk is true you must
discard bytes up to and including the first newline before feeding the chunk to
parser to avoid prepending the tail of the previous row; implement a one-time
strip on the first yielded text from streamToText(obj.body) (e.g., track a local
boolean like didStrip) and if isNonFirstChunk && !didStrip remove everything
through the first '\n' from that text chunk (then set didStrip) before calling
parser.write (and still honor parser.once('drain')); update logic around
injectedHeader, parser.write and streamToText(obj.body) handling to perform this
strip only for the first chunk of non-initial payloads.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 4bc811a8-f8ac-47b5-b622-dcaea37faeef

📥 Commits

Reviewing files that changed from the base of the PR and between 36f1317 and 916732b.

📒 Files selected for processing (4)
  • packages/api/src/services/etl/processCatalogEtl.ts
  • packages/api/src/utils/__tests__/json-utils.test.ts
  • packages/api/src/utils/json-utils.ts
  • packages/api/src/workflows/catalog-etl-workflow.ts

Comment on lines +194 to +205
on_skip: (err: Error) => {
const parserLine = (err as { lines?: number }).lines ?? rowIndex;
const parseErrorLog: NewInvalidItemLog = {
jobId,
errors: [{ field: 'csv_parse', reason: err.message }],
rawData: { parseError: err.message },
rowIndex: parserLine,
};
invalidItemsBatch.push(parseErrorLog);
console.warn(
`[ETL] Skipped malformed CSV row at parser line ${parserLine}: ${err.message}`,
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

file="packages/api/src/services/etl/processCatalogEtl.ts"
echo "== File =="
ls -l "$file"

echo "== Snippet around on_skip (lines ~170-240) =="
nl -ba "$file" | sed -n '150,260p'

echo "== Snippet around invalidItemsBatch flush points (search) =="
rg -n "invalidItemsBatch|push\\(parseErrorLog\\)|flush|EOF|for await \\(const record of parser\\)" "$file" || true

echo "== Find unawaited async IIFE / writer task =="
rg -n "IIFE|\\(async \\(\\) =>|void \\(async|async \\(\\) =>|new Promise|setImmediate|writer|append|writeStream|createWrite|stream|pipeline|then\\(" "$file" || true

echo "== Snippet around parser loop (around for-await) =="
lnum=$(rg -n "for await \\(const record of parser\\)" "$file" | head -n1 | cut -d: -f1 || true)
if [ -n "${lnum:-}" ]; then
  start=$((lnum-60)); end=$((lnum+80))
  nl -ba "$file" | sed -n "${start},${end}p"
fi

Repository: PackRat-AI/PackRat

Length of output: 281


🏁 Script executed:

#!/bin/bash
set -euo pipefail

file="packages/api/src/services/etl/processCatalogEtl.ts"

echo "== rg locations =="
rg -n "on_skip" "$file" || true
rg -n "invalidItemsBatch" "$file" || true
rg -n "for await \\(const record of parser\\)" "$file" || true
rg -n "IIFE|void \\(async|\\(async \\(\\) =>|new Promise|then\\(|append\\(|writeStream|createWrite|pipeline|stream\\.write|writer" "$file" || true
rg -n "flush|EOF|end-of|close\\(|finish\\(" "$file" || true

echo "== Show on_skip block (±40 lines) =="
on_skip_line=$(rg -n "on_skip" "$file" | head -n1 | cut -d: -f1)
if [ -n "${on_skip_line:-}" ]; then
  start=$((on_skip_line-60)); end=$((on_skip_line+60))
  awk -v s="$start" -v e="$end" 'NR>=s && NR<=e {printf "%5d\t%s\n", NR, $0}' "$file"
fi

echo "== Show invalidItemsBatch section (±80 lines around first occurrence) =="
inv_line=$(rg -n "invalidItemsBatch" "$file" | head -n1 | cut -d: -f1)
if [ -n "${inv_line:-}" ]; then
  start=$((inv_line-80)); end=$((inv_line+160))
  awk -v s="$start" -v e="$end" 'NR>=s && NR<=e {printf "%5d\t%s\n", NR, $0}' "$file"
fi

echo "== Show parser for-await loop section (±80 lines) =="
loop_line=$(rg -n "for await \\(const record of parser\\)" "$file" | head -n1 | cut -d: -f1 || true)
if [ -n "${loop_line:-}" ]; then
  start=$((loop_line-80)); end=$((loop_line+140))
  awk -v s="$start" -v e="$end" 'NR>=s && NR<=e {printf "%5d\t%s\n", NR, $0}' "$file"
fi

Repository: PackRat-AI/PackRat

Length of output: 24727


Bound on_skip buffering and await the CSV writer task.

  • on_skip only pushes into invalidItemsBatch and never flushes; flushing happens only while consuming for await (const record of parser) and at the final post-loop flush, so a chunk/file with mostly or entirely malformed rows can grow invalidItemsBatch to total-row scale (risking OOM).
  • The CSV writer runs in an unawaited async IIFE (the streamToText(...)->parser.write(...)->parser.end() path), so failures there can bypass the outer try/catch and surface as unhandled promise rejections.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/api/src/services/etl/processCatalogEtl.ts` around lines 194 - 205,
The on_skip handler currently only pushes parse errors into invalidItemsBatch
(in processCatalogEtl.ts) and doesn't trigger a flush, which can let
invalidItemsBatch grow unbounded when many rows are malformed; also the CSV
writer launched via the streamToText -> parser.write(...) -> parser.end() path
runs in an unawaited async IIFE so its failures can become unhandled rejections.
Fix by making on_skip an async function that (a) increments/uses the same
batching logic as the main consumer (the for await (const record of parser)
flush threshold) and calls the same flush function when invalidItemsBatch
reaches the batch size, and (b) awaits the shared CSV writer promise instead of
letting its IIFE run detached (capture the promise returned by the writer task
created around streamToText/parser.write/parser.end and await it before
completing the outer try/catch), ensuring any writer errors propagate and the
batch is flushed timely from on_skip.

Comment thread packages/api/src/services/etl/processCatalogEtl.ts Outdated
@@ -0,0 +1,218 @@
import { describe, expect, it } from 'vitest';
import { isJsonlFile, mapJsonRowToItem } from '../json-utils';
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Use the API path alias in the test import.

Please import this helper through @packrat/api/* instead of a relative path to match the repo rule for TypeScript imports.

As per coding guidelines, "Use path aliases defined in root tsconfig.json: @packrat/api/*, @packrat/ui/*, expo-app/*, guides-app/*, landing-app/*, and nativewindui/*."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/api/src/utils/__tests__/json-utils.test.ts` at line 2, The test
imports helpers via a relative path; change the import to use the repo
TypeScript path alias instead (so the test imports isJsonlFile and
mapJsonRowToItem from the `@packrat/api` alias rather than ../json-utils). Locate
the import line that currently reads "import { isJsonlFile, mapJsonRowToItem }
from '../json-utils';" in the json-utils.test.ts and replace it with the
corresponding aliased module import (e.g., import { isJsonlFile,
mapJsonRowToItem } from '`@packrat/api/`...') so the test follows the project's
`@packrat/api/`* import convention.

Comment thread packages/api/src/utils/json-utils.ts Outdated
Comment thread packages/api/src/utils/json-utils.ts
Comment thread packages/api/src/utils/json-utils.ts
Comment thread packages/api/src/workflows/catalog-etl-workflow.ts
andrew-bierman and others added 7 commits May 21, 2026 11:13
- Remove incorrect JSONL partial-line skip (chunker guarantees clean boundaries;
  skipPartialLine was dropping the first valid record per non-first chunk)
- Fix @packrat/guards import order for Biome organizeImports (after @packrat/db)
- Use @packrat/api/utils/csv-utils alias instead of relative path in json-utils
- Filter non-strings from JSON-parsed categories array (parity with native branch)
- Apply toStringRecord to safeJsonParse techs result (parity with native branch)
- Add test for JSON array categories non-string filtering

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@packrat/api/* → @packrat/db → @packrat/guards → @packrat/schemas/*

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…olution

csv-parse infers the correct CsvError type; explicit Error annotation caused
the Options overload to be rejected, resolving to Callback<string[]> instead.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
csv-parse types on_skip as (err: CsvError | undefined, ...) => void

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Avoids TS18048 — `err` is possibly undefined at line 206; the `message`
variable is already safely computed with optional chaining and a fallback.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The string fits within the 100-char line width; collapsing it removes
the only Biome format error in the checks job.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/api/src/workflows/catalog-etl-workflow.ts (1)

225-239: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Bound invalidItemsBatch inside on_skip.

Skipped CSV rows bypass the for await (const record of parser) flush path. On a malformed chunk this can accumulate one log per bad row until EOF and blow the worker's memory.

Suggested fix
+    let invalidLogsFlush = Promise.resolve();
+    const flushSkippedLogsIfNeeded = () => {
+      if (invalidItemsBatch.length < BATCH_SIZE) return;
+      const batch = invalidItemsBatch.splice(0, invalidItemsBatch.length);
+      invalidLogsFlush = invalidLogsFlush.then(() =>
+        processLogsBatch({ jobId, logs: batch, env }),
+      );
+    };
+
     const parser = parse({
       relax_column_count: true,
       relax_quotes: true,
       skip_empty_lines: true,
       skip_records_with_error: true,
       on_skip: (err) => {
         const parserLine = (err as { lines?: number } | undefined)?.lines ?? rowIndex;
         const message = err?.message ?? 'unknown parse error';
         invalidItemsBatch.push({
           jobId,
           errors: [{ field: 'csv_parse', reason: message }],
           rawData: { parseError: message },
           rowIndex: parserLine,
         });
+        flushSkippedLogsIfNeeded();
       },
     });
...
     await writerPromise;
+    await invalidLogsFlush;

Also applies to: 302-302

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/api/src/workflows/catalog-etl-workflow.ts` around lines 225 - 239,
The on_skip parser callback currently pushes skipped-row entries directly into
the in-memory invalidItemsBatch, which can grow without bound because skipped
rows never hit the downstream for-await (const record of parser) flush path; fix
by replacing the direct push with a helper that appends the invalid item and
triggers an immediate flush when the buffer exceeds a small threshold (e.g.,
50–100) or always persists immediately for skips: implement a function like
addInvalidItemAndMaybeFlush(invalidItemsBatch, item) that pushes the item and
calls the existing flushInvalidItemsBatch()/persistInvalidItems() routine (or
creates one) to persist and clear the buffer, and call that helper from parser's
on_skip (and at the other skip site around line ~302) so skipped rows do not
accumulate in memory.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@packages/api/src/workflows/catalog-etl-workflow.ts`:
- Around line 225-239: The on_skip parser callback currently pushes skipped-row
entries directly into the in-memory invalidItemsBatch, which can grow without
bound because skipped rows never hit the downstream for-await (const record of
parser) flush path; fix by replacing the direct push with a helper that appends
the invalid item and triggers an immediate flush when the buffer exceeds a small
threshold (e.g., 50–100) or always persists immediately for skips: implement a
function like addInvalidItemAndMaybeFlush(invalidItemsBatch, item) that pushes
the item and calls the existing flushInvalidItemsBatch()/persistInvalidItems()
routine (or creates one) to persist and clear the buffer, and call that helper
from parser's on_skip (and at the other skip site around line ~302) so skipped
rows do not accumulate in memory.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 129b2939-bf6c-4269-b043-73b291d04546

📥 Commits

Reviewing files that changed from the base of the PR and between 916732b and 3af10be.

📒 Files selected for processing (4)
  • packages/api/src/services/etl/processCatalogEtl.ts
  • packages/api/src/utils/__tests__/json-utils.test.ts
  • packages/api/src/utils/json-utils.ts
  • packages/api/src/workflows/catalog-etl-workflow.ts

@andrew-bierman andrew-bierman merged commit 3ce56fc into main May 21, 2026
10 checks passed
@andrew-bierman andrew-bierman deleted the feat/jsonl-etl-support branch May 21, 2026 18:45
@andrew-bierman andrew-bierman restored the feat/jsonl-etl-support branch May 22, 2026 07:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants