Skip to content
Closed
233 changes: 231 additions & 2 deletions .../scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,21 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
case IsNotNull(ar: Attribute) if plan.child.isInstanceOf[LeafNode] =>
evaluateNullCheck(ar, isNull = false, update)

case op @ Equality(attrLeft: Attribute, attrRight: Attribute) =>
evaluateBinaryForTwoColumns(op, attrLeft, attrRight, update)

case op @ LessThan(attrLeft: Attribute, attrRight: Attribute) =>
evaluateBinaryForTwoColumns(op, attrLeft, attrRight, update)

case op @ LessThanOrEqual(attrLeft: Attribute, attrRight: Attribute) =>
evaluateBinaryForTwoColumns(op, attrLeft, attrRight, update)

case op @ GreaterThan(attrLeft: Attribute, attrRight: Attribute) =>
evaluateBinaryForTwoColumns(op, attrLeft, attrRight, update)

case op @ GreaterThanOrEqual(attrLeft: Attribute, attrRight: Attribute) =>
evaluateBinaryForTwoColumns(op, attrLeft, attrRight, update)

case _ =>
// TODO: it's difficult to support string operators without advanced statistics.
// Hence, these string operators Like(_, _) | Contains(_, _) | StartsWith(_, _)
Expand Down Expand Up @@ -257,7 +272,7 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
/**
* Returns a percentage of rows meeting a binary comparison expression.
*
* @param op a binary comparison operator uch as =, <, <=, >, >=
* @param op a binary comparison operator such as =, <, <=, >, >=
* @param attr an Attribute (or a column)
* @param literal a literal value (or constant)
* @param update a boolean flag to specify if we need to update ColumnStat of a given column
Expand Down Expand Up @@ -448,7 +463,7 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
* Returns a percentage of rows meeting a binary comparison expression.
* This method evaluate expression for Numeric/Date/Timestamp/Boolean columns.
*
* @param op a binary comparison operator uch as =, <, <=, >, >=
* @param op a binary comparison operator such as =, <, <=, >, >=
* @param attr an Attribute (or a column)
* @param literal a literal value (or constant)
* @param update a boolean flag to specify if we need to update ColumnStat of a given column
Expand Down Expand Up @@ -550,6 +565,220 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
Some(percent.toDouble)
}

/**
* Returns a percentage of rows meeting a binary comparison expression containing two columns.
* In SQL queries, we also see predicate expressions involving two columns
* such as "column-1 (op) column-2" where column-1 and column-2 belong to same table.
* Note that, if column-1 and column-2 belong to different tables, then it is a join
* operator's work, NOT a filter operator's work.
*
* @param op a binary comparison operator, including =, <=>, <, <=, >, >=
* @param attrLeft the left Attribute (or a column)
* @param attrRight the right Attribute (or a column)
* @param update a boolean flag to specify if we need to update ColumnStat of the given columns
* for subsequent conditions
* @return an optional double value to show the percentage of rows meeting a given condition
*/
def evaluateBinaryForTwoColumns(
op: BinaryComparison,
attrLeft: Attribute,
attrRight: Attribute,
update: Boolean): Option[Double] = {

if (!colStatsMap.contains(attrLeft)) {
logDebug("[CBO] No statistics for " + attrLeft)
return None
}
if (!colStatsMap.contains(attrRight)) {
logDebug("[CBO] No statistics for " + attrRight)
return None
}

attrLeft.dataType match {
case StringType | BinaryType =>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use white list here? It is also easy for us to see which data types are assumed to support in the implementation.

I am afraid we might easily forget updating this if we support new data type in the future.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code is written in such a way that we do not have too deep indentation. Some engineers do not like deep indentation as they often put screen monitor vertically.
Let's handle it when the need occurs. I think, with good test case coverage, we will be able to catch anything we miss.

// TODO: It is difficult to support other binary comparisons for String/Binary
// type without min/max and advanced statistics like histogram.
logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft)
return None
case _ =>
}

val colStatLeft = colStatsMap(attrLeft)
val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType)
.asInstanceOf[NumericRange]
val maxLeft = BigDecimal(statsRangeLeft.max)
val minLeft = BigDecimal(statsRangeLeft.min)

val colStatRight = colStatsMap(attrRight)
val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType)
.asInstanceOf[NumericRange]
val maxRight = BigDecimal(statsRangeRight.max)
val minRight = BigDecimal(statsRangeRight.min)

// determine the overlapping degree between predicate range and column's range
val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0)
val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
// Left < Right or Left <= Right
// - no overlap:
// minRight maxRight minLeft maxLeft
// --------+------------------+------------+-------------+------->
// - complete overlap: (If null values exists, we set it to partial overlap.)
// minLeft maxLeft minRight maxRight
// --------+------------------+------------+-------------+------->
case _: LessThan =>
(minLeft >= maxRight, (maxLeft < minRight) && allNotNull)
case _: LessThanOrEqual =>
(minLeft > maxRight, (maxLeft <= minRight) && allNotNull)

// Left > Right or Left >= Right
// - no overlap:
// minLeft maxLeft minRight maxRight
// --------+------------------+------------+-------------+------->
// - complete overlap: (If null values exists, we set it to partial overlap.)
// minRight maxRight minLeft maxLeft
// --------+------------------+------------+-------------+------->
case _: GreaterThan =>
(maxLeft <= minRight, (minLeft > maxRight) && allNotNull)
case _: GreaterThanOrEqual =>
(maxLeft < minRight, (minLeft >= maxRight) && allNotNull)

// Left = Right or Left <=> Right
// - no overlap:
// minLeft maxLeft minRight maxRight
// --------+------------------+------------+-------------+------->
// minRight maxRight minLeft maxLeft
// --------+------------------+------------+-------------+------->
// - complete overlap:
// minLeft maxLeft
// minRight maxRight
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to update this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comments below.

// --------+------------------+------->
case _: EqualTo =>
((maxLeft < minRight) || (maxRight < minLeft),
(minLeft == minRight) && (maxLeft == maxRight) && allNotNull
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also compare the NDV?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We changed condition to:
(minLeft == minRight) && (maxLeft == maxRight) && allNotNull
&& (colStatLeft.distinctCount == colStatRight.distinctCount)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt that the when this condition is true, it is a complete overlapping between two columns.

The complete equality between the values of two columns also depends on the order. E.g., when left values are (1, 2, 3, 4), right values are (4, 3, 2, 1), the condition is true, but no values can pass the filter predicate left_col = right_col.

Am I missing something?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is empirical. Without more statistics, it's really hard to do it mathematically.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. We prefer over estimation to under estimation in order to avoid out-of-memory error.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. LGTM.

&& (colStatLeft.distinctCount == colStatRight.distinctCount)
)
case _: EqualNullSafe =>
// For null-safe equality, we use a very restrictive condition to evaluate its overlap.
// If null values exists, we set it to partial overlap.
(((maxLeft < minRight) || (maxRight < minLeft)) && allNotNull,
(minLeft == minRight) && (maxLeft == maxRight) && allNotNull
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&& (colStatLeft.distinctCount == colStatRight.distinctCount)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

&& (colStatLeft.distinctCount == colStatRight.distinctCount)
)
}

var percent = BigDecimal(1.0)
if (noOverlap) {
percent = 0.0
} else if (completeOverlap) {
percent = 1.0
} else {
// For partial overlap, we use an empirical value 1/3 as suggested by the book
// "Database Systems, the complete book".
percent = 1.0 / 3.0

if (update) {
// Need to adjust new min/max after the filter condition is applied

val ndvLeft = BigDecimal(colStatLeft.distinctCount)
var newNdvLeft = (ndvLeft * percent).setScale(0, RoundingMode.HALF_UP).toBigInt()
if (newNdvLeft < 1) newNdvLeft = 1
val ndvRight = BigDecimal(colStatRight.distinctCount)
var newNdvRight = (ndvRight * percent).setScale(0, RoundingMode.HALF_UP).toBigInt()
if (newNdvRight < 1) newNdvRight = 1

var newMaxLeft = colStatLeft.max
var newMinLeft = colStatLeft.min
var newMaxRight = colStatRight.max
var newMinRight = colStatRight.min

op match {
case _: LessThan | _: LessThanOrEqual =>
// the left side should be less than the right side.
// If not, we need to adjust it to narrow the range.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            // Left < Right or Left <= Right
            //      minRight     <     minLeft
            // 0 ------+******************+------->
            //              filtered      ^
            //                            |
            //                        newMinRight
            //
            //      maxRight     <     maxLeft
            // 0 ------+******************+------->
            //         ^    filtered
            //         |
            //     newMaxLeft

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

// Left < Right or Left <= Right
// minRight < minLeft
// --------+******************+------->
// filtered ^
// |
// newMinRight
//
// maxRight < maxLeft
// --------+******************+------->
// ^ filtered
// |
// newMaxLeft
if (minLeft > minRight) newMinRight = colStatLeft.min
if (maxLeft > maxRight) newMaxLeft = colStatRight.max

case _: GreaterThan | _: GreaterThanOrEqual =>
// the left side should be greater than the right side.
// If not, we need to adjust it to narrow the range.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            // Left > Right or Left >= Right
            //      minLeft     <      minRight
            // 0 ------+******************+------->
            //              filtered      ^
            //                            |
            //                        newMinLeft
            //
            //      maxLeft     <      maxRight
            // 0 ------+******************+------->
            //         ^    filtered
            //         |
            //     newMaxRight

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

// Left > Right or Left >= Right
// minLeft < minRight
// --------+******************+------->
// filtered ^
// |
// newMinLeft
//
// maxLeft < maxRight
// --------+******************+------->
// ^ filtered
// |
// newMaxRight
if (minLeft < minRight) newMinLeft = colStatRight.min
if (maxLeft < maxRight) newMaxRight = colStatLeft.max

case _: EqualTo | _: EqualNullSafe =>
// need to set new min to the larger min value, and
// set the new max to the smaller max value.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            // Left = Right or Left <=> Right
            //      minLeft     <      minRight
            // 0 ------+******************+------->
            //              filtered      ^
            //                            |
            //                        newMinLeft
            //
            //      minRight    <=     minLeft
            // 0 ------+******************+------->
            //              filtered      ^
            //                            |
            //                        newMinRight
            //
            //      maxLeft     <      maxRight
            // 0 ------+******************+------->
            //         ^    filtered
            //         |
            //     newMaxRight
            //
            //      maxRight    <=     maxLeft
            // 0 ------+******************+------->
            //         ^    filtered
            //         |
            //     newMaxLeft

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

// Left = Right or Left <=> Right
// minLeft < minRight
// --------+******************+------->
// filtered ^
// |
// newMinLeft
//
// minRight <= minLeft
// --------+******************+------->
// filtered ^
// |
// newMinRight
//
// maxLeft < maxRight
// --------+******************+------->
// ^ filtered
// |
// newMaxRight
//
// maxRight <= maxLeft
// --------+******************+------->
// ^ filtered
// |
// newMaxLeft
if (minLeft < minRight) {
newMinLeft = colStatRight.min
} else {
newMinRight = colStatLeft.min
}
if (maxLeft < maxRight) {
newMaxRight = colStatLeft.max
} else {
newMaxLeft = colStatRight.max
}
}

val newStatsLeft = colStatLeft.copy(distinctCount = newNdvLeft, min = newMinLeft,
max = newMaxLeft)
colStatsMap(attrLeft) = newStatsLeft
val newStatsRight = colStatRight.copy(distinctCount = newNdvRight, min = newMinRight,
max = newMaxRight)
colStatsMap(attrRight) = newStatsRight
}
}

Some(percent.toDouble)
}

}

class ColumnStatsMap {
Expand Down
Loading