From 66857c18f9ab196bf1ad5deac182de1eb117a898 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 5 Mar 2026 08:45:26 +0200 Subject: [PATCH 1/6] Fix flaky EmployeeFlightServerTests shutdown race Use FlightServer.close() for robust gRPC shutdown and add a brief sleep to let Netty event loop callbacks drain before the test ends. Closes #143636 --- .../esql/datasource/grpc/EmployeeFlightServer.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/esql-datasource-grpc/src/test/java/org/elasticsearch/xpack/esql/datasource/grpc/EmployeeFlightServer.java b/x-pack/plugin/esql-datasource-grpc/src/test/java/org/elasticsearch/xpack/esql/datasource/grpc/EmployeeFlightServer.java index ea500c3c60882..3f2e2a96cb4c4 100644 --- a/x-pack/plugin/esql-datasource-grpc/src/test/java/org/elasticsearch/xpack/esql/datasource/grpc/EmployeeFlightServer.java +++ b/x-pack/plugin/esql-datasource-grpc/src/test/java/org/elasticsearch/xpack/esql/datasource/grpc/EmployeeFlightServer.java @@ -40,7 +40,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; /** * In-memory Arrow Flight server that serves employee data from the employees.csv test fixture. @@ -249,8 +248,15 @@ public void getStream(FlightProducer.CallContext context, Ticket ticket, FlightP @Override public void close() throws IOException { try { - server.shutdown(); - server.awaitTermination(5, TimeUnit.SECONDS); + // Use FlightServer.close() which provides a robust shutdown sequence: + // graceful shutdown → awaitTermination → shutdownNow fallback. + // After close() returns, the gRPC server is terminated but Netty's event loop may still + // have pending ChannelFutureListener callbacks (e.g. NettyServerHandler.closeStreamWhenDone) + // that can throw if the HTTP/2 stream was already closed. Netty's DefaultPromise logs + // these as WARN which ESTestCase.checkStaticState() treats as a test failure. + // A brief sleep lets those callbacks drain on the event loop thread before the test ends. + server.close(); + Thread.sleep(50); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted while closing FlightServer", e); From 82b3bfcd9cc3daa6c1c1a08a19a016e9ec916daf Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 5 Mar 2026 08:59:17 +0200 Subject: [PATCH 2/6] Unmute EmployeeFlightServerTests testMultiEndpointReturnsCorrectEndpointCount --- muted-tests.yml | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index d48c1a771323c..50a599f0ac2ee 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -387,9 +387,30 @@ tests: - class: org.elasticsearch.index.query.PrefixQueryBuilderTests method: testPrefixCircuitBreakerTripsWithLowLimit issue: https://github.com/elastic/elasticsearch/issues/143548 -- class: org.elasticsearch.xpack.esql.datasource.grpc.EmployeeFlightServerTests - method: testMultiEndpointReturnsCorrectEndpointCount - issue: https://github.com/elastic/elasticsearch/issues/143636 +- class: org.elasticsearch.xpack.esql.CsvIT + method: test {csv-spec:k8s-timeseries-irate.Irate_of_ratio_promql} + issue: https://github.com/elastic/elasticsearch/issues/143616 +- class: org.elasticsearch.xpack.esql.CsvIT + method: test {csv-spec:k8s-timeseries-irate.Irate_of_ratio} + issue: https://github.com/elastic/elasticsearch/issues/143617 +- class: org.elasticsearch.xpack.esql.CsvIT + method: test {csv-spec:k8s-timeseries-irate.Irate_of_double_no_grouping_promql} + issue: https://github.com/elastic/elasticsearch/issues/143624 +- class: org.elasticsearch.xpack.esql.CsvIT + method: test {csv-spec:k8s-timeseries-irate.Irate_of_double_no_grouping} + issue: https://github.com/elastic/elasticsearch/issues/143625 +- class: org.elasticsearch.xpack.esql.CsvIT + method: test {csv-spec:k8s-timeseries-avg-over-time.Avg_over_time_of_aggregate_metric_double} + issue: https://github.com/elastic/elasticsearch/issues/143631 +- class: org.elasticsearch.xpack.esql.CsvIT + method: test {csv-spec:k8s-timeseries-delta.Delta_of_double_no_grouping} + issue: https://github.com/elastic/elasticsearch/issues/143632 +- class: org.elasticsearch.xpack.esql.CsvIT + method: test {csv-spec:k8s-timeseries-avg-over-time.Avg_over_time_of_aggregate_metric_double_grouping} + issue: https://github.com/elastic/elasticsearch/issues/143633 +- class: org.elasticsearch.xpack.esql.CsvIT + method: test {csv-spec:k8s-timeseries-delta.Delta_of_double_no_grouping_promql} + issue: https://github.com/elastic/elasticsearch/issues/143634 - class: org.elasticsearch.search.SearchLoggingIT method: testSearchLogShardInfoPartialFailure issue: https://github.com/elastic/elasticsearch/issues/143638 From 79deb27a281f8f4953f630e41b56ff3b96a8d294 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 5 Mar 2026 18:34:45 +0200 Subject: [PATCH 3/6] [ESQL] Enable distributed pipeline breakers for external sources via FragmentExec Wrap ExternalRelation in FragmentExec (like EsRelation) so that pipeline breakers (Aggregate, Limit, TopN) above external sources are naturally distributed to data nodes via ExchangeExec. On data nodes, localPlan() expands the FragmentExec through LocalMapper into ExternalSourceExec, enabling local optimizations such as filter pushdown via FilterPushdownRegistry. Key changes: - Make ExternalRelation serializable for transport inside FragmentExec - Mapper wraps ExternalRelation in FragmentExec; LocalMapper expands it - Split discovery peeks inside FragmentExec for ExternalRelation - Coordinator-only path injects splits after localPlan expansion - AdaptiveStrategy recognizes TopN as a pipeline breaker --- .../main/resources/external-basic.csv-spec | 31 +++ .../xpack/esql/plan/PlanWritables.java | 2 + .../esql/plan/logical/ExternalRelation.java | 50 +++- .../plan/physical/ExternalSourceExec.java | 19 +- .../esql/planner/mapper/LocalMapper.java | 7 +- .../xpack/esql/planner/mapper/Mapper.java | 7 +- .../esql/planner/mapper/MapperUtils.java | 7 - .../xpack/esql/plugin/AdaptiveStrategy.java | 10 +- .../xpack/esql/plugin/ComputeService.java | 68 ++++- .../esql/plugin/DataNodeComputeHandler.java | 35 ++- .../ExternalRelationSerializationTests.java | 56 ++++ .../esql/plugin/AdaptiveStrategyTests.java | 21 ++ .../plugin/ExternalDistributionTests.java | 256 +++++++++++++++++- .../plugin/ExternalSourceDataNodeTests.java | 6 +- 14 files changed, 510 insertions(+), 65 deletions(-) create mode 100644 x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelationSerializationTests.java diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/external-basic.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/external-basic.csv-spec index bce42a159aba8..ed866793c751a 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/external-basic.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/external-basic.csv-spec @@ -212,3 +212,34 @@ emp_no:integer | height:double | height_float_rounded:double | height.scaled_flo 10004 | 1.78 | 1.78 | 1.78 | 1.78 10005 | 2.05 | 2.05 | 2.05 | 2.05 ; + +// Pipeline breaker distribution tests +// These exercise distributed execution paths for pipeline breakers (STATS, TopN, LIMIT) +// across all distribution modes via ExternalDistributedSpecIT + +topNSortBySalaryDesc +required_capability: external_command +EXTERNAL "{{employees}}" +| KEEP emp_no, first_name, salary +| SORT salary DESC +| LIMIT 3; + +emp_no:integer | first_name:keyword | salary:integer +10029 | "Otmar" | 74999 +10045 | "Moss" | 74970 +10007 | "Tzvetan" | 74572 +; + +topNFilteredSortBySalary +required_capability: external_command +EXTERNAL "{{employees}}" +| WHERE gender == "F" +| KEEP emp_no, first_name, salary +| SORT salary DESC +| LIMIT 3; + +emp_no:integer | first_name:keyword | salary:integer +10007 | "Tzvetan" | 74572 +10027 | "Divier" | 73851 +10099 | "Valter" | 73578 +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java index 3d7ad2913be71..a93b642406fc1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Grok; import org.elasticsearch.xpack.esql.plan.logical.InlineStats; @@ -88,6 +89,7 @@ public static List logical() { Enrich.ENTRY, EsRelation.ENTRY, Eval.ENTRY, + ExternalRelation.ENTRY, Filter.ENTRY, Grok.ENTRY, InlineJoin.ENTRY, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelation.java index 87357a7f4c0f9..c600e72906925 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelation.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelation.java @@ -6,29 +6,35 @@ */ package org.elasticsearch.xpack.esql.plan.logical; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.NodeUtils; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.datasources.FileSet; +import org.elasticsearch.xpack.esql.datasources.spi.SimpleSourceMetadata; import org.elasticsearch.xpack.esql.datasources.spi.SourceMetadata; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.physical.ExternalSourceExec; +import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Objects; /** * Logical plan node for external data source relations (e.g., Iceberg table, Parquet file). - * This plan node is executed on the coordinator only (no dispatch to data nodes). *

- * Unlike EsRelation which wraps into FragmentExec for data node dispatch, - * ExternalRelation maps directly to physical source operators via LocalMapper, - * similar to how LocalRelation works. + * Like {@link EsRelation}, the Mapper wraps this into a {@link org.elasticsearch.xpack.esql.plan.physical.FragmentExec} + * so that pipeline breakers (Aggregate, Limit, TopN) above it are distributed to data nodes + * via ExchangeExec. On data nodes, {@code localPlan()} expands the FragmentExec through + * LocalMapper into {@link ExternalSourceExec}, enabling local optimizations such as + * filter pushdown via FilterPushdownRegistry. *

- * This class provides a source-agnostic logical plan node for external data sources. - * It can represent any external source (Iceberg, Parquet, CSV, etc.) without requiring - * source-specific subclasses in core ESQL code. + * The {@link ExecutesOn.Coordinator} marker is retained for logical plan validation + * (e.g., Enrich/Join hoist rules that inspect whether a relation executes on the coordinator). *

* The source-specific metadata is stored in the {@link SourceMetadata} interface, which * provides: @@ -45,6 +51,12 @@ */ public class ExternalRelation extends LeafPlan implements ExecutesOn.Coordinator { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + LogicalPlan.class, + "ExternalRelation", + ExternalRelation::readFrom + ); + private final String sourcePath; private final List output; private final SourceMetadata metadata; @@ -71,14 +83,32 @@ public ExternalRelation(Source source, String sourcePath, SourceMetadata metadat this(source, sourcePath, metadata, output, FileSet.UNRESOLVED); } + private static ExternalRelation readFrom(StreamInput in) throws IOException { + var source = Source.readFrom((PlanStreamInput) in); + String sourcePath = in.readString(); + String sourceType = in.readString(); + var output = in.readNamedWriteableCollectionAsList(Attribute.class); + @SuppressWarnings("unchecked") + Map config = (Map) in.readGenericValue(); + @SuppressWarnings("unchecked") + Map sourceMetadata = (Map) in.readGenericValue(); + var metadata = new SimpleSourceMetadata(output, sourceType, sourcePath, null, null, sourceMetadata, config); + return new ExternalRelation(source, sourcePath, metadata, output, FileSet.UNRESOLVED); + } + @Override - public void writeTo(StreamOutput out) { - throw new UnsupportedOperationException("ExternalRelation is not yet serializable for cross-cluster operations"); + public void writeTo(StreamOutput out) throws IOException { + Source.EMPTY.writeTo(out); + out.writeString(sourcePath); + out.writeString(metadata.sourceType()); + out.writeNamedWriteableCollection(output); + out.writeGenericValue(metadata.config()); + out.writeGenericValue(metadata.sourceMetadata()); } @Override public String getWriteableName() { - throw new UnsupportedOperationException("ExternalRelation is not yet serializable for cross-cluster operations"); + return ENTRY.name; } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExternalSourceExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExternalSourceExec.java index 8b4da7ff60159..62fd51d568ae7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExternalSourceExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExternalSourceExec.java @@ -19,7 +19,6 @@ import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit; import org.elasticsearch.xpack.esql.datasources.spi.FormatReader; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; -import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import java.io.IOException; import java.util.List; @@ -29,7 +28,7 @@ /** * Generic physical plan node for reading from external data sources (e.g., Iceberg tables, Parquet files). *

- * This is the unified physical plan node for all external sources, replacing source-specific nodes + * This is the unified physical plan node for all external sources, replacing source-specific nodes. * It uses generic maps for configuration and metadata to avoid leaking * source-specific types (like S3Configuration) into core ESQL code. *

@@ -40,13 +39,13 @@ *

  • Opaque metadata: Source-specific data (native schema, etc.) is stored in * {@link #sourceMetadata()} and passed through without core understanding it
  • *
  • Opaque pushed filter: The {@link #pushedFilter()} is an opaque Object that only - * the source-specific operator factory interprets. It is NOT serialized because external - * sources execute on coordinator only ({@link ExecutesOn.Coordinator})
  • - *
  • Coordinator-only execution: External sources run entirely on the coordinator node, - * so no cross-node serialization of source-specific data is needed
  • + * the source-specific operator factory interprets. It is NOT serialized; it is created + * locally on each data node by the LocalPhysicalPlanOptimizer via FilterPushdownRegistry + *
  • Data node execution: Created on data nodes by LocalMapper from + * {@link org.elasticsearch.xpack.esql.plan.logical.ExternalRelation} inside FragmentExec
  • * */ -public class ExternalSourceExec extends LeafExec implements EstimatesRowSize, ExecutesOn.Coordinator { +public class ExternalSourceExec extends LeafExec implements EstimatesRowSize { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( PhysicalPlan.class, @@ -61,10 +60,10 @@ public class ExternalSourceExec extends LeafExec implements EstimatesRowSize, Ex private final List attributes; private final Map config; private final Map sourceMetadata; - private final Object pushedFilter; // Opaque filter - NOT serialized (coordinator only) - private final int pushedLimit; // NOT serialized (coordinator only) + private final Object pushedFilter; // Opaque filter - NOT serialized, created locally on data nodes + private final int pushedLimit; // NOT serialized, set locally on data nodes private final Integer estimatedRowSize; - private final FileSet fileSet; // NOT serialized - coordinator only + private final FileSet fileSet; // NOT serialized - resolved on coordinator, null on data nodes private final List splits; public ExternalSourceExec( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java index 934c2f0ff02f8..f0fbf6fbf9730 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java @@ -14,6 +14,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; import org.elasticsearch.xpack.esql.plan.logical.Limit; @@ -71,8 +72,10 @@ private PhysicalPlan mapLeaf(LeafPlan leaf) { return new EsSourceExec(esRelation); } - // ExternalRelation is handled by MapperUtils.mapLeaf() - // via its toPhysicalExec() method, bypassing FragmentExec/ExchangeExec dispatch + if (leaf instanceof ExternalRelation external) { + return external.toPhysicalExec(); + } + return MapperUtils.mapLeaf(leaf); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index 50da38255bf4c..f9435ad8693a6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Fork; import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; @@ -87,8 +88,10 @@ private PhysicalPlan mapLeaf(LeafPlan leaf) { return new FragmentExec(esRelation); } - // ExternalRelation is handled by MapperUtils.mapLeaf() - // which calls toPhysicalExec() to create coordinator-only source operators + if (leaf instanceof ExternalRelation external) { + return new FragmentExec(external); + } + return MapperUtils.mapLeaf(leaf); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java index 209bdfc377ba7..e47f11b03beb6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java @@ -16,7 +16,6 @@ import org.elasticsearch.xpack.esql.plan.logical.Dissect; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.Eval; -import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Grok; import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; @@ -70,12 +69,6 @@ static PhysicalPlan mapLeaf(LeafPlan p) { return new LocalSourceExec(local.source(), local.output(), local.supplier()); } - // External data sources (Iceberg, Parquet, etc.) - // These are executed on the coordinator only, bypassing FragmentExec/ExchangeExec dispatch - if (p instanceof ExternalRelation external) { - return external.toPhysicalExec(); - } - // Commands if (p instanceof ShowInfo showInfo) { return new ShowExec(showInfo.source(), showInfo.output(), showInfo.values()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategy.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategy.java index cb98150a7f15f..86fccf021b654 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategy.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategy.java @@ -19,9 +19,9 @@ /** * Adaptive distribution strategy for external sources. *

    - * Distributes when the plan contains aggregations and there are multiple splits, - * or when the split count exceeds the number of eligible nodes. - * Stays on the coordinator for single splits or LIMIT-only plans. + * Distributes when the plan contains pipeline breakers (aggregations, TopN) + * and there are multiple splits, or when the split count exceeds the number + * of eligible nodes. Stays on the coordinator for single splits or LIMIT-only plans. */ public final class AdaptiveStrategy implements ExternalDistributionStrategy { @@ -56,10 +56,10 @@ public ExternalDistributionPlan planDistribution(ExternalDistributionContext con return ExternalDistributionPlan.LOCAL; } - boolean hasAggregation = plan.anyMatch(n -> n instanceof AggregateExec); + boolean hasPipelineBreaker = plan.anyMatch(n -> n instanceof AggregateExec || n instanceof TopNExec); boolean manySplits = splits.size() > nodes.size(); - if (hasAggregation || manySplits) { + if (hasPipelineBreaker || manySplits) { boolean allHaveSize = true; for (ExternalSplit split : splits) { if (split.estimatedSizeInBytes() <= 0) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 7151f2e49aa68..64e1331a90a63 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -66,6 +66,7 @@ import org.elasticsearch.xpack.esql.optimizer.PhysicalVerifier; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.MetricsInfo; import org.elasticsearch.xpack.esql.plan.logical.TsInfo; @@ -215,6 +216,10 @@ PlannerSettings.Holder plannerSettings() { return plannerSettings; } + FilterPushdownRegistry filterPushdownRegistry() { + return filterPushdownRegistry; + } + PhysicalPlan discoverSplits(PhysicalPlan plan) { if (operatorFactoryRegistry == null) { return plan; @@ -259,7 +264,7 @@ static ExternalDistributionStrategy resolveExternalDistributionStrategy(QueryPra ExternalDistributionResult applyExternalDistributionStrategy(PhysicalPlan plan, Configuration configuration) { List externalSplits = collectExternalSplits(plan); if (externalSplits.isEmpty()) { - return new ExternalDistributionResult(plan, null); + return new ExternalDistributionResult(plan, null, List.of()); } ExternalDistributionStrategy strategy = resolveExternalDistributionStrategy(configuration.pragmas()); @@ -278,29 +283,57 @@ ExternalDistributionResult applyExternalDistributionStrategy(PhysicalPlan plan, externalSplits.size(), distributionPlan.nodeAssignments().size() ); - return new ExternalDistributionResult(plan, distributionPlan); + return new ExternalDistributionResult(plan, distributionPlan, List.of()); } - return new ExternalDistributionResult(collapseExternalSourceExchanges(plan), null); + return new ExternalDistributionResult(collapseExternalSourceExchanges(plan), null, externalSplits); } - record ExternalDistributionResult(PhysicalPlan plan, ExternalDistributionPlan distributionPlan) { + record ExternalDistributionResult(PhysicalPlan plan, ExternalDistributionPlan distributionPlan, List coordinatorSplits) { boolean isDistributed() { return distributionPlan != null && distributionPlan.distributed(); } } - private static List collectExternalSplits(PhysicalPlan plan) { + private List collectExternalSplits(PhysicalPlan plan) { List splits = new ArrayList<>(); plan.forEachDown(ExternalSourceExec.class, exec -> splits.addAll(exec.splits())); + if (splits.isEmpty()) { + discoverSplitsFromFragments(plan, splits); + if (splits.size() > SplitCoalescer.COALESCING_THRESHOLD) { + List coalesced = SplitCoalescer.coalesce(splits); + if (coalesced != splits) { + splits.clear(); + splits.addAll(coalesced); + } + } + } return splits; } + private void discoverSplitsFromFragments(PhysicalPlan plan, List splits) { + if (operatorFactoryRegistry == null) { + return; + } + plan.forEachDown(FragmentExec.class, fragment -> { + fragment.fragment().forEachDown(ExternalRelation.class, external -> { + ExternalSourceExec tempExec = external.toPhysicalExec(); + PhysicalPlan discovered = SplitDiscoveryPhase.resolveExternalSplits(tempExec, operatorFactoryRegistry.sourceFactories()); + if (discovered instanceof ExternalSourceExec withSplits) { + splits.addAll(withSplits.splits()); + } + }); + }); + } + static PhysicalPlan collapseExternalSourceExchanges(PhysicalPlan plan) { return plan.transformUp(ExchangeExec.class, exchange -> { if (exchange.child() instanceof ExternalSourceExec) { return exchange.child(); } + if (exchange.child() instanceof FragmentExec fragment && fragment.fragment().anyMatch(ExternalRelation.class::isInstance)) { + return exchange.child(); + } return exchange; }); } @@ -522,6 +555,7 @@ public void executePlan( coordinatorPlan, plannerSettings.get(), LocalPhysicalOptimization.ENABLED, + distributionResult.coordinatorSplits(), planTimeProfile, computeListener.acquireCompute() ); @@ -880,6 +914,19 @@ void runCompute( LocalPhysicalOptimization localPhysicalOptimization, PlanTimeProfile planTimeProfile, ActionListener listener + ) { + runCompute(task, context, plan, plannerSettings, localPhysicalOptimization, List.of(), planTimeProfile, listener); + } + + void runCompute( + CancellableTask task, + ComputeContext context, + PhysicalPlan plan, + PlannerSettings plannerSettings, + LocalPhysicalOptimization localPhysicalOptimization, + List coordinatorExternalSplits, + PlanTimeProfile planTimeProfile, + ActionListener listener ) { var shardContexts = context.searchContexts().map(ComputeSearchContext::shardContext); EsPhysicalOperationProviders physicalOperationProviders = new EsPhysicalOperationProviders( @@ -911,7 +958,10 @@ void runCompute( List localContexts = new ArrayList<>(); context.searchExecutionContexts().iterable().forEach(localContexts::add); - boolean hasExternalSource = plan.anyMatch(p -> p instanceof ExternalSourceExec); + boolean hasExternalSource = plan.anyMatch( + p -> p instanceof ExternalSourceExec + || (p instanceof FragmentExec f && f.fragment().anyMatch(ExternalRelation.class::isInstance)) + ); var localPlan = switch (localPhysicalOptimization) { case ENABLED -> hasExternalSource ? PlannerUtils.localPlan( @@ -935,6 +985,12 @@ void runCompute( ); case DISABLED -> plan; }; + if (coordinatorExternalSplits.isEmpty() == false) { + localPlan = localPlan.transformUp( + ExternalSourceExec.class, + exec -> exec.splits().isEmpty() ? exec.withSplits(coordinatorExternalSplits) : exec + ); + } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Local plan for {}:\n{}", context.description(), localPlan); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index dc55f33e7504c..933ec4a8005ae 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -59,6 +59,7 @@ import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.session.Configuration; +import org.elasticsearch.xpack.esql.stats.SearchStats; import java.util.ArrayList; import java.util.HashMap; @@ -802,17 +803,26 @@ private void handleExternalSourceRequest( ExchangeSinkExec sinkExec = (ExchangeSinkExec) request.plan(); Configuration configuration = request.configuration(); final String sessionId = request.sessionId(); - - // Inject external splits into the ExternalSourceExec within the plan - PhysicalPlan planWithSplits = sinkExec.child() - .transformUp(ExternalSourceExec.class, exec -> exec.withSplits(request.externalSplits())); - ExchangeSinkExec updatedSinkExec = new ExchangeSinkExec( - sinkExec.source(), - sinkExec.output(), - sinkExec.isIntermediateAgg(), - planWithSplits + EsqlFlags flags = computeService.createFlags(); + PlannerSettings plannerSettings = computeService.plannerSettings().get(); + + // Run localPlan() to expand FragmentExec(ExternalRelation) -> ExternalSourceExec + // This runs LocalLogicalPlanOptimizer, LocalMapper, and LocalPhysicalPlanOptimizer + // (including filter pushdown via FilterPushdownRegistry) + PhysicalPlan expandedPlan = PlannerUtils.localPlan( + plannerSettings, + flags, + configuration, + configuration.newFoldContext(), + sinkExec, + SearchStats.EMPTY, + computeService.filterPushdownRegistry(), + planTimeProfile ); + // Inject external splits into the ExternalSourceExec created by localPlan() + PhysicalPlan planWithSplits = expandedPlan.transformUp(ExternalSourceExec.class, exec -> exec.withSplits(request.externalSplits())); + try ( ComputeListener computeListener = new ComputeListener( threadPool, @@ -827,7 +837,6 @@ private void handleExternalSourceRequest( task.addListener( () -> { exchangeService.finishSinkHandler(sessionId, new TaskCancelledException(task.getReasonCancelled())); } ); - EsqlFlags flags = computeService.createFlags(); var computeContext = new ComputeContext( internalSessionId, @@ -843,9 +852,9 @@ private void handleExternalSourceRequest( computeService.runCompute( task, computeContext, - updatedSinkExec, - computeService.plannerSettings().get(), - LocalPhysicalOptimization.ENABLED, + planWithSplits, + plannerSettings, + LocalPhysicalOptimization.DISABLED, planTimeProfile, ActionListener.wrap(resp -> { externalSink.addCompletionListener(ActionListener.running(() -> { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelationSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelationSerializationTests.java new file mode 100644 index 0000000000000..1166f424bf7c0 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelationSerializationTests.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical; + +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.datasources.spi.SimpleSourceMetadata; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class ExternalRelationSerializationTests extends AbstractLogicalPlanSerializationTests { + + public static ExternalRelation randomExternalRelation() { + String sourcePath = "s3://bucket/" + randomAlphaOfLength(8) + ".parquet"; + String sourceType = randomFrom("parquet", "csv", "file", "iceberg"); + List output = randomFieldAttributes(1, 5, false); + Map config = randomBoolean() ? Map.of() : Map.of("endpoint", "https://s3.example.com"); + Map sourceMetadata = randomBoolean() ? Map.of() : Map.of("schema_version", 1); + SimpleSourceMetadata metadata = new SimpleSourceMetadata(output, sourceType, sourcePath, null, null, sourceMetadata, config); + return new ExternalRelation(randomSource(), sourcePath, metadata, output); + } + + @Override + protected ExternalRelation createTestInstance() { + return randomExternalRelation(); + } + + @Override + protected ExternalRelation mutateInstance(ExternalRelation instance) throws IOException { + String sourcePath = instance.sourcePath(); + List output = instance.output(); + String sourceType = instance.metadata().sourceType(); + Map config = instance.metadata().config(); + Map sourceMetadata = instance.metadata().sourceMetadata(); + + switch (between(0, 2)) { + case 0 -> sourcePath = randomValueOtherThan(sourcePath, () -> "s3://bucket/" + randomAlphaOfLength(8) + ".parquet"); + case 1 -> output = randomValueOtherThan(output, () -> randomFieldAttributes(1, 5, false)); + case 2 -> sourceType = randomValueOtherThan(sourceType, () -> randomFrom("parquet", "csv", "file", "iceberg")); + default -> throw new IllegalStateException(); + } + SimpleSourceMetadata metadata = new SimpleSourceMetadata(output, sourceType, sourcePath, null, null, sourceMetadata, config); + return new ExternalRelation(instance.source(), sourcePath, metadata, output); + } + + @Override + protected boolean alwaysEmptySource() { + return true; + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategyTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategyTests.java index c83ff22a50f41..24bb81fcc434a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategyTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategyTests.java @@ -17,10 +17,12 @@ import org.elasticsearch.xpack.esql.datasources.FileSplit; import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit; import org.elasticsearch.xpack.esql.datasources.spi.StoragePath; +import org.elasticsearch.xpack.esql.expression.Order; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.ExternalSourceExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.TopNExec; import java.util.ArrayList; import java.util.List; @@ -93,6 +95,25 @@ public void testLimitOnlyReturnsCoordinator() { assertFalse(plan.distributed()); } + public void testTopNWithMultipleSplitsDistributes() { + PhysicalPlan source = createExternalSourceExec(); + Literal limitExpr = new Literal(Source.EMPTY, 10, DataType.INTEGER); + Order order = new Order(Source.EMPTY, limitExpr, Order.OrderDirection.ASC, Order.NullsPosition.LAST); + PhysicalPlan planWithTopN = new TopNExec(Source.EMPTY, source, List.of(order), limitExpr, null); + + ExternalDistributionContext context = new ExternalDistributionContext( + planWithTopN, + createSplits(4), + createNodes(2), + QueryPragmas.EMPTY + ); + + ExternalDistributionPlan plan = strategy.planDistribution(context); + + assertTrue(plan.distributed()); + assertEquals(2, plan.nodeAssignments().size()); + } + public void testManySplitsNoAggregationDistributes() { ExternalDistributionContext context = new ExternalDistributionContext( createExternalSourceExec(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java index c70d287633028..910dc05c17a8b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; @@ -23,16 +24,25 @@ import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit; import org.elasticsearch.xpack.esql.datasources.spi.SimpleSourceMetadata; import org.elasticsearch.xpack.esql.datasources.spi.StoragePath; +import org.elasticsearch.xpack.esql.expression.Order; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; +import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Limit; +import org.elasticsearch.xpack.esql.plan.logical.TopN; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; +import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.ExternalSourceExec; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.TopNExec; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; +import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.session.Versioned; +import org.elasticsearch.xpack.esql.stats.SearchStats; import java.util.List; import java.util.Map; @@ -43,7 +53,21 @@ public class ExternalDistributionTests extends ESTestCase { // --- Mapper tests --- - public void testMapperCreatesSingleAggForExternalSource() { + public void testMapperWrapsExternalRelationInFragmentExec() { + ExternalRelation external = createExternalRelation(); + + Mapper mapper = new Mapper(); + PhysicalPlan physicalPlan = mapper.map(new Versioned<>(external, TransportVersion.current())); + + assertTrue("Expected FragmentExec, got: " + physicalPlan.getClass().getSimpleName(), physicalPlan instanceof FragmentExec); + FragmentExec fragment = (FragmentExec) physicalPlan; + assertTrue( + "Expected ExternalRelation inside fragment, got: " + fragment.fragment().getClass().getSimpleName(), + fragment.fragment() instanceof ExternalRelation + ); + } + + public void testMapperCreatesDistributedAggForExternalSource() { ExternalRelation external = createExternalRelation(); List groupings = List.of(); List aggregates = List.of(); @@ -54,14 +78,24 @@ public void testMapperCreatesSingleAggForExternalSource() { assertTrue("Expected AggregateExec at top, got: " + physicalPlan.getClass().getSimpleName(), physicalPlan instanceof AggregateExec); AggregateExec aggExec = (AggregateExec) physicalPlan; - assertEquals(AggregatorMode.SINGLE, aggExec.getMode()); + assertEquals(AggregatorMode.FINAL, aggExec.getMode()); + assertTrue( + "Expected ExchangeExec child, got: " + aggExec.child().getClass().getSimpleName(), + aggExec.child() instanceof ExchangeExec + ); + ExchangeExec exchange = (ExchangeExec) aggExec.child(); + assertTrue( + "Expected FragmentExec child, got: " + exchange.child().getClass().getSimpleName(), + exchange.child() instanceof FragmentExec + ); + FragmentExec fragment = (FragmentExec) exchange.child(); assertTrue( - "Expected ExternalSourceExec child, got: " + aggExec.child().getClass().getSimpleName(), - aggExec.child() instanceof ExternalSourceExec + "Expected Aggregate inside fragment, got: " + fragment.fragment().getClass().getSimpleName(), + fragment.fragment() instanceof Aggregate ); } - public void testMapperDoesNotInsertExchangeForLimitAboveExternalSource() { + public void testMapperInsertsExchangeForLimitAboveExternalSource() { ExternalRelation external = createExternalRelation(); Limit limit = new Limit(SRC, new Literal(SRC, 10, DataType.INTEGER), external); @@ -71,8 +105,45 @@ public void testMapperDoesNotInsertExchangeForLimitAboveExternalSource() { assertTrue("Expected LimitExec at top, got: " + physicalPlan.getClass().getSimpleName(), physicalPlan instanceof LimitExec); LimitExec limitExec = (LimitExec) physicalPlan; assertTrue( - "Expected ExternalSourceExec child (no exchange), got: " + limitExec.child().getClass().getSimpleName(), - limitExec.child() instanceof ExternalSourceExec + "Expected ExchangeExec child, got: " + limitExec.child().getClass().getSimpleName(), + limitExec.child() instanceof ExchangeExec + ); + ExchangeExec exchange = (ExchangeExec) limitExec.child(); + assertTrue( + "Expected FragmentExec child, got: " + exchange.child().getClass().getSimpleName(), + exchange.child() instanceof FragmentExec + ); + FragmentExec fragment = (FragmentExec) exchange.child(); + assertTrue( + "Expected Limit inside fragment, got: " + fragment.fragment().getClass().getSimpleName(), + fragment.fragment() instanceof Limit + ); + } + + public void testMapperInsertsExchangeForTopNAboveExternalSource() { + ExternalRelation external = createExternalRelation(); + Attribute nameAttr = external.output().get(0); + Order order = new Order(SRC, nameAttr, Order.OrderDirection.ASC, Order.NullsPosition.LAST); + TopN topN = new TopN(SRC, external, List.of(order), new Literal(SRC, 10, DataType.INTEGER), false); + + Mapper mapper = new Mapper(); + PhysicalPlan physicalPlan = mapper.map(new Versioned<>(topN, TransportVersion.current())); + + assertTrue("Expected TopNExec at top, got: " + physicalPlan.getClass().getSimpleName(), physicalPlan instanceof TopNExec); + TopNExec topNExec = (TopNExec) physicalPlan; + assertTrue( + "Expected ExchangeExec child, got: " + topNExec.child().getClass().getSimpleName(), + topNExec.child() instanceof ExchangeExec + ); + ExchangeExec exchange = (ExchangeExec) topNExec.child(); + assertTrue( + "Expected FragmentExec child, got: " + exchange.child().getClass().getSimpleName(), + exchange.child() instanceof FragmentExec + ); + FragmentExec fragment = (FragmentExec) exchange.child(); + assertTrue( + "Expected TopN inside fragment, got: " + fragment.fragment().getClass().getSimpleName(), + fragment.fragment() instanceof TopN ); } @@ -166,8 +237,179 @@ public void testCollapseWithSplitsOnExternalSource() { assertEquals(2, collapsedSource.splits().size()); } + public void testCollapseFragmentExecExchangeRemovesExchange() { + ExternalRelation external = createExternalRelation(); + FragmentExec fragment = new FragmentExec(external); + ExchangeExec exchange = new ExchangeExec(SRC, fragment); + LimitExec limit = new LimitExec(SRC, exchange, new Literal(SRC, 10, DataType.INTEGER), null); + + PhysicalPlan collapsed = ComputeService.collapseExternalSourceExchanges(limit); + + assertTrue("Expected LimitExec at top", collapsed instanceof LimitExec); + LimitExec collapsedLimit = (LimitExec) collapsed; + assertTrue( + "Expected FragmentExec directly under LimitExec, got: " + collapsedLimit.child().getClass().getSimpleName(), + collapsedLimit.child() instanceof FragmentExec + ); + } + + // --- localPlan expansion tests --- + + public void testLocalPlanExpandsFragmentExecToExternalSourceExec() { + ExternalRelation external = createExternalRelation(); + FragmentExec fragment = new FragmentExec(external); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", expanded instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + assertTrue( + "Expected ExternalSourceExec after expansion, got: " + expandedSink.child().getClass().getSimpleName(), + expandedSink.child() instanceof ExternalSourceExec + ); + ExternalSourceExec sourceExec = (ExternalSourceExec) expandedSink.child(); + assertEquals("s3://bucket/data/*.parquet", sourceExec.sourcePath()); + assertEquals("parquet", sourceExec.sourceType()); + } + + public void testLocalPlanExpandsAggregateFragmentContainsExternalSource() { + ExternalRelation external = createExternalRelation(); + Aggregate aggregate = new Aggregate(SRC, external, List.of(), List.of()); + FragmentExec fragment = new FragmentExec(aggregate); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, aggregate.output(), true, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", expanded instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + boolean hasNoFragmentExec = expandedSink.anyMatch(n -> n instanceof FragmentExec) == false; + assertTrue("FragmentExec should be fully expanded", hasNoFragmentExec); + } + + public void testLocalPlanExpandsLimitFragmentToLimitOverExternalSource() { + ExternalRelation external = createExternalRelation(); + Limit limit = new Limit(SRC, new Literal(SRC, 10, DataType.INTEGER), external); + FragmentExec fragment = new FragmentExec(limit); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", expanded instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + PhysicalPlan child = expandedSink.child(); + assertTrue("Expected LimitExec after expansion, got: " + child.getClass().getSimpleName(), child instanceof LimitExec); + LimitExec limitExec = (LimitExec) child; + assertTrue( + "Expected ExternalSourceExec under LimitExec, got: " + limitExec.child().getClass().getSimpleName(), + limitExec.child() instanceof ExternalSourceExec + ); + } + + public void testLocalPlanExpandsTopNFragmentContainsExternalSource() { + ExternalRelation external = createExternalRelation(); + Attribute nameAttr = external.output().get(0); + Order order = new Order(SRC, nameAttr, Order.OrderDirection.ASC, Order.NullsPosition.LAST); + TopN topN = new TopN(SRC, external, List.of(order), new Literal(SRC, 10, DataType.INTEGER), false); + FragmentExec fragment = new FragmentExec(topN); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", expanded instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + boolean hasExternalSource = expandedSink.anyMatch(n -> n instanceof ExternalSourceExec); + assertTrue("Expanded plan should contain ExternalSourceExec", hasExternalSource); + boolean hasNoFragmentExec = expandedSink.anyMatch(n -> n instanceof FragmentExec) == false; + assertTrue("FragmentExec should be fully expanded", hasNoFragmentExec); + } + + public void testLocalPlanExpandsFilterFragmentContainsExternalSource() { + ExternalRelation external = createExternalRelation(); + Attribute nameAttr = external.output().get(0); + Expression filterCondition = new Literal(SRC, true, DataType.BOOLEAN); + Filter filter = new Filter(SRC, external, filterCondition); + FragmentExec fragment = new FragmentExec(filter); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", expanded instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + boolean hasNoFragmentExec = expandedSink.anyMatch(n -> n instanceof FragmentExec) == false; + assertTrue("FragmentExec should be fully expanded", hasNoFragmentExec); + } + + public void testEndToEndMapperThenLocalPlanExpandsFragmentExec() { + ExternalRelation external = createExternalRelation(); + Limit limit = new Limit(SRC, new Literal(SRC, 10, DataType.INTEGER), external); + + Mapper mapper = new Mapper(); + PhysicalPlan coordPlan = mapper.map(new Versioned<>(limit, TransportVersion.current())); + + assertTrue("Expected LimitExec at top", coordPlan instanceof LimitExec); + LimitExec limitExec = (LimitExec) coordPlan; + assertTrue("Expected ExchangeExec child", limitExec.child() instanceof ExchangeExec); + ExchangeExec exchange = (ExchangeExec) limitExec.child(); + assertTrue("Expected FragmentExec child", exchange.child() instanceof FragmentExec); + FragmentExec fragment = (FragmentExec) exchange.child(); + + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, fragment.output(), false, fragment); + PhysicalPlan dataNodePlan = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", dataNodePlan instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) dataNodePlan; + boolean hasExternalSource = expandedSink.anyMatch(n -> n instanceof ExternalSourceExec); + assertTrue("Expanded data node plan should contain ExternalSourceExec", hasExternalSource); + boolean hasNoFragmentExec = expandedSink.anyMatch(n -> n instanceof FragmentExec) == false; + assertTrue("FragmentExec should be fully expanded on data node", hasNoFragmentExec); + } + + // --- Coordinator-only split injection tests --- + + public void testCoordinatorOnlySplitInjectionAfterLocalPlanExpansion() { + ExternalRelation external = createExternalRelation(); + Limit limit = new Limit(SRC, new Literal(SRC, 10, DataType.INTEGER), external); + FragmentExec fragment = new FragmentExec(limit); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", expanded instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + boolean hasExternalSource = expandedSink.anyMatch(n -> n instanceof ExternalSourceExec); + assertTrue("Expanded plan should contain ExternalSourceExec", hasExternalSource); + + List splits = List.of( + new FileSplit("parquet", StoragePath.of("s3://bucket/file1.parquet"), 0, 1024, ".parquet", Map.of(), Map.of()), + new FileSplit("parquet", StoragePath.of("s3://bucket/file2.parquet"), 0, 1024, ".parquet", Map.of(), Map.of()) + ); + PhysicalPlan withSplits = expanded.transformUp( + ExternalSourceExec.class, + exec -> exec.splits().isEmpty() ? exec.withSplits(splits) : exec + ); + + final List sources = new java.util.ArrayList<>(); + withSplits.forEachDown(ExternalSourceExec.class, sources::add); + assertFalse("Should have at least one ExternalSourceExec", sources.isEmpty()); + assertEquals("Splits should be injected", 2, sources.get(0).splits().size()); + } + // --- Helpers --- + private static PhysicalPlan runLocalPlan(PhysicalPlan plan) { + var config = EsqlTestUtils.TEST_CFG; + return PlannerUtils.localPlan( + PlannerSettings.DEFAULTS, + new EsqlFlags(false), + config, + config.newFoldContext(), + plan, + SearchStats.EMPTY, + null + ); + } + private static ExternalRelation createExternalRelation() { List output = List.of( new FieldAttribute(SRC, "name", new EsField("name", DataType.KEYWORD, Map.of(), false, EsField.TimeSeriesFieldType.NONE)) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalSourceDataNodeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalSourceDataNodeTests.java index 82aa34472b544..fb17cc1a8ff09 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalSourceDataNodeTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalSourceDataNodeTests.java @@ -233,18 +233,18 @@ public void testDistributionResultIsDistributedWithRealPlan() { Map> assignments = Map.of("node-0", splits.subList(0, 2), "node-1", splits.subList(2, 4)); ExternalDistributionPlan distributionPlan = new ExternalDistributionPlan(assignments, true); - var result = new ComputeService.ExternalDistributionResult(createExternalSourceExec(), distributionPlan); + var result = new ComputeService.ExternalDistributionResult(createExternalSourceExec(), distributionPlan, List.of()); assertTrue("Should be distributed when plan says distributed", result.isDistributed()); assertThat(result.distributionPlan().nodeAssignments().size(), equalTo(2)); } public void testDistributionResultNotDistributedWithNullPlan() { - var result = new ComputeService.ExternalDistributionResult(createExternalSourceExec(), null); + var result = new ComputeService.ExternalDistributionResult(createExternalSourceExec(), null, List.of()); assertFalse("Should not be distributed with null plan", result.isDistributed()); } public void testDistributionResultNotDistributedWithLocalPlan() { - var result = new ComputeService.ExternalDistributionResult(createExternalSourceExec(), ExternalDistributionPlan.LOCAL); + var result = new ComputeService.ExternalDistributionResult(createExternalSourceExec(), ExternalDistributionPlan.LOCAL, List.of()); assertFalse("Should not be distributed with LOCAL plan", result.isDistributed()); } From 99f6973a8069699567ba8d9eb0452017f7fe23b2 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 5 Mar 2026 18:36:01 +0200 Subject: [PATCH 4/6] Update docs/changelog/143696.yaml --- docs/changelog/143696.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/143696.yaml diff --git a/docs/changelog/143696.yaml b/docs/changelog/143696.yaml new file mode 100644 index 0000000000000..14363a7b43ede --- /dev/null +++ b/docs/changelog/143696.yaml @@ -0,0 +1,5 @@ +area: ES|QL +issues: [] +pr: 143696 +summary: Enable distributed pipeline breakers for external sources via `FragmentExec` +type: enhancement From 0300fc74952b0b704bfdea07626325229db0a880 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 5 Mar 2026 23:23:08 +0200 Subject: [PATCH 5/6] Fix coordinator-only execution for external sources ReplaceFieldWithConstantOrNull uses SearchStats.exists() to determine which fields to retain. On the coordinator-only path for external sources, SearchStats is empty (no ES search contexts), so exists() returns false for all fields, causing the optimizer to replace all external source fields with null. Retain fields from ExternalRelation output in the shouldBeRetained predicate, following the same pattern used for lookup index fields. This preserves all local optimizations (constant folding, filter pushdown, column pruning, etc.) while preventing incorrect field pruning. Also collapse ExchangeExec and reset TopNExec InputOrdering when external source plans fall back to coordinator-only execution. --- .../local/ReplaceFieldWithConstantOrNull.java | 9 +- .../xpack/esql/plugin/ComputeService.java | 12 +- .../plugin/ExternalDistributionTests.java | 116 ++++++++++++++++++ 3 files changed, 133 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java index 44dd8ecc01fa5..cf3074339d405 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.esql.plan.logical.CompoundOutputEval; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.OrderBy; @@ -47,7 +48,9 @@ public class ReplaceFieldWithConstantOrNull extends ParameterizedRule attrToConstant = new HashMap<>(); + plan.forEachUp(ExternalRelation.class, external -> externalFieldsBuilder.addAll(external.output())); plan.forEachUp(EsRelation.class, esRelation -> { // Looking for indices in LOOKUP mode is correct: during parsing, we assign the expected mode and even if a lookup index // is used in the FROM command, it will not be marked with LOOKUP mode there - but STANDARD. @@ -77,14 +80,16 @@ else if (esRelation.indexMode() == IndexMode.STANDARD) { } }); AttributeSet lookupFields = lookupFieldsBuilder.build(); + AttributeSet externalFields = externalFieldsBuilder.build(); // Do not use the attribute name, this can deviate from the field name for union types; use fieldName() instead. - // Also retain fields from lookup indices because we do not have stats for these. + // Also retain fields from lookup indices and external sources because we do not have stats for these. Predicate shouldBeRetained = f -> f.field() instanceof PotentiallyUnmappedKeywordEsField // The source (or doc) field is added to the relation output as a hack to enable late materialization in the reduce driver. || EsQueryExec.isDocAttribute(f) || localLogicalOptimizerContext.searchStats().exists(f.fieldName()) - || lookupFields.contains(f); + || lookupFields.contains(f) + || externalFields.contains(f); return plan.transformUp(p -> replaceWithNullOrConstant(p, shouldBeRetained, attrToConstant)); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 64e1331a90a63..6c5a1d7c04923 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -77,6 +77,8 @@ import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.OutputExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.TopNExec; +import org.elasticsearch.compute.operator.topn.TopNOperator.InputOrdering; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; import org.elasticsearch.xpack.esql.planner.PlannerSettings; @@ -264,7 +266,7 @@ static ExternalDistributionStrategy resolveExternalDistributionStrategy(QueryPra ExternalDistributionResult applyExternalDistributionStrategy(PhysicalPlan plan, Configuration configuration) { List externalSplits = collectExternalSplits(plan); if (externalSplits.isEmpty()) { - return new ExternalDistributionResult(plan, null, List.of()); + return new ExternalDistributionResult(collapseExternalSourceExchanges(plan), null, List.of()); } ExternalDistributionStrategy strategy = resolveExternalDistributionStrategy(configuration.pragmas()); @@ -327,7 +329,7 @@ private void discoverSplitsFromFragments(PhysicalPlan plan, List } static PhysicalPlan collapseExternalSourceExchanges(PhysicalPlan plan) { - return plan.transformUp(ExchangeExec.class, exchange -> { + PhysicalPlan collapsed = plan.transformUp(ExchangeExec.class, exchange -> { if (exchange.child() instanceof ExternalSourceExec) { return exchange.child(); } @@ -336,6 +338,12 @@ static PhysicalPlan collapseExternalSourceExchanges(PhysicalPlan plan) { } return exchange; }); + return collapsed.transformUp(TopNExec.class, topN -> { + if (topN.inputOrdering() != InputOrdering.NOT_SORTED && topN.child() instanceof FragmentExec) { + return topN.withNonSortedInput(); + } + return topN; + }); } public void execute( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java index 910dc05c17a8b..1c9b0f287df56 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.compute.operator.topn.TopNOperator.InputOrdering; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; @@ -20,6 +21,8 @@ import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.expression.function.scalar.math.Abs; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan; import org.elasticsearch.xpack.esql.datasources.FileSplit; import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit; import org.elasticsearch.xpack.esql.datasources.spi.SimpleSourceMetadata; @@ -30,10 +33,13 @@ import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.TopN; +import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; +import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.ExternalSourceExec; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; @@ -395,6 +401,106 @@ public void testCoordinatorOnlySplitInjectionAfterLocalPlanExpansion() { assertEquals("Splits should be injected", 2, sources.get(0).splits().size()); } + // --- Field retention through local optimization tests --- + + /** + * Verifies that external source fields survive local plan optimization with empty SearchStats. + * ReplaceFieldWithConstantOrNull must not replace external fields with null. + * Without the fix, the optimizer inserts Eval(field=null) + Project nodes above the source. + */ + public void testLocalPlanRetainsExternalSourceFields() { + ExternalRelation external = createMultiFieldExternalRelation(); + Attribute salaryAttr = external.output().get(1); + Expression filterCondition = new GreaterThan(SRC, salaryAttr, new Literal(SRC, 50000, DataType.INTEGER), null); + Filter filter = new Filter(SRC, external, filterCondition); + FragmentExec fragment = new FragmentExec(filter); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + // The filter must survive -- without the fix, the field reference in the filter condition + // is replaced with null by ReplaceFieldWithConstantOrNull, causing PruneFilters to remove it + boolean hasFilter = expandedSink.anyMatch(n -> n instanceof FilterExec); + assertTrue("Filter on external field must survive local optimization with empty SearchStats", hasFilter); + } + + /** + * Verifies that a filter referencing external source fields survives local optimization. + * With empty SearchStats, the filter condition must not be replaced with null. + */ + public void testLocalPlanRetainsFilterOnExternalFields() { + ExternalRelation external = createMultiFieldExternalRelation(); + Attribute salaryAttr = external.output().get(1); + Expression filterCondition = new GreaterThan(SRC, salaryAttr, new Literal(SRC, 50000, DataType.INTEGER), null); + Filter filter = new Filter(SRC, external, filterCondition); + FragmentExec fragment = new FragmentExec(filter); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + boolean hasFilter = expandedSink.anyMatch(n -> n instanceof FilterExec); + assertTrue("Filter should survive optimization, not be pruned", hasFilter); + } + + /** + * Verifies that eval expressions referencing external source fields produce non-null results. + */ + public void testLocalPlanRetainsEvalOnExternalFields() { + ExternalRelation external = createMultiFieldExternalRelation(); + Attribute salaryAttr = external.output().get(1); + var alias = new org.elasticsearch.xpack.esql.core.expression.Alias(SRC, "abs_salary", new Abs(SRC, salaryAttr)); + Eval eval = new Eval(SRC, external, List.of(alias)); + FragmentExec fragment = new FragmentExec(eval); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, eval.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + boolean hasEval = expandedSink.anyMatch(n -> n instanceof EvalExec); + assertTrue("Eval should survive optimization", hasEval); + boolean hasExternalSource = expandedSink.anyMatch(n -> n instanceof ExternalSourceExec); + assertTrue("ExternalSourceExec should be present", hasExternalSource); + } + + /** + * Verifies that collapseExternalSourceExchanges resets TopNExec InputOrdering + * when the exchange is removed and the TopN sits directly above a FragmentExec. + */ + public void testCollapseResetsTopNInputOrdering() { + ExternalRelation external = createExternalRelation(); + TopN topN = new TopN( + SRC, + external, + List.of(new Order(SRC, external.output().get(0), Order.OrderDirection.ASC, Order.NullsPosition.LAST)), + new Literal(SRC, 10, DataType.INTEGER), + false + ); + FragmentExec fragment = new FragmentExec(topN); + ExchangeExec exchange = new ExchangeExec(SRC, fragment); + TopNExec topNExec = new TopNExec( + SRC, + exchange, + List.of(new Order(SRC, external.output().get(0), Order.OrderDirection.ASC, Order.NullsPosition.LAST)), + new Literal(SRC, 10, DataType.INTEGER), + null + ).withSortedInput(); + + assertEquals("Precondition: TopNExec should have SORTED ordering", InputOrdering.SORTED, topNExec.inputOrdering()); + + PhysicalPlan collapsed = ComputeService.collapseExternalSourceExchanges(topNExec); + + assertTrue("Expected TopNExec at top", collapsed instanceof TopNExec); + TopNExec collapsedTopN = (TopNExec) collapsed; + assertEquals( + "InputOrdering should be reset to NOT_SORTED after collapse", + InputOrdering.NOT_SORTED, + collapsedTopN.inputOrdering() + ); + assertTrue("Expected FragmentExec child", collapsedTopN.child() instanceof FragmentExec); + } + // --- Helpers --- private static PhysicalPlan runLocalPlan(PhysicalPlan plan) { @@ -418,6 +524,16 @@ private static ExternalRelation createExternalRelation() { return new ExternalRelation(SRC, "s3://bucket/data/*.parquet", metadata, output, null); } + private static ExternalRelation createMultiFieldExternalRelation() { + List output = List.of( + new FieldAttribute(SRC, "name", new EsField("name", DataType.KEYWORD, Map.of(), false, EsField.TimeSeriesFieldType.NONE)), + new FieldAttribute(SRC, "salary", new EsField("salary", DataType.INTEGER, Map.of(), false, EsField.TimeSeriesFieldType.NONE)), + new FieldAttribute(SRC, "age", new EsField("age", DataType.INTEGER, Map.of(), false, EsField.TimeSeriesFieldType.NONE)) + ); + SimpleSourceMetadata metadata = new SimpleSourceMetadata(output, "parquet", "s3://bucket/data/*.parquet"); + return new ExternalRelation(SRC, "s3://bucket/data/*.parquet", metadata, output, null); + } + private static ExternalSourceExec createExternalSourceExec() { return new ExternalSourceExec( SRC, From 84dca73033f09aa0bd9eebc767bccc0996670738 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 5 Mar 2026 21:48:19 +0000 Subject: [PATCH 6/6] [CI] Auto commit changes from spotless --- .../xpack/esql/plugin/ComputeService.java | 2 +- .../esql/plugin/ExternalDistributionTests.java | 14 +++++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 6c5a1d7c04923..358786af310a4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -30,6 +30,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeSink; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; +import org.elasticsearch.compute.operator.topn.TopNOperator.InputOrdering; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; @@ -78,7 +79,6 @@ import org.elasticsearch.xpack.esql.plan.physical.OutputExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; -import org.elasticsearch.compute.operator.topn.TopNOperator.InputOrdering; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; import org.elasticsearch.xpack.esql.planner.PlannerSettings; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java index 1c9b0f287df56..abe2aee476c46 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java @@ -10,9 +10,9 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.compute.aggregation.AggregatorMode; +import org.elasticsearch.compute.operator.topn.TopNOperator.InputOrdering; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.esql.EsqlTestUtils; -import org.elasticsearch.compute.operator.topn.TopNOperator.InputOrdering; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; @@ -21,19 +21,19 @@ import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.EsField; -import org.elasticsearch.xpack.esql.expression.function.scalar.math.Abs; -import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan; import org.elasticsearch.xpack.esql.datasources.FileSplit; import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit; import org.elasticsearch.xpack.esql.datasources.spi.SimpleSourceMetadata; import org.elasticsearch.xpack.esql.datasources.spi.StoragePath; import org.elasticsearch.xpack.esql.expression.Order; +import org.elasticsearch.xpack.esql.expression.function.scalar.math.Abs; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; +import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.TopN; -import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; @@ -493,11 +493,7 @@ public void testCollapseResetsTopNInputOrdering() { assertTrue("Expected TopNExec at top", collapsed instanceof TopNExec); TopNExec collapsedTopN = (TopNExec) collapsed; - assertEquals( - "InputOrdering should be reset to NOT_SORTED after collapse", - InputOrdering.NOT_SORTED, - collapsedTopN.inputOrdering() - ); + assertEquals("InputOrdering should be reset to NOT_SORTED after collapse", InputOrdering.NOT_SORTED, collapsedTopN.inputOrdering()); assertTrue("Expected FragmentExec child", collapsedTopN.child() instanceof FragmentExec); }