[ESQL] Introduce pluggable external datasource framework#141678
[ESQL] Introduce pluggable external datasource framework#141678costin merged 33 commits intoelastic:mainfrom
Conversation
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
| @@ -749,6 +697,7 @@ private void resolveIndicesAndAnalyze( | |||
| return r; | |||
| }) | |||
| .<PreAnalysisResult>andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l)) | |||
| .<PreAnalysisResult>andThen((l, r) -> preAnalyzeExternalSources(parsed.plan(), preAnalysis, r, l)) | |||
There was a problem hiding this comment.
I wonder if we should resolve external sources as part (or even inside) of indexResolver?
Current approach indirectly implies external sources are separate paths, presumably only in FROM. If we can make them part of IndexPattern (or just pattern) we could eventually make joins and subqueries with external sources as well as select from mix of indices and external sources at once.
There was a problem hiding this comment.
I worked on minimal changes to this area as my goal for the PR is end-to-end testing.
That said, as the discovery paths are separate, I would keep the indexResolver, which is fairly tied to FieldCaps & co, in place and work on generifying the PreAnalyzer/Analysis phase.
| * Implementations handle specific protocols (HTTP, S3, GCS, local, etc.). | ||
| * This is a read-only interface focused on ESQL's needs for querying external data. | ||
| */ | ||
| public interface StorageProvider extends Closeable { |
There was a problem hiding this comment.
I wonder if we should rely on org.elasticsearch.common.blobstore.BlobStore instead?
It is used by snapshot and already implemented for s3, gcs, azure, fs and probably others.
This will allow us to leverage existing infrastructure for accessing external sources rather than creating new infrastructure.
It is part of the server package so it should not create any extra unwanted dependencies (unless eventually we want to abstract that as well).
There was a problem hiding this comment.
I started with Blob however it adds both read and write methods so ended up with the above interface which is read-onlly focused. Definitely something to look into closer to see whether the two interfaces could be unified (as well as the code behind it).
| String formatName(); | ||
|
|
||
| /** Returns file extensions this reader handles (e.g., [".parquet", ".parq"]). */ | ||
| List<String> fileExtensions(); |
There was a problem hiding this comment.
Do we plan to use it only to detect a format for the object?
I think we need to keep in mind structures that spans multiple files with multiple extensions (such as lucene indices as well).
Also
| List<String> fileExtensions(); | |
| Collection<String> fileExtensions(); |
for more flexibility
There was a problem hiding this comment.
In this PR I started simple, with one format per provider. This can be extended to have mixed formats through hierarchical provider that.
| /** | ||
| * @return The type of external source (e.g., "iceberg", "parquet") | ||
| */ | ||
| String sourceType(); |
There was a problem hiding this comment.
I think it is worth having ExternalSource object. For now it could only have a name but later it could be used to describe what kind of optimizations (such as pushdown) it supports.
| /** | ||
| * @return The path or identifier of the external source (e.g., S3 path) | ||
| */ | ||
| String tablePath(); |
There was a problem hiding this comment.
| String tablePath(); | |
| String dataPath(); |
Also do we expect it to be spread across multiple files (again thinking about lucene index like data structures with multiple files with different functions backing the actual data)
There was a problem hiding this comment.
I believe you're touching on two topics here:
- logical storage - which is composed of multiple files that work together as a unit
- heterogeneous data - different file formats and providers that can be seen as a unified table
Neither is covered in this PR which focused on the SPI primitive where a logical storage = physical one. Once settled, we should be able to assemble the logical storage with this primitive.
Point 2 is conceptually simliar to our IndexResolver as you already pointed out however there's a lot more work on the planner side to efficiently do data loading and processing based on the overall plan and cost analysis, which is way outside the current iteration scope.
cfe8bcb to
615d00b
Compare
| // Create ExternalSourceResolver for Iceberg/Parquet resolution | ||
| // Use the same executor as for searches to avoid blocking | ||
| final ExternalSourceResolver externalSourceResolver = new ExternalSourceResolver( | ||
| services.transportService().getThreadPool().executor(org.elasticsearch.threadpool.ThreadPool.Names.SEARCH), |
There was a problem hiding this comment.
We use dedicated thread pools for snapshot read operations:
SNAPSHOT for actual lucene files and SNAPSHOT_META for fairly small files describing the snapshot structure.
I believe we should revisit overall esql thread pool usage and split operations into compute/memory heavy (actual transformations and aggregations) and read heavy. With this change I believe we should split read heavy further into reading local lucene files and remote data source files.
Move data lake types from base/ into connector.lakehouse (including LakehouseConnector, LakehousePlan, and 17 SPI types from PR #141678). Move SQL types from base/ into connector.sql (SqlConnector, SqlPlan, JdbcConnector, JdbcPlan). Remove the now-empty base/ and example/ packages. Update all cross-references in javadoc and CONNECTOR_SPI.md. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a modular framework for querying external data sources from ESQL via
the EXTERNAL command. The goal is to enable ESQL to query data residing
outside Elasticsearch indices, including datalakes (Iceberg), cloud
storage (S3), HTTP endpoints, and local files, using various formats.
The architecture consists of two pluggable layers:
- Storage layer: protocol-agnostic access to storage systems
- StorageProvider/StorageObject SPIs with HTTP, S3, and local
filesystem implementations
- Range-based reads for efficient columnar format access
- Format layer: data parsing into ESQL Page batches
- FormatReader SPI with CSV and Parquet implementations
- Schema inference and column projection support
Additional components:
- Iceberg datalake adapter with catalog integration and filter pushdown
- ExternalSourceOperatorFactory combining storage and format layers
- Parser and analyzer extensions for EXTERNAL relation syntax
- Physical plan mapping and LocalExecutionPlanner integration
- Comprehensive test suite including S3 fixture-based integration tests
Developed using AI-assisted tooling.
Refactor datasources to use an SPI-based plugin system where each external data source lives in its own module with isolated dependencies. Core changes: - DataSourcePlugin interface for storage/format/catalog contributions - DataSourceModule for plugin discovery and registry population - BuiltInDataSourcePlugin provides HTTP, file, CSV (always available) - Extracted modules: esql-datasource-s3, esql-datasource-parquet, esql-datasource-iceberg Plan simplification: - Replaced IcebergRelation/IcebergSourceExec with generic ExternalRelation and ExternalSourceExec that work with any registered source type - FilterPushdownRegistry enables per-source predicate optimization This separation allows datasource modules to be developed, tested, and released independently while keeping the ESQL core lightweight. Developed using AI-assisted tooling [CI] Auto commit changes from spotless Fix failures
Extract the HTTP/local storage providers and CSV format reader from the core ESQL plugin into esql-datasource-http and esql-datasource-csv, loaded via the existing DataSourcePlugin SPI. This removes hardcoded scheme routing, isolates third-party dependencies (Jackson CSV, Apache HttpClient), and makes all storage backends and format readers pluggable. - StorageProviderRegistry simplified: holds concrete providers, supports per-query provider creation for WITH-clause config, drops unnecessary synchronized blocks (registration is init-only). - StorageManager closes per-query providers, fixing a resource leak. - OperatorFactoryRegistry and ExternalSourceResolver propagate per-query config end-to-end, making execution fully scheme-agnostic. - Test infrastructure unified: AbstractExternalSourceSpecTestCase cross-products csv-spec tests with all backends (S3, HTTP, LOCAL). Removed BuiltInDataSourcePlugin and per-format test base classes. - CsvFormatReader: datetime parsing uses looksNumeric() pre-check instead of exception-based branching; method javadocs removed per project conventions. Developed using AI-assisted tooling
b05225d to
2b7bf3b
Compare
Enable wildcard-based file discovery so that external source queries can match multiple files (e.g. FROM "file:///data/*.parquet"). - StoragePath, StorageProvider: added listObjects with recursive flag - GlobExpander, GlobMatcher, FileSet: new glob expansion pipeline - LocalStorageProvider: recursive file walking for glob resolution - ExternalSourceResolver, ExternalRelation: multi-file resolution - AsyncExternalSourceOperatorFactory: iterate over expanded file sets - AnalyzerTests, ExternalSourceResolverTests, GlobDiscoveryLocalTests: new test coverage for glob patterns Developed using AI-assisted tooling
The iceberg and parquet datasource plugins were missing license files, dependency mappings, and thirdPartyAudit configurations needed to pass CI precommit checks. - esql-datasource-iceberg: added license files (arrow, aws-sdk-2, caffeine, joda-time, reactive-streams), dependency mappings, and thirdPartyAudit ignoreMissingClasses/ignoreViolations - esql-datasource-parquet: added thirdPartyAudit configuration - IcebergSourceOperatorFactory: refactored for updated API - Minor test fixes across analyzer and operator factory tests Developed using AI-assisted tooling
2b7bf3b to
af0e510
Compare
Move data lake types from base/ into connector.lakehouse (including LakehouseConnector, LakehousePlan, and 17 SPI types from PR elastic#141678). Move SQL types from base/ into connector.sql (SqlConnector, SqlPlan, JdbcConnector, JdbcPlan). Remove the now-empty base/ and example/ packages. Update all cross-references in javadoc and CONNECTOR_SPI.md.
…stic#141678)" This reverts commit 6acdb30. It's causing HeapAttackIT to fail in serverless. On some machines. But not others! How exciting. For now, we'll revert and then try and get it back more slowly.
…1678) (#142663) * Revert "[ESQL] Introduce pluggable external datasource framework (#141678)" This reverts commit 6acdb30. It's causing HeapAttackIT to fail in serverless. On some machines. But not others! How exciting. For now, we'll revert and then try and get it back more slowly. Co-authored-by: Nik Everett <nik9000@gmail.com>
…astic#141678) (elastic#142663) This reverts commit 8f68c0b.
…on-sliced-reindex * upstream/main: (120 commits) [Fleet] Add OpAMP field mappings to fleet-agents (elastic#142550) Clarify `expectedSize` behaviour of `ReleasableBytesStreamOutput` (elastic#142451) Refactor KnnIndexTester to tidy up some options (elastic#142651) Fixed with elastic#142638 already (elastic#142655) Change *OverTimeTests to extend AbstractAggregationTestCase (elastic#142659) Fix byteRefBlockHashSize for release mode (elastic#142668) Mute org.elasticsearch.xpack.esql.tree.EsqlNodeSubclassTests testTransform {class org.elasticsearch.xpack.esql.plan.logical.MMR} elastic#142674 Fix PAUSED_FOR_NODE_REMOVAL shard blocking QUEUED promotion (elastic#142637) Mute org.elasticsearch.xpack.logsdb.RandomizedRollingUpgradeIT testIndexingStandardSource elastic#142670 Revert "[ESQL] Introduce pluggable external datasource framework (elastic#141678) (elastic#142663) Mute org.elasticsearch.xpack.esql.spatial.SpatialPushDownGeoShapeIT testQuantizedXY elastic#141234 PromQL: infer start/end from query DSL filters (elastic#142580) Add GPU vector indexing monitoring to _xpack/usage (elastic#141932) Fix testTrackerClearShutdown: use non-zero startTimeMillis for DONE status (elastic#142646) Mute org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT test elastic#142426 ESQL_ Move time_zone to GA (elastic#142287) Mute org.elasticsearch.xpack.esql.qa.multi_node.GenerativeIT test elastic#142426 DOCS: Convert Painless diagrams to mermaid (elastic#141851) ES|QL: fix validation in generative tests (elastic#142638) Unmute tests that do not reproduce failures (elastic#141712) ...
The datasource plugins are now excluded from serverless (separate PR), similar to other plugins. Thus the data sources functionality (elastic#141678) can be added back. This reverts commit 8f68c0b and restore the functionality back into main.
The datasource plugins are now excluded from serverless (separate PR), similar to other plugins. Thus the data sources functionality (elastic#141678) can be added back. This reverts commit 8f68c0b and restore the functionality back into main.
The datasource plugins are now excluded from serverless (separate PR), similar to other plugins. Thus the data sources functionality (elastic#141678) can be added back. This reverts commit 8f68c0b and restore the functionality back into main.
The datasource plugins are now excluded from serverless (separate PR), similar to other plugins. Thus the data sources functionality (elastic#141678) can be added back. This reverts commit 8f68c0b and restore the functionality back into main.
The datasource plugins are now excluded from serverless (separate PR), similar to other plugins. Thus the data sources functionality (elastic#141678) can be added back. This reverts commit 8f68c0b and restore the functionality back into main.
Add a modular framework for querying external data sources from ESQL via the EXTERNAL command. The goal is to enable ESQL to query data residing outside Elasticsearch indices, including datalakes (Iceberg), cloud storage (S3), HTTP endpoints, and local files, using various formats.
The architecture consists of two pluggable layers:
Storage layer: protocol-agnostic access to storage systems
Format layer: data parsing into ESQL Page batches
Additional components:
Developed using AI-assisted tooling