diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index ee81ad437515..f77bb636e4bf 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -220,6 +220,7 @@ public final class SystemSessionProperties public static final String CLOSE_IDLE_WRITERS_TRIGGER_DURATION = "close_idle_writers_trigger_duration"; public static final String COLUMNAR_FILTER_EVALUATION_ENABLED = "columnar_filter_evaluation_enabled"; public static final String SPOOLING_ENABLED = "spooling_enabled"; + public static final String DEBUG_ADAPTIVE_PLANNER = "debug_adaptive_planner"; private final List> sessionProperties; @@ -1134,6 +1135,11 @@ public SystemSessionProperties( SPOOLING_ENABLED, "Enable client spooling protocol", true, + true), + booleanProperty( + DEBUG_ADAPTIVE_PLANNER, + "Enable debug information for the adaptive planner", + false, true)); } @@ -2033,4 +2039,9 @@ public static boolean isUnsafePushdownAllowed(Session session) { return session.getSystemProperty(ALLOW_UNSAFE_PUSHDOWN, Boolean.class); } + + public static boolean isDebugAdaptivePlannerEnabled(Session session) + { + return session.getSystemProperty(DEBUG_ADAPTIVE_PLANNER, Boolean.class); + } } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/AdaptivePlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/AdaptivePlanner.java index cac3a0df10c4..516cd62ac19d 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/AdaptivePlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/AdaptivePlanner.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.graph.Traverser; +import io.airlift.log.Logger; import io.trino.Session; import io.trino.cost.CachingTableStatsProvider; import io.trino.cost.RuntimeInfoProvider; @@ -34,6 +35,7 @@ import io.trino.sql.planner.plan.PlanNodeId; import io.trino.sql.planner.plan.RemoteSourceNode; import io.trino.sql.planner.plan.SimplePlanRewriter; +import io.trino.sql.planner.planprinter.PlanPrinter; import io.trino.sql.planner.sanity.PlanSanityChecker; import java.util.HashMap; @@ -48,6 +50,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.SystemSessionProperties.isDebugAdaptivePlannerEnabled; import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static io.trino.sql.planner.plan.ExchangeNode.Scope.REMOTE; import static io.trino.sql.planner.plan.SimplePlanRewriter.rewriteWith; @@ -83,6 +86,8 @@ */ public class AdaptivePlanner { + private static final Logger log = Logger.get(AdaptivePlanner.class); + private final Session session; private final PlannerContext plannerContext; private final List planOptimizers; @@ -115,10 +120,14 @@ public AdaptivePlanner( public SubPlan optimize(SubPlan root, RuntimeInfoProvider runtimeInfoProvider) { + boolean debugEnabled = isDebugAdaptivePlannerEnabled(session); // No need to run optimizer since the root is already finished or its stats are almost accurate based on // estimate by progress. // TODO: We need add an ability to re-plan fragment whose stats are estimated by progress. if (runtimeInfoProvider.getRuntimeOutputStats(root.getFragment().getId()).isAccurate()) { + if (debugEnabled) { + log.info("Skipping adaptive planning for %s as root stats are accurate", session.getQueryId()); + } return root; } @@ -133,10 +142,19 @@ public SubPlan optimize(SubPlan root, RuntimeInfoProvider runtimeInfoProvider) ReplaceRemoteSourcesWithExchanges rewriter = new ReplaceRemoteSourcesWithExchanges(runtimeInfoProvider); PlanNode currentAdaptivePlan = rewriteWith(rewriter, root.getFragment().getRoot(), root.getChildren()); + if (debugEnabled) { + log.info("Current adaptive plan for %s: %s", session.getQueryId(), planToString(currentAdaptivePlan)); + } // Remove the adaptive plan node and replace it with initial plan PlanNode initialPlan = getInitialPlan(currentAdaptivePlan); + if (debugEnabled) { + log.info("Initial plan for %s: %s", session.getQueryId(), planToString(initialPlan)); + } // Remove the adaptive plan node and replace it with current plan PlanNode currentPlan = getCurrentPlan(currentAdaptivePlan); + if (debugEnabled) { + log.info("Current plan for %s: %s", session.getQueryId(), planToString(currentPlan)); + } // Collect the sub plans for each remote exchange and remote source node. We will use this map during // re-fragmentation as a cache for all unchanged sub plans. @@ -148,12 +166,32 @@ public SubPlan optimize(SubPlan root, RuntimeInfoProvider runtimeInfoProvider) PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator(getMaxPlanId(currentPlan) + 1); AdaptivePlanOptimizer.Result optimizationResult = optimizePlan(currentPlan, symbolAllocator, runtimeInfoProvider, idAllocator); + if (debugEnabled) { + log.info("Optimized plan for %s: %s; new changedNodes: %s", session.getQueryId(), planToString(optimizationResult.plan()), optimizationResult.changedPlanNodes()); + } + // Check whether there are some changes in the plan after optimization if (optimizationResult.changedPlanNodes().isEmpty()) { return root; } this.cummulativeChangedPlanNodes.addAll(optimizationResult.changedPlanNodes()); + + if (debugEnabled) { + log.info("Cumulative changed nodes for %s: %s", session.getQueryId(), cummulativeChangedPlanNodes); + } + + if (debugEnabled) { + try { + // try running addAdaptivePlanNode incorrectly passing non-cumulative changePlanNode ids as last parameter to verify customer issue which is hard to + // reproduce locally. Do not fail; just log + addAdaptivePlanNode(new PlanNodeIdAllocator(getMaxPlanId(optimizationResult.plan()) + 1), initialPlan, optimizationResult.plan(), optimizationResult.changedPlanNodes()); + } + catch (Throwable exception) { + log.warn(exception, "Add adaptive plan node verify error for %s", session.getQueryId()); + } + } + // Add the adaptive plan node recursively where initialPlan remain as it is and optimizedPlan as new currentPlan PlanNode adaptivePlan = addAdaptivePlanNode(idAllocator, initialPlan, optimizationResult.plan(), cummulativeChangedPlanNodes); // validate the adaptive plan @@ -175,6 +213,18 @@ public SubPlan optimize(SubPlan root, RuntimeInfoProvider runtimeInfoProvider) getUnchangedSubPlans(adaptivePlan, optimizationResult.changedPlanNodes(), exchangeSourceIdToSubPlan)); } + private String planToString(PlanNode plan) + { + return PlanPrinter.textLogicalPlan( + plan, + plannerContext.getMetadata(), + plannerContext.getFunctionManager(), + StatsAndCosts.empty(), + session, + 0, + false); + } + private AdaptivePlanOptimizer.Result optimizePlan( PlanNode plan, SymbolAllocator symbolAllocator,