Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugin/esql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ tasks.named("test").configure {
if (System.getProperty("golden.overwrite") != null || project.hasProperty("golden.overwrite")) {
systemProperty "golden.overwrite", "true"
}

systemProperty "policy.directory", file("${projectDir}").absolutePath
systemProperty "java.security.policy", file("${projectDir}/test.policy").absolutePath
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public String nodeString(NodeStringFormat format) {
sb.append(estimatedRowSize);
sb.append(", reducer=[");
sb.append("], fragment=[<>\n");
sb.append(fragment.toString());
sb.append(fragment.toString(format));
sb.append("<>]]");
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ public static PhysicalPlan localPlan(
LocalPhysicalPlanOptimizer physicalOptimizer,
PlanTimeProfile planTimeProfile
) {
// TODO add a test assertion for the consistency checker (after https://github.com/elastic/elasticsearch/issues/141654, see
// https://github.com/elastic/elasticsearch/pull/141082/changes#r2745334028);
var isCoordPlan = new Holder<>(Boolean.TRUE);
Set<PhysicalPlan> lookupJoinExecRightChildren = plan.collect(LookupJoinExec.class::isInstance)
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ void runComputeOnRemoteCluster(
),
coordinatorPlan,
computeService.plannerSettings().get(),
LocalPhysicalOptimization.ENABLED,
configuration.profile() ? new PlanTimeProfile() : null,
computeListener.acquireCompute()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void execute(
ActionListener<Result> listener
) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
ESQL_WORKER_THREAD_POOL_NAME,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION
Expand Down Expand Up @@ -284,7 +284,15 @@ public void execute(
})
)
) {
runCompute(rootTask, computeContext, mainPlan, plannerSettings.get(), planTimeProfile, localListener.acquireCompute());
runCompute(
rootTask,
computeContext,
mainPlan,
plannerSettings.get(),
LocalPhysicalOptimization.ENABLED,
planTimeProfile,
localListener.acquireCompute()
);

for (int i = 0; i < subplans.size(); i++) {
var subplan = subplans.get(i);
Expand Down Expand Up @@ -392,6 +400,7 @@ public void executePlan(
computeContext,
coordinatorPlan,
plannerSettings.get(),
LocalPhysicalOptimization.ENABLED,
planTimeProfile,
computeListener.acquireCompute()
);
Expand Down Expand Up @@ -476,6 +485,7 @@ public void executePlan(
),
coordinatorPlan,
plannerSettings.get(),
LocalPhysicalOptimization.ENABLED,
planTimeProfile,
localListener.acquireCompute()
);
Expand Down Expand Up @@ -651,6 +661,7 @@ void runCompute(
ComputeContext context,
PhysicalPlan plan,
PlannerSettings plannerSettings,
LocalPhysicalOptimization localPhysicalOptimization,
PlanTimeProfile planTimeProfile,
ActionListener<DriverCompletionInfo> listener
) {
Expand Down Expand Up @@ -683,15 +694,18 @@ void runCompute(

List<SearchExecutionContext> localContexts = new ArrayList<>();
context.searchExecutionContexts().iterable().forEach(localContexts::add);
var localPlan = PlannerUtils.localPlan(
plannerSettings,
context.flags(),
localContexts,
context.configuration(),
context.foldCtx(),
plan,
planTimeProfile
);
var localPlan = switch (localPhysicalOptimization) {
case ENABLED -> PlannerUtils.localPlan(
plannerSettings,
context.flags(),
localContexts,
context.configuration(),
context.foldCtx(),
plan,
planTimeProfile
);
case DISABLED -> plan;
};
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Local plan for {}:\n{}", context.description(), localPlan);
}
Expand Down Expand Up @@ -767,7 +781,8 @@ ActionListener<Void> addCompletionInfo(
});
}

static ReductionPlan reductionPlan(
// public for testing
public static ReductionPlan reductionPlan(
PlannerSettings plannerSettings,
EsqlFlags flags,
Configuration configuration,
Expand All @@ -779,21 +794,22 @@ static ReductionPlan reductionPlan(
) {
long startTime = planTimeProfile == null ? 0 : System.nanoTime();
PhysicalPlan source = new ExchangeSourceExec(originalPlan.source(), originalPlan.output(), originalPlan.isIntermediateAgg());
ReductionPlan defaultResult = new ReductionPlan(originalPlan.replaceChild(source), originalPlan);
ReductionPlan defaultResult = new ReductionPlan(originalPlan.replaceChild(source), originalPlan, LocalPhysicalOptimization.ENABLED);
if (reduceNodeLateMaterialization == false && runNodeLevelReduction == false) {
return defaultResult;
}

Function<PhysicalPlan, ReductionPlan> placePlanBetweenExchanges = p -> new ReductionPlan(
originalPlan.replaceChild(p.replaceChildren(List.of(source))),
originalPlan
originalPlan,
LocalPhysicalOptimization.ENABLED
);
// The default plan is just the exchange source piped directly into the exchange sink.
ReductionPlan reductionPlan = switch (PlannerUtils.reductionPlan(originalPlan)) {
case PlannerUtils.TopNReduction topN when reduceNodeLateMaterialization ->
// In the case of TopN, the source output type is replaced since we're pulling the FieldExtractExec to the reduction node,
// so essential we are splitting the TopNExec into two parts, similar to other aggregations, but unlike other aggregations,
// we also need the original plan, since we add the project in the reduction node.
// so essentially we are splitting the TopNExec into two parts, similar to other aggregations, but unlike other
// aggregations, we also need the original plan, since we add the project in the reduction node.
LateMaterializationPlanner.planReduceDriverTopN(
stats -> new LocalPhysicalOptimizerContext(plannerSettings, flags, configuration, foldCtx, stats),
originalPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ public void onFailure(Exception e) {
computeContext,
request.plan(),
computeService.plannerSettings().get(),
LocalPhysicalOptimization.ENABLED,
planTimeProfile,
sub.acquireCompute()
);
Expand All @@ -374,6 +375,7 @@ public void onFailure(Exception e) {
computeContext,
request.plan(),
computeService.plannerSettings().get(),
LocalPhysicalOptimization.ENABLED,
planTimeProfile,
batchListener
);
Expand Down Expand Up @@ -480,6 +482,7 @@ private void runComputeOnDataNode(
CancellableTask task,
String externalId,
PhysicalPlan reducePlan,
LocalPhysicalOptimization localPhysicalOptimization,
DataNodeRequest request,
boolean failFastOnShardFailure,
AcquiredSearchContexts searchContexts,
Expand Down Expand Up @@ -543,6 +546,7 @@ private void runComputeOnDataNode(
),
reducePlan,
plannerSettings,
localPhysicalOptimization,
planTimeProfile,
ActionListener.wrap(resp -> {
// don't return until all pages are fetched
Expand Down Expand Up @@ -611,6 +615,7 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
(CancellableTask) task,
sessionId,
reductionPlan.nodeReducePlan(),
reductionPlan.localPhysicalOptimization(),
request.withPlan(reductionPlan.dataNodePlan()),
failFastOnShardFailures,
computeSearchContexts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public static Optional<ReductionPlan> planReduceDriverTopN(
}
var updatedFragment = new Project(Source.EMPTY, withAddedDocToRelation, expectedDataOutput);
FragmentExec updatedFragmentExec = fragmentExec.withFragment(updatedFragment);
// TODO This ignores the possible change in output, see #141654
ExchangeSinkExec updatedDataPlan = originalPlan.replaceChild(updatedFragmentExec);

// Replace the TopN child with the data driver as the source.
Expand All @@ -141,7 +142,9 @@ public static Optional<ReductionPlan> planReduceDriverTopN(
EstimatesRowSize.estimateRowSize(updatedFragmentExec.estimatedRowSize(), reductionPlan)
);

return Optional.of(new ReductionPlan(reductionPlanWithSize, updatedDataPlan));
// The TopN reduction plan should not be further optimized locally on the node reduce driver, since we took great pains to
// preplan in advance, including all the necessary field extractions!
return Optional.of(new ReductionPlan(reductionPlanWithSize, updatedDataPlan, LocalPhysicalOptimization.DISABLED));
}

private static PhysicalPlan toPhysical(LogicalPlan plan, LocalPhysicalOptimizerContext context) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plugin;

/** This class is {@code public} for testing. */
public enum LocalPhysicalOptimization {
ENABLED,
DISABLED
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,20 @@
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;

/**
* This class is {@code public} for testing.
* @param nodeReducePlan The plan to be executed on the node_reduce driver. This should <i>not</i> contain a
* {@link org.elasticsearch.xpack.esql.plan.physical.FragmentExec}, but be a plan "sandwiched" between an {@link ExchangeSinkExec} and an
* {@link org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec}.
* @param dataNodePlan The plan to be executed on the data driver. This may contain a
* {@link org.elasticsearch.xpack.esql.plan.physical.FragmentExec}.
*/
record ReductionPlan(ExchangeSinkExec nodeReducePlan, ExchangeSinkExec dataNodePlan) {}
public record ReductionPlan(
ExchangeSinkExec nodeReducePlan,
ExchangeSinkExec dataNodePlan,
// TODO This should always be DISABLED; see https://github.com/elastic/elasticsearch/issues/142392.
LocalPhysicalOptimization localPhysicalOptimization
) {
public ReductionPlan withoutLocalPhysicalOptimization() {
return new ReductionPlan(nodeReducePlan, dataNodePlan, LocalPhysicalOptimization.DISABLED);
}
}
Loading