Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,30 @@ object JoinReorderDP extends PredicateHelper with Logging {
}
}

/**
* To identify the plan with smaller computational cost,
* we use the weighted geometric mean of ratio of rows and the ratio of sizes in bytes.
*
* There are other ways to combine these values as a cost comparison function.
* Some of these, that we have experimented with, but have gotten worse result,
* than with the current one:
* 1) Weighted arithmetic mean of these two ratios - adding up fractions puts
* less emphasis on ratios between 0 and 1. Ratios 10 and 0.1 should be considered
* to be just as strong evidences in opposite directions. The arithmetic mean of these
* would be heavily biased towards the 10.
* 2) Absolute cost (cost = weight * rowCount + (1 - weight) * size) - when adding up
* two numeric measurements that have different units we can easily end up with one
* overwhelming the other.
*/
def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
val thisCost = BigDecimal(this.planCost.card) * conf.joinReorderCardWeight +
BigDecimal(this.planCost.size) * (1 - conf.joinReorderCardWeight)
val otherCost = BigDecimal(other.planCost.card) * conf.joinReorderCardWeight +
BigDecimal(other.planCost.size) * (1 - conf.joinReorderCardWeight)
thisCost < otherCost
if (other.planCost.card == 0 || other.planCost.size == 0) {
false
} else {
val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
Math.pow(relativeRows.doubleValue, conf.joinReorderCardWeight) *
Math.pow(relativeSize.doubleValue, 1 - conf.joinReorderCardWeight) < 1
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1664,8 +1664,10 @@ object SQLConf {
val JOIN_REORDER_CARD_WEIGHT =
buildConf("spark.sql.cbo.joinReorder.card.weight")
.internal()
.doc("The weight of cardinality (number of rows) for plan cost comparison in join reorder: " +
"rows * weight + size * (1 - weight).")
.doc("The weight of the ratio of cardinalities (number of rows) " +
"in the cost comparison function. The ratio of sizes in bytes has weight " +
"1 - this value. The weighted geometric mean of these ratios is used to decide " +
"which of the candidate plans will be chosen by the CBO.")
.version("2.2.0")
.doubleConf
.checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,6 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
val plan1 = JoinPlan(null, null, null, Cost(300, 80))
val plan2 = JoinPlan(null, null, null, Cost(500, 30))

// cost1 = 300*0.7 + 80*0.3 = 234
// cost2 = 500*0.7 + 30*0.3 = 359

assert(!plan1.betterThan(plan1, conf))
assert(!plan2.betterThan(plan2, conf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,13 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))

val expected =
t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
f1
.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
.join(t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1"))), Inner,
Some(nameToAttr("d1_c2") === nameToAttr("t4_c1")))
.join(t1.join(t2, Inner, Some(nameToAttr("t1_c1") === nameToAttr("t2_c1"))), Inner,
Some(nameToAttr("t1_c2") === nameToAttr("t4_c2")))
.join(f1
.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk"))))
.select(outputsOf(d1, t1, t2, t3, t4, f1, d2): _*)

assertEqualPlans(query, expected)
Expand Down