Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build-tools-internal/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ cuvs_java = 25.12.0
ldapsdk = 7.0.3

antlr4 = 4.13.1
iceberg = 1.10.1
# bouncy castle version for non-fips. fips jars use a different version
bouncycastle=1.79
# used by security and idp (need to be in sync due to cross-dependency in testing)
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/142707.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: ES|QL
issues: []
pr: 142707
summary: Reapply "Introduce pluggable external datasource framework"
type: feature
305 changes: 305 additions & 0 deletions gradle/verification-metadata.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ public void handle(final HttpExchange exchange) throws IOException {
if (blob == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
// HEAD response must include Content-Length header for S3 clients (AWS SDK) that read file size
exchange.getResponseHeaders().add("Content-Length", String.valueOf(blob.length()));
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
}
} else if (request.isListMultipartUploadsRequest()) {
Expand Down Expand Up @@ -181,6 +184,9 @@ public void handle(final HttpExchange exchange) throws IOException {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
var range = parsePartRange(exchange);
if (range.end() == null) {
throw new AssertionError("Copy-part range must specify an end: " + range);
}
int start = Math.toIntExact(range.start());
int len = Math.toIntExact(range.end() - range.start() + 1);
var part = sourceBlob.slice(start, len);
Expand Down Expand Up @@ -379,16 +385,15 @@ public void handle(final HttpExchange exchange) throws IOException {
return;
}

// S3 supports https://www.rfc-editor.org/rfc/rfc9110.html#name-range. The AWS SDK v1.x seems to always generate range
// requests with a header value like "Range: bytes=start-end" where both {@code start} and {@code end} are always defined
// (sometimes to very high value for {@code end}). It would be too tedious to fully support the RFC so S3HttpHandler only
// supports when both {@code start} and {@code end} are defined to match the SDK behavior.
// S3 supports https://www.rfc-editor.org/rfc/rfc9110.html#name-range
// This handler supports both bounded ranges (bytes=0-100) and open-ended ranges (bytes=100-)
final HttpHeaderParser.Range range = parseRangeHeader(rangeHeader);
if (range == null) {
throw new AssertionError("Bytes range does not match expected pattern: " + rangeHeader);
}
long start = range.start();
long end = range.end();
// For open-ended ranges (bytes=N-), end is null, meaning "to end of file"
long end = range.end() != null ? range.end() : blob.length() - 1;
if (end < start) {
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
public enum HttpHeaderParser {
;

private static final Pattern RANGE_HEADER_PATTERN = Pattern.compile("bytes=([0-9]+)-([0-9]+)");
// Pattern supports both bounded ranges (bytes=0-100) and open-ended ranges (bytes=100-)
private static final Pattern RANGE_HEADER_PATTERN = Pattern.compile("bytes=([0-9]+)-([0-9]*)");
private static final Pattern CONTENT_RANGE_HEADER_PATTERN = Pattern.compile("bytes (?:(\\d+)-(\\d+)|\\*)/(?:(\\d+)|\\*)");

/**
* Parse a "Range" header
*
* Note: only a single bounded range is supported (e.g. <code>Range: bytes={range_start}-{range_end}</code>)
* Supports both bounded and open-ended ranges:
* <ul>
* <li>Bounded: <code>Range: bytes={range_start}-{range_end}</code></li>
* <li>Open-ended: <code>Range: bytes={range_start}-</code> (end is null, meaning "to end of file")</li>
* </ul>
*
* @see <a href="https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range">MDN: Range header</a>
* @param rangeHeaderValue The header value as a string
Expand All @@ -31,18 +36,38 @@ public static Range parseRangeHeader(String rangeHeaderValue) {
final Matcher matcher = RANGE_HEADER_PATTERN.matcher(rangeHeaderValue);
if (matcher.matches()) {
try {
return new Range(Long.parseLong(matcher.group(1)), Long.parseLong(matcher.group(2)));
long start = Long.parseLong(matcher.group(1));
String endGroup = matcher.group(2);
Long end = (endGroup == null || endGroup.isEmpty()) ? null : Long.parseLong(endGroup);
return new Range(start, end);
} catch (NumberFormatException e) {
return null;
}
}
return null;
}

public record Range(long start, long end) {
/**
* A HTTP "Range" from a Range header.
*
* @param start The start of the range (always present)
* @param end The end of the range, or null for open-ended ranges (meaning "to end of file")
*/
public record Range(long start, Long end) {

public Range(long start, long end) {
this(start, (Long) end);
}

/**
* Returns true if this is an open-ended range (no end specified).
*/
public boolean isOpenEnded() {
return end == null;
}

public String headerString() {
return "bytes=" + start + "-" + end;
return end != null ? "bytes=" + start + "-" + end : "bytes=" + start + "-";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ public void testParseRangeHeaderMultipleRangesNotMatched() {
);
}

public void testParseRangeHeaderEndlessRangeNotMatched() {
assertNull(HttpHeaderParser.parseRangeHeader(Strings.format("bytes=%d-", randomLongBetween(0, Long.MAX_VALUE))));
public void testParseRangeHeaderEndlessRange() {
var bytes = randomLongBetween(0, Long.MAX_VALUE);
assertEquals(new HttpHeaderParser.Range(bytes, null), HttpHeaderParser.parseRangeHeader(Strings.format("bytes=%d-", bytes)));
}

public void testParseRangeHeaderSuffixLengthNotMatched() {
Expand Down
39 changes: 39 additions & 0 deletions x-pack/plugin/esql-datasource-csv/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

apply plugin: 'elasticsearch.internal-es-plugin'
apply plugin: 'elasticsearch.publish'

esplugin {
name = 'esql-datasource-csv'
description = 'CSV format support for ESQL external data sources'
classname = 'org.elasticsearch.xpack.esql.datasource.csv.CsvDataSourcePlugin'
extendedPlugins = ['x-pack-esql']
}

base {
archivesName = 'esql-datasource-csv'
}

dependencies {
// SPI interfaces from ESQL core
compileOnly project(path: xpackModule('esql'))
compileOnly project(path: xpackModule('esql-core'))
compileOnly project(path: xpackModule('core'))
compileOnly project(':server')
compileOnly project(xpackModule('esql:compute'))

// Jackson CSV for CSV format reader
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:${versions.jackson}"

testImplementation project(':test:framework')
testImplementation(testArtifact(project(xpackModule('core'))))
}

tasks.named("dependencyLicenses").configure {
mapping from: /jackson-.*/, to: 'jackson'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
This copy of Jackson JSON processor streaming parser/generator is licensed under the
Apache (Software) License, version 2.0 ("the License").
See the License for details about distribution rights, and the
specific rights regarding derivate works.

You may obtain a copy of the License at:

http://www.apache.org/licenses/LICENSE-2.0
20 changes: 20 additions & 0 deletions x-pack/plugin/esql-datasource-csv/licenses/jackson-NOTICE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Jackson JSON processor

Jackson is a high-performance, Free/Open Source JSON processing library.
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
been in development since 2007.
It is currently developed by a community of developers, as well as supported
commercially by FasterXML.com.

## Licensing

Jackson core and extension components may licensed under different licenses.
To find the details that apply to this artifact see the accompanying LICENSE file.
For more information, including possible other licensing options, contact
FasterXML.com (http://fasterxml.com).

## Credits

A list of contributors may be found from CREDITS file, which is included
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.
71 changes: 71 additions & 0 deletions x-pack/plugin/esql-datasource-csv/qa/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

apply plugin: 'elasticsearch.internal-java-rest-test'
apply plugin: org.elasticsearch.gradle.internal.precommit.CheckstylePrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.ForbiddenApisPrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.ForbiddenPatternsPrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.FilePermissionsPrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.LoggerUsagePrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.TestingConventionsPrecommitPlugin

dependencies {
// Test fixtures and spec reader infrastructure
javaRestTestImplementation project(xpackModule('esql:qa:testFixtures'))
javaRestTestImplementation project(xpackModule('esql:qa:server'))
javaRestTestImplementation project(xpackModule('esql'))
javaRestTestImplementation(project(path: xpackModule('esql'), configuration: 'testRuntimeElements'))

// S3 fixture infrastructure for mocking S3 operations
javaRestTestImplementation project(':test:fixtures:s3-fixture')
javaRestTestImplementation project(':test:fixtures:aws-fixture-utils')

// GCS fixture infrastructure for mocking GCS operations
javaRestTestImplementation project(':test:fixtures:gcs-fixture')

// Repository S3 module for cluster
clusterModules project(':modules:repository-s3')
// Repository GCS module for cluster
clusterModules project(':modules:repository-gcs')
clusterPlugins project(':plugins:mapper-size')
clusterPlugins project(':plugins:mapper-murmur3')

// The CSV datasource plugin under test
clusterPlugins project(xpackModule('esql-datasource-csv'))
clusterPlugins project(xpackModule('esql-datasource-http'))
clusterPlugins project(xpackModule('esql-datasource-s3'))
// GCS datasource plugin for gs:// storage backend
clusterPlugins project(xpackModule('esql-datasource-gcs'))
}

// The CSV fixtures (employees.csv and csv-basic.csv-spec) are included
// directly in this module's javaRestTest/resources directory

tasks.named('javaRestTest') {
usesDefaultDistribution("to be triaged")
maxParallelForks = 1

// Increase timeouts for S3 operations which may take longer than standard queries
systemProperty 'tests.rest.client_timeout', '60'
systemProperty 'tests.rest.socket_timeout', '60'

// Enable more verbose logging for debugging
testLogging {
events = ["passed", "skipped", "failed"]
exceptionFormat = "full"
showStandardStreams = false
}
}

restResources {
restApi {
include '_common', 'bulk', 'get', 'indices', 'esql', 'xpack', 'cluster', 'capabilities', 'index'
}
restTests {
includeXpack 'esql'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.csv;

import org.elasticsearch.core.PathUtils;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;

import java.net.URISyntaxException;
import java.net.URL;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.esql.datasources.S3FixtureUtils.ACCESS_KEY;
import static org.elasticsearch.xpack.esql.datasources.S3FixtureUtils.SECRET_KEY;

/**
* Cluster configuration for CSV integration tests.
*/
public class Clusters {

public static ElasticsearchCluster testCluster(Supplier<String> s3EndpointSupplier, LocalClusterConfigProvider configProvider) {
return ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.shared(true)
// Enable S3 repository plugin for S3 access
.module("repository-s3")
// Enable GCS repository module for GCS access
.module("repository-gcs")
// Basic cluster settings
.setting("xpack.security.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
// Disable ML to avoid native code loading issues in some environments
.setting("xpack.ml.enabled", "false")
// Allow the LOCAL storage backend to read fixture files from the test resources directory.
// The esql-datasource-http plugin's entitlement policy uses shared_repo for file read access.
.setting("path.repo", fixturesPath())
// S3 client configuration for accessing the S3HttpFixture
.setting("s3.client.default.endpoint", s3EndpointSupplier)
// S3 credentials must be stored in keystore, not as regular settings
.keystore("s3.client.default.access_key", ACCESS_KEY)
.keystore("s3.client.default.secret_key", SECRET_KEY)
// Disable SSL for HTTP fixture
.setting("s3.client.default.protocol", "http")
// Disable AWS SDK profile file loading by pointing to non-existent files
// This prevents the SDK from trying to read ~/.aws/credentials and ~/.aws/config
// which would violate Elasticsearch entitlements
.environment("AWS_CONFIG_FILE", "/dev/null/aws/config")
.environment("AWS_SHARED_CREDENTIALS_FILE", "/dev/null/aws/credentials")
// Apply any additional configuration
.apply(() -> configProvider)
.build();
}

public static ElasticsearchCluster testCluster(Supplier<String> s3EndpointSupplier) {
return testCluster(s3EndpointSupplier, config -> {});
}

private static String fixturesPath() {
URL resourceUrl = Clusters.class.getResource("/iceberg-fixtures");
if (resourceUrl != null && resourceUrl.getProtocol().equals("file")) {
try {
return PathUtils.get(resourceUrl.toURI()).toAbsolutePath().toString();
} catch (URISyntaxException e) {
throw new IllegalStateException("Failed to resolve fixtures path", e);
}
}
// Fall back to a safe default; LOCAL tests will fail gracefully
return "/tmp";
}
}
Loading