Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c4e50cb
Add plan profiling
Nov 17, 2025
d47fa65
Add plan profiling
Nov 17, 2025
aaa044b
Add logical / physical planning info to main plan
Nov 18, 2025
bb67cb9
Add logical / physical planning info to main plan
Nov 25, 2025
93412cb
Add transport versions
Nov 25, 2025
d1a1ab5
Update docs/changelog/138564.yaml
carlosdelest Nov 25, 2025
eadcc23
[CI] Auto commit changes from spotless
Nov 25, 2025
3163150
Fix serialization
Nov 25, 2025
077e7a3
Add test
Nov 25, 2025
5e61597
Merge remote-tracking branch 'carlosdelest/enhancement/esql-profile-p…
Nov 25, 2025
2f8bb1c
Fix changelog
Nov 25, 2025
3ce645d
Merge remote-tracking branch 'origin/main' into enhancement/esql-prof…
Nov 25, 2025
6f917f9
Remove plan time from profiling
carlosdelest Nov 28, 2025
8aa8c76
Add reduction plan
carlosdelest Nov 28, 2025
9cc0d6d
Merge remote-tracking branch 'origin/main' into enhancement/esql-prof…
carlosdelest Nov 28, 2025
f809ad0
Fix transport version
carlosdelest Nov 28, 2025
656b605
[CI] Auto commit changes from spotless
Nov 28, 2025
87788f1
Merge branch 'main' into enhancement/esql-profile-planning-time
carlosdelest Dec 1, 2025
39eebe8
Merge branch 'main' into enhancement/esql-profile-planning-time
carlosdelest Dec 1, 2025
8ea6fa3
Merge remote-tracking branch 'origin/main' into enhancement/esql-prof…
carlosdelest Dec 3, 2025
1eb1220
Fix transport version
carlosdelest Dec 3, 2025
779cee9
Fix merge
carlosdelest Dec 3, 2025
6943d54
Merge remote-tracking branch 'origin/main' into enhancement/esql-prof…
carlosdelest Dec 4, 2025
6be487f
Fix profiling
carlosdelest Dec 4, 2025
b7ce206
Merge remote-tracking branch 'origin/main' into enhancement/esql-prof…
carlosdelest Dec 9, 2025
42ef455
Add transport version after merge
carlosdelest Dec 9, 2025
72e3591
Add transport version after merge
carlosdelest Dec 9, 2025
9046c66
Fix serverless test
carlosdelest Dec 9, 2025
9bdeede
Merge remote-tracking branch 'origin/main' into enhancement/esql-prof…
carlosdelest Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/138564.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138564
summary: ESQL - Add planning detailed timing to profile information
area: "ES|QL"
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9236000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
inference_ccm_enablement_service,9235000
plan_profile_version,9236000
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public static DriverCompletionInfo includingProfiles(
String description,
String clusterName,
String nodeName,
String planTree
String planTree,
PlanTimeProfile planTimeProfile
) {
long documentsFound = 0;
long valuesLoaded = 0;
Expand All @@ -68,7 +69,7 @@ public static DriverCompletionInfo includingProfiles(
documentsFound,
valuesLoaded,
collectedProfiles,
List.of(new PlanProfile(description, clusterName, nodeName, planTree))
List.of(new PlanProfile(description, clusterName, nodeName, planTree, planTimeProfile))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.operator;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -15,10 +16,24 @@

import java.io.IOException;

public record PlanProfile(String description, String clusterName, String nodeName, String planTree) implements Writeable, ToXContentObject {
public record PlanProfile(String description, String clusterName, String nodeName, String planTree, PlanTimeProfile planTimeProfile)
implements
Writeable,
ToXContentObject {

private static final TransportVersion PLAN_PROFILE_VERSION = TransportVersion.fromName("plan_profile_version");

public static PlanProfile readFrom(StreamInput in) throws IOException {
return new PlanProfile(in.readString(), in.readString(), in.readString(), in.readString());
String description = in.readString();
String clusterName = in.readString();
String nodeName = in.readString();
String planTree = in.readString();
PlanTimeProfile profile = null;
if (in.getTransportVersion().supports(PLAN_PROFILE_VERSION)) {
profile = in.readOptionalWriteable(PlanTimeProfile::new);
}

return new PlanProfile(description, clusterName, nodeName, planTree, profile);
}

@Override
Expand All @@ -27,15 +42,22 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(clusterName);
out.writeString(nodeName);
out.writeString(planTree);
if (out.getTransportVersion().supports(PLAN_PROFILE_VERSION)) {
out.writeOptionalWriteable(planTimeProfile);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field("description", description)
.field("cluster_name", clusterName)
.field("node_name", nodeName)
.field("plan", planTree)
.endObject();
builder.startObject();
builder.field("description", description);
builder.field("cluster_name", clusterName);
builder.field("node_name", nodeName);
builder.field("plan", planTree);
if (planTimeProfile != null) {
planTimeProfile.toXContent(builder, params);
}

return builder.endObject();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Profile information for plan optimization phases.
* Captures timing information for logical and physical optimization steps.
*
*/
public final class PlanTimeProfile implements Writeable, ToXContentObject {
private long reductionPlanNanos;
private long logicalOptimizationNanos;
private long physicalOptimizationNanos;

/**
* @param logicalOptimizationNanos Time spent on local logical plan optimization (in nanoseconds)
* @param physicalOptimizationNanos Time spent on local physical plan optimization (in nanoseconds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to move my prior question here: #138400 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind clarifying planNanos as well?

It looks like it includes:

breakPlanBetweenCoordinatorAndDataNode
reductionPlan
plan.collect(LookupJoinExec.class::isInstance)...
All of them should be fairly cheap.

I'm looking at it now - as you say, it's fairly cheap and probably makes no sense as it addresses multiple intermediate operations for planning.

I will remove it as it adds some confusion. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a reduction_nanos field for taking into account the reduction plan timing on node_reduce. LMKWYT!

* @param reductionPlanNanos Time spent on reduction plan for node_reduce phase (in nanoseconds)
*/
public PlanTimeProfile(long logicalOptimizationNanos, long physicalOptimizationNanos, long reductionPlanNanos) {
this.logicalOptimizationNanos = logicalOptimizationNanos;
this.physicalOptimizationNanos = physicalOptimizationNanos;
this.reductionPlanNanos = reductionPlanNanos;
}

public PlanTimeProfile() {
this.logicalOptimizationNanos = 0L;
this.physicalOptimizationNanos = 0L;
this.reductionPlanNanos = 0L;
}

public PlanTimeProfile(StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong(), in.readVLong());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(logicalOptimizationNanos);
out.writeVLong(physicalOptimizationNanos);
out.writeVLong(reductionPlanNanos);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (logicalOptimizationNanos > 0) {
builder.field("logical_optimization_nanos", logicalOptimizationNanos);
}
if (physicalOptimizationNanos > 0) {
builder.field("physical_optimization_nanos", physicalOptimizationNanos);
}
if (reductionPlanNanos > 0) {
builder.field("reduction_nanos", physicalOptimizationNanos);
}
return builder;
}

public void addLogicalOptimizationPlanTime(long logicalOptimizationPlanTime) {
this.logicalOptimizationNanos = this.logicalOptimizationNanos + logicalOptimizationPlanTime;
}

public void addPhysicalOptimizationPlanTime(long physicalOptimizationPlanTime) {
this.physicalOptimizationNanos = this.physicalOptimizationNanos + physicalOptimizationPlanTime;
}

public void addReductionPlanNanos(long reductionPlanNanos) {
this.reductionPlanNanos = this.reductionPlanNanos + reductionPlanNanos;
}

@Override
public boolean equals(Object obj) {
if (obj == this) return true;
if (obj == null || obj.getClass() != this.getClass()) return false;
var that = (PlanTimeProfile) obj;
return this.logicalOptimizationNanos == that.logicalOptimizationNanos
&& this.physicalOptimizationNanos == that.physicalOptimizationNanos
&& this.reductionPlanNanos == that.reductionPlanNanos;
}

@Override
public int hashCode() {
return Objects.hash(logicalOptimizationNanos, physicalOptimizationNanos, reductionPlanNanos);
}

@Override
public String toString() {
return "PlanTimeProfile["
+ "logicalOptimizationNanos="
+ logicalOptimizationNanos
+ ", "
+ "physicalOptimizationNanos="
+ physicalOptimizationNanos
+ ", "
+ "reductionPlanNanos="
+ reductionPlanNanos
+ ']';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.core.Is.is;
Expand Down Expand Up @@ -365,6 +366,29 @@ public void testProfile() throws IOException {
default -> throw new IllegalArgumentException("can't match " + description);
}
}
@SuppressWarnings("unchecked")
List<Map<String, Object>> plans = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("plans");
for (Map<String, Object> plan : plans) {
assertThat(plan.get("cluster_name"), equalTo("test-cluster"));
assertThat(plan.get("node_name"), notNullValue());
assertThat(plan.get("plan"), notNullValue());
String description = (String) plan.get("description");
assertTrue("Unexpected plan description " + description, Set.of("final", "node_reduce", "data").contains(description));
switch (description) {
case "final", "data" -> {
assertThat((int) plan.get("logical_optimization_nanos"), greaterThanOrEqualTo(0));
assertThat((int) plan.get("physical_optimization_nanos"), greaterThanOrEqualTo(0));
assertFalse(plan.containsKey("reduction_nanos"));
}
case "node_reduce" -> {
assertThat((int) plan.get("reduction_nanos"), greaterThanOrEqualTo(0));
assertFalse(plan.containsKey("logical_optimization_nanos"));
assertFalse(plan.containsKey("physical_optimization_nanos"));
}
default -> {
}
}
}
}

private final String PROCESS_NAME = "process_name";
Expand Down Expand Up @@ -703,7 +727,7 @@ public void testForceSleepsProfile() throws IOException {
String operators = p.get("operators").toString();
MapMatcher sleepMatcher = matchesMap().entry("reason", "exchange empty")
.entry("sleep_millis", greaterThan(0L))
.entry("thread_name", Matchers.containsString("[esql_worker]")) // NB: this doesn't run in the test thread
.entry("thread_name", containsString("[esql_worker]")) // NB: this doesn't run in the test thread
.entry("wake_millis", greaterThan(0L));
String description = p.get("description").toString();
switch (description) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.operator.PlanTimeProfile;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexMode;
Expand Down Expand Up @@ -179,9 +180,10 @@ public static PhysicalPlan localPlan(
List<SearchExecutionContext> searchContexts,
Configuration configuration,
FoldContext foldCtx,
PhysicalPlan plan
PhysicalPlan plan,
PlanTimeProfile planTimeProfile
) {
return localPlan(plannerSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts));
return localPlan(plannerSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts), planTimeProfile);
}

public static PhysicalPlan localPlan(
Expand All @@ -190,14 +192,15 @@ public static PhysicalPlan localPlan(
Configuration configuration,
FoldContext foldCtx,
PhysicalPlan plan,
SearchStats searchStats
SearchStats searchStats,
PlanTimeProfile planTimeProfile
) {
final var logicalOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats));
var physicalOptimizer = new LocalPhysicalPlanOptimizer(
new LocalPhysicalOptimizerContext(plannerSettings, flags, configuration, foldCtx, searchStats)
);

return localPlan(plan, logicalOptimizer, physicalOptimizer);
return localPlan(plan, logicalOptimizer, physicalOptimizer, planTimeProfile);
}

public static PhysicalPlan integrateEsFilterIntoFragment(PhysicalPlan plan, @Nullable QueryBuilder esFilter) {
Expand All @@ -215,7 +218,8 @@ public static PhysicalPlan integrateEsFilterIntoFragment(PhysicalPlan plan, @Nul
public static PhysicalPlan localPlan(
PhysicalPlan plan,
LocalLogicalPlanOptimizer logicalOptimizer,
LocalPhysicalPlanOptimizer physicalOptimizer
LocalPhysicalPlanOptimizer physicalOptimizer,
PlanTimeProfile planTimeProfile
) {
var isCoordPlan = new Holder<>(Boolean.TRUE);
Set<PhysicalPlan> lookupJoinExecRightChildren = plan.collect(LookupJoinExec.class::isInstance)
Expand All @@ -231,19 +235,36 @@ public static PhysicalPlan localPlan(
return f;
}
isCoordPlan.set(Boolean.FALSE);

// Logical optimization
boolean profilingEnabled = planTimeProfile != null;
long logicalStartNanos = profilingEnabled ? System.nanoTime() : 0;
LogicalPlan optimizedFragment = logicalOptimizer.localOptimize(f.fragment());
PhysicalPlan physicalFragment = LocalMapper.INSTANCE.map(optimizedFragment);
if (profilingEnabled) {
planTimeProfile.addLogicalOptimizationPlanTime(System.nanoTime() - logicalStartNanos);
}
QueryBuilder filter = f.esFilter();
if (filter != null) {
physicalFragment = physicalFragment.transformUp(
EsSourceExec.class,
query -> new EsSourceExec(Source.EMPTY, query.indexPattern(), query.indexMode(), query.output(), filter)
);
}

// Physical optimization
long physicalStartNanos = profilingEnabled ? System.nanoTime() : 0;
var localOptimized = physicalOptimizer.localOptimize(physicalFragment);
if (profilingEnabled) {
planTimeProfile.addPhysicalOptimizationPlanTime(System.nanoTime() - physicalStartNanos);
}

return EstimatesRowSize.estimateRowSize(f.estimatedRowSize(), localOptimized);
});
return isCoordPlan.get() ? plan : localPhysicalPlan;

PhysicalPlan resultPlan = isCoordPlan.get() ? plan : localPhysicalPlan;

return resultPlan;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.compute.lucene.EmptyIndexedByShardId;
import org.elasticsearch.compute.operator.DriverCompletionInfo;
import org.elasticsearch.compute.operator.PlanTimeProfile;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -267,7 +268,8 @@ void runComputeOnRemoteCluster(
configuration.newFoldContext(),
plan,
true,
false
false,
configuration.profile() ? new PlanTimeProfile() : null
);
PhysicalPlan coordinatorPlan = reductionPlan.nodeReducePlan();
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
Expand Down Expand Up @@ -299,6 +301,7 @@ void runComputeOnRemoteCluster(
() -> exchangeSink.createExchangeSink(() -> {})
),
coordinatorPlan,
configuration.profile() ? new PlanTimeProfile() : null,
computeListener.acquireCompute()
);
dataNodeComputeHandler.startComputeOnDataNodes(
Expand Down
Loading