From 9e1aacb7e12df630494a35146aa0559517d39d15 Mon Sep 17 00:00:00 2001 From: nooberfsh Date: Fri, 28 Jun 2019 11:37:55 +0800 Subject: [PATCH 1/3] Allow more aggregations on KeyValueGroupedDataset --- .../spark/sql/KeyValueGroupedDataset.scala | 65 +++++++++++++++++++ .../org/apache/spark/sql/DatasetSuite.scala | 64 ++++++++++++++++++ 2 files changed, 129 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index a3cbea9021f2..ab61584e7238 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -520,6 +520,71 @@ class KeyValueGroupedDataset[K, V] private[sql]( col4: TypedColumn[V, U4]): Dataset[(K, U1, U2, U3, U4)] = aggUntyped(col1, col2, col3, col4).asInstanceOf[Dataset[(K, U1, U2, U3, U4)]] + /** + * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key + * and the result of computing these aggregations over all elements in the group. + * + * @since 2.4.0 + */ + def agg[U1, U2, U3, U4, U5]( + col1: TypedColumn[V, U1], + col2: TypedColumn[V, U2], + col3: TypedColumn[V, U3], + col4: TypedColumn[V, U4], + col5: TypedColumn[V, U5]): Dataset[(K, U1, U2, U3, U4, U5)] = + aggUntyped(col1, col2, col3, col4, col5).asInstanceOf[Dataset[(K, U1, U2, U3, U4, U5)]] + + /** + * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key + * and the result of computing these aggregations over all elements in the group. + * + * @since 2.4.0 + */ + def agg[U1, U2, U3, U4, U5, U6]( + col1: TypedColumn[V, U1], + col2: TypedColumn[V, U2], + col3: TypedColumn[V, U3], + col4: TypedColumn[V, U4], + col5: TypedColumn[V, U5], + col6: TypedColumn[V, U6]): Dataset[(K, U1, U2, U3, U4, U5, U6)] = + aggUntyped(col1, col2, col3, col4, col5, col6) + .asInstanceOf[Dataset[(K, U1, U2, U3, U4, U5, U6)]] + + /** + * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key + * and the result of computing these aggregations over all elements in the group. + * + * @since 2.4.0 + */ + def agg[U1, U2, U3, U4, U5, U6, U7]( + col1: TypedColumn[V, U1], + col2: TypedColumn[V, U2], + col3: TypedColumn[V, U3], + col4: TypedColumn[V, U4], + col5: TypedColumn[V, U5], + col6: TypedColumn[V, U6], + col7: TypedColumn[V, U7]): Dataset[(K, U1, U2, U3, U4, U5, U6, U7)] = + aggUntyped(col1, col2, col3, col4, col5, col6, col7) + .asInstanceOf[Dataset[(K, U1, U2, U3, U4, U5, U6, U7)]] + + /** + * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key + * and the result of computing these aggregations over all elements in the group. + * + * @since 2.4.0 + */ + def agg[U1, U2, U3, U4, U5, U6, U7, U8]( + col1: TypedColumn[V, U1], + col2: TypedColumn[V, U2], + col3: TypedColumn[V, U3], + col4: TypedColumn[V, U4], + col5: TypedColumn[V, U5], + col6: TypedColumn[V, U6], + col7: TypedColumn[V, U7], + col8: TypedColumn[V, U8]): Dataset[(K, U1, U2, U3, U4, U5, U6, U7, U8)] = + aggUntyped(col1, col2, col3, col4, col5, col6, col7, col8) + .asInstanceOf[Dataset[(K, U1, U2, U3, U4, U5, U6, U7, U8)]] + /** * Returns a [[Dataset]] that contains a tuple with each key and the number of items present * for that key. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 4b08a4b0d1a0..ff6143162ff2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -603,6 +603,70 @@ class DatasetSuite extends QueryTest with SharedSQLContext { ("a", 30L, 32L, 2L, 15.0), ("b", 3L, 5L, 2L, 1.5), ("c", 1L, 2L, 1L, 1.0)) } + test("typed aggregation: expr, expr, expr, expr, expr") { + val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() + + checkDatasetUnorderly( + ds.groupByKey(_._1).agg( + sum("_2").as[Long], + sum($"_2" + 1).as[Long], + count("*").as[Long], + avg("_2").as[Double], + countDistinct("*").as[Long]), + ("a", 30L, 32L, 2L, 15.0, 2L), ("b", 3L, 5L, 2L, 1.5, 2L), ("c", 1L, 2L, 1L, 1.0, 1L)) + } + + test("typed aggregation: expr, expr, expr, expr, expr, expr") { + val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() + + checkDatasetUnorderly( + ds.groupByKey(_._1).agg( + sum("_2").as[Long], + sum($"_2" + 1).as[Long], + count("*").as[Long], + avg("_2").as[Double], + countDistinct("*").as[Long], + max("_2").as[Long]), + ("a", 30L, 32L, 2L, 15.0, 2L, 20L), + ("b", 3L, 5L, 2L, 1.5, 2L, 2L), + ("c", 1L, 2L, 1L, 1.0, 1L, 1L)) + } + + test("typed aggregation: expr, expr, expr, expr, expr, expr, expr") { + val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() + + checkDatasetUnorderly( + ds.groupByKey(_._1).agg( + sum("_2").as[Long], + sum($"_2" + 1).as[Long], + count("*").as[Long], + avg("_2").as[Double], + countDistinct("*").as[Long], + max("_2").as[Long], + min("_2").as[Long]), + ("a", 30L, 32L, 2L, 15.0, 2L, 20L, 10L), + ("b", 3L, 5L, 2L, 1.5, 2L, 2L, 1L), + ("c", 1L, 2L, 1L, 1.0, 1L, 1L, 1L)) + } + + test("typed aggregation: expr, expr, expr, expr, expr, expr, expr, expr") { + val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() + + checkDatasetUnorderly( + ds.groupByKey(_._1).agg( + sum("_2").as[Long], + sum($"_2" + 1).as[Long], + count("*").as[Long], + avg("_2").as[Double], + countDistinct("*").as[Long], + max("_2").as[Long], + min("_2").as[Long], + mean("_2").as[Double]), + ("a", 30L, 32L, 2L, 15.0, 2L, 20L, 10L, 15.0), + ("b", 3L, 5L, 2L, 1.5, 2L, 2L, 1L, 1.5), + ("c", 1L, 2L, 1L, 1.0, 1L, 1L, 1L, 1.0)) + } + test("cogroup") { val ds1 = Seq(1 -> "a", 3 -> "abc", 5 -> "hello", 3 -> "foo").toDS() val ds2 = Seq(2 -> "q", 3 -> "w", 5 -> "e", 5 -> "r").toDS() From e1a3442a533373376860618bb62f974f05362989 Mon Sep 17 00:00:00 2001 From: nooberfsh Date: Mon, 1 Jul 2019 19:33:22 +0800 Subject: [PATCH 2/3] Format agg comment --- .../spark/sql/KeyValueGroupedDataset.scala | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index ab61584e7238..fbd59c9096e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -521,11 +521,11 @@ class KeyValueGroupedDataset[K, V] private[sql]( aggUntyped(col1, col2, col3, col4).asInstanceOf[Dataset[(K, U1, U2, U3, U4)]] /** - * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key - * and the result of computing these aggregations over all elements in the group. - * - * @since 2.4.0 - */ + * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key + * and the result of computing these aggregations over all elements in the group. + * + * @since 1.6.0 + */ def agg[U1, U2, U3, U4, U5]( col1: TypedColumn[V, U1], col2: TypedColumn[V, U2], @@ -535,11 +535,11 @@ class KeyValueGroupedDataset[K, V] private[sql]( aggUntyped(col1, col2, col3, col4, col5).asInstanceOf[Dataset[(K, U1, U2, U3, U4, U5)]] /** - * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key - * and the result of computing these aggregations over all elements in the group. - * - * @since 2.4.0 - */ + * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key + * and the result of computing these aggregations over all elements in the group. + * + * @since 1.6.0 + */ def agg[U1, U2, U3, U4, U5, U6]( col1: TypedColumn[V, U1], col2: TypedColumn[V, U2], @@ -551,11 +551,11 @@ class KeyValueGroupedDataset[K, V] private[sql]( .asInstanceOf[Dataset[(K, U1, U2, U3, U4, U5, U6)]] /** - * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key - * and the result of computing these aggregations over all elements in the group. - * - * @since 2.4.0 - */ + * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key + * and the result of computing these aggregations over all elements in the group. + * + * @since 2.4.0 + */ def agg[U1, U2, U3, U4, U5, U6, U7]( col1: TypedColumn[V, U1], col2: TypedColumn[V, U2], @@ -568,11 +568,11 @@ class KeyValueGroupedDataset[K, V] private[sql]( .asInstanceOf[Dataset[(K, U1, U2, U3, U4, U5, U6, U7)]] /** - * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key - * and the result of computing these aggregations over all elements in the group. - * - * @since 2.4.0 - */ + * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key + * and the result of computing these aggregations over all elements in the group. + * + * @since 2.4.0 + */ def agg[U1, U2, U3, U4, U5, U6, U7, U8]( col1: TypedColumn[V, U1], col2: TypedColumn[V, U2], From 352013884694e9ba7d4fbf2c0c1ba259c2fb3174 Mon Sep 17 00:00:00 2001 From: nooberfsh Date: Mon, 8 Jul 2019 10:29:05 +0800 Subject: [PATCH 3/3] Fix agg `since` version --- .../org/apache/spark/sql/KeyValueGroupedDataset.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index fbd59c9096e5..0da52d432d25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -524,7 +524,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key * and the result of computing these aggregations over all elements in the group. * - * @since 1.6.0 + * @since 3.0.0 */ def agg[U1, U2, U3, U4, U5]( col1: TypedColumn[V, U1], @@ -538,7 +538,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key * and the result of computing these aggregations over all elements in the group. * - * @since 1.6.0 + * @since 3.0.0 */ def agg[U1, U2, U3, U4, U5, U6]( col1: TypedColumn[V, U1], @@ -554,7 +554,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key * and the result of computing these aggregations over all elements in the group. * - * @since 2.4.0 + * @since 3.0.0 */ def agg[U1, U2, U3, U4, U5, U6, U7]( col1: TypedColumn[V, U1], @@ -571,7 +571,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key * and the result of computing these aggregations over all elements in the group. * - * @since 2.4.0 + * @since 3.0.0 */ def agg[U1, U2, U3, U4, U5, U6, U7, U8]( col1: TypedColumn[V, U1],