diff --git a/docs/changelog/143696.yaml b/docs/changelog/143696.yaml new file mode 100644 index 0000000000000..14363a7b43ede --- /dev/null +++ b/docs/changelog/143696.yaml @@ -0,0 +1,5 @@ +area: ES|QL +issues: [] +pr: 143696 +summary: Enable distributed pipeline breakers for external sources via `FragmentExec` +type: enhancement diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/external-basic.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/external-basic.csv-spec index bce42a159aba8..ed866793c751a 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/external-basic.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/external-basic.csv-spec @@ -212,3 +212,34 @@ emp_no:integer | height:double | height_float_rounded:double | height.scaled_flo 10004 | 1.78 | 1.78 | 1.78 | 1.78 10005 | 2.05 | 2.05 | 2.05 | 2.05 ; + +// Pipeline breaker distribution tests +// These exercise distributed execution paths for pipeline breakers (STATS, TopN, LIMIT) +// across all distribution modes via ExternalDistributedSpecIT + +topNSortBySalaryDesc +required_capability: external_command +EXTERNAL "{{employees}}" +| KEEP emp_no, first_name, salary +| SORT salary DESC +| LIMIT 3; + +emp_no:integer | first_name:keyword | salary:integer +10029 | "Otmar" | 74999 +10045 | "Moss" | 74970 +10007 | "Tzvetan" | 74572 +; + +topNFilteredSortBySalary +required_capability: external_command +EXTERNAL "{{employees}}" +| WHERE gender == "F" +| KEEP emp_no, first_name, salary +| SORT salary DESC +| LIMIT 3; + +emp_no:integer | first_name:keyword | salary:integer +10007 | "Tzvetan" | 74572 +10027 | "Divier" | 73851 +10099 | "Valter" | 73578 +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java index 44dd8ecc01fa5..cf3074339d405 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.esql.plan.logical.CompoundOutputEval; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.OrderBy; @@ -47,7 +48,9 @@ public class ReplaceFieldWithConstantOrNull extends ParameterizedRule attrToConstant = new HashMap<>(); + plan.forEachUp(ExternalRelation.class, external -> externalFieldsBuilder.addAll(external.output())); plan.forEachUp(EsRelation.class, esRelation -> { // Looking for indices in LOOKUP mode is correct: during parsing, we assign the expected mode and even if a lookup index // is used in the FROM command, it will not be marked with LOOKUP mode there - but STANDARD. @@ -77,14 +80,16 @@ else if (esRelation.indexMode() == IndexMode.STANDARD) { } }); AttributeSet lookupFields = lookupFieldsBuilder.build(); + AttributeSet externalFields = externalFieldsBuilder.build(); // Do not use the attribute name, this can deviate from the field name for union types; use fieldName() instead. - // Also retain fields from lookup indices because we do not have stats for these. + // Also retain fields from lookup indices and external sources because we do not have stats for these. Predicate shouldBeRetained = f -> f.field() instanceof PotentiallyUnmappedKeywordEsField // The source (or doc) field is added to the relation output as a hack to enable late materialization in the reduce driver. || EsQueryExec.isDocAttribute(f) || localLogicalOptimizerContext.searchStats().exists(f.fieldName()) - || lookupFields.contains(f); + || lookupFields.contains(f) + || externalFields.contains(f); return plan.transformUp(p -> replaceWithNullOrConstant(p, shouldBeRetained, attrToConstant)); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java index 3d7ad2913be71..a93b642406fc1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Grok; import org.elasticsearch.xpack.esql.plan.logical.InlineStats; @@ -88,6 +89,7 @@ public static List logical() { Enrich.ENTRY, EsRelation.ENTRY, Eval.ENTRY, + ExternalRelation.ENTRY, Filter.ENTRY, Grok.ENTRY, InlineJoin.ENTRY, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelation.java index 87357a7f4c0f9..c600e72906925 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelation.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelation.java @@ -6,29 +6,35 @@ */ package org.elasticsearch.xpack.esql.plan.logical; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.NodeUtils; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.datasources.FileSet; +import org.elasticsearch.xpack.esql.datasources.spi.SimpleSourceMetadata; import org.elasticsearch.xpack.esql.datasources.spi.SourceMetadata; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.physical.ExternalSourceExec; +import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Objects; /** * Logical plan node for external data source relations (e.g., Iceberg table, Parquet file). - * This plan node is executed on the coordinator only (no dispatch to data nodes). *

- * Unlike EsRelation which wraps into FragmentExec for data node dispatch, - * ExternalRelation maps directly to physical source operators via LocalMapper, - * similar to how LocalRelation works. + * Like {@link EsRelation}, the Mapper wraps this into a {@link org.elasticsearch.xpack.esql.plan.physical.FragmentExec} + * so that pipeline breakers (Aggregate, Limit, TopN) above it are distributed to data nodes + * via ExchangeExec. On data nodes, {@code localPlan()} expands the FragmentExec through + * LocalMapper into {@link ExternalSourceExec}, enabling local optimizations such as + * filter pushdown via FilterPushdownRegistry. *

- * This class provides a source-agnostic logical plan node for external data sources. - * It can represent any external source (Iceberg, Parquet, CSV, etc.) without requiring - * source-specific subclasses in core ESQL code. + * The {@link ExecutesOn.Coordinator} marker is retained for logical plan validation + * (e.g., Enrich/Join hoist rules that inspect whether a relation executes on the coordinator). *

* The source-specific metadata is stored in the {@link SourceMetadata} interface, which * provides: @@ -45,6 +51,12 @@ */ public class ExternalRelation extends LeafPlan implements ExecutesOn.Coordinator { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + LogicalPlan.class, + "ExternalRelation", + ExternalRelation::readFrom + ); + private final String sourcePath; private final List output; private final SourceMetadata metadata; @@ -71,14 +83,32 @@ public ExternalRelation(Source source, String sourcePath, SourceMetadata metadat this(source, sourcePath, metadata, output, FileSet.UNRESOLVED); } + private static ExternalRelation readFrom(StreamInput in) throws IOException { + var source = Source.readFrom((PlanStreamInput) in); + String sourcePath = in.readString(); + String sourceType = in.readString(); + var output = in.readNamedWriteableCollectionAsList(Attribute.class); + @SuppressWarnings("unchecked") + Map config = (Map) in.readGenericValue(); + @SuppressWarnings("unchecked") + Map sourceMetadata = (Map) in.readGenericValue(); + var metadata = new SimpleSourceMetadata(output, sourceType, sourcePath, null, null, sourceMetadata, config); + return new ExternalRelation(source, sourcePath, metadata, output, FileSet.UNRESOLVED); + } + @Override - public void writeTo(StreamOutput out) { - throw new UnsupportedOperationException("ExternalRelation is not yet serializable for cross-cluster operations"); + public void writeTo(StreamOutput out) throws IOException { + Source.EMPTY.writeTo(out); + out.writeString(sourcePath); + out.writeString(metadata.sourceType()); + out.writeNamedWriteableCollection(output); + out.writeGenericValue(metadata.config()); + out.writeGenericValue(metadata.sourceMetadata()); } @Override public String getWriteableName() { - throw new UnsupportedOperationException("ExternalRelation is not yet serializable for cross-cluster operations"); + return ENTRY.name; } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExternalSourceExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExternalSourceExec.java index 8b4da7ff60159..62fd51d568ae7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExternalSourceExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExternalSourceExec.java @@ -19,7 +19,6 @@ import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit; import org.elasticsearch.xpack.esql.datasources.spi.FormatReader; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; -import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import java.io.IOException; import java.util.List; @@ -29,7 +28,7 @@ /** * Generic physical plan node for reading from external data sources (e.g., Iceberg tables, Parquet files). *

- * This is the unified physical plan node for all external sources, replacing source-specific nodes + * This is the unified physical plan node for all external sources, replacing source-specific nodes. * It uses generic maps for configuration and metadata to avoid leaking * source-specific types (like S3Configuration) into core ESQL code. *

@@ -40,13 +39,13 @@ *

  • Opaque metadata: Source-specific data (native schema, etc.) is stored in * {@link #sourceMetadata()} and passed through without core understanding it
  • *
  • Opaque pushed filter: The {@link #pushedFilter()} is an opaque Object that only - * the source-specific operator factory interprets. It is NOT serialized because external - * sources execute on coordinator only ({@link ExecutesOn.Coordinator})
  • - *
  • Coordinator-only execution: External sources run entirely on the coordinator node, - * so no cross-node serialization of source-specific data is needed
  • + * the source-specific operator factory interprets. It is NOT serialized; it is created + * locally on each data node by the LocalPhysicalPlanOptimizer via FilterPushdownRegistry + *
  • Data node execution: Created on data nodes by LocalMapper from + * {@link org.elasticsearch.xpack.esql.plan.logical.ExternalRelation} inside FragmentExec
  • * */ -public class ExternalSourceExec extends LeafExec implements EstimatesRowSize, ExecutesOn.Coordinator { +public class ExternalSourceExec extends LeafExec implements EstimatesRowSize { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( PhysicalPlan.class, @@ -61,10 +60,10 @@ public class ExternalSourceExec extends LeafExec implements EstimatesRowSize, Ex private final List attributes; private final Map config; private final Map sourceMetadata; - private final Object pushedFilter; // Opaque filter - NOT serialized (coordinator only) - private final int pushedLimit; // NOT serialized (coordinator only) + private final Object pushedFilter; // Opaque filter - NOT serialized, created locally on data nodes + private final int pushedLimit; // NOT serialized, set locally on data nodes private final Integer estimatedRowSize; - private final FileSet fileSet; // NOT serialized - coordinator only + private final FileSet fileSet; // NOT serialized - resolved on coordinator, null on data nodes private final List splits; public ExternalSourceExec( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java index 934c2f0ff02f8..f0fbf6fbf9730 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java @@ -14,6 +14,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; import org.elasticsearch.xpack.esql.plan.logical.Limit; @@ -71,8 +72,10 @@ private PhysicalPlan mapLeaf(LeafPlan leaf) { return new EsSourceExec(esRelation); } - // ExternalRelation is handled by MapperUtils.mapLeaf() - // via its toPhysicalExec() method, bypassing FragmentExec/ExchangeExec dispatch + if (leaf instanceof ExternalRelation external) { + return external.toPhysicalExec(); + } + return MapperUtils.mapLeaf(leaf); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index 50da38255bf4c..f9435ad8693a6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Fork; import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; @@ -87,8 +88,10 @@ private PhysicalPlan mapLeaf(LeafPlan leaf) { return new FragmentExec(esRelation); } - // ExternalRelation is handled by MapperUtils.mapLeaf() - // which calls toPhysicalExec() to create coordinator-only source operators + if (leaf instanceof ExternalRelation external) { + return new FragmentExec(external); + } + return MapperUtils.mapLeaf(leaf); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java index 209bdfc377ba7..e47f11b03beb6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java @@ -16,7 +16,6 @@ import org.elasticsearch.xpack.esql.plan.logical.Dissect; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.Eval; -import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Grok; import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; @@ -70,12 +69,6 @@ static PhysicalPlan mapLeaf(LeafPlan p) { return new LocalSourceExec(local.source(), local.output(), local.supplier()); } - // External data sources (Iceberg, Parquet, etc.) - // These are executed on the coordinator only, bypassing FragmentExec/ExchangeExec dispatch - if (p instanceof ExternalRelation external) { - return external.toPhysicalExec(); - } - // Commands if (p instanceof ShowInfo showInfo) { return new ShowExec(showInfo.source(), showInfo.output(), showInfo.values()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategy.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategy.java index cb98150a7f15f..86fccf021b654 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategy.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategy.java @@ -19,9 +19,9 @@ /** * Adaptive distribution strategy for external sources. *

    - * Distributes when the plan contains aggregations and there are multiple splits, - * or when the split count exceeds the number of eligible nodes. - * Stays on the coordinator for single splits or LIMIT-only plans. + * Distributes when the plan contains pipeline breakers (aggregations, TopN) + * and there are multiple splits, or when the split count exceeds the number + * of eligible nodes. Stays on the coordinator for single splits or LIMIT-only plans. */ public final class AdaptiveStrategy implements ExternalDistributionStrategy { @@ -56,10 +56,10 @@ public ExternalDistributionPlan planDistribution(ExternalDistributionContext con return ExternalDistributionPlan.LOCAL; } - boolean hasAggregation = plan.anyMatch(n -> n instanceof AggregateExec); + boolean hasPipelineBreaker = plan.anyMatch(n -> n instanceof AggregateExec || n instanceof TopNExec); boolean manySplits = splits.size() > nodes.size(); - if (hasAggregation || manySplits) { + if (hasPipelineBreaker || manySplits) { boolean allHaveSize = true; for (ExternalSplit split : splits) { if (split.estimatedSizeInBytes() <= 0) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 7151f2e49aa68..358786af310a4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -30,6 +30,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeSink; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; +import org.elasticsearch.compute.operator.topn.TopNOperator.InputOrdering; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; @@ -66,6 +67,7 @@ import org.elasticsearch.xpack.esql.optimizer.PhysicalVerifier; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.MetricsInfo; import org.elasticsearch.xpack.esql.plan.logical.TsInfo; @@ -76,6 +78,7 @@ import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.OutputExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.TopNExec; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; import org.elasticsearch.xpack.esql.planner.PlannerSettings; @@ -215,6 +218,10 @@ PlannerSettings.Holder plannerSettings() { return plannerSettings; } + FilterPushdownRegistry filterPushdownRegistry() { + return filterPushdownRegistry; + } + PhysicalPlan discoverSplits(PhysicalPlan plan) { if (operatorFactoryRegistry == null) { return plan; @@ -259,7 +266,7 @@ static ExternalDistributionStrategy resolveExternalDistributionStrategy(QueryPra ExternalDistributionResult applyExternalDistributionStrategy(PhysicalPlan plan, Configuration configuration) { List externalSplits = collectExternalSplits(plan); if (externalSplits.isEmpty()) { - return new ExternalDistributionResult(plan, null); + return new ExternalDistributionResult(collapseExternalSourceExchanges(plan), null, List.of()); } ExternalDistributionStrategy strategy = resolveExternalDistributionStrategy(configuration.pragmas()); @@ -278,31 +285,65 @@ ExternalDistributionResult applyExternalDistributionStrategy(PhysicalPlan plan, externalSplits.size(), distributionPlan.nodeAssignments().size() ); - return new ExternalDistributionResult(plan, distributionPlan); + return new ExternalDistributionResult(plan, distributionPlan, List.of()); } - return new ExternalDistributionResult(collapseExternalSourceExchanges(plan), null); + return new ExternalDistributionResult(collapseExternalSourceExchanges(plan), null, externalSplits); } - record ExternalDistributionResult(PhysicalPlan plan, ExternalDistributionPlan distributionPlan) { + record ExternalDistributionResult(PhysicalPlan plan, ExternalDistributionPlan distributionPlan, List coordinatorSplits) { boolean isDistributed() { return distributionPlan != null && distributionPlan.distributed(); } } - private static List collectExternalSplits(PhysicalPlan plan) { + private List collectExternalSplits(PhysicalPlan plan) { List splits = new ArrayList<>(); plan.forEachDown(ExternalSourceExec.class, exec -> splits.addAll(exec.splits())); + if (splits.isEmpty()) { + discoverSplitsFromFragments(plan, splits); + if (splits.size() > SplitCoalescer.COALESCING_THRESHOLD) { + List coalesced = SplitCoalescer.coalesce(splits); + if (coalesced != splits) { + splits.clear(); + splits.addAll(coalesced); + } + } + } return splits; } + private void discoverSplitsFromFragments(PhysicalPlan plan, List splits) { + if (operatorFactoryRegistry == null) { + return; + } + plan.forEachDown(FragmentExec.class, fragment -> { + fragment.fragment().forEachDown(ExternalRelation.class, external -> { + ExternalSourceExec tempExec = external.toPhysicalExec(); + PhysicalPlan discovered = SplitDiscoveryPhase.resolveExternalSplits(tempExec, operatorFactoryRegistry.sourceFactories()); + if (discovered instanceof ExternalSourceExec withSplits) { + splits.addAll(withSplits.splits()); + } + }); + }); + } + static PhysicalPlan collapseExternalSourceExchanges(PhysicalPlan plan) { - return plan.transformUp(ExchangeExec.class, exchange -> { + PhysicalPlan collapsed = plan.transformUp(ExchangeExec.class, exchange -> { if (exchange.child() instanceof ExternalSourceExec) { return exchange.child(); } + if (exchange.child() instanceof FragmentExec fragment && fragment.fragment().anyMatch(ExternalRelation.class::isInstance)) { + return exchange.child(); + } return exchange; }); + return collapsed.transformUp(TopNExec.class, topN -> { + if (topN.inputOrdering() != InputOrdering.NOT_SORTED && topN.child() instanceof FragmentExec) { + return topN.withNonSortedInput(); + } + return topN; + }); } public void execute( @@ -522,6 +563,7 @@ public void executePlan( coordinatorPlan, plannerSettings.get(), LocalPhysicalOptimization.ENABLED, + distributionResult.coordinatorSplits(), planTimeProfile, computeListener.acquireCompute() ); @@ -880,6 +922,19 @@ void runCompute( LocalPhysicalOptimization localPhysicalOptimization, PlanTimeProfile planTimeProfile, ActionListener listener + ) { + runCompute(task, context, plan, plannerSettings, localPhysicalOptimization, List.of(), planTimeProfile, listener); + } + + void runCompute( + CancellableTask task, + ComputeContext context, + PhysicalPlan plan, + PlannerSettings plannerSettings, + LocalPhysicalOptimization localPhysicalOptimization, + List coordinatorExternalSplits, + PlanTimeProfile planTimeProfile, + ActionListener listener ) { var shardContexts = context.searchContexts().map(ComputeSearchContext::shardContext); EsPhysicalOperationProviders physicalOperationProviders = new EsPhysicalOperationProviders( @@ -911,7 +966,10 @@ void runCompute( List localContexts = new ArrayList<>(); context.searchExecutionContexts().iterable().forEach(localContexts::add); - boolean hasExternalSource = plan.anyMatch(p -> p instanceof ExternalSourceExec); + boolean hasExternalSource = plan.anyMatch( + p -> p instanceof ExternalSourceExec + || (p instanceof FragmentExec f && f.fragment().anyMatch(ExternalRelation.class::isInstance)) + ); var localPlan = switch (localPhysicalOptimization) { case ENABLED -> hasExternalSource ? PlannerUtils.localPlan( @@ -935,6 +993,12 @@ void runCompute( ); case DISABLED -> plan; }; + if (coordinatorExternalSplits.isEmpty() == false) { + localPlan = localPlan.transformUp( + ExternalSourceExec.class, + exec -> exec.splits().isEmpty() ? exec.withSplits(coordinatorExternalSplits) : exec + ); + } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Local plan for {}:\n{}", context.description(), localPlan); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index dc55f33e7504c..933ec4a8005ae 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -59,6 +59,7 @@ import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.session.Configuration; +import org.elasticsearch.xpack.esql.stats.SearchStats; import java.util.ArrayList; import java.util.HashMap; @@ -802,17 +803,26 @@ private void handleExternalSourceRequest( ExchangeSinkExec sinkExec = (ExchangeSinkExec) request.plan(); Configuration configuration = request.configuration(); final String sessionId = request.sessionId(); - - // Inject external splits into the ExternalSourceExec within the plan - PhysicalPlan planWithSplits = sinkExec.child() - .transformUp(ExternalSourceExec.class, exec -> exec.withSplits(request.externalSplits())); - ExchangeSinkExec updatedSinkExec = new ExchangeSinkExec( - sinkExec.source(), - sinkExec.output(), - sinkExec.isIntermediateAgg(), - planWithSplits + EsqlFlags flags = computeService.createFlags(); + PlannerSettings plannerSettings = computeService.plannerSettings().get(); + + // Run localPlan() to expand FragmentExec(ExternalRelation) -> ExternalSourceExec + // This runs LocalLogicalPlanOptimizer, LocalMapper, and LocalPhysicalPlanOptimizer + // (including filter pushdown via FilterPushdownRegistry) + PhysicalPlan expandedPlan = PlannerUtils.localPlan( + plannerSettings, + flags, + configuration, + configuration.newFoldContext(), + sinkExec, + SearchStats.EMPTY, + computeService.filterPushdownRegistry(), + planTimeProfile ); + // Inject external splits into the ExternalSourceExec created by localPlan() + PhysicalPlan planWithSplits = expandedPlan.transformUp(ExternalSourceExec.class, exec -> exec.withSplits(request.externalSplits())); + try ( ComputeListener computeListener = new ComputeListener( threadPool, @@ -827,7 +837,6 @@ private void handleExternalSourceRequest( task.addListener( () -> { exchangeService.finishSinkHandler(sessionId, new TaskCancelledException(task.getReasonCancelled())); } ); - EsqlFlags flags = computeService.createFlags(); var computeContext = new ComputeContext( internalSessionId, @@ -843,9 +852,9 @@ private void handleExternalSourceRequest( computeService.runCompute( task, computeContext, - updatedSinkExec, - computeService.plannerSettings().get(), - LocalPhysicalOptimization.ENABLED, + planWithSplits, + plannerSettings, + LocalPhysicalOptimization.DISABLED, planTimeProfile, ActionListener.wrap(resp -> { externalSink.addCompletionListener(ActionListener.running(() -> { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelationSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelationSerializationTests.java new file mode 100644 index 0000000000000..1166f424bf7c0 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelationSerializationTests.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical; + +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.datasources.spi.SimpleSourceMetadata; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class ExternalRelationSerializationTests extends AbstractLogicalPlanSerializationTests { + + public static ExternalRelation randomExternalRelation() { + String sourcePath = "s3://bucket/" + randomAlphaOfLength(8) + ".parquet"; + String sourceType = randomFrom("parquet", "csv", "file", "iceberg"); + List output = randomFieldAttributes(1, 5, false); + Map config = randomBoolean() ? Map.of() : Map.of("endpoint", "https://s3.example.com"); + Map sourceMetadata = randomBoolean() ? Map.of() : Map.of("schema_version", 1); + SimpleSourceMetadata metadata = new SimpleSourceMetadata(output, sourceType, sourcePath, null, null, sourceMetadata, config); + return new ExternalRelation(randomSource(), sourcePath, metadata, output); + } + + @Override + protected ExternalRelation createTestInstance() { + return randomExternalRelation(); + } + + @Override + protected ExternalRelation mutateInstance(ExternalRelation instance) throws IOException { + String sourcePath = instance.sourcePath(); + List output = instance.output(); + String sourceType = instance.metadata().sourceType(); + Map config = instance.metadata().config(); + Map sourceMetadata = instance.metadata().sourceMetadata(); + + switch (between(0, 2)) { + case 0 -> sourcePath = randomValueOtherThan(sourcePath, () -> "s3://bucket/" + randomAlphaOfLength(8) + ".parquet"); + case 1 -> output = randomValueOtherThan(output, () -> randomFieldAttributes(1, 5, false)); + case 2 -> sourceType = randomValueOtherThan(sourceType, () -> randomFrom("parquet", "csv", "file", "iceberg")); + default -> throw new IllegalStateException(); + } + SimpleSourceMetadata metadata = new SimpleSourceMetadata(output, sourceType, sourcePath, null, null, sourceMetadata, config); + return new ExternalRelation(instance.source(), sourcePath, metadata, output); + } + + @Override + protected boolean alwaysEmptySource() { + return true; + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategyTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategyTests.java index c83ff22a50f41..24bb81fcc434a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategyTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategyTests.java @@ -17,10 +17,12 @@ import org.elasticsearch.xpack.esql.datasources.FileSplit; import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit; import org.elasticsearch.xpack.esql.datasources.spi.StoragePath; +import org.elasticsearch.xpack.esql.expression.Order; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.ExternalSourceExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.TopNExec; import java.util.ArrayList; import java.util.List; @@ -93,6 +95,25 @@ public void testLimitOnlyReturnsCoordinator() { assertFalse(plan.distributed()); } + public void testTopNWithMultipleSplitsDistributes() { + PhysicalPlan source = createExternalSourceExec(); + Literal limitExpr = new Literal(Source.EMPTY, 10, DataType.INTEGER); + Order order = new Order(Source.EMPTY, limitExpr, Order.OrderDirection.ASC, Order.NullsPosition.LAST); + PhysicalPlan planWithTopN = new TopNExec(Source.EMPTY, source, List.of(order), limitExpr, null); + + ExternalDistributionContext context = new ExternalDistributionContext( + planWithTopN, + createSplits(4), + createNodes(2), + QueryPragmas.EMPTY + ); + + ExternalDistributionPlan plan = strategy.planDistribution(context); + + assertTrue(plan.distributed()); + assertEquals(2, plan.nodeAssignments().size()); + } + public void testManySplitsNoAggregationDistributes() { ExternalDistributionContext context = new ExternalDistributionContext( createExternalSourceExec(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java index c70d287633028..abe2aee476c46 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalDistributionTests.java @@ -10,7 +10,9 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.compute.aggregation.AggregatorMode; +import org.elasticsearch.compute.operator.topn.TopNOperator.InputOrdering; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; @@ -23,16 +25,30 @@ import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit; import org.elasticsearch.xpack.esql.datasources.spi.SimpleSourceMetadata; import org.elasticsearch.xpack.esql.datasources.spi.StoragePath; +import org.elasticsearch.xpack.esql.expression.Order; +import org.elasticsearch.xpack.esql.expression.function.scalar.math.Abs; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; +import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; +import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Limit; +import org.elasticsearch.xpack.esql.plan.logical.TopN; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; +import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; +import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.ExternalSourceExec; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.TopNExec; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; +import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.session.Versioned; +import org.elasticsearch.xpack.esql.stats.SearchStats; import java.util.List; import java.util.Map; @@ -43,7 +59,21 @@ public class ExternalDistributionTests extends ESTestCase { // --- Mapper tests --- - public void testMapperCreatesSingleAggForExternalSource() { + public void testMapperWrapsExternalRelationInFragmentExec() { + ExternalRelation external = createExternalRelation(); + + Mapper mapper = new Mapper(); + PhysicalPlan physicalPlan = mapper.map(new Versioned<>(external, TransportVersion.current())); + + assertTrue("Expected FragmentExec, got: " + physicalPlan.getClass().getSimpleName(), physicalPlan instanceof FragmentExec); + FragmentExec fragment = (FragmentExec) physicalPlan; + assertTrue( + "Expected ExternalRelation inside fragment, got: " + fragment.fragment().getClass().getSimpleName(), + fragment.fragment() instanceof ExternalRelation + ); + } + + public void testMapperCreatesDistributedAggForExternalSource() { ExternalRelation external = createExternalRelation(); List groupings = List.of(); List aggregates = List.of(); @@ -54,14 +84,24 @@ public void testMapperCreatesSingleAggForExternalSource() { assertTrue("Expected AggregateExec at top, got: " + physicalPlan.getClass().getSimpleName(), physicalPlan instanceof AggregateExec); AggregateExec aggExec = (AggregateExec) physicalPlan; - assertEquals(AggregatorMode.SINGLE, aggExec.getMode()); + assertEquals(AggregatorMode.FINAL, aggExec.getMode()); assertTrue( - "Expected ExternalSourceExec child, got: " + aggExec.child().getClass().getSimpleName(), - aggExec.child() instanceof ExternalSourceExec + "Expected ExchangeExec child, got: " + aggExec.child().getClass().getSimpleName(), + aggExec.child() instanceof ExchangeExec + ); + ExchangeExec exchange = (ExchangeExec) aggExec.child(); + assertTrue( + "Expected FragmentExec child, got: " + exchange.child().getClass().getSimpleName(), + exchange.child() instanceof FragmentExec + ); + FragmentExec fragment = (FragmentExec) exchange.child(); + assertTrue( + "Expected Aggregate inside fragment, got: " + fragment.fragment().getClass().getSimpleName(), + fragment.fragment() instanceof Aggregate ); } - public void testMapperDoesNotInsertExchangeForLimitAboveExternalSource() { + public void testMapperInsertsExchangeForLimitAboveExternalSource() { ExternalRelation external = createExternalRelation(); Limit limit = new Limit(SRC, new Literal(SRC, 10, DataType.INTEGER), external); @@ -71,8 +111,45 @@ public void testMapperDoesNotInsertExchangeForLimitAboveExternalSource() { assertTrue("Expected LimitExec at top, got: " + physicalPlan.getClass().getSimpleName(), physicalPlan instanceof LimitExec); LimitExec limitExec = (LimitExec) physicalPlan; assertTrue( - "Expected ExternalSourceExec child (no exchange), got: " + limitExec.child().getClass().getSimpleName(), - limitExec.child() instanceof ExternalSourceExec + "Expected ExchangeExec child, got: " + limitExec.child().getClass().getSimpleName(), + limitExec.child() instanceof ExchangeExec + ); + ExchangeExec exchange = (ExchangeExec) limitExec.child(); + assertTrue( + "Expected FragmentExec child, got: " + exchange.child().getClass().getSimpleName(), + exchange.child() instanceof FragmentExec + ); + FragmentExec fragment = (FragmentExec) exchange.child(); + assertTrue( + "Expected Limit inside fragment, got: " + fragment.fragment().getClass().getSimpleName(), + fragment.fragment() instanceof Limit + ); + } + + public void testMapperInsertsExchangeForTopNAboveExternalSource() { + ExternalRelation external = createExternalRelation(); + Attribute nameAttr = external.output().get(0); + Order order = new Order(SRC, nameAttr, Order.OrderDirection.ASC, Order.NullsPosition.LAST); + TopN topN = new TopN(SRC, external, List.of(order), new Literal(SRC, 10, DataType.INTEGER), false); + + Mapper mapper = new Mapper(); + PhysicalPlan physicalPlan = mapper.map(new Versioned<>(topN, TransportVersion.current())); + + assertTrue("Expected TopNExec at top, got: " + physicalPlan.getClass().getSimpleName(), physicalPlan instanceof TopNExec); + TopNExec topNExec = (TopNExec) physicalPlan; + assertTrue( + "Expected ExchangeExec child, got: " + topNExec.child().getClass().getSimpleName(), + topNExec.child() instanceof ExchangeExec + ); + ExchangeExec exchange = (ExchangeExec) topNExec.child(); + assertTrue( + "Expected FragmentExec child, got: " + exchange.child().getClass().getSimpleName(), + exchange.child() instanceof FragmentExec + ); + FragmentExec fragment = (FragmentExec) exchange.child(); + assertTrue( + "Expected TopN inside fragment, got: " + fragment.fragment().getClass().getSimpleName(), + fragment.fragment() instanceof TopN ); } @@ -166,8 +243,275 @@ public void testCollapseWithSplitsOnExternalSource() { assertEquals(2, collapsedSource.splits().size()); } + public void testCollapseFragmentExecExchangeRemovesExchange() { + ExternalRelation external = createExternalRelation(); + FragmentExec fragment = new FragmentExec(external); + ExchangeExec exchange = new ExchangeExec(SRC, fragment); + LimitExec limit = new LimitExec(SRC, exchange, new Literal(SRC, 10, DataType.INTEGER), null); + + PhysicalPlan collapsed = ComputeService.collapseExternalSourceExchanges(limit); + + assertTrue("Expected LimitExec at top", collapsed instanceof LimitExec); + LimitExec collapsedLimit = (LimitExec) collapsed; + assertTrue( + "Expected FragmentExec directly under LimitExec, got: " + collapsedLimit.child().getClass().getSimpleName(), + collapsedLimit.child() instanceof FragmentExec + ); + } + + // --- localPlan expansion tests --- + + public void testLocalPlanExpandsFragmentExecToExternalSourceExec() { + ExternalRelation external = createExternalRelation(); + FragmentExec fragment = new FragmentExec(external); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", expanded instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + assertTrue( + "Expected ExternalSourceExec after expansion, got: " + expandedSink.child().getClass().getSimpleName(), + expandedSink.child() instanceof ExternalSourceExec + ); + ExternalSourceExec sourceExec = (ExternalSourceExec) expandedSink.child(); + assertEquals("s3://bucket/data/*.parquet", sourceExec.sourcePath()); + assertEquals("parquet", sourceExec.sourceType()); + } + + public void testLocalPlanExpandsAggregateFragmentContainsExternalSource() { + ExternalRelation external = createExternalRelation(); + Aggregate aggregate = new Aggregate(SRC, external, List.of(), List.of()); + FragmentExec fragment = new FragmentExec(aggregate); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, aggregate.output(), true, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", expanded instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + boolean hasNoFragmentExec = expandedSink.anyMatch(n -> n instanceof FragmentExec) == false; + assertTrue("FragmentExec should be fully expanded", hasNoFragmentExec); + } + + public void testLocalPlanExpandsLimitFragmentToLimitOverExternalSource() { + ExternalRelation external = createExternalRelation(); + Limit limit = new Limit(SRC, new Literal(SRC, 10, DataType.INTEGER), external); + FragmentExec fragment = new FragmentExec(limit); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", expanded instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + PhysicalPlan child = expandedSink.child(); + assertTrue("Expected LimitExec after expansion, got: " + child.getClass().getSimpleName(), child instanceof LimitExec); + LimitExec limitExec = (LimitExec) child; + assertTrue( + "Expected ExternalSourceExec under LimitExec, got: " + limitExec.child().getClass().getSimpleName(), + limitExec.child() instanceof ExternalSourceExec + ); + } + + public void testLocalPlanExpandsTopNFragmentContainsExternalSource() { + ExternalRelation external = createExternalRelation(); + Attribute nameAttr = external.output().get(0); + Order order = new Order(SRC, nameAttr, Order.OrderDirection.ASC, Order.NullsPosition.LAST); + TopN topN = new TopN(SRC, external, List.of(order), new Literal(SRC, 10, DataType.INTEGER), false); + FragmentExec fragment = new FragmentExec(topN); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", expanded instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + boolean hasExternalSource = expandedSink.anyMatch(n -> n instanceof ExternalSourceExec); + assertTrue("Expanded plan should contain ExternalSourceExec", hasExternalSource); + boolean hasNoFragmentExec = expandedSink.anyMatch(n -> n instanceof FragmentExec) == false; + assertTrue("FragmentExec should be fully expanded", hasNoFragmentExec); + } + + public void testLocalPlanExpandsFilterFragmentContainsExternalSource() { + ExternalRelation external = createExternalRelation(); + Attribute nameAttr = external.output().get(0); + Expression filterCondition = new Literal(SRC, true, DataType.BOOLEAN); + Filter filter = new Filter(SRC, external, filterCondition); + FragmentExec fragment = new FragmentExec(filter); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", expanded instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + boolean hasNoFragmentExec = expandedSink.anyMatch(n -> n instanceof FragmentExec) == false; + assertTrue("FragmentExec should be fully expanded", hasNoFragmentExec); + } + + public void testEndToEndMapperThenLocalPlanExpandsFragmentExec() { + ExternalRelation external = createExternalRelation(); + Limit limit = new Limit(SRC, new Literal(SRC, 10, DataType.INTEGER), external); + + Mapper mapper = new Mapper(); + PhysicalPlan coordPlan = mapper.map(new Versioned<>(limit, TransportVersion.current())); + + assertTrue("Expected LimitExec at top", coordPlan instanceof LimitExec); + LimitExec limitExec = (LimitExec) coordPlan; + assertTrue("Expected ExchangeExec child", limitExec.child() instanceof ExchangeExec); + ExchangeExec exchange = (ExchangeExec) limitExec.child(); + assertTrue("Expected FragmentExec child", exchange.child() instanceof FragmentExec); + FragmentExec fragment = (FragmentExec) exchange.child(); + + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, fragment.output(), false, fragment); + PhysicalPlan dataNodePlan = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", dataNodePlan instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) dataNodePlan; + boolean hasExternalSource = expandedSink.anyMatch(n -> n instanceof ExternalSourceExec); + assertTrue("Expanded data node plan should contain ExternalSourceExec", hasExternalSource); + boolean hasNoFragmentExec = expandedSink.anyMatch(n -> n instanceof FragmentExec) == false; + assertTrue("FragmentExec should be fully expanded on data node", hasNoFragmentExec); + } + + // --- Coordinator-only split injection tests --- + + public void testCoordinatorOnlySplitInjectionAfterLocalPlanExpansion() { + ExternalRelation external = createExternalRelation(); + Limit limit = new Limit(SRC, new Literal(SRC, 10, DataType.INTEGER), external); + FragmentExec fragment = new FragmentExec(limit); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + assertTrue("Expected ExchangeSinkExec at top", expanded instanceof ExchangeSinkExec); + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + boolean hasExternalSource = expandedSink.anyMatch(n -> n instanceof ExternalSourceExec); + assertTrue("Expanded plan should contain ExternalSourceExec", hasExternalSource); + + List splits = List.of( + new FileSplit("parquet", StoragePath.of("s3://bucket/file1.parquet"), 0, 1024, ".parquet", Map.of(), Map.of()), + new FileSplit("parquet", StoragePath.of("s3://bucket/file2.parquet"), 0, 1024, ".parquet", Map.of(), Map.of()) + ); + PhysicalPlan withSplits = expanded.transformUp( + ExternalSourceExec.class, + exec -> exec.splits().isEmpty() ? exec.withSplits(splits) : exec + ); + + final List sources = new java.util.ArrayList<>(); + withSplits.forEachDown(ExternalSourceExec.class, sources::add); + assertFalse("Should have at least one ExternalSourceExec", sources.isEmpty()); + assertEquals("Splits should be injected", 2, sources.get(0).splits().size()); + } + + // --- Field retention through local optimization tests --- + + /** + * Verifies that external source fields survive local plan optimization with empty SearchStats. + * ReplaceFieldWithConstantOrNull must not replace external fields with null. + * Without the fix, the optimizer inserts Eval(field=null) + Project nodes above the source. + */ + public void testLocalPlanRetainsExternalSourceFields() { + ExternalRelation external = createMultiFieldExternalRelation(); + Attribute salaryAttr = external.output().get(1); + Expression filterCondition = new GreaterThan(SRC, salaryAttr, new Literal(SRC, 50000, DataType.INTEGER), null); + Filter filter = new Filter(SRC, external, filterCondition); + FragmentExec fragment = new FragmentExec(filter); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + // The filter must survive -- without the fix, the field reference in the filter condition + // is replaced with null by ReplaceFieldWithConstantOrNull, causing PruneFilters to remove it + boolean hasFilter = expandedSink.anyMatch(n -> n instanceof FilterExec); + assertTrue("Filter on external field must survive local optimization with empty SearchStats", hasFilter); + } + + /** + * Verifies that a filter referencing external source fields survives local optimization. + * With empty SearchStats, the filter condition must not be replaced with null. + */ + public void testLocalPlanRetainsFilterOnExternalFields() { + ExternalRelation external = createMultiFieldExternalRelation(); + Attribute salaryAttr = external.output().get(1); + Expression filterCondition = new GreaterThan(SRC, salaryAttr, new Literal(SRC, 50000, DataType.INTEGER), null); + Filter filter = new Filter(SRC, external, filterCondition); + FragmentExec fragment = new FragmentExec(filter); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, external.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + boolean hasFilter = expandedSink.anyMatch(n -> n instanceof FilterExec); + assertTrue("Filter should survive optimization, not be pruned", hasFilter); + } + + /** + * Verifies that eval expressions referencing external source fields produce non-null results. + */ + public void testLocalPlanRetainsEvalOnExternalFields() { + ExternalRelation external = createMultiFieldExternalRelation(); + Attribute salaryAttr = external.output().get(1); + var alias = new org.elasticsearch.xpack.esql.core.expression.Alias(SRC, "abs_salary", new Abs(SRC, salaryAttr)); + Eval eval = new Eval(SRC, external, List.of(alias)); + FragmentExec fragment = new FragmentExec(eval); + ExchangeSinkExec sink = new ExchangeSinkExec(SRC, eval.output(), false, fragment); + + PhysicalPlan expanded = runLocalPlan(sink); + + ExchangeSinkExec expandedSink = (ExchangeSinkExec) expanded; + boolean hasEval = expandedSink.anyMatch(n -> n instanceof EvalExec); + assertTrue("Eval should survive optimization", hasEval); + boolean hasExternalSource = expandedSink.anyMatch(n -> n instanceof ExternalSourceExec); + assertTrue("ExternalSourceExec should be present", hasExternalSource); + } + + /** + * Verifies that collapseExternalSourceExchanges resets TopNExec InputOrdering + * when the exchange is removed and the TopN sits directly above a FragmentExec. + */ + public void testCollapseResetsTopNInputOrdering() { + ExternalRelation external = createExternalRelation(); + TopN topN = new TopN( + SRC, + external, + List.of(new Order(SRC, external.output().get(0), Order.OrderDirection.ASC, Order.NullsPosition.LAST)), + new Literal(SRC, 10, DataType.INTEGER), + false + ); + FragmentExec fragment = new FragmentExec(topN); + ExchangeExec exchange = new ExchangeExec(SRC, fragment); + TopNExec topNExec = new TopNExec( + SRC, + exchange, + List.of(new Order(SRC, external.output().get(0), Order.OrderDirection.ASC, Order.NullsPosition.LAST)), + new Literal(SRC, 10, DataType.INTEGER), + null + ).withSortedInput(); + + assertEquals("Precondition: TopNExec should have SORTED ordering", InputOrdering.SORTED, topNExec.inputOrdering()); + + PhysicalPlan collapsed = ComputeService.collapseExternalSourceExchanges(topNExec); + + assertTrue("Expected TopNExec at top", collapsed instanceof TopNExec); + TopNExec collapsedTopN = (TopNExec) collapsed; + assertEquals("InputOrdering should be reset to NOT_SORTED after collapse", InputOrdering.NOT_SORTED, collapsedTopN.inputOrdering()); + assertTrue("Expected FragmentExec child", collapsedTopN.child() instanceof FragmentExec); + } + // --- Helpers --- + private static PhysicalPlan runLocalPlan(PhysicalPlan plan) { + var config = EsqlTestUtils.TEST_CFG; + return PlannerUtils.localPlan( + PlannerSettings.DEFAULTS, + new EsqlFlags(false), + config, + config.newFoldContext(), + plan, + SearchStats.EMPTY, + null + ); + } + private static ExternalRelation createExternalRelation() { List output = List.of( new FieldAttribute(SRC, "name", new EsField("name", DataType.KEYWORD, Map.of(), false, EsField.TimeSeriesFieldType.NONE)) @@ -176,6 +520,16 @@ private static ExternalRelation createExternalRelation() { return new ExternalRelation(SRC, "s3://bucket/data/*.parquet", metadata, output, null); } + private static ExternalRelation createMultiFieldExternalRelation() { + List output = List.of( + new FieldAttribute(SRC, "name", new EsField("name", DataType.KEYWORD, Map.of(), false, EsField.TimeSeriesFieldType.NONE)), + new FieldAttribute(SRC, "salary", new EsField("salary", DataType.INTEGER, Map.of(), false, EsField.TimeSeriesFieldType.NONE)), + new FieldAttribute(SRC, "age", new EsField("age", DataType.INTEGER, Map.of(), false, EsField.TimeSeriesFieldType.NONE)) + ); + SimpleSourceMetadata metadata = new SimpleSourceMetadata(output, "parquet", "s3://bucket/data/*.parquet"); + return new ExternalRelation(SRC, "s3://bucket/data/*.parquet", metadata, output, null); + } + private static ExternalSourceExec createExternalSourceExec() { return new ExternalSourceExec( SRC, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalSourceDataNodeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalSourceDataNodeTests.java index 82aa34472b544..fb17cc1a8ff09 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalSourceDataNodeTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ExternalSourceDataNodeTests.java @@ -233,18 +233,18 @@ public void testDistributionResultIsDistributedWithRealPlan() { Map> assignments = Map.of("node-0", splits.subList(0, 2), "node-1", splits.subList(2, 4)); ExternalDistributionPlan distributionPlan = new ExternalDistributionPlan(assignments, true); - var result = new ComputeService.ExternalDistributionResult(createExternalSourceExec(), distributionPlan); + var result = new ComputeService.ExternalDistributionResult(createExternalSourceExec(), distributionPlan, List.of()); assertTrue("Should be distributed when plan says distributed", result.isDistributed()); assertThat(result.distributionPlan().nodeAssignments().size(), equalTo(2)); } public void testDistributionResultNotDistributedWithNullPlan() { - var result = new ComputeService.ExternalDistributionResult(createExternalSourceExec(), null); + var result = new ComputeService.ExternalDistributionResult(createExternalSourceExec(), null, List.of()); assertFalse("Should not be distributed with null plan", result.isDistributed()); } public void testDistributionResultNotDistributedWithLocalPlan() { - var result = new ComputeService.ExternalDistributionResult(createExternalSourceExec(), ExternalDistributionPlan.LOCAL); + var result = new ComputeService.ExternalDistributionResult(createExternalSourceExec(), ExternalDistributionPlan.LOCAL, List.of()); assertFalse("Should not be distributed with LOCAL plan", result.isDistributed()); }