Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public final class SystemSessionProperties
public static final String CLOSE_IDLE_WRITERS_TRIGGER_DURATION = "close_idle_writers_trigger_duration";
public static final String COLUMNAR_FILTER_EVALUATION_ENABLED = "columnar_filter_evaluation_enabled";
public static final String SPOOLING_ENABLED = "spooling_enabled";
public static final String DEBUG_ADAPTIVE_PLANNER = "debug_adaptive_planner";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -1134,6 +1135,11 @@ public SystemSessionProperties(
SPOOLING_ENABLED,
"Enable client spooling protocol",
true,
true),
booleanProperty(
DEBUG_ADAPTIVE_PLANNER,
"Enable debug information for the adaptive planner",
false,
true));
}

Expand Down Expand Up @@ -2033,4 +2039,9 @@ public static boolean isUnsafePushdownAllowed(Session session)
{
return session.getSystemProperty(ALLOW_UNSAFE_PUSHDOWN, Boolean.class);
}

public static boolean isDebugAdaptivePlannerEnabled(Session session)
{
return session.getSystemProperty(DEBUG_ADAPTIVE_PLANNER, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.graph.Traverser;
import io.airlift.log.Logger;
import io.trino.Session;
import io.trino.cost.CachingTableStatsProvider;
import io.trino.cost.RuntimeInfoProvider;
Expand All @@ -34,6 +35,7 @@
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.SimplePlanRewriter;
import io.trino.sql.planner.planprinter.PlanPrinter;
import io.trino.sql.planner.sanity.PlanSanityChecker;

import java.util.HashMap;
Expand All @@ -48,6 +50,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.SystemSessionProperties.isDebugAdaptivePlannerEnabled;
import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static io.trino.sql.planner.plan.ExchangeNode.Scope.REMOTE;
import static io.trino.sql.planner.plan.SimplePlanRewriter.rewriteWith;
Expand Down Expand Up @@ -83,6 +86,8 @@
*/
public class AdaptivePlanner
{
private static final Logger log = Logger.get(AdaptivePlanner.class);

private final Session session;
private final PlannerContext plannerContext;
private final List<AdaptivePlanOptimizer> planOptimizers;
Expand Down Expand Up @@ -115,10 +120,14 @@ public AdaptivePlanner(

public SubPlan optimize(SubPlan root, RuntimeInfoProvider runtimeInfoProvider)
{
boolean debugEnabled = isDebugAdaptivePlannerEnabled(session);
// No need to run optimizer since the root is already finished or its stats are almost accurate based on
// estimate by progress.
// TODO: We need add an ability to re-plan fragment whose stats are estimated by progress.
if (runtimeInfoProvider.getRuntimeOutputStats(root.getFragment().getId()).isAccurate()) {
if (debugEnabled) {
log.info("Skipping adaptive planning for %s as root stats are accurate", session.getQueryId());
}
return root;
}

Expand All @@ -133,10 +142,19 @@ public SubPlan optimize(SubPlan root, RuntimeInfoProvider runtimeInfoProvider)
ReplaceRemoteSourcesWithExchanges rewriter = new ReplaceRemoteSourcesWithExchanges(runtimeInfoProvider);
PlanNode currentAdaptivePlan = rewriteWith(rewriter, root.getFragment().getRoot(), root.getChildren());

if (debugEnabled) {
log.info("Current adaptive plan for %s: %s", session.getQueryId(), planToString(currentAdaptivePlan));
}
// Remove the adaptive plan node and replace it with initial plan
PlanNode initialPlan = getInitialPlan(currentAdaptivePlan);
if (debugEnabled) {
log.info("Initial plan for %s: %s", session.getQueryId(), planToString(initialPlan));
}
// Remove the adaptive plan node and replace it with current plan
PlanNode currentPlan = getCurrentPlan(currentAdaptivePlan);
if (debugEnabled) {
log.info("Current plan for %s: %s", session.getQueryId(), planToString(currentPlan));
}

// Collect the sub plans for each remote exchange and remote source node. We will use this map during
// re-fragmentation as a cache for all unchanged sub plans.
Expand All @@ -148,12 +166,32 @@ public SubPlan optimize(SubPlan root, RuntimeInfoProvider runtimeInfoProvider)
PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator(getMaxPlanId(currentPlan) + 1);
AdaptivePlanOptimizer.Result optimizationResult = optimizePlan(currentPlan, symbolAllocator, runtimeInfoProvider, idAllocator);

if (debugEnabled) {
log.info("Optimized plan for %s: %s; new changedNodes: %s", session.getQueryId(), planToString(optimizationResult.plan()), optimizationResult.changedPlanNodes());
}

// Check whether there are some changes in the plan after optimization
if (optimizationResult.changedPlanNodes().isEmpty()) {
return root;
}

this.cummulativeChangedPlanNodes.addAll(optimizationResult.changedPlanNodes());

if (debugEnabled) {
log.info("Cumulative changed nodes for %s: %s", session.getQueryId(), cummulativeChangedPlanNodes);
}

if (debugEnabled) {
try {
// try running addAdaptivePlanNode incorrectly passing non-cumulative changePlanNode ids as last parameter to verify customer issue which is hard to
// reproduce locally. Do not fail; just log
addAdaptivePlanNode(new PlanNodeIdAllocator(getMaxPlanId(optimizationResult.plan()) + 1), initialPlan, optimizationResult.plan(), optimizationResult.changedPlanNodes());
}
catch (Throwable exception) {
log.warn(exception, "Add adaptive plan node verify error for %s", session.getQueryId());
}
}

// Add the adaptive plan node recursively where initialPlan remain as it is and optimizedPlan as new currentPlan
PlanNode adaptivePlan = addAdaptivePlanNode(idAllocator, initialPlan, optimizationResult.plan(), cummulativeChangedPlanNodes);
// validate the adaptive plan
Expand All @@ -175,6 +213,18 @@ public SubPlan optimize(SubPlan root, RuntimeInfoProvider runtimeInfoProvider)
getUnchangedSubPlans(adaptivePlan, optimizationResult.changedPlanNodes(), exchangeSourceIdToSubPlan));
}

private String planToString(PlanNode plan)
{
return PlanPrinter.textLogicalPlan(
plan,
plannerContext.getMetadata(),
plannerContext.getFunctionManager(),
StatsAndCosts.empty(),
session,
0,
false);
}

private AdaptivePlanOptimizer.Result optimizePlan(
PlanNode plan,
SymbolAllocator symbolAllocator,
Expand Down
Loading