-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-13721][SQL] Support outer generators in DataFrame API #16608
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -163,9 +163,11 @@ object FunctionRegistry { | |
| expression[Abs]("abs"), | ||
| expression[Coalesce]("coalesce"), | ||
| expression[Explode]("explode"), | ||
| expressionGeneratorOuter[Explode]("explode_outer"), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might need an update on
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gatorsmile it uses the expression description of the underlying expression: https://github.com/apache/spark/pull/16608/files#diff-2c0350957ac4932d3f63796eceaeae08R517
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @hvanhovell
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would we need an update? What is the extra information you want to convey? Do you want to add a generic line saying that an outer generator might produce nulls instead of filtering out the row?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Update the description and provide an example to users? Maybe hardcode the function name instead of using
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not super enthusiastic about this. We have three options here:
I am fine with any. |
||
| expression[Greatest]("greatest"), | ||
| expression[If]("if"), | ||
| expression[Inline]("inline"), | ||
| expressionGeneratorOuter[Inline]("inline_outer"), | ||
| expression[IsNaN]("isnan"), | ||
| expression[IfNull]("ifnull"), | ||
| expression[IsNull]("isnull"), | ||
|
|
@@ -176,6 +178,7 @@ object FunctionRegistry { | |
| expression[Nvl]("nvl"), | ||
| expression[Nvl2]("nvl2"), | ||
| expression[PosExplode]("posexplode"), | ||
| expressionGeneratorOuter[PosExplode]("posexplode_outer"), | ||
| expression[Rand]("rand"), | ||
| expression[Randn]("randn"), | ||
| expression[Stack]("stack"), | ||
|
|
@@ -508,4 +511,12 @@ object FunctionRegistry { | |
| new ExpressionInfo(clazz.getCanonicalName, name) | ||
| } | ||
| } | ||
| private def expressionGeneratorOuter[T <: Generator : ClassTag] | ||
|
||
| (name: String): (String, (ExpressionInfo, FunctionBuilder)) = { | ||
|
||
| val regularGen = expression[T](name) | ||
|
||
| val outerBuilder = (args: Seq[Expression]) => { | ||
| GeneratorOuter(regularGen._2._2(args).asInstanceOf[Generator]) | ||
| } | ||
| (name, (expressionInfo[GeneratorOuter](name), outerBuilder)) | ||
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -204,6 +204,17 @@ case class Stack(children: Seq[Expression]) extends Generator { | |
| } | ||
| } | ||
|
|
||
| case class GeneratorOuter(child: Generator) extends UnaryExpression | ||
| with Generator { | ||
|
||
|
|
||
| final override def eval(input: InternalRow = null): TraversableOnce[InternalRow] = | ||
| throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") | ||
|
|
||
| final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = | ||
| throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") | ||
|
|
||
| override def elementSchema: StructType = child.elementSchema | ||
| } | ||
| /** | ||
| * A base class for [[Explode]] and [[PosExplode]]. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,7 +104,10 @@ case class Generate( | |
| def qualifiedGeneratorOutput: Seq[Attribute] = qualifier.map { q => | ||
| // prepend the new qualifier to the existed one | ||
| generatorOutput.map(a => a.withQualifier(Some(q))) | ||
| }.getOrElse(generatorOutput) | ||
| }.getOrElse(generatorOutput).map { | ||
|
||
| // if outer, make all attributes nullable, otherwise keep existing nullability | ||
| a => a.withNullability(outer || a.nullable) | ||
| } | ||
|
|
||
| def output: Seq[Attribute] = { | ||
| if (join) child.output ++ qualifiedGeneratorOutput else qualifiedGeneratorOutput | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2870,6 +2870,15 @@ object functions { | |
| */ | ||
| def explode(e: Column): Column = withExpr { Explode(e.expr) } | ||
|
|
||
| /** | ||
| * Creates a new row for each element in the given array or map column. | ||
| * Unlike explode, if the array/map is null or empty then null is produced. | ||
| * | ||
| * @group collection_funcs | ||
| * @since 2.2.0 | ||
| */ | ||
| def explode_outer(e: Column): Column = withExpr { GeneratorOuter(Explode(e.expr)) } | ||
|
|
||
| /** | ||
| * Creates a new row for each element with position in the given array or map column. | ||
| * | ||
|
|
@@ -2878,6 +2887,15 @@ object functions { | |
| */ | ||
| def posexplode(e: Column): Column = withExpr { PosExplode(e.expr) } | ||
|
|
||
| /** | ||
| * Creates a new row for each element with position in the given array or map column. | ||
| * Unlike posexplode, if the array/map is null or empty then the row (0, null) is produced. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. update this the result. |
||
| * | ||
| * @group collection_funcs | ||
| * @since 2.2.0 | ||
| */ | ||
| def posexplode_outer(e: Column): Column = withExpr { GeneratorOuter(PosExplode(e.expr)) } | ||
|
|
||
| /** | ||
| * Extracts json object from a json string based on json path specified, and returns json string | ||
| * of the extracted json object. It will return null if the input json string is invalid. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,6 +102,9 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils { | |
| checkSqlGeneration("SELECT map(1, 'a', 2, 'b')") | ||
| checkSqlGeneration("SELECT named_struct('c1',1,'c2',2,'c3',3)") | ||
| checkSqlGeneration("SELECT nanvl(a, 5), nanvl(b, 10), nanvl(d, c) from t2") | ||
| checkSqlGeneration("SELECT outer_explode(array())") | ||
|
||
| checkSqlGeneration("SELECT outer_posexplode(array())") | ||
| checkSqlGeneration("SELECT outer_inline(array(struct('a', 1)))") | ||
| checkSqlGeneration("SELECT rand(1)") | ||
| checkSqlGeneration("SELECT randn(3)") | ||
| checkSqlGeneration("SELECT struct(1,2,3)") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document what the return value means (especially that boolean value, but also the Seq[String] that's preexisting)