From 4c9f6d0310e0fb56ab071812bfd529f3ec74d028 Mon Sep 17 00:00:00 2001 From: zero323 Date: Sun, 21 May 2017 19:08:25 +0200 Subject: [PATCH 1/3] Add posexplode and posexplode_outer --- python/pyspark/sql/functions.py | 65 +++++++++++++++++++++++++++++++++ python/pyspark/sql/tests.py | 13 ++++++- 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 240ae65a6178..121516663172 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1727,6 +1727,71 @@ def posexplode(col): return Column(jc) +@since(2.3) +def explode_outer(col): + """Returns a new row for each element in the given array or map. + Unlike explode, if the array/map is null or empty then null is produced. + + >>> df = spark.createDataFrame( + ... [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)], + ... ("id", "an_array", "a_map") + ... ) + >>> df.select("id", "an_array", explode_outer("a_map")).show() + +---+----------+----+-----+ + | id| an_array| key|value| + +---+----------+----+-----+ + | 1|[foo, bar]| x| 1.0| + | 2| []|null| null| + | 3| null|null| null| + +---+----------+----+-----+ + + >>> df.select("id", "a_map", explode_outer("an_array")).show() + +---+-------------+----+ + | id| a_map| col| + +---+-------------+----+ + | 1|Map(x -> 1.0)| foo| + | 1|Map(x -> 1.0)| bar| + | 2| Map()|null| + | 3| null|null| + +---+-------------+----+ + """ + sc = SparkContext._active_spark_context + jc = sc._jvm.functions.explode_outer(_to_java_column(col)) + return Column(jc) + + +@since(2.3) +def posexplode_outer(col): + """Returns a new row for each element with position in the given array or map. + Unlike posexplode, if the array/map is null or empty then the row (null, null) is produced. + + >>> df = spark.createDataFrame( + ... [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)], + ... ("id", "an_array", "a_map") + ... ) + >>> df.select("id", "an_array", posexplode_outer("a_map")).show() + +---+----------+----+----+-----+ + | id| an_array| pos| key|value| + +---+----------+----+----+-----+ + | 1|[foo, bar]| 0| x| 1.0| + | 2| []|null|null| null| + | 3| null|null|null| null| + +---+----------+----+----+-----+ + >>> df.select("id", "a_map", posexplode_outer("an_array")).show() + +---+-------------+----+----+ + | id| a_map| pos| col| + +---+-------------+----+----+ + | 1|Map(x -> 1.0)| 0| foo| + | 1|Map(x -> 1.0)| 1| bar| + | 2| Map()|null|null| + | 3| null|null|null| + +---+-------------+----+----+ + """ + sc = SparkContext._active_spark_context + jc = sc._jvm.functions.posexplode_outer(_to_java_column(col)) + return Column(jc) + + @ignore_unicode_prefix @since(1.6) def get_json_object(col, path): diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 31f932a36322..a867f23b6b8d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -258,8 +258,12 @@ def test_column_name_encoding(self): self.assertTrue(isinstance(columns[1], str)) def test_explode(self): - from pyspark.sql.functions import explode - d = [Row(a=1, intlist=[1, 2, 3], mapfield={"a": "b"})] + from pyspark.sql.functions import explode, explode_outer, posexplode_outer + d = [ + Row(a=1, intlist=[1, 2, 3], mapfield={"a": "b"}), + Row(a=1, intlist=[], mapfield={}), + Row(a=1, intlist=None, mapfield=None), + ] rdd = self.sc.parallelize(d) data = self.spark.createDataFrame(rdd) @@ -272,6 +276,11 @@ def test_explode(self): self.assertEqual(result[0][0], "a") self.assertEqual(result[0][1], "b") + self.assertEqual(data.select(posexplode_outer("intlist")).count(), 5) + self.assertEqual(data.select(posexplode_outer("mapfield")).count(), 3) + self.assertEqual(data.select(explode_outer("intlist")).count(), 5) + self.assertEqual(data.select(explode_outer("mapfield")).count(), 3) + def test_and_in_expression(self): self.assertEqual(4, self.df.filter((self.df.key <= 10) & (self.df.value <= "2")).count()) self.assertRaises(ValueError, lambda: (self.df.key <= 10) and (self.df.value <= "2")) From cc6ca84b5da0fc21a9b8516f6ba24135affc84be Mon Sep 17 00:00:00 2001 From: zero323 Date: Sun, 21 May 2017 20:58:05 +0200 Subject: [PATCH 2/3] Fix style --- python/pyspark/sql/functions.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 121516663172..3416c4b118a0 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1731,11 +1731,11 @@ def posexplode(col): def explode_outer(col): """Returns a new row for each element in the given array or map. Unlike explode, if the array/map is null or empty then null is produced. - + >>> df = spark.createDataFrame( - ... [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)], + ... [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)], ... ("id", "an_array", "a_map") - ... ) + ... ) >>> df.select("id", "an_array", explode_outer("a_map")).show() +---+----------+----+-----+ | id| an_array| key|value| @@ -1744,7 +1744,7 @@ def explode_outer(col): | 2| []|null| null| | 3| null|null| null| +---+----------+----+-----+ - + >>> df.select("id", "a_map", explode_outer("an_array")).show() +---+-------------+----+ | id| a_map| col| @@ -1764,11 +1764,11 @@ def explode_outer(col): def posexplode_outer(col): """Returns a new row for each element with position in the given array or map. Unlike posexplode, if the array/map is null or empty then the row (null, null) is produced. - + >>> df = spark.createDataFrame( - ... [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)], + ... [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)], ... ("id", "an_array", "a_map") - ... ) + ... ) >>> df.select("id", "an_array", posexplode_outer("a_map")).show() +---+----------+----+----+-----+ | id| an_array| pos| key|value| From 3e5c35d0a39097a67d19b78ffcf7b764f54bcfb8 Mon Sep 17 00:00:00 2001 From: zero323 Date: Wed, 21 Jun 2017 22:52:33 +0200 Subject: [PATCH 3/3] Extract tests similar to mimic explode --- python/pyspark/sql/tests.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a867f23b6b8d..3b308579a377 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -276,10 +276,17 @@ def test_explode(self): self.assertEqual(result[0][0], "a") self.assertEqual(result[0][1], "b") - self.assertEqual(data.select(posexplode_outer("intlist")).count(), 5) - self.assertEqual(data.select(posexplode_outer("mapfield")).count(), 3) - self.assertEqual(data.select(explode_outer("intlist")).count(), 5) - self.assertEqual(data.select(explode_outer("mapfield")).count(), 3) + result = [tuple(x) for x in data.select(posexplode_outer("intlist")).collect()] + self.assertEqual(result, [(0, 1), (1, 2), (2, 3), (None, None), (None, None)]) + + result = [tuple(x) for x in data.select(posexplode_outer("mapfield")).collect()] + self.assertEqual(result, [(0, 'a', 'b'), (None, None, None), (None, None, None)]) + + result = [x[0] for x in data.select(explode_outer("intlist")).collect()] + self.assertEqual(result, [1, 2, 3, None, None]) + + result = [tuple(x) for x in data.select(explode_outer("mapfield")).collect()] + self.assertEqual(result, [('a', 'b'), (None, None), (None, None)]) def test_and_in_expression(self): self.assertEqual(4, self.df.filter((self.df.key <= 10) & (self.df.value <= "2")).count())