-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19851][SQL] Add support for EVERY and ANY (SOME) aggregates #22809
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ import java.util.Locale | |
|
|
||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion} | ||
| import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate | ||
| import org.apache.spark.sql.catalyst.expressions.codegen._ | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.Block._ | ||
| import org.apache.spark.sql.catalyst.trees.TreeNode | ||
|
|
@@ -282,6 +283,31 @@ trait RuntimeReplaceable extends UnaryExpression with Unevaluable { | |
| override lazy val canonicalized: Expression = child.canonicalized | ||
| } | ||
|
|
||
| /** | ||
| * An aggregate expression that gets rewritten (currently by the optimizer) into a | ||
| * different aggregate expression for evaluation. This is mainly used to provide compatibility | ||
| * with other databases. For example, we use this to support every, any/some aggregates by rewriting | ||
| * them with Min and Max respectively. | ||
| */ | ||
| trait UnevaluableAggrgate extends DeclarativeAggregate { | ||
|
||
|
|
||
| override def nullable: Boolean = true | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we set them always as nullable?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mgaido91 most of the aggregates are nullable, no ? Did you have an suggestion here ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't this be nullable only if the incoming expression is?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mgaido91 I think for aggregates, its different ? Please see
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, right, I was missing that case, sorry, thanks. |
||
|
|
||
| override lazy val aggBufferAttributes = | ||
| throw new UnsupportedOperationException(s"Cannot evaluate aggBufferAttributes: $this") | ||
|
|
||
| override lazy val initialValues: Seq[Expression] = | ||
| throw new UnsupportedOperationException(s"Cannot evaluate initialValues: $this") | ||
|
|
||
| override lazy val updateExpressions: Seq[Expression] = | ||
| throw new UnsupportedOperationException(s"Cannot evaluate updateExpressions: $this") | ||
|
|
||
| override lazy val mergeExpressions: Seq[Expression] = | ||
| throw new UnsupportedOperationException(s"Cannot evaluate mergeExpressions: $this") | ||
|
|
||
| override lazy val evaluateExpression: Expression = | ||
| throw new UnsupportedOperationException(s"Cannot evaluate evaluateExpression: $this") | ||
| } | ||
|
|
||
| /** | ||
| * Expressions that don't have SQL representation should extend this trait. Examples are | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,3 +57,34 @@ case class Max(child: Expression) extends DeclarativeAggregate { | |
|
|
||
| override lazy val evaluateExpression: AttributeReference = max | ||
| } | ||
|
|
||
| abstract class AnyAggBase(arg: Expression) | ||
|
||
| extends UnevaluableAggrgate with ImplicitCastInputTypes { | ||
|
|
||
| override def children: Seq[Expression] = arg :: Nil | ||
|
|
||
| override def dataType: DataType = BooleanType | ||
|
|
||
| override def inputTypes: Seq[AbstractDataType] = Seq(BooleanType) | ||
|
|
||
| override def checkInputDataTypes(): TypeCheckResult = { | ||
| arg.dataType match { | ||
| case dt if dt != BooleanType => | ||
| TypeCheckResult.TypeCheckFailure(s"Input to function '$prettyName' should have been " + | ||
| s"${BooleanType.simpleString}, but it's [${arg.dataType.catalogString}].") | ||
| case _ => TypeCheckResult.TypeCheckSuccess | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @ExpressionDescription( | ||
| usage = "_FUNC_(expr) - Returns true if at least one value of `expr` is true.") | ||
|
||
| case class AnyAgg(arg: Expression) extends AnyAggBase(arg) { | ||
| override def nodeName: String = "Any" | ||
| } | ||
|
|
||
| @ExpressionDescription( | ||
| usage = "_FUNC_(expr) - Returns true if at least one value of `expr` is true.") | ||
|
||
| case class SomeAgg(arg: Expression) extends AnyAggBase(arg) { | ||
| override def nodeName: String = "Some" | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,3 +57,27 @@ case class Min(child: Expression) extends DeclarativeAggregate { | |
|
|
||
| override lazy val evaluateExpression: AttributeReference = min | ||
| } | ||
|
|
||
| @ExpressionDescription( | ||
| usage = "_FUNC_(expr) - Returns true if all values of `expr` are true.") | ||
|
||
| case class EveryAgg(arg: Expression) | ||
| extends UnevaluableAggrgate with ImplicitCastInputTypes { | ||
|
|
||
| override def nodeName: String = "Every" | ||
|
|
||
| override def children: Seq[Expression] = arg :: Nil | ||
|
|
||
| override def dataType: DataType = BooleanType | ||
|
|
||
| override def inputTypes: Seq[AbstractDataType] = Seq(BooleanType) | ||
|
|
||
| override def checkInputDataTypes(): TypeCheckResult = { | ||
| arg.dataType match { | ||
| case dt if dt != BooleanType => | ||
| TypeCheckResult.TypeCheckFailure(s"Input to function '$prettyName' should have been " + | ||
| s"${BooleanType.simpleString}, but it's [${arg.dataType.catalogString}].") | ||
| case _ => TypeCheckResult.TypeCheckSuccess | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,24 +21,32 @@ import scala.collection.mutable | |
|
|
||
| import org.apache.spark.sql.catalyst.catalog.SessionCatalog | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.aggregate._ | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.catalyst.rules._ | ||
| import org.apache.spark.sql.catalyst.util.DateTimeUtils | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
|
|
||
| /** | ||
| * Finds all [[RuntimeReplaceable]] expressions and replace them with the expressions that can | ||
| * be evaluated. This is mainly used to provide compatibility with other databases. | ||
| * For example, we use this to support "nvl" by replacing it with "coalesce". | ||
| * Finds all the expressions that are unevaluable and replace/rewrite them with semantically | ||
| * equivalent expressions that can be evaluated. Currently we replace two kinds of expressions : | ||
|
||
| * 1) [[RuntimeReplaceable]] expressions | ||
| * 2) [[UnevaluableAggrgate]] expressions such as Every, Some, Any | ||
| * This is mainly used to provide compatibility with other databases. | ||
| * Few examples are : | ||
| * we use this to support "nvl" by replacing it with "coalesce". | ||
| * we use this to replace Every and Any with Min and Max respectively. | ||
| */ | ||
| object ReplaceExpressions extends Rule[LogicalPlan] { | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
| case e: RuntimeReplaceable => e.child | ||
| case SomeAgg(arg) => Max(arg) | ||
| case AnyAgg(arg) => Max(arg) | ||
| case EveryAgg(arg) => Min(arg) | ||
| } | ||
| } | ||
|
|
||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: unneded change |
||
| /** | ||
| * Computes the current date and time to make sure we return the same result in a single query. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -144,6 +144,8 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite { | |
| assertSuccess(Sum('stringField)) | ||
| assertSuccess(Average('stringField)) | ||
| assertSuccess(Min('arrayField)) | ||
| assertSuccess(new EveryAgg('booleanField)) | ||
| assertSuccess(new AnyAgg('booleanField)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we add also |
||
|
|
||
| assertError(Min('mapField), "min does not support ordering on type") | ||
| assertError(Max('mapField), "max does not support ordering on type") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -80,3 +80,69 @@ SELECT 1 FROM range(10) HAVING true; | |
| SELECT 1 FROM range(10) HAVING MAX(id) > 0; | ||
|
|
||
| SELECT id FROM range(10) HAVING id > 0; | ||
|
|
||
| -- Test data | ||
| CREATE OR REPLACE TEMPORARY VIEW test_agg AS SELECT * FROM VALUES | ||
| (1, true), (1, false), | ||
| (2, true), | ||
| (3, false), (3, null), | ||
| (4, null), (4, null), | ||
| (5, null), (5, true), (5, false) AS test_agg(k, v); | ||
|
|
||
| -- empty table | ||
| SELECT every(v), some(v), any(v) FROM test_agg WHERE 1 = 0; | ||
|
|
||
| -- all null values | ||
| SELECT every(v), some(v), any(v) FROM test_agg WHERE k = 4; | ||
|
|
||
| -- aggregates are null Filtering | ||
| SELECT every(v), some(v), any(v) FROM test_agg WHERE k = 5; | ||
|
|
||
| -- group by | ||
| SELECT k, every(v), some(v), any(v) FROM test_agg GROUP BY k; | ||
|
|
||
| -- having | ||
| SELECT k, every(v) FROM test_agg GROUP BY k HAVING every(v) = false; | ||
| SELECT k, every(v) FROM test_agg GROUP BY k HAVING every(v) IS NULL; | ||
|
|
||
| -- basic subquery path to make sure rewrite happens in both parent and child plans. | ||
| SELECT k, | ||
| Every(v) AS every | ||
| FROM test_agg | ||
| WHERE k = 2 | ||
| AND v IN (SELECT Any(v) | ||
| FROM test_agg | ||
| WHERE k = 1) | ||
| GROUP BY k; | ||
|
|
||
| -- basic subquery path to make sure rewrite happens in both parent and child plans. | ||
| SELECT k, | ||
| Every(v) AS every | ||
| FROM test_agg | ||
| WHERE k = 2 | ||
| AND v IN (SELECT Every(v) | ||
| FROM test_agg | ||
| WHERE k = 1) | ||
| GROUP BY k; | ||
|
|
||
| -- input type checking Int | ||
| SELECT every(1); | ||
|
|
||
| -- input type checking Short | ||
| SELECT some(1S); | ||
|
|
||
| -- input type checking Long | ||
| SELECT any(1L); | ||
|
|
||
| -- input type checking String | ||
| SELECT every("true"); | ||
|
|
||
| -- every/some/any aggregates are supported as windows expression. | ||
| SELECT k, v, every(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan here are a few window tests. (fyi) |
||
| SELECT k, v, some(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; | ||
| SELECT k, v, any(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; | ||
|
|
||
| -- simple explain of queries having every/some/any agregates. Optimized | ||
| -- plan should show the rewritten aggregate expression. | ||
| EXPLAIN EXTENDED SELECT k, every(v), some(v), any(v) FROM test_agg GROUP BY k; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unneeded newline