Skip to content
Merged
5 changes: 5 additions & 0 deletions docs/changelog/143696.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: ES|QL
issues: []
pr: 143696
summary: Enable distributed pipeline breakers for external sources via `FragmentExec`
type: enhancement
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,34 @@ emp_no:integer | height:double | height_float_rounded:double | height.scaled_flo
10004 | 1.78 | 1.78 | 1.78 | 1.78
10005 | 2.05 | 2.05 | 2.05 | 2.05
;

// Pipeline breaker distribution tests
// These exercise distributed execution paths for pipeline breakers (STATS, TopN, LIMIT)
// across all distribution modes via ExternalDistributedSpecIT

topNSortBySalaryDesc
required_capability: external_command
EXTERNAL "{{employees}}"
| KEEP emp_no, first_name, salary
| SORT salary DESC
| LIMIT 3;

emp_no:integer | first_name:keyword | salary:integer
10029 | "Otmar" | 74999
10045 | "Moss" | 74970
10007 | "Tzvetan" | 74572
;

topNFilteredSortBySalary
required_capability: external_command
EXTERNAL "{{employees}}"
| WHERE gender == "F"
| KEEP emp_no, first_name, salary
| SORT salary DESC
| LIMIT 3;

emp_no:integer | first_name:keyword | salary:integer
10007 | "Tzvetan" | 74572
10027 | "Divier" | 73851
10099 | "Valter" | 73578
;
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.esql.plan.logical.CompoundOutputEval;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
Expand All @@ -47,7 +48,9 @@ public class ReplaceFieldWithConstantOrNull extends ParameterizedRule<LogicalPla
@Override
public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext localLogicalOptimizerContext) {
var lookupFieldsBuilder = AttributeSet.builder();
var externalFieldsBuilder = AttributeSet.builder();
Map<Attribute, Expression> attrToConstant = new HashMap<>();
plan.forEachUp(ExternalRelation.class, external -> externalFieldsBuilder.addAll(external.output()));
plan.forEachUp(EsRelation.class, esRelation -> {
// Looking for indices in LOOKUP mode is correct: during parsing, we assign the expected mode and even if a lookup index
// is used in the FROM command, it will not be marked with LOOKUP mode there - but STANDARD.
Expand Down Expand Up @@ -77,14 +80,16 @@ else if (esRelation.indexMode() == IndexMode.STANDARD) {
}
});
AttributeSet lookupFields = lookupFieldsBuilder.build();
AttributeSet externalFields = externalFieldsBuilder.build();

// Do not use the attribute name, this can deviate from the field name for union types; use fieldName() instead.
// Also retain fields from lookup indices because we do not have stats for these.
// Also retain fields from lookup indices and external sources because we do not have stats for these.
Predicate<FieldAttribute> shouldBeRetained = f -> f.field() instanceof PotentiallyUnmappedKeywordEsField
// The source (or doc) field is added to the relation output as a hack to enable late materialization in the reduce driver.
|| EsQueryExec.isDocAttribute(f)
|| localLogicalOptimizerContext.searchStats().exists(f.fieldName())
|| lookupFields.contains(f);
|| lookupFields.contains(f)
|| externalFields.contains(f);

return plan.transformUp(p -> replaceWithNullOrConstant(p, shouldBeRetained, attrToConstant));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.Grok;
import org.elasticsearch.xpack.esql.plan.logical.InlineStats;
Expand Down Expand Up @@ -88,6 +89,7 @@ public static List<NamedWriteableRegistry.Entry> logical() {
Enrich.ENTRY,
EsRelation.ENTRY,
Eval.ENTRY,
ExternalRelation.ENTRY,
Filter.ENTRY,
Grok.ENTRY,
InlineJoin.ENTRY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,35 @@
*/
package org.elasticsearch.xpack.esql.plan.logical;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.datasources.FileSet;
import org.elasticsearch.xpack.esql.datasources.spi.SimpleSourceMetadata;
import org.elasticsearch.xpack.esql.datasources.spi.SourceMetadata;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.physical.ExternalSourceExec;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Logical plan node for external data source relations (e.g., Iceberg table, Parquet file).
* This plan node is executed on the coordinator only (no dispatch to data nodes).
* <p>
* Unlike EsRelation which wraps into FragmentExec for data node dispatch,
* ExternalRelation maps directly to physical source operators via LocalMapper,
* similar to how LocalRelation works.
* Like {@link EsRelation}, the Mapper wraps this into a {@link org.elasticsearch.xpack.esql.plan.physical.FragmentExec}
* so that pipeline breakers (Aggregate, Limit, TopN) above it are distributed to data nodes
* via ExchangeExec. On data nodes, {@code localPlan()} expands the FragmentExec through
* LocalMapper into {@link ExternalSourceExec}, enabling local optimizations such as
* filter pushdown via FilterPushdownRegistry.
* <p>
* This class provides a source-agnostic logical plan node for external data sources.
* It can represent any external source (Iceberg, Parquet, CSV, etc.) without requiring
* source-specific subclasses in core ESQL code.
* The {@link ExecutesOn.Coordinator} marker is retained for logical plan validation
* (e.g., Enrich/Join hoist rules that inspect whether a relation executes on the coordinator).
* <p>
* The source-specific metadata is stored in the {@link SourceMetadata} interface, which
* provides:
Expand All @@ -45,6 +51,12 @@
*/
public class ExternalRelation extends LeafPlan implements ExecutesOn.Coordinator {

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
LogicalPlan.class,
"ExternalRelation",
ExternalRelation::readFrom
);

private final String sourcePath;
private final List<Attribute> output;
private final SourceMetadata metadata;
Expand All @@ -71,14 +83,32 @@ public ExternalRelation(Source source, String sourcePath, SourceMetadata metadat
this(source, sourcePath, metadata, output, FileSet.UNRESOLVED);
}

private static ExternalRelation readFrom(StreamInput in) throws IOException {
var source = Source.readFrom((PlanStreamInput) in);
String sourcePath = in.readString();
String sourceType = in.readString();
var output = in.readNamedWriteableCollectionAsList(Attribute.class);
@SuppressWarnings("unchecked")
Map<String, Object> config = (Map<String, Object>) in.readGenericValue();
@SuppressWarnings("unchecked")
Map<String, Object> sourceMetadata = (Map<String, Object>) in.readGenericValue();
var metadata = new SimpleSourceMetadata(output, sourceType, sourcePath, null, null, sourceMetadata, config);
return new ExternalRelation(source, sourcePath, metadata, output, FileSet.UNRESOLVED);
}

@Override
public void writeTo(StreamOutput out) {
throw new UnsupportedOperationException("ExternalRelation is not yet serializable for cross-cluster operations");
public void writeTo(StreamOutput out) throws IOException {
Source.EMPTY.writeTo(out);
out.writeString(sourcePath);
out.writeString(metadata.sourceType());
out.writeNamedWriteableCollection(output);
out.writeGenericValue(metadata.config());
out.writeGenericValue(metadata.sourceMetadata());
}

@Override
public String getWriteableName() {
throw new UnsupportedOperationException("ExternalRelation is not yet serializable for cross-cluster operations");
return ENTRY.name;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit;
import org.elasticsearch.xpack.esql.datasources.spi.FormatReader;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;

import java.io.IOException;
import java.util.List;
Expand All @@ -29,7 +28,7 @@
/**
* Generic physical plan node for reading from external data sources (e.g., Iceberg tables, Parquet files).
* <p>
* This is the unified physical plan node for all external sources, replacing source-specific nodes
* This is the unified physical plan node for all external sources, replacing source-specific nodes.
* It uses generic maps for configuration and metadata to avoid leaking
* source-specific types (like S3Configuration) into core ESQL code.
* <p>
Expand All @@ -40,13 +39,13 @@
* <li><b>Opaque metadata</b>: Source-specific data (native schema, etc.) is stored in
* {@link #sourceMetadata()} and passed through without core understanding it</li>
* <li><b>Opaque pushed filter</b>: The {@link #pushedFilter()} is an opaque Object that only
* the source-specific operator factory interprets. It is NOT serialized because external
* sources execute on coordinator only ({@link ExecutesOn.Coordinator})</li>
* <li><b>Coordinator-only execution</b>: External sources run entirely on the coordinator node,
* so no cross-node serialization of source-specific data is needed</li>
* the source-specific operator factory interprets. It is NOT serialized; it is created
* locally on each data node by the LocalPhysicalPlanOptimizer via FilterPushdownRegistry</li>
* <li><b>Data node execution</b>: Created on data nodes by LocalMapper from
* {@link org.elasticsearch.xpack.esql.plan.logical.ExternalRelation} inside FragmentExec</li>
* </ul>
*/
public class ExternalSourceExec extends LeafExec implements EstimatesRowSize, ExecutesOn.Coordinator {
public class ExternalSourceExec extends LeafExec implements EstimatesRowSize {

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
PhysicalPlan.class,
Expand All @@ -61,10 +60,10 @@ public class ExternalSourceExec extends LeafExec implements EstimatesRowSize, Ex
private final List<Attribute> attributes;
private final Map<String, Object> config;
private final Map<String, Object> sourceMetadata;
private final Object pushedFilter; // Opaque filter - NOT serialized (coordinator only)
private final int pushedLimit; // NOT serialized (coordinator only)
private final Object pushedFilter; // Opaque filter - NOT serialized, created locally on data nodes
private final int pushedLimit; // NOT serialized, set locally on data nodes
private final Integer estimatedRowSize;
private final FileSet fileSet; // NOT serialized - coordinator only
private final FileSet fileSet; // NOT serialized - resolved on coordinator, null on data nodes
private final List<ExternalSplit> splits;

public ExternalSourceExec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
Expand Down Expand Up @@ -71,8 +72,10 @@ private PhysicalPlan mapLeaf(LeafPlan leaf) {
return new EsSourceExec(esRelation);
}

// ExternalRelation is handled by MapperUtils.mapLeaf()
// via its toPhysicalExec() method, bypassing FragmentExec/ExchangeExec dispatch
if (leaf instanceof ExternalRelation external) {
return external.toPhysicalExec();
}

return MapperUtils.mapLeaf(leaf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.Fork;
import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
Expand Down Expand Up @@ -87,8 +88,10 @@ private PhysicalPlan mapLeaf(LeafPlan leaf) {
return new FragmentExec(esRelation);
}

// ExternalRelation is handled by MapperUtils.mapLeaf()
// which calls toPhysicalExec() to create coordinator-only source operators
if (leaf instanceof ExternalRelation external) {
return new FragmentExec(external);
}

return MapperUtils.mapLeaf(leaf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.xpack.esql.plan.logical.Dissect;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.Grok;
import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
Expand Down Expand Up @@ -70,12 +69,6 @@ static PhysicalPlan mapLeaf(LeafPlan p) {
return new LocalSourceExec(local.source(), local.output(), local.supplier());
}

// External data sources (Iceberg, Parquet, etc.)
// These are executed on the coordinator only, bypassing FragmentExec/ExchangeExec dispatch
if (p instanceof ExternalRelation external) {
return external.toPhysicalExec();
}

// Commands
if (p instanceof ShowInfo showInfo) {
return new ShowExec(showInfo.source(), showInfo.output(), showInfo.values());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
/**
* Adaptive distribution strategy for external sources.
* <p>
* Distributes when the plan contains aggregations and there are multiple splits,
* or when the split count exceeds the number of eligible nodes.
* Stays on the coordinator for single splits or LIMIT-only plans.
* Distributes when the plan contains pipeline breakers (aggregations, TopN)
* and there are multiple splits, or when the split count exceeds the number
* of eligible nodes. Stays on the coordinator for single splits or LIMIT-only plans.
*/
public final class AdaptiveStrategy implements ExternalDistributionStrategy {

Expand Down Expand Up @@ -56,10 +56,10 @@ public ExternalDistributionPlan planDistribution(ExternalDistributionContext con
return ExternalDistributionPlan.LOCAL;
}

boolean hasAggregation = plan.anyMatch(n -> n instanceof AggregateExec);
boolean hasPipelineBreaker = plan.anyMatch(n -> n instanceof AggregateExec || n instanceof TopNExec);
boolean manySplits = splits.size() > nodes.size();

if (hasAggregation || manySplits) {
if (hasPipelineBreaker || manySplits) {
boolean allHaveSize = true;
for (ExternalSplit split : splits) {
if (split.estimatedSizeInBytes() <= 0) {
Expand Down
Loading