diff --git a/docs/changelog/138564.yaml b/docs/changelog/138564.yaml new file mode 100644 index 0000000000000..d60104c1950af --- /dev/null +++ b/docs/changelog/138564.yaml @@ -0,0 +1,5 @@ +pr: 138564 +summary: ESQL - Add planning detailed timing to profile information +area: "ES|QL" +type: enhancement +issues: [] diff --git a/server/src/main/resources/transport/definitions/referable/plan_profile_version.csv b/server/src/main/resources/transport/definitions/referable/plan_profile_version.csv new file mode 100644 index 0000000000000..6a0019b688ef0 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/plan_profile_version.csv @@ -0,0 +1 @@ +9236000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index e75bb12605445..bbc3e4edba1e8 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -inference_ccm_enablement_service,9235000 +plan_profile_version,9236000 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java index a1a3ac2070ac0..b0f163a261872 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java @@ -51,7 +51,8 @@ public static DriverCompletionInfo includingProfiles( String description, String clusterName, String nodeName, - String planTree + String planTree, + PlanTimeProfile planTimeProfile ) { long documentsFound = 0; long valuesLoaded = 0; @@ -68,7 +69,7 @@ public static DriverCompletionInfo includingProfiles( documentsFound, valuesLoaded, collectedProfiles, - List.of(new PlanProfile(description, clusterName, nodeName, planTree)) + List.of(new PlanProfile(description, clusterName, nodeName, planTree, planTimeProfile)) ); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanProfile.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanProfile.java index 502fa024d31ea..d8c3fa75f31a3 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanProfile.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanProfile.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.operator; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -15,10 +16,24 @@ import java.io.IOException; -public record PlanProfile(String description, String clusterName, String nodeName, String planTree) implements Writeable, ToXContentObject { +public record PlanProfile(String description, String clusterName, String nodeName, String planTree, PlanTimeProfile planTimeProfile) + implements + Writeable, + ToXContentObject { + + private static final TransportVersion PLAN_PROFILE_VERSION = TransportVersion.fromName("plan_profile_version"); public static PlanProfile readFrom(StreamInput in) throws IOException { - return new PlanProfile(in.readString(), in.readString(), in.readString(), in.readString()); + String description = in.readString(); + String clusterName = in.readString(); + String nodeName = in.readString(); + String planTree = in.readString(); + PlanTimeProfile profile = null; + if (in.getTransportVersion().supports(PLAN_PROFILE_VERSION)) { + profile = in.readOptionalWriteable(PlanTimeProfile::new); + } + + return new PlanProfile(description, clusterName, nodeName, planTree, profile); } @Override @@ -27,15 +42,22 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(clusterName); out.writeString(nodeName); out.writeString(planTree); + if (out.getTransportVersion().supports(PLAN_PROFILE_VERSION)) { + out.writeOptionalWriteable(planTimeProfile); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.startObject() - .field("description", description) - .field("cluster_name", clusterName) - .field("node_name", nodeName) - .field("plan", planTree) - .endObject(); + builder.startObject(); + builder.field("description", description); + builder.field("cluster_name", clusterName); + builder.field("node_name", nodeName); + builder.field("plan", planTree); + if (planTimeProfile != null) { + planTimeProfile.toXContent(builder, params); + } + + return builder.endObject(); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanTimeProfile.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanTimeProfile.java new file mode 100644 index 0000000000000..01eecf84feef9 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanTimeProfile.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * Profile information for plan optimization phases. + * Captures timing information for logical and physical optimization steps. + * + */ +public final class PlanTimeProfile implements Writeable, ToXContentObject { + private long reductionPlanNanos; + private long logicalOptimizationNanos; + private long physicalOptimizationNanos; + + /** + * @param logicalOptimizationNanos Time spent on local logical plan optimization (in nanoseconds) + * @param physicalOptimizationNanos Time spent on local physical plan optimization (in nanoseconds) + * @param reductionPlanNanos Time spent on reduction plan for node_reduce phase (in nanoseconds) + */ + public PlanTimeProfile(long logicalOptimizationNanos, long physicalOptimizationNanos, long reductionPlanNanos) { + this.logicalOptimizationNanos = logicalOptimizationNanos; + this.physicalOptimizationNanos = physicalOptimizationNanos; + this.reductionPlanNanos = reductionPlanNanos; + } + + public PlanTimeProfile() { + this.logicalOptimizationNanos = 0L; + this.physicalOptimizationNanos = 0L; + this.reductionPlanNanos = 0L; + } + + public PlanTimeProfile(StreamInput in) throws IOException { + this(in.readVLong(), in.readVLong(), in.readVLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(logicalOptimizationNanos); + out.writeVLong(physicalOptimizationNanos); + out.writeVLong(reductionPlanNanos); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (logicalOptimizationNanos > 0) { + builder.field("logical_optimization_nanos", logicalOptimizationNanos); + } + if (physicalOptimizationNanos > 0) { + builder.field("physical_optimization_nanos", physicalOptimizationNanos); + } + if (reductionPlanNanos > 0) { + builder.field("reduction_nanos", physicalOptimizationNanos); + } + return builder; + } + + public void addLogicalOptimizationPlanTime(long logicalOptimizationPlanTime) { + this.logicalOptimizationNanos = this.logicalOptimizationNanos + logicalOptimizationPlanTime; + } + + public void addPhysicalOptimizationPlanTime(long physicalOptimizationPlanTime) { + this.physicalOptimizationNanos = this.physicalOptimizationNanos + physicalOptimizationPlanTime; + } + + public void addReductionPlanNanos(long reductionPlanNanos) { + this.reductionPlanNanos = this.reductionPlanNanos + reductionPlanNanos; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) return true; + if (obj == null || obj.getClass() != this.getClass()) return false; + var that = (PlanTimeProfile) obj; + return this.logicalOptimizationNanos == that.logicalOptimizationNanos + && this.physicalOptimizationNanos == that.physicalOptimizationNanos + && this.reductionPlanNanos == that.reductionPlanNanos; + } + + @Override + public int hashCode() { + return Objects.hash(logicalOptimizationNanos, physicalOptimizationNanos, reductionPlanNanos); + } + + @Override + public String toString() { + return "PlanTimeProfile[" + + "logicalOptimizationNanos=" + + logicalOptimizationNanos + + ", " + + "physicalOptimizationNanos=" + + physicalOptimizationNanos + + ", " + + "reductionPlanNanos=" + + reductionPlanNanos + + ']'; + } + +} diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 5cb10aa89dddf..209fc0202fc46 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -73,6 +73,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.oneOf; import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.Is.is; @@ -365,6 +366,29 @@ public void testProfile() throws IOException { default -> throw new IllegalArgumentException("can't match " + description); } } + @SuppressWarnings("unchecked") + List> plans = (List>) ((Map) result.get("profile")).get("plans"); + for (Map plan : plans) { + assertThat(plan.get("cluster_name"), equalTo("test-cluster")); + assertThat(plan.get("node_name"), notNullValue()); + assertThat(plan.get("plan"), notNullValue()); + String description = (String) plan.get("description"); + assertTrue("Unexpected plan description " + description, Set.of("final", "node_reduce", "data").contains(description)); + switch (description) { + case "final", "data" -> { + assertThat((int) plan.get("logical_optimization_nanos"), greaterThanOrEqualTo(0)); + assertThat((int) plan.get("physical_optimization_nanos"), greaterThanOrEqualTo(0)); + assertFalse(plan.containsKey("reduction_nanos")); + } + case "node_reduce" -> { + assertThat((int) plan.get("reduction_nanos"), greaterThanOrEqualTo(0)); + assertFalse(plan.containsKey("logical_optimization_nanos")); + assertFalse(plan.containsKey("physical_optimization_nanos")); + } + default -> { + } + } + } } private final String PROCESS_NAME = "process_name"; @@ -703,7 +727,7 @@ public void testForceSleepsProfile() throws IOException { String operators = p.get("operators").toString(); MapMatcher sleepMatcher = matchesMap().entry("reason", "exchange empty") .entry("sleep_millis", greaterThan(0L)) - .entry("thread_name", Matchers.containsString("[esql_worker]")) // NB: this doesn't run in the test thread + .entry("thread_name", containsString("[esql_worker]")) // NB: this doesn't run in the test thread .entry("wake_millis", greaterThan(0L)); String description = p.get("description").toString(); switch (description) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 8ff62c9b4d76d..60bd97d166193 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -13,6 +13,7 @@ import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.operator.PlanTimeProfile; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; @@ -179,9 +180,10 @@ public static PhysicalPlan localPlan( List searchContexts, Configuration configuration, FoldContext foldCtx, - PhysicalPlan plan + PhysicalPlan plan, + PlanTimeProfile planTimeProfile ) { - return localPlan(plannerSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts)); + return localPlan(plannerSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts), planTimeProfile); } public static PhysicalPlan localPlan( @@ -190,14 +192,15 @@ public static PhysicalPlan localPlan( Configuration configuration, FoldContext foldCtx, PhysicalPlan plan, - SearchStats searchStats + SearchStats searchStats, + PlanTimeProfile planTimeProfile ) { final var logicalOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats)); var physicalOptimizer = new LocalPhysicalPlanOptimizer( new LocalPhysicalOptimizerContext(plannerSettings, flags, configuration, foldCtx, searchStats) ); - return localPlan(plan, logicalOptimizer, physicalOptimizer); + return localPlan(plan, logicalOptimizer, physicalOptimizer, planTimeProfile); } public static PhysicalPlan integrateEsFilterIntoFragment(PhysicalPlan plan, @Nullable QueryBuilder esFilter) { @@ -215,7 +218,8 @@ public static PhysicalPlan integrateEsFilterIntoFragment(PhysicalPlan plan, @Nul public static PhysicalPlan localPlan( PhysicalPlan plan, LocalLogicalPlanOptimizer logicalOptimizer, - LocalPhysicalPlanOptimizer physicalOptimizer + LocalPhysicalPlanOptimizer physicalOptimizer, + PlanTimeProfile planTimeProfile ) { var isCoordPlan = new Holder<>(Boolean.TRUE); Set lookupJoinExecRightChildren = plan.collect(LookupJoinExec.class::isInstance) @@ -231,8 +235,15 @@ public static PhysicalPlan localPlan( return f; } isCoordPlan.set(Boolean.FALSE); + + // Logical optimization + boolean profilingEnabled = planTimeProfile != null; + long logicalStartNanos = profilingEnabled ? System.nanoTime() : 0; LogicalPlan optimizedFragment = logicalOptimizer.localOptimize(f.fragment()); PhysicalPlan physicalFragment = LocalMapper.INSTANCE.map(optimizedFragment); + if (profilingEnabled) { + planTimeProfile.addLogicalOptimizationPlanTime(System.nanoTime() - logicalStartNanos); + } QueryBuilder filter = f.esFilter(); if (filter != null) { physicalFragment = physicalFragment.transformUp( @@ -240,10 +251,20 @@ public static PhysicalPlan localPlan( query -> new EsSourceExec(Source.EMPTY, query.indexPattern(), query.indexMode(), query.output(), filter) ); } + + // Physical optimization + long physicalStartNanos = profilingEnabled ? System.nanoTime() : 0; var localOptimized = physicalOptimizer.localOptimize(physicalFragment); + if (profilingEnabled) { + planTimeProfile.addPhysicalOptimizationPlanTime(System.nanoTime() - physicalStartNanos); + } + return EstimatesRowSize.estimateRowSize(f.estimatedRowSize(), localOptimized); }); - return isCoordPlan.get() ? plan : localPhysicalPlan; + + PhysicalPlan resultPlan = isCoordPlan.get() ? plan : localPhysicalPlan; + + return resultPlan; } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 4131ee0d4582e..b0f1a6297c58e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.compute.lucene.EmptyIndexedByShardId; import org.elasticsearch.compute.operator.DriverCompletionInfo; +import org.elasticsearch.compute.operator.PlanTimeProfile; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.core.Releasable; @@ -267,7 +268,8 @@ void runComputeOnRemoteCluster( configuration.newFoldContext(), plan, true, - false + false, + configuration.profile() ? new PlanTimeProfile() : null ); PhysicalPlan coordinatorPlan = reductionPlan.nodeReducePlan(); final AtomicReference finalResponse = new AtomicReference<>(); @@ -299,6 +301,7 @@ void runComputeOnRemoteCluster( () -> exchangeSink.createExchangeSink(() -> {}) ), coordinatorPlan, + configuration.profile() ? new PlanTimeProfile() : null, computeListener.acquireCompute() ); dataNodeComputeHandler.startComputeOnDataNodes( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 80a0a0076ff5e..e35205d661ebf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -26,6 +26,7 @@ import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.FailureCollector; +import org.elasticsearch.compute.operator.PlanTimeProfile; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSink; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; @@ -201,6 +202,7 @@ public void execute( Configuration configuration, FoldContext foldContext, EsqlExecutionInfo execInfo, + PlanTimeProfile planTimeProfile, ActionListener listener ) { assert ThreadPool.assertCurrentThreadPool( @@ -233,7 +235,8 @@ public void execute( null, listener, null, - initialClusterStatuses + initialClusterStatuses, + planTimeProfile ); return; } @@ -281,7 +284,7 @@ public void execute( }) ) ) { - runCompute(rootTask, computeContext, mainPlan, localListener.acquireCompute()); + runCompute(rootTask, computeContext, mainPlan, planTimeProfile, localListener.acquireCompute()); for (int i = 0; i < subplans.size(); i++) { var subplan = subplans.get(i); @@ -310,7 +313,8 @@ public void execute( subPlanListener.onFailure(e); }), () -> exchangeSink.createExchangeSink(() -> {}), - initialClusterStatuses + initialClusterStatuses, + configuration.profile() ? new PlanTimeProfile() : null ); } } @@ -328,7 +332,8 @@ public void executePlan( String profileQualifier, ActionListener listener, Supplier exchangeSinkSupplier, - Map initialClusterStatuses + Map initialClusterStatuses, + PlanTimeProfile planTimeProfile ) { Tuple coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode( physicalPlan, @@ -382,7 +387,7 @@ public void executePlan( }) ) ) { - runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute()); + runCompute(rootTask, computeContext, coordinatorPlan, planTimeProfile, computeListener.acquireCompute()); return; } } else { @@ -463,6 +468,7 @@ public void executePlan( exchangeSinkSupplier ), coordinatorPlan, + planTimeProfile, localListener.acquireCompute() ); // starts computes on data nodes on the main cluster @@ -631,7 +637,13 @@ static void failIfAllShardsFailed(EsqlExecutionInfo execInfo, List finalRe ExceptionsHelper.reThrowIfNotNull(failureCollector.getFailure()); } - void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener listener) { + void runCompute( + CancellableTask task, + ComputeContext context, + PhysicalPlan plan, + PlanTimeProfile planTimeProfile, + ActionListener listener + ) { var shardContexts = context.searchContexts().map(ComputeSearchContext::shardContext); EsPhysicalOperationProviders physicalOperationProviders = new EsPhysicalOperationProviders( context.foldCtx(), @@ -665,7 +677,8 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, new ArrayList<>(context.searchExecutionContexts().collection()), context.configuration(), context.foldCtx(), - plan + plan, + planTimeProfile ); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Local plan for {}:\n{}", context.description(), localPlan); @@ -690,7 +703,7 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, throw new IllegalStateException("no drivers created"); } LOGGER.debug("using {} drivers", drivers.size()); - ActionListener driverListener = addCompletionInfo(listener, drivers, context, localPlan); + ActionListener driverListener = addCompletionInfo(listener, drivers, context, localPlan, planTimeProfile); driverRunner.executeDrivers( task, drivers, @@ -708,7 +721,8 @@ ActionListener addCompletionInfo( ActionListener listener, List drivers, ComputeContext context, - PhysicalPlan localPlan + PhysicalPlan localPlan, + PlanTimeProfile planTimeProfile ) { /* * We *really* don't want to close over the localPlan because it can @@ -717,34 +731,27 @@ ActionListener addCompletionInfo( boolean needPlanString = LOGGER.isDebugEnabled() || context.configuration().profile(); String planString = needPlanString ? localPlan.toString() : null; return listener.map(ignored -> { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "finished {}", - DriverCompletionInfo.includingProfiles( - drivers, - context.description(), - clusterService.getClusterName().value(), - transportService.getLocalNode().getName(), - planString - ) - ); - /* - * planString *might* be null if we *just* set DEBUG to *after* - * we built the listener but before we got here. That's something - * we can live with. - */ - } - if (context.configuration().profile()) { - return DriverCompletionInfo.includingProfiles( + if (LOGGER.isDebugEnabled() || context.configuration().profile()) { + DriverCompletionInfo driverCompletionInfo = DriverCompletionInfo.includingProfiles( drivers, context.description(), clusterService.getClusterName().value(), transportService.getLocalNode().getName(), - planString + planString, + planTimeProfile ); - } else { - return DriverCompletionInfo.excludingProfiles(drivers); + LOGGER.debug("finished {}", driverCompletionInfo); + if (context.configuration().profile()) { + /* + * planString *might* be null if we *just* set DEBUG to *after* + * we built the listener but before we got here. That's something + * we can live with. + */ + return driverCompletionInfo; + } } + + return DriverCompletionInfo.excludingProfiles(drivers); }); } @@ -755,8 +762,10 @@ static ReductionPlan reductionPlan( FoldContext foldCtx, ExchangeSinkExec originalPlan, boolean runNodeLevelReduction, - boolean reduceNodeLateMaterialization + boolean reduceNodeLateMaterialization, + PlanTimeProfile planTimeProfile ) { + long startTime = planTimeProfile == null ? 0 : System.nanoTime(); PhysicalPlan source = new ExchangeSourceExec(originalPlan.source(), originalPlan.output(), originalPlan.isIntermediateAgg()); ReductionPlan defaultResult = new ReductionPlan(originalPlan.replaceChild(source), originalPlan); if (reduceNodeLateMaterialization == false && runNodeLevelReduction == false) { @@ -768,7 +777,7 @@ static ReductionPlan reductionPlan( originalPlan ); // The default plan is just the exchange source piped directly into the exchange sink. - return switch (PlannerUtils.reductionPlan(originalPlan)) { + ReductionPlan reductionPlan = switch (PlannerUtils.reductionPlan(originalPlan)) { case PlannerUtils.TopNReduction topN when reduceNodeLateMaterialization -> // In the case of TopN, the source output type is replaced since we're pulling the FieldExtractExec to the reduction node, // so essential we are splitting the TopNExec into two parts, similar to other aggregations, but unlike other aggregations, @@ -783,6 +792,10 @@ static ReductionPlan reductionPlan( case PlannerUtils.ReducedPlan rp when runNodeLevelReduction -> placePlanBetweenExchanges.apply(rp.plan()); default -> defaultResult; }; + if (planTimeProfile != null) { + planTimeProfile.addReductionPlanNanos(System.nanoTime() - startTime); + } + return reductionPlan; } String newChildSession(String session) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index 1b9bb37d2a391..ab2273744dfb9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -22,6 +22,7 @@ import org.elasticsearch.compute.lucene.IndexedByShardId; import org.elasticsearch.compute.lucene.IndexedByShardIdFromSingleton; import org.elasticsearch.compute.operator.DriverCompletionInfo; +import org.elasticsearch.compute.operator.PlanTimeProfile; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSink; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; @@ -251,6 +252,7 @@ private class DataNodeRequestExecutor { private final boolean failFastOnShardFailure; private final Map shardLevelFailures; private final ComputeSearchContextByShardId searchContexts; + private final PlanTimeProfile planTimeProfile; DataNodeRequestExecutor( EsqlFlags flags, @@ -275,6 +277,7 @@ private class DataNodeRequestExecutor { this.singleShardPipeline = singleShardPipeline; this.blockingSink = exchangeSink.createExchangeSink(() -> {}); this.searchContexts = searchContexts; + this.planTimeProfile = new PlanTimeProfile(); } void start() { @@ -342,7 +345,13 @@ public void onFailure(Exception e) { null, () -> exchangeSink.createExchangeSink(pagesProduced::incrementAndGet) ); - computeService.runCompute(parentTask, computeContext, request.plan(), sub.acquireCompute()); + computeService.runCompute( + parentTask, + computeContext, + request.plan(), + planTimeProfile, + sub.acquireCompute() + ); } } } else { @@ -357,7 +366,7 @@ public void onFailure(Exception e) { null, () -> exchangeSink.createExchangeSink(pagesProduced::incrementAndGet) ); - computeService.runCompute(parentTask, computeContext, request.plan(), batchListener); + computeService.runCompute(parentTask, computeContext, request.plan(), planTimeProfile, batchListener); } }, batchListener::onFailure) ); @@ -472,6 +481,7 @@ private void runComputeOnDataNode( DataNodeRequest request, boolean failFastOnShardFailure, ComputeSearchContextByShardId searchContexts, + PlanTimeProfile planTimeProfile, ActionListener listener ) { final Map shardLevelFailures = new HashMap<>(); @@ -529,6 +539,7 @@ private void runComputeOnDataNode( () -> externalSink.createExchangeSink(() -> {}) ), reducePlan, + planTimeProfile, ActionListener.wrap(resp -> { // don't return until all pages are fetched externalSink.addCompletionListener(ActionListener.running(() -> { @@ -557,6 +568,10 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T Configuration configuration = request.configuration(); // We can avoid synchronization (for the most part) since the array elements are never modified, and the array is only added to, // with its size being known before we start the computation. + PlanTimeProfile planTimeProfile = null; + if (configuration.profile()) { + planTimeProfile = new PlanTimeProfile(); + } if (request.plan() instanceof ExchangeSinkExec plan) { reductionPlan = ComputeService.reductionPlan( computeService.plannerSettings(), @@ -565,7 +580,8 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T configuration.newFoldContext(), plan, request.runNodeLevelReduction(), - request.reductionLateMaterialization() + request.reductionLateMaterialization(), + planTimeProfile ); } else { listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + request.plan())); @@ -594,6 +610,7 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T request.withPlan(reductionPlan.dataNodePlan()), failFastOnShardFailures, computeSearchContexts, + planTimeProfile, ActionListener.releaseAfter(listener, computeSearchContexts) ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 875d2ea4391ef..b94acc7316067 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -247,7 +247,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener computeService.execute( + PlanRunner planRunner = (plan, configuration, foldCtx, planTimeProfile, resultListener) -> computeService.execute( sessionId, (CancellableTask) task, flags, @@ -255,6 +255,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener listener); + void run( + PhysicalPlan plan, + Configuration configuration, + FoldContext foldContext, + PlanTimeProfile planTimeProfile, + ActionListener listener + ); } private static final TransportVersion LOOKUP_JOIN_CCS = TransportVersion.fromName("lookup_join_ccs"); @@ -190,6 +197,7 @@ public void execute( assert executionInfo != null : "Null EsqlExecutionInfo"; LOGGER.debug("ESQL query:\n{}", request.query()); EsqlStatement statement = parse(request); + PlanTimeProfile planTimeProfile = request.profile() ? new PlanTimeProfile() : null; Configuration configuration = new Configuration( request.timeZone() == null ? statement.setting(QuerySettings.TIME_ZONE) @@ -242,9 +250,12 @@ public void onResponse(Versioned analyzedPlan) { new LogicalOptimizerContext(configuration, foldContext, minimumVersion) ); - SubscribableListener.newForked(l -> preOptimizedPlan(plan, logicalPlanPreOptimizer, l)) + SubscribableListener.newForked(l -> preOptimizedPlan(plan, logicalPlanPreOptimizer, planTimeProfile, l)) .andThen( - (l, p) -> preMapper.preMapper(new Versioned<>(optimizedPlan(p, logicalPlanOptimizer), minimumVersion), l) + (l, p) -> preMapper.preMapper( + new Versioned<>(optimizedPlan(p, logicalPlanOptimizer, planTimeProfile), minimumVersion), + l + ) ) .andThen( (l, p) -> executeOptimizedPlan( @@ -255,6 +266,7 @@ public void onResponse(Versioned analyzedPlan) { configuration, foldContext, minimumVersion, + planTimeProfile, l ) ) @@ -289,6 +301,7 @@ public void executeOptimizedPlan( Configuration configuration, FoldContext foldContext, TransportVersion minimumVersion, + PlanTimeProfile planTimeProfile, ActionListener listener ) { assert ThreadPool.assertCurrentThreadPool( @@ -299,7 +312,7 @@ public void executeOptimizedPlan( var physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration, minimumVersion)); if (explainMode) {// TODO: INLINE STATS come back to the explain mode branch and reevaluate - PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request, physicalPlanOptimizer); + PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request, physicalPlanOptimizer, planTimeProfile); String physicalPlanString = physicalPlan.toString(); List fields = List.of( new ReferenceAttribute(EMPTY, null, "role", DataType.KEYWORD), @@ -312,12 +325,22 @@ public void executeOptimizedPlan( values.add(List.of("coordinator", "optimizedPhysicalPlan", physicalPlanString)); var blocks = BlockUtils.fromList(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, values); physicalPlan = new LocalSourceExec(Source.EMPTY, fields, LocalSupplier.of(new Page(blocks))); - planRunner.run(physicalPlan, configuration, foldContext, listener); + planRunner.run(physicalPlan, configuration, foldContext, planTimeProfile, listener); } else { // TODO: this could be snuck into the underlying listener EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); // execute any potential subplans - executeSubPlans(optimizedPlan, configuration, foldContext, planRunner, executionInfo, request, physicalPlanOptimizer, listener); + executeSubPlans( + optimizedPlan, + configuration, + foldContext, + planRunner, + executionInfo, + request, + physicalPlanOptimizer, + planTimeProfile, + listener + ); } } @@ -329,6 +352,7 @@ private void executeSubPlans( EsqlExecutionInfo executionInfo, EsqlQueryRequest request, PhysicalPlanOptimizer physicalPlanOptimizer, + PlanTimeProfile planTimeProfile, ActionListener listener ) { var subPlansResults = new HashSet(); @@ -348,13 +372,14 @@ private void executeSubPlans( request, subPlansResults, physicalPlanOptimizer, + planTimeProfile, // Ensure we don't have subplan flag stuck in there on failure ActionListener.runAfter(listener, executionInfo::finishSubPlans) ); } else { - PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request, physicalPlanOptimizer); + PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request, physicalPlanOptimizer, planTimeProfile); // execute main plan - runner.run(physicalPlan, configuration, foldContext, listener); + runner.run(physicalPlan, configuration, foldContext, planTimeProfile, listener); } } @@ -369,15 +394,16 @@ private void executeSubPlan( EsqlQueryRequest request, Set subPlansResults, PhysicalPlanOptimizer physicalPlanOptimizer, + PlanTimeProfile planTimeProfile, ActionListener listener ) { LOGGER.debug("Executing subplan:\n{}", subPlans.stubReplacedSubPlan()); // Create a physical plan out of the logical sub-plan - var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.stubReplacedSubPlan(), request, physicalPlanOptimizer); + var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.stubReplacedSubPlan(), request, physicalPlanOptimizer, planTimeProfile); executionInfo.startSubPlans(); - runner.run(physicalSubPlan, configuration, foldContext, listener.delegateFailureAndWrap((next, result) -> { + runner.run(physicalSubPlan, configuration, foldContext, planTimeProfile, listener.delegateFailureAndWrap((next, result) -> { AtomicReference localRelationPage = new AtomicReference<>(); try { // Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation @@ -396,11 +422,12 @@ private void executeSubPlan( if (newSubPlan == null) {// run the final "main" plan executionInfo.finishSubPlans(); LOGGER.debug("Executing final plan:\n{}", newMainPlan); - var newPhysicalPlan = logicalPlanToPhysicalPlan(newMainPlan, request, physicalPlanOptimizer); + var newPhysicalPlan = logicalPlanToPhysicalPlan(newMainPlan, request, physicalPlanOptimizer, planTimeProfile); runner.run( newPhysicalPlan, configuration, foldContext, + planTimeProfile, releasingNext.delegateFailureAndWrap((finalListener, finalResult) -> { completionInfoAccumulator.accumulate(finalResult.completionInfo()); finalListener.onResponse( @@ -420,6 +447,7 @@ private void executeSubPlan( request, subPlansResults, physicalPlanOptimizer, + planTimeProfile, releasingNext ); } @@ -1054,9 +1082,10 @@ private void analyzeWithRetry( private PhysicalPlan logicalPlanToPhysicalPlan( LogicalPlan optimizedPlan, EsqlQueryRequest request, - PhysicalPlanOptimizer physicalPlanOptimizer + PhysicalPlanOptimizer physicalPlanOptimizer, + PlanTimeProfile planTimeProfile ) { - PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan, physicalPlanOptimizer); + PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan, physicalPlanOptimizer, planTimeProfile); physicalPlan = PlannerUtils.integrateEsFilterIntoFragment(physicalPlan, request.filter()); return EstimatesRowSize.estimateRowSize(0, physicalPlan); } @@ -1070,11 +1099,15 @@ private LogicalPlan analyzedPlan(LogicalPlan parsed, Configuration configuration return plan; } - public LogicalPlan optimizedPlan(LogicalPlan logicalPlan, LogicalPlanOptimizer logicalPlanOptimizer) { + public LogicalPlan optimizedPlan(LogicalPlan logicalPlan, LogicalPlanOptimizer logicalPlanOptimizer, PlanTimeProfile planTimeProfile) { if (logicalPlan.preOptimized() == false) { throw new IllegalStateException("Expected pre-optimized plan"); } + long start = planTimeProfile == null ? 0L : System.nanoTime(); var plan = logicalPlanOptimizer.optimize(logicalPlan); + if (planTimeProfile != null) { + planTimeProfile.addLogicalOptimizationPlanTime(System.nanoTime() - start); + } LOGGER.debug("Optimized logicalPlan plan:\n{}", plan); return plan; } @@ -1082,9 +1115,16 @@ public LogicalPlan optimizedPlan(LogicalPlan logicalPlan, LogicalPlanOptimizer l public void preOptimizedPlan( LogicalPlan logicalPlan, LogicalPlanPreOptimizer logicalPlanPreOptimizer, + PlanTimeProfile planTimeProfile, ActionListener listener ) { - logicalPlanPreOptimizer.preOptimize(logicalPlan, listener); + long start = planTimeProfile == null ? 0L : System.nanoTime(); + logicalPlanPreOptimizer.preOptimize(logicalPlan, listener.delegateResponse((l, e) -> { l.onFailure(e); }).map(plan -> { + if (planTimeProfile != null) { + planTimeProfile.addLogicalOptimizationPlanTime(System.nanoTime() - start); + } + return plan; + })); } private PhysicalPlan physicalPlan(Versioned optimizedPlan) { @@ -1098,10 +1138,18 @@ private PhysicalPlan physicalPlan(Versioned optimizedPlan) { return plan; } - private PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan, PhysicalPlanOptimizer physicalPlanOptimizer) { + private PhysicalPlan optimizedPhysicalPlan( + LogicalPlan optimizedPlan, + PhysicalPlanOptimizer physicalPlanOptimizer, + PlanTimeProfile planTimeProfile + ) { + long start = planTimeProfile == null ? 0L : System.nanoTime(); var plan = physicalPlanOptimizer.optimize( physicalPlan(new Versioned<>(optimizedPlan, physicalPlanOptimizer.context().minimumVersion())) ); + if (planTimeProfile != null) { + planTimeProfile.addPhysicalOptimizationPlanTime(System.nanoTime() - start); + } LOGGER.debug("Optimized physical plan:\n{}", plan); return plan; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 6a94ca2d392b3..423f9f11de398 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverRunner; +import org.elasticsearch.compute.operator.PlanTimeProfile; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.compute.querydsl.query.SingleValueMatchQuery; @@ -698,15 +699,17 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { new LogicalPreOptimizerContext(foldCtx, mock(InferenceService.class), minimumVersion) ); var logicalPlanOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration, foldCtx, minimumVersion)); - session.preOptimizedPlan(analyzed, logicalPlanPreOptimizer, listener.delegateFailureAndWrap((l, preOptimized) -> { + PlanTimeProfile planTimeProfile = configuration.profile() ? new PlanTimeProfile() : null; + session.preOptimizedPlan(analyzed, logicalPlanPreOptimizer, planTimeProfile, listener.delegateFailureAndWrap((l, preOptimized) -> { session.executeOptimizedPlan( new EsqlQueryRequest(), new EsqlExecutionInfo(randomBoolean()), planRunner(bigArrays, physicalOperationProviders), - session.optimizedPlan(preOptimized, logicalPlanOptimizer), + session.optimizedPlan(preOptimized, logicalPlanOptimizer, planTimeProfile), configuration, foldCtx, minimumVersion, + planTimeProfile, listener.delegateFailureAndWrap( // Wrap so we can capture the warnings in the calling thread (next, result) -> next.onResponse( @@ -767,7 +770,7 @@ private void assertWarnings(List warnings, Object context) { } PlanRunner planRunner(BigArrays bigArrays, TestPhysicalOperationProviders physicalOperationProviders) { - return (physicalPlan, configuration, foldContext, listener) -> executeSubPlan( + return (physicalPlan, configuration, foldContext, planTimeProfile, listener) -> executeSubPlan( bigArrays, foldContext, physicalOperationProviders, @@ -841,7 +844,7 @@ void executeSubPlan( new LocalPhysicalOptimizerContext(TEST_PLANNER_SETTINGS, flags, configuration, foldCtx, searchStats) ); - var csvDataNodePhysicalPlan = PlannerUtils.localPlan(dataNodePlan, logicalTestOptimizer, physicalTestOptimizer); + var csvDataNodePhysicalPlan = PlannerUtils.localPlan(dataNodePlan, logicalTestOptimizer, physicalTestOptimizer, null); exchangeSource.addRemoteSink( exchangeSink::fetchPageAsync, Randomness.get().nextBoolean(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java index 3983679a5291f..d7e1057f726ae 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.compute.operator.DriverSleeps; import org.elasticsearch.compute.operator.OperatorStatus; import org.elasticsearch.compute.operator.PlanProfile; +import org.elasticsearch.compute.operator.PlanTimeProfile; import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.esql.EsqlTestUtils; @@ -71,10 +72,20 @@ private static List randomDriverProfiles() { private static List randomPlanProfiles() { return randomList( 10, - () -> new PlanProfile(randomIdentifier(), randomIdentifier(), randomIdentifier(), randomAlphanumericOfLength(1024)) + () -> new PlanProfile( + randomIdentifier(), + randomIdentifier(), + randomIdentifier(), + randomAlphanumericOfLength(1024), + randomPlanTimeProfile() + ) ); } + private static PlanTimeProfile randomPlanTimeProfile() { + return randomBoolean() ? null : new PlanTimeProfile(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + } + private static OperatorStatus randomOperatorStatus() { return new OperatorStatus( randomAlphaOfLength(4), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index d220c55eac2ce..ef98b9b422025 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -1036,7 +1036,7 @@ public void testProfileXContent() { DriverSleeps.empty() ) ), - List.of(new PlanProfile("test", "elasticsearch", "node-1", "plan tree")), + List.of(new PlanProfile("test", "elasticsearch", "node-1", "plan tree", null)), minimumVersion ), false, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 8c04500ee68a9..7ee44b53f02bd 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -8179,7 +8179,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP var plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(EstimatesRowSize.estimateRowSize(0, plan), config); plan = useDataNodePlan ? plans.v2() : plans.v1(); var flags = new EsqlFlags(true); - plan = PlannerUtils.localPlan(TEST_PLANNER_SETTINGS, flags, config, FoldContext.small(), plan, TEST_SEARCH_STATS); + plan = PlannerUtils.localPlan(TEST_PLANNER_SETTINGS, flags, config, FoldContext.small(), plan, TEST_SEARCH_STATS, null); ExchangeSinkHandler exchangeSinkHandler = new ExchangeSinkHandler(null, 10, () -> 10); LocalExecutionPlanner planner = new LocalExecutionPlanner( "test", @@ -8558,7 +8558,7 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, TestDataSource data, Searc var l = p.transformUp(FragmentExec.class, fragment -> { var flags = new EsqlFlags(true); - var localPlan = PlannerUtils.localPlan(TEST_PLANNER_SETTINGS, flags, config, FoldContext.small(), fragment, stats); + var localPlan = PlannerUtils.localPlan(TEST_PLANNER_SETTINGS, flags, config, FoldContext.small(), fragment, stats, null); return EstimatesRowSize.estimateRowSize(fragment.estimatedRowSize(), localPlan); }); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java index 8445df5b2277c..5a3a421660683 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java @@ -90,7 +90,7 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, E new LocalPhysicalOptimizerContext(TEST_PLANNER_SETTINGS, esqlFlags, config, FoldContext.small(), searchStats), true ); - var l = PlannerUtils.localPlan(physicalPlan, logicalTestOptimizer, physicalTestOptimizer); + var l = PlannerUtils.localPlan(physicalPlan, logicalTestOptimizer, physicalTestOptimizer, null); // handle local reduction alignment l = PhysicalPlanOptimizerTests.localRelationshipAlignment(l); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java index 88d12b163cf01..33633e9ff8a3c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverSleeps; import org.elasticsearch.compute.operator.PlanProfile; +import org.elasticsearch.compute.operator.PlanTimeProfile; import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; @@ -82,11 +83,21 @@ private DriverCompletionInfo randomCompletionInfo() { randomList( 0, 2, - () -> new PlanProfile(randomIdentifier(), randomIdentifier(), randomIdentifier(), randomAlphaOfLengthBetween(1, 1024)) + () -> new PlanProfile( + randomIdentifier(), + randomIdentifier(), + randomIdentifier(), + randomAlphaOfLengthBetween(1, 1024), + randomPlanTimeProfile() + ) ) ); } + private PlanTimeProfile randomPlanTimeProfile() { + return randomBoolean() ? null : new PlanTimeProfile(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + } + public void testEmpty() { PlainActionFuture results = new PlainActionFuture<>(); try (var ignored = new ComputeListener(threadPool, () -> {}, results)) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java index 59337e345aed0..6c8291f8b8ae3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java @@ -160,7 +160,7 @@ public void testFailedMetric() { // test a failed query: xyz field doesn't exist request.query("from test | stats m = max(xyz)"); request.allowPartialResults(false); - EsqlSession.PlanRunner runPhase = (p, configuration, foldContext, r) -> fail("this shouldn't happen"); + EsqlSession.PlanRunner runPhase = (p, configuration, foldContext, planTimeProfile, r) -> fail("this shouldn't happen"); IndicesExpressionGrouper groupIndicesByCluster = (indicesOptions, indexExpressions, returnLocalAll) -> Map.of( "", new OriginalIndices(new String[] { "test" }, IndicesOptions.DEFAULT) @@ -196,7 +196,7 @@ public void onFailure(Exception e) { // fix the failing query: foo field does exist request.query("from test | stats m = max(foo)"); - runPhase = (p, configuration, foldContext, r) -> r.onResponse(null); + runPhase = (p, configuration, foldContext, planTimeProfile, r) -> r.onResponse(null); planExecutor.esql( request, randomAlphaOfLength(10),