From 98853b290a1c935abfcd619bf9ba288902a2f422 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 2 Nov 2015 12:35:10 +0100 Subject: [PATCH 01/10] Add multiple column support for count distinct. --- .../expressions/aggregate/Utils.scala | 10 ++++--- .../expressions/conditionalExpressions.scala | 27 +++++++++++++++++++ .../ConditionalExpressionSuite.scala | 11 ++++++++ 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala index ac23f727829b..3d79462288a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala @@ -55,10 +55,14 @@ object Utils { mode = aggregate.Complete, isDistinct = false) - // We do not support multiple COUNT DISTINCT columns for now. - case expressions.CountDistinct(children) if children.length == 1 => + case expressions.CountDistinct(children) => + val child = if (children.size > 1) { + DropAnyNull(CreateStruct(children)) + } else { + children.head + } aggregate.AggregateExpression2( - aggregateFunction = aggregate.Count(children.head), + aggregateFunction = aggregate.Count(child), mode = aggregate.Complete, isDistinct = true) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala index d532629984be..7812e4b87dfd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala @@ -419,3 +419,30 @@ case class Greatest(children: Seq[Expression]) extends Expression { """ } } + +/** Operator that drops a row when it contains any nulls. */ +case class DropAnyNull(child: Expression) extends UnaryExpression { + override def nullable: Boolean = true + override def dataType: DataType = child.dataType + + protected override def nullSafeEval(input: Any): InternalRow = { + val row = input.asInstanceOf[InternalRow] + if (row.anyNull) { + null + } else { + row + } + } + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + nullSafeCodeGen(ctx, ev, eval => { + s""" + if ($eval.anyNull()) { + ${ev.isNull} = true; + } else { + ${ev.value} = $eval; + } + """ + }) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala index 0df673bb9fa0..1488e6c283b8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala @@ -231,4 +231,15 @@ class ConditionalExpressionSuite extends SparkFunSuite with ExpressionEvalHelper checkConsistencyBetweenInterpretedAndCodegen(Greatest, dt, 2) } } + + test("function dropAnyNull") { + val row = 'r.struct( + StructField("a", StringType, false), + StructField("b", StringType, true)).at(0) + val a = create_row("a", "q") + val b = create_row("b", null.asInstanceOf[String]) + checkEvaluation(DropAnyNull(row), a, create_row(a)) + checkEvaluation(DropAnyNull(row), null, create_row(b)) + checkEvaluation(DropAnyNull(row), null, create_row(null.asInstanceOf[InternalRow])) + } } From 45363cecfe218dafde038e782f0359aad8247286 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Fri, 6 Nov 2015 10:22:20 +0100 Subject: [PATCH 02/10] Added tests. --- .../spark/sql/DataFrameAggregateSuite.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 2e679e7bc4e0..802d95dbfbbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -162,6 +162,30 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { ) } + test("multiple column distinct count") { + val df1 = Seq( + ("a", "b", "c"), + ("a", "b", "c"), + ("a", "b", "d"), + ("x", "y", "z")) + .toDF("key1", "key2", "key3") + + checkAnswer( + df1.agg(countDistinct('key1, 'key2)), + Row(2) + ) + + checkAnswer( + df1.agg(countDistinct('key1, 'key2, 'key3)), + Row(3) + ) + + checkAnswer( + df1.groupBy('key1).agg(countDistinct('key2, 'key3)), + Seq(Row("a", 2), Row("x", 1)) + ) + } + test("zero count") { val emptyTableData = Seq.empty[(Int, Int)].toDF("a", "b") checkAnswer( From 89f90b91f83d930d78cf3cf61857f1db9d991f11 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Fri, 6 Nov 2015 10:33:17 +0100 Subject: [PATCH 03/10] Added NULL case to tests. --- .../scala/org/apache/spark/sql/DataFrameAggregateSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 802d95dbfbbf..eb1ee266c5d2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -167,12 +167,13 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { ("a", "b", "c"), ("a", "b", "c"), ("a", "b", "d"), - ("x", "y", "z")) + ("x", "y", "z"), + ("x", "q", null.asInstanceOf[String])) .toDF("key1", "key2", "key3") checkAnswer( df1.agg(countDistinct('key1, 'key2)), - Row(2) + Row(3) ) checkAnswer( From f265e449d840fdb96296e3418174f7617ae4c8c8 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sat, 7 Nov 2015 12:58:24 +0100 Subject: [PATCH 04/10] Add ExpectsInputTypes and some more tests. --- .../expressions/conditionalExpressions.scala | 5 +++-- .../expressions/ConditionalExpressionSuite.scala | 13 ++++++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala index 7812e4b87dfd..0d4af43978ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util.TypeUtils -import org.apache.spark.sql.types.{NullType, BooleanType, DataType} +import org.apache.spark.sql.types._ case class If(predicate: Expression, trueValue: Expression, falseValue: Expression) @@ -421,9 +421,10 @@ case class Greatest(children: Seq[Expression]) extends Expression { } /** Operator that drops a row when it contains any nulls. */ -case class DropAnyNull(child: Expression) extends UnaryExpression { +case class DropAnyNull(child: Expression) extends UnaryExpression with ExpectsInputTypes { override def nullable: Boolean = true override def dataType: DataType = child.dataType + override def inputTypes: Seq[AbstractDataType] = Seq(StructType) protected override def nullSafeEval(input: Any): InternalRow = { val row = input.asInstanceOf[InternalRow] diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala index 1488e6c283b8..c1e3c17b8710 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala @@ -233,13 +233,16 @@ class ConditionalExpressionSuite extends SparkFunSuite with ExpressionEvalHelper } test("function dropAnyNull") { + val drop = DropAnyNull(CreateStruct(Seq('a.string.at(0), 'b.string.at(1)))) + val a = create_row("a", "q") + val nullStr: String = null + checkEvaluation(drop, a, a) + checkEvaluation(drop, null, create_row("b", nullStr)) + checkEvaluation(drop, null, create_row(nullStr, nullStr)) + val row = 'r.struct( StructField("a", StringType, false), StructField("b", StringType, true)).at(0) - val a = create_row("a", "q") - val b = create_row("b", null.asInstanceOf[String]) - checkEvaluation(DropAnyNull(row), a, create_row(a)) - checkEvaluation(DropAnyNull(row), null, create_row(b)) - checkEvaluation(DropAnyNull(row), null, create_row(null.asInstanceOf[InternalRow])) + checkEvaluation(DropAnyNull(row), null, create_row(null)) } } From 68f34b49a0985b1ed02edae28d1630b81ea4c657 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sat, 7 Nov 2015 16:35:22 +0100 Subject: [PATCH 05/10] Add tests. --- .../sql/hive/execution/AggregationQuerySuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 7f6fe339232a..56264ba11e8e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -523,14 +523,15 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te |SELECT | key, | count(distinct value1), - | count(distinct value2) + | count(distinct value2), + | count(distinct value1, value2) |FROM agg2 |GROUP BY key """.stripMargin), - Row(null, 3, 3) :: - Row(1, 2, 3) :: - Row(2, 2, 1) :: - Row(3, 0, 1) :: Nil) + Row(null, 3, 3, 3) :: + Row(1, 2, 3, 3) :: + Row(2, 2, 1, 1) :: + Row(3, 0, 1, 0) :: Nil) } test("test count") { From 4e75bdf64bd24b791e159b45cfd6d34254cc2307 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sat, 7 Nov 2015 20:08:53 +0100 Subject: [PATCH 06/10] Allow struct as a grouping key in the new aggregate code path. Add small test for multiple column count distinct. --- .../sql/catalyst/expressions/aggregate/Utils.scala | 1 - .../sql/hive/execution/AggregationQuerySuite.scala | 12 +++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala index 3d79462288a3..c0178b162e0a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala @@ -33,7 +33,6 @@ object Utils { val hasComplexTypes = aggregate.groupingExpressions.map(_.dataType).exists { case array: ArrayType => true case map: MapType => true - case struct: StructType => true case _ => false } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 56264ba11e8e..8d6e9b5c30d6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -516,22 +516,20 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(3, 4, 4, 3, null) :: Nil) } - test("multiple distinct column sets") { + test("single distinct multiple columns set") { checkAnswer( sqlContext.sql( """ |SELECT | key, - | count(distinct value1), - | count(distinct value2), | count(distinct value1, value2) |FROM agg2 |GROUP BY key """.stripMargin), - Row(null, 3, 3, 3) :: - Row(1, 2, 3, 3) :: - Row(2, 2, 1, 1) :: - Row(3, 0, 1, 0) :: Nil) + Row(null, 3) :: + Row(1, 3) :: + Row(2, 1) :: + Row(3, 0) :: Nil) } test("test count") { From d34e4cfc3bc423d7560d69f42a082b3969f82d42 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sat, 7 Nov 2015 21:34:18 +0100 Subject: [PATCH 07/10] Improve checking if a StructType is suitable as an aggregation group by key. --- .../expressions/aggregate/Utils.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala index c0178b162e0a..15c8b06bc3f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala @@ -22,25 +22,27 @@ import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{Expand, Aggregate, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.types.{IntegerType, StructType, MapType, ArrayType} +import org.apache.spark.sql.types._ /** * Utility functions used by the query planner to convert our plan to new aggregation code path. */ object Utils { - // Right now, we do not support complex types in the grouping key schema. - private def supportsGroupingKeySchema(aggregate: Aggregate): Boolean = { - val hasComplexTypes = aggregate.groupingExpressions.map(_.dataType).exists { - case array: ArrayType => true - case map: MapType => true - case _ => false - } - !hasComplexTypes + // Check if the DataType given cannot be part of a group by clause. + private def isUnGroupable(dt: DataType): Boolean = dt match { + case _: ArrayType | _: MapType => true + case s: StructType => s.fields.exists(f => isUnGroupable(f.dataType)) + case _ => false } + // Right now, we do not support complex types in the grouping key schema. + private def supportsGroupingKeySchema(aggregate: Aggregate): Boolean = + !aggregate.groupingExpressions.exists(e => isUnGroupable(e.dataType)) + private def doConvert(plan: LogicalPlan): Option[Aggregate] = plan match { case p: Aggregate if supportsGroupingKeySchema(p) => + val converted = MultipleDistinctRewriter.rewrite(p.transformExpressionsDown { case expressions.Average(child) => aggregate.AggregateExpression2( From f19a6d40397768cc074a8c0e85a84912ffe338d2 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sun, 8 Nov 2015 12:26:57 +0100 Subject: [PATCH 08/10] Fix references in expand (so we can use this directly in a dataframe without analysis errors). --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index fb963e2f8f7e..09aac00a455f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -306,6 +306,9 @@ case class Expand( output: Seq[Attribute], child: LogicalPlan) extends UnaryNode { + override def references: AttributeSet = + AttributeSet(projections.flatten.flatMap(_.references)) + override def statistics: Statistics = { // TODO shouldn't we factor in the size of the projection versus the size of the backing child // row? From 82b9c60c4e3cfbcbef96bb53606e56eb1c73fcea Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sun, 8 Nov 2015 12:38:55 +0100 Subject: [PATCH 09/10] Improve attribute naming in MultipleDistinctRewriter (for debugging purposes). --- .../spark/sql/catalyst/expressions/aggregate/Utils.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala index 15c8b06bc3f2..2f718e51e8f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala @@ -325,7 +325,7 @@ object MultipleDistinctRewriter extends Rule[LogicalPlan] { val gid = new AttributeReference("gid", IntegerType, false)() val groupByMap = a.groupingExpressions.collect { case ne: NamedExpression => ne -> ne.toAttribute - case e => e -> new AttributeReference(e.prettyName, e.dataType, e.nullable)() + case e => e -> new AttributeReference(e.prettyString, e.dataType, e.nullable)() } val groupByAttrs = groupByMap.map(_._2) @@ -377,7 +377,7 @@ object MultipleDistinctRewriter extends Rule[LogicalPlan] { val regularAggOperatorMap = regularAggExprs.map { e => // Perform the actual aggregation in the initial aggregate. val af = patchAggregateFunctionChildren(e.aggregateFunction)(regularAggChildAttrMap) - val operator = Alias(e.copy(aggregateFunction = af), e.toString)() + val operator = Alias(e.copy(aggregateFunction = af), e.prettyString)() // Select the result of the first aggregate in the last aggregate. val result = AggregateExpression2( @@ -462,5 +462,5 @@ object MultipleDistinctRewriter extends Rule[LogicalPlan] { // NamedExpression. This is done to prevent collisions between distinct and regular aggregate // children, in this case attribute reuse causes the input of the regular aggregate to bound to // the (nulled out) input of the distinct aggregate. - e -> new AttributeReference(e.prettyName, e.dataType, true)() + e -> new AttributeReference(e.prettyString, e.dataType, true)() } From 4e53aab60e342d4f95200091bc3d522e6f87f79e Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sun, 8 Nov 2015 13:53:42 +0100 Subject: [PATCH 10/10] Fix bug in regular aggregation path of the MultipleDistinctWriter: expressions and attributes didn't align. --- .../expressions/aggregate/Utils.scala | 7 ++--- .../execution/AggregationQuerySuite.scala | 26 +++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala index 2f718e51e8f1..9b22ce261973 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala @@ -370,13 +370,14 @@ object MultipleDistinctRewriter extends Rule[LogicalPlan] { // Setup expand for the 'regular' aggregate expressions. val regularAggExprs = aggExpressions.filter(!_.isDistinct) val regularAggChildren = regularAggExprs.flatMap(_.aggregateFunction.children).distinct - val regularAggChildAttrMap = regularAggChildren.map(expressionAttributePair).toMap + val regularAggChildAttrMap = regularAggChildren.map(expressionAttributePair) // Setup aggregates for 'regular' aggregate expressions. val regularGroupId = Literal(0) + val regularAggChildAttrLookup = regularAggChildAttrMap.toMap val regularAggOperatorMap = regularAggExprs.map { e => // Perform the actual aggregation in the initial aggregate. - val af = patchAggregateFunctionChildren(e.aggregateFunction)(regularAggChildAttrMap) + val af = patchAggregateFunctionChildren(e.aggregateFunction)(regularAggChildAttrLookup) val operator = Alias(e.copy(aggregateFunction = af), e.prettyString)() // Select the result of the first aggregate in the last aggregate. @@ -421,7 +422,7 @@ object MultipleDistinctRewriter extends Rule[LogicalPlan] { // Construct the expand operator. val expand = Expand( regularAggProjection ++ distinctAggProjections, - groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.values.toSeq, + groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2), a.child) // Construct the first aggregate operator. This de-duplicates the all the children of diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 8d6e9b5c30d6..ea36c132bb19 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -532,6 +532,32 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(3, 0) :: Nil) } + test("multiple distinct multiple columns sets") { + checkAnswer( + sqlContext.sql( + """ + |SELECT + | key, + | count(distinct value1), + | sum(distinct value1), + | count(distinct value2), + | sum(distinct value2), + | count(distinct value1, value2), + | count(value1), + | sum(value1), + | count(value2), + | sum(value2), + | count(*), + | count(1) + |FROM agg2 + |GROUP BY key + """.stripMargin), + Row(null, 3, 30, 3, 60, 3, 3, 30, 3, 60, 4, 4) :: + Row(1, 2, 40, 3, -10, 3, 3, 70, 3, -10, 3, 3) :: + Row(2, 2, 0, 1, 1, 1, 3, 1, 3, 3, 4, 4) :: + Row(3, 0, null, 1, 3, 0, 0, null, 1, 3, 2, 2) :: Nil) + } + test("test count") { checkAnswer( sqlContext.sql(