Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1621,9 +1621,11 @@ class Analyzer(

/** Extracts a [[Generator]] expression and any names assigned by aliases to their output. */
private object AliasedGenerator {
def unapply(e: Expression): Option[(Generator, Seq[String])] = e match {
case Alias(g: Generator, name) if g.resolved => Some((g, name :: Nil))
case MultiAlias(g: Generator, names) if g.resolved => Some(g, names)
def unapply(e: Expression): Option[(Generator, Seq[String], Boolean)] = e match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document what the return value means (especially that boolean value, but also the Seq[String] that's preexisting)

case Alias(GeneratorOuter(g: Generator), name) if g.resolved => Some((g, name :: Nil, true))
case MultiAlias(GeneratorOuter(g: Generator), names) if g.resolved => Some(g, names, true)
case Alias(g: Generator, name) if g.resolved => Some((g, name :: Nil, false))
case MultiAlias(g: Generator, names) if g.resolved => Some(g, names, false)
case _ => None
}
}
Expand All @@ -1644,7 +1646,7 @@ class Analyzer(
var resolvedGenerator: Generate = null

val newProjectList = projectList.flatMap {
case AliasedGenerator(generator, names) if generator.childrenResolved =>
case AliasedGenerator(generator, names, outer) if generator.childrenResolved =>
// It's a sanity check, this should not happen as the previous case will throw
// exception earlier.
assert(resolvedGenerator == null, "More than one generator found in SELECT.")
Expand All @@ -1653,7 +1655,7 @@ class Analyzer(
Generate(
generator,
join = projectList.size > 1, // Only join if there are other expressions in SELECT.
outer = false,
outer = outer,
qualifier = None,
generatorOutput = ResolveGenerate.makeGeneratorOutput(generator, names),
child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ object FunctionRegistry {
expression[NullIf]("nullif"),
expression[Nvl]("nvl"),
expression[Nvl2]("nvl2"),
expression[OuterExplode]("outer_explode"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name these ..._outer, e.g.: explode_outer. That makes them easier to find when someone is looking for them (same goes for naming in functions.scala).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

expression[OuterInline]("outer_inline"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this one to the function registry? It is not exposed in functions.scala.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to support df.selectExpr("outer_explode(array())").
Should I remove it?

expression[OuterPosExplode]("outer_posexplode"),
expression[PosExplode]("posexplode"),
expression[Rand]("rand"),
expression[Randn]("randn"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,17 @@ case class Stack(children: Seq[Expression]) extends Generator {
}
}

case class GeneratorOuter(child: Generator) extends UnaryExpression
with Generator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the Unevaluable trait. Then you don't have to implement eval() and doGenCode()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't make it work with Unevaluable because Generator.eval is not the same as Unevaluable.eval.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that is fair.


final override def eval(input: InternalRow = null): TraversableOnce[InternalRow] =
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")

final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")

override def elementSchema: StructType = child.elementSchema
}
/**
* A base class for [[Explode]] and [[PosExplode]].
*/
Expand Down Expand Up @@ -233,11 +244,11 @@ abstract class ExplodeBase extends UnaryExpression with CollectionGenerator with
if (position) {
new StructType()
.add("pos", IntegerType, nullable = false)
.add("key", kt, nullable = false)
.add("key", kt, nullable = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when this is false?

I think we should fix the output of Generate instead of doing this.

Copy link
Contributor Author

@bogdanrdc bogdanrdc Jan 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will see if I can fix it from Generate instead.

If it's false then outer_explode with empty map fails with this exception:
java.lang.RuntimeException: Error while decoding: java.lang.NullPointerException
createexternalrow(input[0, string, false].toString, input[1, string, true].toString, StructField(key,StringType,false), StructField(value,StringType,true))
:- input[0, string, false].toString
: +- input[0, string, false]
+- input[1, string, true].toString
+- input[1, string, true]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed it in Generate instead. makes more sense from there too.

.add("value", vt, valueContainsNull)
} else {
new StructType()
.add("key", kt, nullable = false)
.add("key", kt, nullable = true)
.add("value", vt, valueContainsNull)
}
}
Expand Down Expand Up @@ -300,7 +311,7 @@ abstract class ExplodeBase extends UnaryExpression with CollectionGenerator with
case class Explode(child: Expression) extends ExplodeBase {
override val position: Boolean = false
}

class OuterExplode(child: Expression) extends GeneratorOuter(Explode(child))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this. Favor composition over inheritance. Just create a GeneratorOuter(Explode(child)) whenever you need this.

Copy link
Contributor Author

@bogdanrdc bogdanrdc Jan 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it like this so I can use it in FunctionRegistry.scala:
expression[OuterExplode]("outer_explode")
Is there a way I can still make it work with GeneratorOuter(Explode(child))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted to GeneratorOuter(Explode()) and created a new helper method in FunctionRegistry instead.
Actually the way I made these classes wasn't working. It was returning strange results, but not failing otherwise. I guess somewhere the OuterExplode class was not caught by a match. The generated code was different.

/**
* Given an input array produces a sequence of rows for each position and value in the array.
*
Expand All @@ -323,7 +334,7 @@ case class Explode(child: Expression) extends ExplodeBase {
case class PosExplode(child: Expression) extends ExplodeBase {
override val position = true
}

class OuterPosExplode(child: Expression) extends GeneratorOuter(PosExplode(child))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this.

/**
* Explodes an array of structs into a table.
*/
Expand Down Expand Up @@ -369,3 +380,5 @@ case class Inline(child: Expression) extends UnaryExpression with CollectionGene
child.genCode(ctx)
}
}

class OuterInline(child: Expression) extends GeneratorOuter(Inline(child))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this.

5 changes: 1 addition & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,7 @@ class Column(val expr: Expression) extends Logging {

// Leave an unaliased generator with an empty list of names since the analyzer will generate
// the correct defaults after the nested expression's type has been resolved.
case explode: Explode => MultiAlias(explode, Nil)
case explode: PosExplode => MultiAlias(explode, Nil)

case jt: JsonTuple => MultiAlias(jt, Nil)
case g: Generator => MultiAlias(g, Nil)

case func: UnresolvedFunction => UnresolvedAlias(func, Some(Column.generateAlias))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,20 @@ case class GenerateExec(

// Generate looping variables.
val index = ctx.freshName("index")
val numElements = ctx.freshName("numElements")

// In case of outer=true we need to make sure the loop is executed at-least once when the
// array/map contains no input.
// generateOuter is an int. it is set to 1 iff outer is true and the input is empty or null.
val generateOuter = ctx.freshName("generateOuter")
val isOuter = if (outer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we generate the expressions for the generateOuter variable here instead? It is always 0 when outer == false and $numElements == 0 ? 1 : 0 when it is true. This increases the probability that the generated code gets eliminated at compile time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simplified it by using index == -1 like you said and reverting to the old code. It was also easy to make it produce (null, null) for posexplode_outer instead of (-1, null). @rxin preferred to return null and not -1 (or even 0) for the position. So now, everything is null always.

"true"
} else {
"false"
}

// Add a check if the generate outer flag is true.
val checks = optionalCode(outer, data.isNull)
val checks = optionalCode(outer, s"($generateOuter == 1)")

// Add position
val position = if (e.position) {
Expand Down Expand Up @@ -199,21 +210,13 @@ case class GenerateExec(
(initArrayData, "", values)
}

// In case of outer=true we need to make sure the loop is executed at-least once when the
// array/map contains no input. We do this by setting the looping index to -1 if there is no
// input, evaluation of the array is prevented by a check in the accessor code.
val numElements = ctx.freshName("numElements")
val init = if (outer) {
s"$numElements == 0 ? -1 : 0"
} else {
"0"
}
val numOutput = metricTerm(ctx, "numOutputRows")
s"""
|${data.code}
|$initMapData
|int $numElements = ${data.isNull} ? 0 : ${data.value}.numElements();
|for (int $index = $init; $index < $numElements; $index++) {
|int $generateOuter = ($numElements == 0 && $isOuter) ? 1 : 0;
|for (int $index = 0; $index < $numElements + $generateOuter; $index++) {
| $numOutput.add(1);
| $updateRowData
| ${consume(ctx, input ++ position ++ values)}
Expand Down
18 changes: 18 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2870,6 +2870,15 @@ object functions {
*/
def explode(e: Column): Column = withExpr { Explode(e.expr) }

/**
* Creates a new row for each element in the given array or map column.
* Unlike explode, if the array/map is null or empty then null is produced.
*
* @group collection_funcs
* @since 2.2.0
*/
def outer_explode(e: Column): Column = withExpr { new OuterExplode(e.expr) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming: explode_outer


/**
* Creates a new row for each element with position in the given array or map column.
*
Expand All @@ -2878,6 +2887,15 @@ object functions {
*/
def posexplode(e: Column): Column = withExpr { PosExplode(e.expr) }

/**
* Creates a new row for each element with position in the given array or map column.
* Unlike posexplode, if the array/map is null or empty then the row (0, null) is produced.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update this the result.

*
* @group collection_funcs
* @since 2.2.0
*/
def outer_posexplode(e: Column): Column = withExpr { new OuterPosExplode(e.expr) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming: posexplode_outer


/**
* Extracts json object from a json string based on json path specified, and returns json string
* of the extracted json object. It will return null if the input json string is invalid.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,25 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
df.select(explode('intList)),
Row(1) :: Row(2) :: Row(3) :: Nil)
}
test("single outer_explode") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a blank line before this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just add it to all the test cases - have a blank line (only one) separating them.

val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")
checkAnswer(
df.select(outer_explode('intList)),
Row(1) :: Row(2) :: Row(3) :: Row(0) :: Nil)
}

test("single posexplode") {
val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList")
checkAnswer(
df.select(posexplode('intList)),
Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Nil)
}
test("single outer_posexplode") {
val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")
checkAnswer(
df.select(outer_posexplode('intList)),
Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(0, 0) :: Nil)
}

test("explode and other columns") {
val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList")
Expand All @@ -109,6 +121,25 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
Row(1, Seq(1, 2, 3), 2) ::
Row(1, Seq(1, 2, 3), 3) :: Nil)
}
test("outer_explode and other columns") {
val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")

checkAnswer(
df.select($"a", outer_explode('intList)),
Row(1, 1) ::
Row(1, 2) ::
Row(1, 3) ::
Row(2, 0) ::
Nil)

checkAnswer(
df.select($"*", outer_explode('intList)),
Row(1, Seq(1, 2, 3), 1) ::
Row(1, Seq(1, 2, 3), 2) ::
Row(1, Seq(1, 2, 3), 3) ::
Row(2, Seq(), 0) ::
Nil)
}

test("aliased explode") {
val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList")
Expand All @@ -122,13 +153,33 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
Row(6) :: Nil)
}

test("aliased outer_explode") {
val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")

checkAnswer(
df.select(outer_explode('intList).as('int)).select('int),
Row(1) :: Row(2) :: Row(3) :: Row(0) :: Nil)

checkAnswer(
df.select(explode('intList).as('int)).select(sum('int)),
Row(6) :: Nil)
}

test("explode on map") {
val df = Seq((1, Map("a" -> "b"))).toDF("a", "map")

checkAnswer(
df.select(explode('map)),
Row("a", "b"))
}
test("outer_explode on map") {
val df = Seq((1, Map("a" -> "b")), (2, Map[String, String]()),
(3, Map("c" -> "d"))).toDF("a", "map")

checkAnswer(
df.select(outer_explode('map)),
Row("a", "b") :: Row(null, null) :: Row("c", "d") :: Nil)
}

test("explode on map with aliases") {
val df = Seq((1, Map("a" -> "b"))).toDF("a", "map")
Expand All @@ -138,6 +189,14 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
Row("a", "b"))
}

test("outer_explode on map with aliases") {
val df = Seq((3, None), (1, Some(Map("a" -> "b")))).toDF("a", "map")

checkAnswer(
df.select(outer_explode('map).as("key1" :: "value1" :: Nil)).select("key1", "value1"),
Row("a", "b") :: Row(null, null) :: Nil)
}

test("self join explode") {
val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList")
val exploded = df.select(explode('intList).as('i))
Expand Down Expand Up @@ -206,6 +265,18 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
df.selectExpr("array(struct(a), named_struct('a', b))").selectExpr("inline(*)"),
Row(1) :: Row(2) :: Nil)
}
test("outer_inline") {
val df = Seq((1, "2"), (3, "4"), (5, "6")).toDF("col1", "col2")
val df2 = df.select(when('col1 === 1, null).otherwise(array(struct('col1, 'col2))).as("col1"))
checkAnswer(
df2.selectExpr("inline(col1)"),
Row(3, "4") :: Row(5, "6") :: Nil
)
checkAnswer(
df2.selectExpr("outer_inline(col1)"),
Row(0, null) :: Row(3, "4") :: Row(5, "6") :: Nil
)
}

test("SPARK-14986: Outer lateral view with empty generate expression") {
checkAnswer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils {
checkSqlGeneration("SELECT map(1, 'a', 2, 'b')")
checkSqlGeneration("SELECT named_struct('c1',1,'c2',2,'c3',3)")
checkSqlGeneration("SELECT nanvl(a, 5), nanvl(b, 10), nanvl(d, c) from t2")
checkSqlGeneration("SELECT outer_explode(array())")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks right?

checkSqlGeneration("SELECT outer_posexplode(array())")
checkSqlGeneration("SELECT outer_inline(array(struct('a', 1)))")
checkSqlGeneration("SELECT rand(1)")
checkSqlGeneration("SELECT randn(3)")
checkSqlGeneration("SELECT struct(1,2,3)")
Expand Down