Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions x-pack/plugin/esql-datasource-ndjson/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# ESQL NDJSON Data Source Plugin

Provides NDJSON (newline-delimited JSON) format support for ESQL external data sources.

## Features

- Schema inference from the first 100 non-empty lines
- Type conflict resolution to KEYWORD
- Ignores blank lines; logs warnings for malformed lines (does not fail the file)
- Supports `.ndjson` and `.jsonl` extensions

## Usage

Once installed, ESQL will use this plugin for files ending in `.ndjson` or `.jsonl`:

```sql
FROM "https://example.com/data/events.ndjson"
| WHERE status = "ok"
| LIMIT 100
```

## Limitations / TODO

Pages only contain blocks, not attributes. There's no way to know what
these blocks represent (name, type, nullability, etc.)
`SourceMetadata` should be provided to `FormatReader.read()`

1. Avoid rediscovering the schema
2. Ensure data is interpreted as expected by the schema
3. Blocks are laid out in the page in the order they appear in the metadata

## Misc

- `employees.ndjson` created by running `CsvTestsDataLoader` and extracted using
`curl 'http://localhost:9200/employees/_search?size=1000' | jq -c '.hits.hits[] | ._source'`
and manually convert "string booleans" to actual booleans.
35 changes: 35 additions & 0 deletions x-pack/plugin/esql-datasource-ndjson/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

apply plugin: 'elasticsearch.internal-es-plugin'
apply plugin: 'elasticsearch.publish'

esplugin {
name = 'esql-datasource-ndjson'
description = 'NDJSON (newline-delimited JSON) format support for ESQL external data sources'
classname = 'org.elasticsearch.xpack.esql.datasource.ndjson.NdJsonDataSourcePlugin'
extendedPlugins = ['x-pack-esql']
}

base {
archivesName = 'esql-datasource-ndjson'
}

dependencies {
// SPI interfaces from ESQL core
compileOnly project(path: xpackModule('esql'))
compileOnly project(path: xpackModule('esql-core'))
compileOnly project(path: xpackModule('core'))
compileOnly project(':server')
compileOnly project(xpackModule('esql:compute'))

implementation "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"

testImplementation project(':test:framework')
testImplementation('commons-io:commons-io:2.15.1')
testImplementation(testArtifact(project(xpackModule('core'))))
}
62 changes: 62 additions & 0 deletions x-pack/plugin/esql-datasource-ndjson/qa/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

apply plugin: 'elasticsearch.internal-java-rest-test'
apply plugin: org.elasticsearch.gradle.internal.precommit.CheckstylePrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.ForbiddenApisPrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.ForbiddenPatternsPrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.FilePermissionsPrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.LoggerUsagePrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.TestingConventionsPrecommitPlugin

dependencies {
// Test fixtures and spec reader infrastructure
javaRestTestImplementation project(xpackModule('esql:qa:testFixtures'))
javaRestTestImplementation project(xpackModule('esql:qa:server'))
javaRestTestImplementation project(xpackModule('esql'))
javaRestTestImplementation(project(path: xpackModule('esql'), configuration: 'testRuntimeElements'))

// S3 fixture infrastructure for mocking S3 operations
javaRestTestImplementation project(':test:fixtures:s3-fixture')
javaRestTestImplementation project(':test:fixtures:aws-fixture-utils')

// Repository S3 module for cluster
clusterModules project(':modules:repository-s3')

// The datasource plugin under test
clusterPlugins project(xpackModule('esql-datasource-ndjson'))
clusterPlugins project(xpackModule('esql-datasource-http'))
clusterPlugins project(xpackModule('esql-datasource-s3'))
}

// The test fixtures are included directly in this module's javaRestTest/resources directory

tasks.named('javaRestTest') {
usesDefaultDistribution("to be triaged")
maxParallelForks = 1

// Increase timeouts for S3 operations which may take longer than standard queries
systemProperty 'tests.rest.client_timeout', '60'
systemProperty 'tests.rest.socket_timeout', '60'

// Enable more verbose logging for debugging
testLogging {
events = ["passed", "skipped", "failed"]
exceptionFormat = "full"
showStandardStreams = false
}
}

restResources {
restApi {
include '_common', 'bulk', 'get', 'indices', 'esql', 'xpack', 'cluster', 'capabilities', 'index'
}
restTests {
includeXpack 'esql'
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.ndjson;

import org.elasticsearch.core.PathUtils;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;

import java.net.URISyntaxException;
import java.net.URL;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.esql.datasources.S3FixtureUtils.ACCESS_KEY;
import static org.elasticsearch.xpack.esql.datasources.S3FixtureUtils.SECRET_KEY;

/**
* Cluster configuration for NDJSON integration tests.
*/
public class Clusters {

public static ElasticsearchCluster testCluster(Supplier<String> s3EndpointSupplier, LocalClusterConfigProvider configProvider) {
return ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.shared(true)
// Enable S3 repository plugin for S3 access
.module("repository-s3")
// Basic cluster settings
.setting("xpack.security.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
// Disable ML to avoid native code loading issues in some environments
.setting("xpack.ml.enabled", "false")
// Allow the LOCAL storage backend to read fixture files from the test resources directory.
// The esql-datasource-http plugin's entitlement policy uses shared_repo for file read access.
.setting("path.repo", fixturesPath())
// S3 client configuration for accessing the S3HttpFixture
.setting("s3.client.default.endpoint", s3EndpointSupplier)
// S3 credentials must be stored in keystore, not as regular settings
.keystore("s3.client.default.access_key", ACCESS_KEY)
.keystore("s3.client.default.secret_key", SECRET_KEY)
// Disable SSL for HTTP fixture
.setting("s3.client.default.protocol", "http")
// Disable AWS SDK profile file loading by pointing to non-existent files
// This prevents the SDK from trying to read ~/.aws/credentials and ~/.aws/config
// which would violate Elasticsearch entitlements
.environment("AWS_CONFIG_FILE", "/dev/null/aws/config")
.environment("AWS_SHARED_CREDENTIALS_FILE", "/dev/null/aws/credentials")
// Apply any additional configuration
.apply(() -> configProvider)
.build();
}

public static ElasticsearchCluster testCluster(Supplier<String> s3EndpointSupplier) {
return testCluster(s3EndpointSupplier, config -> {});
}

private static String fixturesPath() {
URL resourceUrl = Clusters.class.getResource("/iceberg-fixtures");
if (resourceUrl != null && resourceUrl.getProtocol().equals("file")) {
try {
return PathUtils.get(resourceUrl.toURI()).toAbsolutePath().toString();
} catch (URISyntaxException e) {
throw new IllegalStateException("Failed to resolve fixtures path", e);
}
}
// Fall back to a safe default; LOCAL tests will fail gracefully
return "/tmp";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.ndjson;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.xpack.esql.CsvSpecReader.CsvTestCase;
import org.elasticsearch.xpack.esql.qa.rest.AbstractExternalSourceSpecTestCase;
import org.junit.ClassRule;

import java.util.List;

/**
* Parameterized integration tests for standalone NDJSON files.
* Each csv-spec test is run against every configured storage backend (S3, HTTP, LOCAL).
*/
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class NdJsonFormatSpecIT extends AbstractExternalSourceSpecTestCase {

@ClassRule
public static ElasticsearchCluster cluster = Clusters.testCluster(() -> s3Fixture.getAddress());

public NdJsonFormatSpecIT(
String fileName,
String groupName,
String testName,
Integer lineNumber,
CsvTestCase testCase,
String instructions,
StorageBackend storageBackend
) {
super(fileName, groupName, testName, lineNumber, testCase, instructions, storageBackend, "ndjson");
}

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@ParametersFactory(argumentFormatting = "csv-spec:%2$s.%3$s [%7$s]")
public static List<Object[]> readScriptSpec() throws Exception {
return readExternalSpecTests("/external-ndjson.csv-spec");
}
}
Loading