Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/docs/source/reference/pyspark.sql/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ Collection Functions
explode_outer
posexplode
posexplode_outer
inline
inline_outer
get
get_json_object
json_tuple
Expand Down
76 changes: 76 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5883,6 +5883,41 @@ def posexplode(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("posexplode", col)


def inline(col: "ColumnOrName") -> Column:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to that file

"""
Explodes an array of structs into a table.

.. versionadded:: 3.4.0

Parameters
----------
col : :class:`~pyspark.sql.Column` or str
input column of values to explode.

Returns
-------
:class:`~pyspark.sql.Column`
generator expression with the inline exploded result.

See Also
--------
:meth:`explode`

Examples
--------
>>> from pyspark.sql import Row
>>> df = spark.createDataFrame([Row(structlist=[Row(a=1, b=2), Row(a=3, b=4)])])
>>> df.select(inline(df.structlist)).show()
+---+---+
| a| b|
+---+---+
| 1| 2|
| 3| 4|
+---+---+
"""
return _invoke_function_over_columns("inline", col)


def explode_outer(col: "ColumnOrName") -> Column:
"""
Returns a new row for each element in the given array or map.
Expand Down Expand Up @@ -5956,6 +5991,47 @@ def posexplode_outer(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("posexplode_outer", col)


def inline_outer(col: "ColumnOrName") -> Column:
"""
Explodes an array of structs into a table.
Unlike inline, if the array is null or empty then null is produced for each nested column.

.. versionadded:: 3.4.0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a Parameters and Returns sections? See also https://numpydoc.readthedocs.io/en/latest/format.html#sections

Copy link
Contributor

@itholic itholic Sep 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also we can add See Also ?

e.g.

See Also
--------
:meth:`explode_outer`
:meth:`inline`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only other See Also I see right now are for element_at/get. The existing generators don't have that yet to each other, is that something that should be added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added parameters/returns

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no strict rules where See Also must to be placed, but I suggested just because it would be good to show related functions for user convenience.

WDYT, @HyukjinKwon ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's good to have

Parameters
----------
col : :class:`~pyspark.sql.Column` or str
input column of values to explode.

Returns
-------
:class:`~pyspark.sql.Column`
generator expression with the inline exploded result.

See Also
--------
:meth:`explode_outer`
:meth:`inline`

Examples
--------
>>> from pyspark.sql import Row
>>> df = spark.createDataFrame([
... Row(id=1, structlist=[Row(a=1, b=2), Row(a=3, b=4)]),
... Row(id=2, structlist=[])
... ])
>>> df.select('id', inline_outer(df.structlist)).show()
+---+----+----+
| id| a| b|
+---+----+----+
| 1| 1| 2|
| 1| 3| 4|
| 2|null|null|
+---+----+----+
"""
return _invoke_function_over_columns("inline_outer", col)


def get_json_object(col: "ColumnOrName", path: str) -> Column:
"""
Extracts json object from a json string based on json path specified, and returns json string
Expand Down
16 changes: 16 additions & 0 deletions python/pyspark/sql/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ def test_explode(self):
result = [tuple(x) for x in data.select(explode_outer("mapfield")).collect()]
self.assertEqual(result, [("a", "b"), (None, None), (None, None)])

def test_inline(self):
from pyspark.sql.functions import inline, inline_outer

d = [
Row(structlist=[Row(b=1, c=2), Row(b=3, c=4)]),
Row(structlist=[Row(b=None, c=5), None]),
Row(structlist=[]),
]
data = self.spark.createDataFrame(d)

result = [tuple(x) for x in data.select(inline(data.structlist)).collect()]
self.assertEqual(result, [(1, 2), (3, 4), (None, 5), (None, None)])

result = [tuple(x) for x in data.select(inline_outer(data.structlist)).collect()]
self.assertEqual(result, [(1, 2), (3, 4), (None, 5), (None, None), (None, None)])

def test_basic_functions(self):
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
df = self.spark.read.json(rdd)
Expand Down
17 changes: 17 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4342,6 +4342,23 @@ object functions {
*/
def posexplode_outer(e: Column): Column = withExpr { GeneratorOuter(PosExplode(e.expr)) }

/**
* Creates a new row for each element in the given array of structs.
*
* @group collection_funcs
* @since 3.4.0
*/
def inline(e: Column): Column = withExpr { Inline(e.expr) }

/**
* Creates a new row for each element in the given array of structs.
* Unlike inline, if the array is null or empty then null is produced for each nested column.
*
* @group collection_funcs
* @since 3.4.0
*/
def inline_outer(e: Column): Column = withExpr { GeneratorOuter(Inline(e.expr)) }

/**
* Extracts json object from a json string based on json path specified, and returns json string
* of the extracted json object. It will return null if the input json string is invalid.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,20 +219,21 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {

test("inline raises exception on array of null type") {
val m = intercept[AnalysisException] {
spark.range(2).selectExpr("inline(array())")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we leave existing UTs alone and add extra tests in this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to update things to match how the explode's were being tested, which is mostly scala functions. I can duplicate some of them with the scala function instead if that would be preferred

spark.range(2).select(inline(array()))
}.getMessage
assert(m.contains("data type mismatch"))
}

test("inline with empty table") {
checkAnswer(
spark.range(0).selectExpr("inline(array(struct(10, 100)))"),
spark.range(0).select(inline(array(struct(lit(10), lit(100))))),
Nil)
}

test("inline on literal") {
checkAnswer(
spark.range(2).selectExpr("inline(array(struct(10, 100), struct(20, 200), struct(30, 300)))"),
spark.range(2).select(inline(array(struct(lit(10), lit(100)), struct(lit(20), lit(200)),
struct(lit(30), lit(300))))),
Row(10, 100) :: Row(20, 200) :: Row(30, 300) ::
Row(10, 100) :: Row(20, 200) :: Row(30, 300) :: Nil)
}
Expand All @@ -241,39 +242,39 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
val df = Seq((1, 2)).toDF("a", "b")

checkAnswer(
df.selectExpr("inline(array(struct(a), struct(a)))"),
df.select(inline(array(struct('a), struct('a)))),
Row(1) :: Row(1) :: Nil)

checkAnswer(
df.selectExpr("inline(array(struct(a, b), struct(a, b)))"),
df.select(inline(array(struct('a, 'b), struct('a, 'b)))),
Row(1, 2) :: Row(1, 2) :: Nil)

// Spark think [struct<a:int>, struct<b:int>] is heterogeneous due to name difference.
val m = intercept[AnalysisException] {
df.selectExpr("inline(array(struct(a), struct(b)))")
df.select(inline(array(struct('a), struct('b))))
}.getMessage
assert(m.contains("data type mismatch"))

checkAnswer(
df.selectExpr("inline(array(struct(a), named_struct('a', b)))"),
df.select(inline(array(struct('a), struct('b.alias("a"))))),
Row(1) :: Row(2) :: Nil)

// Spark think [struct<a:int>, struct<col1:int>] is heterogeneous due to name difference.
val m2 = intercept[AnalysisException] {
df.selectExpr("inline(array(struct(a), struct(2)))")
df.select(inline(array(struct('a), struct(lit(2)))))
}.getMessage
assert(m2.contains("data type mismatch"))

checkAnswer(
df.selectExpr("inline(array(struct(a), named_struct('a', 2)))"),
df.select(inline(array(struct('a), struct(lit(2).alias("a"))))),
Row(1) :: Row(2) :: Nil)

checkAnswer(
df.selectExpr("struct(a)").selectExpr("inline(array(*))"),
df.select(struct('a)).select(inline(array("*"))),
Row(1) :: Nil)

checkAnswer(
df.selectExpr("array(struct(a), named_struct('a', b))").selectExpr("inline(*)"),
df.select(array(struct('a), struct('b.alias("a")))).selectExpr("inline(*)"),
Row(1) :: Row(2) :: Nil)
}

Expand All @@ -282,11 +283,11 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
val df2 = df.select(
when($"col1" === 1, null).otherwise(array(struct($"col1", $"col2"))).as("col1"))
checkAnswer(
df2.selectExpr("inline(col1)"),
df2.select(inline('col1)),
Row(3, "4") :: Row(5, "6") :: Nil
)
checkAnswer(
df2.selectExpr("inline_outer(col1)"),
df2.select(inline_outer('col1)),
Row(null, null) :: Row(3, "4") :: Row(5, "6") :: Nil
)
}
Expand Down Expand Up @@ -405,14 +406,13 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
|)
|as tbl(a, b)
""".stripMargin)
df.createOrReplaceTempView("t1")

checkAnswer(
sql("select inline(b) from t1"),
df.select(inline('b)),
Row(0, 1) :: Row(null, null) :: Row(2, 3) :: Row(null, null) :: Nil)

checkAnswer(
sql("select a, inline(b) from t1"),
df.select('a, inline('b)),
Row(1, 0, 1) :: Row(1, null, null) :: Row(1, 2, 3) :: Row(1, null, null) :: Nil)
}

Expand Down