diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index 027babbf57d6c..5a64845598ea5 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -176,6 +176,8 @@ Collection Functions explode_outer posexplode posexplode_outer + inline + inline_outer get get_json_object json_tuple diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 03c16db602f21..4fa289d3af675 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -5883,6 +5883,41 @@ def posexplode(col: "ColumnOrName") -> Column: return _invoke_function_over_columns("posexplode", col) +def inline(col: "ColumnOrName") -> Column: + """ + Explodes an array of structs into a table. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + input column of values to explode. + + Returns + ------- + :class:`~pyspark.sql.Column` + generator expression with the inline exploded result. + + See Also + -------- + :meth:`explode` + + Examples + -------- + >>> from pyspark.sql import Row + >>> df = spark.createDataFrame([Row(structlist=[Row(a=1, b=2), Row(a=3, b=4)])]) + >>> df.select(inline(df.structlist)).show() + +---+---+ + | a| b| + +---+---+ + | 1| 2| + | 3| 4| + +---+---+ + """ + return _invoke_function_over_columns("inline", col) + + def explode_outer(col: "ColumnOrName") -> Column: """ Returns a new row for each element in the given array or map. @@ -5956,6 +5991,47 @@ def posexplode_outer(col: "ColumnOrName") -> Column: return _invoke_function_over_columns("posexplode_outer", col) +def inline_outer(col: "ColumnOrName") -> Column: + """ + Explodes an array of structs into a table. + Unlike inline, if the array is null or empty then null is produced for each nested column. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + input column of values to explode. + + Returns + ------- + :class:`~pyspark.sql.Column` + generator expression with the inline exploded result. + + See Also + -------- + :meth:`explode_outer` + :meth:`inline` + + Examples + -------- + >>> from pyspark.sql import Row + >>> df = spark.createDataFrame([ + ... Row(id=1, structlist=[Row(a=1, b=2), Row(a=3, b=4)]), + ... Row(id=2, structlist=[]) + ... ]) + >>> df.select('id', inline_outer(df.structlist)).show() + +---+----+----+ + | id| a| b| + +---+----+----+ + | 1| 1| 2| + | 1| 3| 4| + | 2|null|null| + +---+----+----+ + """ + return _invoke_function_over_columns("inline_outer", col) + + def get_json_object(col: "ColumnOrName", path: str) -> Column: """ Extracts json object from a json string based on json path specified, and returns json string diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index 102ebef831795..8f579745cef32 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -141,6 +141,22 @@ def test_explode(self): result = [tuple(x) for x in data.select(explode_outer("mapfield")).collect()] self.assertEqual(result, [("a", "b"), (None, None), (None, None)]) + def test_inline(self): + from pyspark.sql.functions import inline, inline_outer + + d = [ + Row(structlist=[Row(b=1, c=2), Row(b=3, c=4)]), + Row(structlist=[Row(b=None, c=5), None]), + Row(structlist=[]), + ] + data = self.spark.createDataFrame(d) + + result = [tuple(x) for x in data.select(inline(data.structlist)).collect()] + self.assertEqual(result, [(1, 2), (3, 4), (None, 5), (None, None)]) + + result = [tuple(x) for x in data.select(inline_outer(data.structlist)).collect()] + self.assertEqual(result, [(1, 2), (3, 4), (None, 5), (None, None), (None, None)]) + def test_basic_functions(self): rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}']) df = self.spark.read.json(rdd) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 69da277d5e604..11802db862746 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -4342,6 +4342,23 @@ object functions { */ def posexplode_outer(e: Column): Column = withExpr { GeneratorOuter(PosExplode(e.expr)) } + /** + * Creates a new row for each element in the given array of structs. + * + * @group collection_funcs + * @since 3.4.0 + */ + def inline(e: Column): Column = withExpr { Inline(e.expr) } + + /** + * Creates a new row for each element in the given array of structs. + * Unlike inline, if the array is null or empty then null is produced for each nested column. + * + * @group collection_funcs + * @since 3.4.0 + */ + def inline_outer(e: Column): Column = withExpr { GeneratorOuter(Inline(e.expr)) } + /** * Extracts json object from a json string based on json path specified, and returns json string * of the extracted json object. It will return null if the input json string is invalid. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index 648f9cac216ec..25231fdecba28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -219,20 +219,21 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { test("inline raises exception on array of null type") { val m = intercept[AnalysisException] { - spark.range(2).selectExpr("inline(array())") + spark.range(2).select(inline(array())) }.getMessage assert(m.contains("data type mismatch")) } test("inline with empty table") { checkAnswer( - spark.range(0).selectExpr("inline(array(struct(10, 100)))"), + spark.range(0).select(inline(array(struct(lit(10), lit(100))))), Nil) } test("inline on literal") { checkAnswer( - spark.range(2).selectExpr("inline(array(struct(10, 100), struct(20, 200), struct(30, 300)))"), + spark.range(2).select(inline(array(struct(lit(10), lit(100)), struct(lit(20), lit(200)), + struct(lit(30), lit(300))))), Row(10, 100) :: Row(20, 200) :: Row(30, 300) :: Row(10, 100) :: Row(20, 200) :: Row(30, 300) :: Nil) } @@ -241,39 +242,39 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { val df = Seq((1, 2)).toDF("a", "b") checkAnswer( - df.selectExpr("inline(array(struct(a), struct(a)))"), + df.select(inline(array(struct('a), struct('a)))), Row(1) :: Row(1) :: Nil) checkAnswer( - df.selectExpr("inline(array(struct(a, b), struct(a, b)))"), + df.select(inline(array(struct('a, 'b), struct('a, 'b)))), Row(1, 2) :: Row(1, 2) :: Nil) // Spark think [struct, struct] is heterogeneous due to name difference. val m = intercept[AnalysisException] { - df.selectExpr("inline(array(struct(a), struct(b)))") + df.select(inline(array(struct('a), struct('b)))) }.getMessage assert(m.contains("data type mismatch")) checkAnswer( - df.selectExpr("inline(array(struct(a), named_struct('a', b)))"), + df.select(inline(array(struct('a), struct('b.alias("a"))))), Row(1) :: Row(2) :: Nil) // Spark think [struct, struct] is heterogeneous due to name difference. val m2 = intercept[AnalysisException] { - df.selectExpr("inline(array(struct(a), struct(2)))") + df.select(inline(array(struct('a), struct(lit(2))))) }.getMessage assert(m2.contains("data type mismatch")) checkAnswer( - df.selectExpr("inline(array(struct(a), named_struct('a', 2)))"), + df.select(inline(array(struct('a), struct(lit(2).alias("a"))))), Row(1) :: Row(2) :: Nil) checkAnswer( - df.selectExpr("struct(a)").selectExpr("inline(array(*))"), + df.select(struct('a)).select(inline(array("*"))), Row(1) :: Nil) checkAnswer( - df.selectExpr("array(struct(a), named_struct('a', b))").selectExpr("inline(*)"), + df.select(array(struct('a), struct('b.alias("a")))).selectExpr("inline(*)"), Row(1) :: Row(2) :: Nil) } @@ -282,11 +283,11 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { val df2 = df.select( when($"col1" === 1, null).otherwise(array(struct($"col1", $"col2"))).as("col1")) checkAnswer( - df2.selectExpr("inline(col1)"), + df2.select(inline('col1)), Row(3, "4") :: Row(5, "6") :: Nil ) checkAnswer( - df2.selectExpr("inline_outer(col1)"), + df2.select(inline_outer('col1)), Row(null, null) :: Row(3, "4") :: Row(5, "6") :: Nil ) } @@ -405,14 +406,13 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { |) |as tbl(a, b) """.stripMargin) - df.createOrReplaceTempView("t1") checkAnswer( - sql("select inline(b) from t1"), + df.select(inline('b)), Row(0, 1) :: Row(null, null) :: Row(2, 3) :: Row(null, null) :: Nil) checkAnswer( - sql("select a, inline(b) from t1"), + df.select('a, inline('b)), Row(1, 0, 1) :: Row(1, null, null) :: Row(1, 2, 3) :: Row(1, null, null) :: Nil) }