diff --git a/core/trino-main/src/main/java/io/trino/cost/CostCalculatorUsingExchanges.java b/core/trino-main/src/main/java/io/trino/cost/CostCalculatorUsingExchanges.java index 0b6a3cd23af1..c3e50fd0973f 100644 --- a/core/trino-main/src/main/java/io/trino/cost/CostCalculatorUsingExchanges.java +++ b/core/trino-main/src/main/java/io/trino/cost/CostCalculatorUsingExchanges.java @@ -47,6 +47,7 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.cost.CostCalculatorWithEstimatedExchanges.adjustReplicatedJoinLocalExchangeCost; import static io.trino.cost.CostCalculatorWithEstimatedExchanges.calculateJoinInputCost; import static io.trino.cost.CostCalculatorWithEstimatedExchanges.calculateLocalRepartitionCost; import static io.trino.cost.CostCalculatorWithEstimatedExchanges.calculateRemoteGatherCost; @@ -192,15 +193,24 @@ public PlanCostEstimate visitJoin(JoinNode node, Void context) private LocalCostEstimate calculateJoinCost(PlanNode join, PlanNode probe, PlanNode build, boolean replicated) { + int estimatedSourceDistributedTaskCount = taskCountEstimator.estimateSourceDistributedTaskCount(session); LocalCostEstimate joinInputCost = calculateJoinInputCost( probe, build, stats, types, replicated, - taskCountEstimator.estimateSourceDistributedTaskCount(session)); + estimatedSourceDistributedTaskCount); + // TODO: Use traits (https://github.com/trinodb/trino/issues/4763) instead, to correctly estimate + // local exchange cost for replicated join in CostCalculatorUsingExchanges#visitExchange + LocalCostEstimate adjustedLocalExchangeCost = adjustReplicatedJoinLocalExchangeCost( + build, + stats, + types, + replicated, + estimatedSourceDistributedTaskCount); LocalCostEstimate joinOutputCost = calculateJoinOutputCost(join); - return addPartialComponents(joinInputCost, joinOutputCost); + return addPartialComponents(joinInputCost, adjustedLocalExchangeCost, joinOutputCost); } private LocalCostEstimate calculateJoinOutputCost(PlanNode join) diff --git a/core/trino-main/src/main/java/io/trino/cost/CostCalculatorWithEstimatedExchanges.java b/core/trino-main/src/main/java/io/trino/cost/CostCalculatorWithEstimatedExchanges.java index 265c9bd6137d..ffaec43424fa 100644 --- a/core/trino-main/src/main/java/io/trino/cost/CostCalculatorWithEstimatedExchanges.java +++ b/core/trino-main/src/main/java/io/trino/cost/CostCalculatorWithEstimatedExchanges.java @@ -214,6 +214,14 @@ public static LocalCostEstimate calculateJoinCostWithoutOutput( types, replicated, estimatedSourceDistributedTaskCount); + // TODO: Remove once traits (https://github.com/trinodb/trino/issues/4763) are used to correctly estimate + // local exchange cost for replicated join in CostCalculatorUsingExchanges#visitExchange + LocalCostEstimate adjustedLocalExchangeCost = adjustReplicatedJoinLocalExchangeCost( + build, + stats, + types, + replicated, + estimatedSourceDistributedTaskCount); LocalCostEstimate inputCost = calculateJoinInputCost( probe, build, @@ -221,7 +229,38 @@ public static LocalCostEstimate calculateJoinCostWithoutOutput( types, replicated, estimatedSourceDistributedTaskCount); - return addPartialComponents(exchangesCost, inputCost); + return addPartialComponents(exchangesCost, adjustedLocalExchangeCost, inputCost); + } + + public static LocalCostEstimate adjustReplicatedJoinLocalExchangeCost( + PlanNode build, + StatsProvider stats, + TypeProvider types, + boolean replicated, + int estimatedSourceDistributedTaskCount) + { + if (!replicated) { + return LocalCostEstimate.zero(); + } + + /* + * HACK! + * + * Stats model doesn't multiply the number of rows by the number of tasks for replicated + * exchange to avoid misestimation of the JOIN output. + * + * Thus the cost estimation for the operations that come after a replicated exchange is + * underestimated. And the cost of operations over the replicated copies must be explicitly + * added here. + */ + + // Add the cost of a local repartitioning of build side copies. + // Cost of the repartitioning of a single data copy has been already added in + // CostCalculatorWithEstimatedExchanges#calculateJoinExchangeCost or in CostCalculatorUsingExchanges#visitExchange + PlanNodeStatsEstimate buildStats = stats.getStats(build); + double buildSideSize = buildStats.getOutputSizeInBytes(build.getOutputSymbols(), types); + double cpuCost = buildSideSize * (estimatedSourceDistributedTaskCount - 1); + return LocalCostEstimate.of(cpuCost, 0, 0); } private static LocalCostEstimate calculateJoinExchangeCost( @@ -237,7 +276,7 @@ private static LocalCostEstimate calculateJoinExchangeCost( if (replicated) { // assuming the probe side of a replicated join is always source distributed LocalCostEstimate replicateCost = calculateRemoteReplicateCost(buildSizeInBytes, estimatedSourceDistributedTaskCount); - // cost of the copies repartitioning is added in CostCalculatorUsingExchanges#calculateJoinCost + // cost of the copies repartitioning is added in CostCalculatorWithEstimatedExchanges#adjustReplicatedJoinLocalExchangeCost LocalCostEstimate localRepartitionCost = calculateLocalRepartitionCost(buildSizeInBytes); return addPartialComponents(replicateCost, localRepartitionCost); } @@ -266,23 +305,6 @@ public static LocalCostEstimate calculateJoinInputCost( double probeSideSize = probeStats.getOutputSizeInBytes(probe.getOutputSymbols(), types); double cpuCost = probeSideSize + buildSideSize * buildSizeMultiplier; - - /* - * HACK! - * - * Stats model doesn't multiply the number of rows by the number of tasks for replicated - * exchange to avoid misestimation of the JOIN output. - * - * Thus the cost estimation for the operations that come after a replicated exchange is - * underestimated. And the cost of operations over the replicated copies must be explicitly - * added here. - */ - if (replicated) { - // add the cost of a local repartitioning of build side copies - // cost of the repartitioning of a single data copy has been already added in calculateExchangeCost - cpuCost += buildSideSize * (buildSizeMultiplier - 1); - } - double memoryCost = buildSideSize * buildSizeMultiplier; return LocalCostEstimate.of(cpuCost, memoryCost, 0);