Skip to content

[ES|QL|DS] Wire parallel parsing into production for text formats#143997

Merged
costin merged 4 commits intoelastic:mainfrom
costin:esql/wire-parallel-parsing
Mar 11, 2026
Merged

[ES|QL|DS] Wire parallel parsing into production for text formats#143997
costin merged 4 commits intoelastic:mainfrom
costin:esql/wire-parallel-parsing

Conversation

@costin
Copy link
Copy Markdown
Member

@costin costin commented Mar 11, 2026

ParallelParsingCoordinator was fully built and tested but never called from production code — text files (CSV, NDJSON) were always read single-threaded per driver regardless of size. This wires SegmentableFormatReader implementations through parallelRead() so large files are split into byte-range segments and parsed concurrently.

Adds a parsing_parallelism QueryPragma (defaults to allocated processors, overridable per query) and propagates it through the operator factory. Parallel parsing is gated on file size and skipped when row limits are pushed down.

Developed using AI-assisted tooling

ParallelParsingCoordinator was fully built and tested but never
called from production code. Text files were always read
single-threaded per driver regardless of size.

Route SegmentableFormatReader implementations (CSV, NDJSON)
through ParallelParsingCoordinator.parallelRead() so large
files are split into byte-range segments and parsed concurrently.

Closes elastic/esql-planning#341
@costin costin requested a review from bpintea March 11, 2026 09:43
@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Mar 11, 2026
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Hi @costin, I've created a changelog YAML for you.

@costin costin enabled auto-merge (squash) March 11, 2026 09:44
costin added 2 commits March 11, 2026 14:55
The first segment of parallel parsing used readSplit() with
resolvedAttributes, which skips header parsing. For CSV files
this meant the header line was treated as data. Use read() for
the first segment so the format reader handles its own header
internally, and readSplit() only for subsequent segments.
*/
public class AsyncExternalSourceOperatorFactory implements SourceOperator.SourceOperatorFactory {

private static final Logger logger = LogManager.getLogger(AsyncExternalSourceOperatorFactory.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: not used.

@costin costin merged commit ad604d4 into elastic:main Mar 11, 2026
36 checks passed
@costin costin deleted the esql/wire-parallel-parsing branch March 11, 2026 17:55
szybia added a commit to szybia/elasticsearch that referenced this pull request Mar 11, 2026
…elocations

* upstream/main: (54 commits)
  [ES|QL|DS] Wire parallel parsing into production for text formats (elastic#143997)
  ESQL: Allow EXTERNAL commands be run part of the CsvTests suite (elastic#143970)
  [ESQL] Push stats to external source via metadata (elastic#143940)
  Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:approximation.Approximate stats with stats where} elastic#144051
  Refactored SortedNumericDocValuesSyntheticFieldLoader into a Layer (elastic#143912)
  Enable extended doc_values params feature flag in RandomizedRollingUpgradeIT (elastic#143918)
  Mute org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT test {csv-spec:approximation.Approximate stats with sample} elastic#144022
  Ensure we use float values for rolling upgrade float vectors (elastic#144032)
  Remove sensitive info from reindex task description (elastic#143635)
  Fix HistogramUnionState.equals (elastic#143990)
  Use dedicated IndexRouting API in ShardSplittingQuery (elastic#143776)
  Engine/Store DistributedArchitectureGuide doc (elastic#143818)
  Mute org.elasticsearch.snapshots.ConcurrentSnapshotsIT testDeletesAreBatched elastic#144034
  Avoid serializing exceptions as JSON in remote write endpoint (elastic#143987)
  allow testLoadDocSequenceReturnsCorrectResultsText to circuit break, it happens in serverless occasionally (elastic#144023)
  [ESQL] Adds memory accounting to GroupedLimitOperator (elastic#143941)
  Adjust ESIntegTestCase.getLiveDocs method to account for pruned sequence numbers (elastic#143999)
  Support target bucket count in `TBUCKET` with explicit from/to date range (elastic#142747)
  TSDBDocValuesFormatSingleNodeTests with and without synthetic id (elastic#144002)
  Fix circuit breaker leak in BreakingTDigestHolder (elastic#143873)
  ...
jdconrad pushed a commit to jdconrad/elasticsearch that referenced this pull request Mar 11, 2026
…astic#143997)

* [ES|QL|DS] Wire parallel parsing into production

ParallelParsingCoordinator was fully built and tested but never
called from production code. Text files were always read
single-threaded per driver regardless of size.

Route SegmentableFormatReader implementations (CSV, NDJSON)
through ParallelParsingCoordinator.parallelRead() so large
files are split into byte-range segments and parsed concurrently.

Closes elastic/esql-planning#341

* Update docs/changelog/143997.yaml

* Fix parallel parsing header handling for CSV

The first segment of parallel parsing used readSplit() with
resolvedAttributes, which skips header parsing. For CSV files
this meant the header line was treated as data. Use read() for
the first segment so the format reader handles its own header
internally, and readSplit() only for subsequent segments.
jdconrad pushed a commit to jdconrad/elasticsearch that referenced this pull request Mar 11, 2026
…astic#143997)

* [ES|QL|DS] Wire parallel parsing into production

ParallelParsingCoordinator was fully built and tested but never
called from production code. Text files were always read
single-threaded per driver regardless of size.

Route SegmentableFormatReader implementations (CSV, NDJSON)
through ParallelParsingCoordinator.parallelRead() so large
files are split into byte-range segments and parsed concurrently.

Closes elastic/esql-planning#341

* Update docs/changelog/143997.yaml

* Fix parallel parsing header handling for CSV

The first segment of parallel parsing used readSplit() with
resolvedAttributes, which skips header parsing. For CSV files
this meant the header line was treated as data. Use read() for
the first segment so the format reader handles its own header
internally, and readSplit() only for subsequent segments.
jdconrad pushed a commit to jdconrad/elasticsearch that referenced this pull request Mar 11, 2026
…astic#143997)

* [ES|QL|DS] Wire parallel parsing into production

ParallelParsingCoordinator was fully built and tested but never
called from production code. Text files were always read
single-threaded per driver regardless of size.

Route SegmentableFormatReader implementations (CSV, NDJSON)
through ParallelParsingCoordinator.parallelRead() so large
files are split into byte-range segments and parsed concurrently.

Closes elastic/esql-planning#341

* Update docs/changelog/143997.yaml

* Fix parallel parsing header handling for CSV

The first segment of parallel parsing used readSplit() with
resolvedAttributes, which skips header parsing. For CSV files
this meant the header line was treated as data. Use read() for
the first segment so the format reader handles its own header
internally, and readSplit() only for subsequent segments.
jdconrad pushed a commit to jdconrad/elasticsearch that referenced this pull request Mar 11, 2026
…astic#143997)

* [ES|QL|DS] Wire parallel parsing into production

ParallelParsingCoordinator was fully built and tested but never
called from production code. Text files were always read
single-threaded per driver regardless of size.

Route SegmentableFormatReader implementations (CSV, NDJSON)
through ParallelParsingCoordinator.parallelRead() so large
files are split into byte-range segments and parsed concurrently.

Closes elastic/esql-planning#341

* Update docs/changelog/143997.yaml

* Fix parallel parsing header handling for CSV

The first segment of parallel parsing used readSplit() with
resolvedAttributes, which skips header parsing. For CSV files
this meant the header line was treated as data. Use read() for
the first segment so the format reader handles its own header
internally, and readSplit() only for subsequent segments.
jdconrad pushed a commit to jdconrad/elasticsearch that referenced this pull request Mar 11, 2026
…astic#143997)

* [ES|QL|DS] Wire parallel parsing into production

ParallelParsingCoordinator was fully built and tested but never
called from production code. Text files were always read
single-threaded per driver regardless of size.

Route SegmentableFormatReader implementations (CSV, NDJSON)
through ParallelParsingCoordinator.parallelRead() so large
files are split into byte-range segments and parsed concurrently.

Closes elastic/esql-planning#341

* Update docs/changelog/143997.yaml

* Fix parallel parsing header handling for CSV

The first segment of parallel parsing used readSplit() with
resolvedAttributes, which skips header parsing. For CSV files
this meant the header line was treated as data. Use read() for
the first segment so the format reader handles its own header
internally, and readSplit() only for subsequent segments.
michalborek pushed a commit to michalborek/elasticsearch that referenced this pull request Mar 23, 2026
…astic#143997)

* [ES|QL|DS] Wire parallel parsing into production

ParallelParsingCoordinator was fully built and tested but never
called from production code. Text files were always read
single-threaded per driver regardless of size.

Route SegmentableFormatReader implementations (CSV, NDJSON)
through ParallelParsingCoordinator.parallelRead() so large
files are split into byte-range segments and parsed concurrently.

Closes elastic/esql-planning#341

* Update docs/changelog/143997.yaml

* Fix parallel parsing header handling for CSV

The first segment of parallel parsing used readSplit() with
resolvedAttributes, which skips header parsing. For CSV files
this meant the header line was treated as data. Use read() for
the first segment so the format reader handles its own header
internally, and readSplit() only for subsequent segments.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >enhancement ES|QL|DS ES|QL datasources Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants