From 2e74dba1a1c62f251ee1ef6411bd37bf20e94ca0 Mon Sep 17 00:00:00 2001 From: buzhihuojie Date: Tue, 1 Nov 2016 20:50:55 -0700 Subject: [PATCH 1/3] improve doc for rangeBetween and rowsBetween --- .../apache/spark/sql/expressions/Window.scala | 49 +++++++++++++++++++ .../spark/sql/expressions/WindowSpec.scala | 49 +++++++++++++++++++ 2 files changed, 98 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala index e8a0c5f43fe46..92611389b235b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala @@ -82,6 +82,29 @@ object Window { * "current row", while "-1" means the row before the current row, and "5" means the fifth row * after the current row. * + * A row based boundary is based on the position of the row within the partition. + * An offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")).toDF("id", "category") + * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rowsBetween(0,1)).show + * + * +---+--------+---+ + * | id|category|sum| + * +---+--------+---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b| 3| + * | 1| a| 2| + * | 1| a| 3| + * | 2| a| 2| + * +---+--------+---+ + * }}} + * * @param start boundary start, inclusive. * The frame is unbounded if this is the minimum long value. * @param end boundary end, inclusive. @@ -101,6 +124,32 @@ object Window { * while "-1" means one off before the current row, and "5" means the five off after the * current row. * + * A range based boundary is based on the actual value of the ORDER BY + * expression(s). An offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY expressions: there can be only one expression and this + * expression must have a numerical data type. An exception can be made when the offset is 0, + * because no value modification is needed, in this case multiple and non-numeric ORDER BY + * expression are allowed. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")).toDF("id", "category") + * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rangeBetween(0,1)).show + * + * +---+--------+---+ + * | id|category|sum| + * +---+--------+---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b| 3| + * | 1| a| 4| + * | 1| a| 4| + * | 2| a| 2| + * +---+--------+---+ + * }}} + * * @param start boundary start, inclusive. * The frame is unbounded if this is the minimum long value. * @param end boundary end, inclusive. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala index 82bc8f152d6ea..242fe5a694cfa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala @@ -86,6 +86,29 @@ class WindowSpec private[sql]( * "current row", while "-1" means the row before the current row, and "5" means the fifth row * after the current row. * + * A row based boundary is based on the position of the row within the partition. + * An offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")).toDF("id", "category") + * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rowsBetween(0,1)).show + * + * +---+--------+---+ + * | id|category|sum| + * +---+--------+---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b| 3| + * | 1| a| 2| + * | 1| a| 3| + * | 2| a| 2| + * +---+--------+---+ + * }}} + * * @param start boundary start, inclusive. * The frame is unbounded if this is the minimum long value. * @param end boundary end, inclusive. @@ -104,6 +127,32 @@ class WindowSpec private[sql]( * while "-1" means one off before the current row, and "5" means the five off after the * current row. * + * A range based boundary is based on the actual value of the ORDER BY + * expression(s). An offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY expressions: there can be only one expression and this + * expression must have a numerical data type. An exception can be made when the offset is 0, + * because no value modification is needed, in this case multiple and non-numeric ORDER BY + * expression are allowed. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")).toDF("id", "category") + * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rangeBetween(0,1)).show + * + * +---+--------+---+ + * | id|category|sum| + * +---+--------+---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b| 3| + * | 1| a| 4| + * | 1| a| 4| + * | 2| a| 2| + * +---+--------+---+ + * }}} + * * @param start boundary start, inclusive. * The frame is unbounded if this is the minimum long value. * @param end boundary end, inclusive. From 25d03a51351323967c1a217b6c3f8ec9c49521de Mon Sep 17 00:00:00 2001 From: buzhihuojie Date: Tue, 1 Nov 2016 23:05:27 -0700 Subject: [PATCH 2/3] address --- .../main/scala/org/apache/spark/sql/expressions/Window.scala | 4 ++-- .../scala/org/apache/spark/sql/expressions/WindowSpec.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala index 92611389b235b..fd2c2e138f871 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala @@ -91,7 +91,7 @@ object Window { * {{{ * import org.apache.spark.sql.expressions.Window * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")).toDF("id", "category") - * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rowsBetween(0,1)).show + * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rowsBetween(0,1)).show() * * +---+--------+---+ * | id|category|sum| @@ -136,7 +136,7 @@ object Window { * {{{ * import org.apache.spark.sql.expressions.Window * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")).toDF("id", "category") - * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rangeBetween(0,1)).show + * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rangeBetween(0,1)).show() * * +---+--------+---+ * | id|category|sum| diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala index 242fe5a694cfa..7497edd51b825 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala @@ -95,7 +95,7 @@ class WindowSpec private[sql]( * {{{ * import org.apache.spark.sql.expressions.Window * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")).toDF("id", "category") - * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rowsBetween(0,1)).show + * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rowsBetween(0,1)).show() * * +---+--------+---+ * | id|category|sum| @@ -139,7 +139,7 @@ class WindowSpec private[sql]( * {{{ * import org.apache.spark.sql.expressions.Window * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")).toDF("id", "category") - * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rangeBetween(0,1)).show + * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rangeBetween(0,1)).show() * * +---+--------+---+ * | id|category|sum| From d76386ab8c30ad73a3951f5a80236a0df3fb4561 Mon Sep 17 00:00:00 2001 From: buzhihuojie Date: Tue, 1 Nov 2016 23:10:59 -0700 Subject: [PATCH 3/3] fix line limit --- .../org/apache/spark/sql/expressions/Window.scala | 14 ++++++++++---- .../apache/spark/sql/expressions/WindowSpec.scala | 14 ++++++++++---- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala index fd2c2e138f871..ea40031a9f75e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala @@ -90,8 +90,11 @@ object Window { * * {{{ * import org.apache.spark.sql.expressions.Window - * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")).toDF("id", "category") - * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rowsBetween(0,1)).show() + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * df.withColumn("sum", + * sum('id) over Window.partitionBy('category).orderBy('id).rowsBetween(0,1)) + * .show() * * +---+--------+---+ * | id|category|sum| @@ -135,8 +138,11 @@ object Window { * * {{{ * import org.apache.spark.sql.expressions.Window - * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")).toDF("id", "category") - * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rangeBetween(0,1)).show() + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * df.withColumn("sum", + * sum('id) over Window.partitionBy('category).orderBy('id).rangeBetween(0,1)) + * .show() * * +---+--------+---+ * | id|category|sum| diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala index 7497edd51b825..a8f729a7e1389 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala @@ -94,8 +94,11 @@ class WindowSpec private[sql]( * * {{{ * import org.apache.spark.sql.expressions.Window - * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")).toDF("id", "category") - * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rowsBetween(0,1)).show() + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * df.withColumn("sum", + * sum('id) over Window.partitionBy('category).orderBy('id).rowsBetween(0,1)) + * .show() * * +---+--------+---+ * | id|category|sum| @@ -138,8 +141,11 @@ class WindowSpec private[sql]( * * {{{ * import org.apache.spark.sql.expressions.Window - * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")).toDF("id", "category") - * df.withColumn("sum", sum('id) over Window.partitionBy('category).orderBy('id).rangeBetween(0,1)).show() + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * df.withColumn("sum", + * sum('id) over Window.partitionBy('category).orderBy('id).rangeBetween(0,1)) + * .show() * * +---+--------+---+ * | id|category|sum|