Skip to content

[esql] NDJSON datasource#142560

Merged
swallez merged 19 commits intoelastic:mainfrom
swallez:esql/datasource-ndjson
Feb 23, 2026
Merged

[esql] NDJSON datasource#142560
swallez merged 19 commits intoelastic:mainfrom
swallez:esql/datasource-ndjson

Conversation

@swallez
Copy link
Copy Markdown
Contributor

@swallez swallez commented Feb 16, 2026

Add a NDJSON datasource, building on the abstractions added in PR #141678

@swallez swallez changed the title Esql/datasource ndjson [esql] NDJSON datasource Feb 16, 2026
@costin costin requested a review from bpintea February 17, 2026 16:01
@costin
Copy link
Copy Markdown
Member

costin commented Feb 17, 2026

Overall structure looks good and follows the CSV/Parquet patterns well.
A few things to address:

  • QA fixture path typo
    resources are under iceberg-features/ but should be iceberg-fixtures/ to match the convention used by CSV/Parquet and what Clusters.java resolves
  • Resource leaks
    NullBlockBuilder.close() doesn't release the block it allocates, and NdJsonPageIterator.close() doesn't guard against partial failures (needs try-finally). There might be others.
  • Type mapping mismatches
    the schema inferrer maps all integers to LONG and doesn't detect dates, but the csv-spec tests expect INTEGER and DATE columns. These tests will fail as-is
  • Boolean inconsistency between the two test fixtures
    unit test data uses native JSON booleans, QA data uses string booleans. The decoder only handles native booleans so the QA fixture will behave differently
  • readAllEmployees test hardcodes a URL instead of using the {{employees}} template
    This will cause issue during execution of the test
  • Schema inferred twice
    you already have the FIXME
  • Stale comments and missing dependencyLicenses mapping in build.gradle
    copied from CSV without updating

@swallez
Copy link
Copy Markdown
Contributor Author

swallez commented Feb 17, 2026

@costin I addressed your comments.

Schema inference now handles ISO-formatted dates and distinguishes integer and long.

I ended up creating a separate csv-spec file: ndjson is the first format where the schema is inferred, and this doesn't fit with tests that involve variants like height.float or height.scaled_float.

Copy link
Copy Markdown
Contributor

@bpintea bpintea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, left a few questions.

private final NdJsonPageDecoder pageDecoder;
private boolean endOfFile = false;
private Page nextPage;
private InputStream inputStream;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be final.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it, pageDecoder now owns it.


@Override
public void close() {
for (var blockBuilder : blockBuilders) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may use Releasables#close for this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


Block[] blocks = new Block[blockBuilders.length];
for (int i = 0; i < blockBuilders.length; i++) {
blocks[i] = blockBuilders[i].build();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the building and the closing be decoupled? Maybe have the closing done in a final clause and with the close() method? The caller of this method doesn't call close() on exception either. I see the caller rewraps the IOException, but .build() can raise a CircuitBreakingException.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I've refactored this method in 6d22cab so that block builders are in a local variable and are always closed. We don't need to keep them longer than that.

Also adjusted the code so that created blocks are closed even if an error happens when building one. And also moved ownership of the InputStream to this class, as we don't need it higher-level classes.

// Builders setup independently as we need to create new ones for each page.
void setupBuilders() {
if (attribute != null) {
blockBuilder = switch (attribute.dataType()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not use ConstantNullBlock.Builder?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. That class wasn't public, which is why I didn't see it. I changed it to public as there's no reason to keep it private.

}
}

private void decodeValue(JsonParser parser, int position) throws IOException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

token is unused.

// Unknown field, skip it
parser.skipChildren();
} else {
childDecoder.decodeValue(parser, position);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

position is unused.

}

// ---------------------------------------------------------------------------------------------
// A tree of decoders. Avoids path reconstruction when traversing nested objects.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the ones in the c'tor arguments be final?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I left the review notes in IntelliJ and I think you got to push an update on the file after my checkout; so all comments below apply to code 4 lines below.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no constructor, all fields are set while traversing the list of Attribute 😅


int c;
while ((c = input.read()) != -1) {
if (c == '\n' || c == '\r') {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add tests with \n\r or \r\n trailing sequences.

List<Attribute> attributes = new ArrayList<>();
buildSchema(root, null, attributes);
return attributes;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method's not used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, a leftover from previous iterations.


/**
* Infers schema from an NDJSON input stream, reading up to maxLines.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the name member ever used in FieldInfo?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed it's not. The key in the parent's children map is what is used.

Copy link
Copy Markdown
Contributor Author

@swallez swallez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bpintea thanks for the thorough review! I have addressed all your comments.


Block[] blocks = new Block[blockBuilders.length];
for (int i = 0; i < blockBuilders.length; i++) {
blocks[i] = blockBuilders[i].build();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I've refactored this method in 6d22cab so that block builders are in a local variable and are always closed. We don't need to keep them longer than that.

Also adjusted the code so that created blocks are closed even if an error happens when building one. And also moved ownership of the InputStream to this class, as we don't need it higher-level classes.


@Override
public void close() {
for (var blockBuilder : blockBuilders) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

// ---------------------------------------------------------------------------------------------
// A tree of decoders. Avoids path reconstruction when traversing nested objects.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no constructor, all fields are set while traversing the list of Attribute 😅

// Builders setup independently as we need to create new ones for each page.
void setupBuilders() {
if (attribute != null) {
blockBuilder = switch (attribute.dataType()) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. That class wasn't public, which is why I didn't see it. I changed it to public as there's no reason to keep it private.

private final NdJsonPageDecoder pageDecoder;
private boolean endOfFile = false;
private Page nextPage;
private InputStream inputStream;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it, pageDecoder now owns it.


/**
* Infers schema from an NDJSON input stream, reading up to maxLines.
*/
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed it's not. The key in the parent's children map is what is used.

List<Attribute> attributes = new ArrayList<>();
buildSchema(root, null, attributes);
return attributes;
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, a leftover from previous iterations.

Copy link
Copy Markdown
Contributor

@bpintea bpintea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@swallez swallez merged commit 605c05b into elastic:main Feb 23, 2026
35 checks passed
@swallez swallez deleted the esql/datasource-ndjson branch February 23, 2026 08:52
@swallez
Copy link
Copy Markdown
Contributor Author

swallez commented Feb 23, 2026

@romseygeek looking at it. Thanks for the heads up!

breskeby added a commit that referenced this pull request Feb 23, 2026
swallez added a commit to swallez/elasticsearch that referenced this pull request Feb 23, 2026
Adds an NDJSON ESQL datasource. The schema is inferred from the first 100 lines.
Supported types are INTEGER, DOUBLE, LONG, BOOLEAN, DATETIME (strings in ISO format) and KEYWORD.

Multi-values are supported, nested objects are flattened by joining property names with a '.'

In case of parsing failure, the stream is consumed until the next end of line to recover.
sidosera pushed a commit to sidosera/elasticsearch that referenced this pull request Feb 24, 2026
@tylerperk tylerperk added the ES|QL|DS ES|QL datasources label Mar 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants