Skip to content

Conversation

@fqaiser94
Copy link
Contributor

@fqaiser94 fqaiser94 commented Jan 2, 2020

What changes were proposed in this pull request?

Added a new withField method to the Column class. This method should allow users to add or replace a StructField in a StructType column (with very similar semantics to the withColumn method on Dataset).

Why are the changes needed?

Often Spark users have to work with deeply nested data e.g. to fix a data quality issue with an existing StructField. To do this with the existing Spark APIs, users have to rebuild the entire struct column.

For example, let's say you have the following deeply nested data structure which has a data quality issue (5 is missing):

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val data = spark.createDataFrame(sc.parallelize(
      Seq(Row(Row(Row(1, 2, 3), Row(Row(4, null, 6), Row(7, 8, 9), Row(10, 11, 12)), Row(13, 14, 15))))),
      StructType(Seq(
        StructField("a", StructType(Seq(
          StructField("a", StructType(Seq(
            StructField("a", IntegerType),
            StructField("b", IntegerType),
            StructField("c", IntegerType)))),
          StructField("b", StructType(Seq(
            StructField("a", StructType(Seq(
              StructField("a", IntegerType),
              StructField("b", IntegerType),
              StructField("c", IntegerType)))),
            StructField("b", StructType(Seq(
              StructField("a", IntegerType),
              StructField("b", IntegerType),
              StructField("c", IntegerType)))), 
            StructField("c", StructType(Seq(
              StructField("a", IntegerType),
              StructField("b", IntegerType),
              StructField("c", IntegerType))))
          ))), 
          StructField("c", StructType(Seq(
            StructField("a", IntegerType),
            StructField("b", IntegerType),
            StructField("c", IntegerType))))
        )))))).cache

data.show(false)
+---------------------------------+                                             
|a                                |
+---------------------------------+
|[[1, 2, 3], [[4,, 6], [7, 8, 9]]]|
+---------------------------------+

Currently, to replace the missing value users would have to do something like this:

val result = data.withColumn("a", 
  struct(
    $"a.a", 
    struct(
      struct(
        $"a.b.a.a", 
        lit(5).as("b"), 
        $"a.b.a.c"
      ).as("a"), 
      $"a.b.b", 
      $"a.b.c"
    ).as("b"), 
    $"a.c"
  ))

result.show(false)
+---------------------------------------------------------------+
|a                                                              |
+---------------------------------------------------------------+
|[[1, 2, 3], [[4, 5, 6], [7, 8, 9], [10, 11, 12]], [13, 14, 15]]|
+---------------------------------------------------------------+

As you can see above, with the existing methods users must call the struct function and list all fields, including fields they don't want to change. This is not ideal as:

this leads to complex, fragile code that cannot survive schema evolution.
SPARK-16483

In contrast, with the method added in this PR, a user could simply do something like this:

val result = data.withColumn("a", 'a.withField("b.a.b", lit(5)))
result.show(false)
+---------------------------------------------------------------+
|a                                                              |
+---------------------------------------------------------------+
|[[1, 2, 3], [[4, 5, 6], [7, 8, 9], [10, 11, 12]], [13, 14, 15]]|
+---------------------------------------------------------------+

This is the first of maybe a few methods that could be added to the Column class to make it easier to manipulate nested data. Other methods under discussion in SPARK-22231 include drop and renameField. However, these should be added in a separate PR.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New unit tests were added. Jenkins must pass them.

Related JIRAs:

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this since it's already able to work around (e.g., withColumn and struct). Does any of DBMSes have such function as a reference?

@hvanhovell
Copy link
Contributor

I would be supportive of this.

@hvanhovell
Copy link
Contributor

ok to test

@HyukjinKwon
Copy link
Member

I didn't notice there are investigations and discussion already made in the JIRA, which I wanted to happen. I am good with it.

@HyukjinKwon
Copy link
Member

cc @rxin, @dbtsai, @viirya

@SparkQA
Copy link

SparkQA commented Jan 2, 2020

Test build #116026 has finished for PR 27066 at commit 9b2b5dc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AddField(struct: Expression, fieldName: String, field: Expression)

@rxin
Copy link
Contributor

rxin commented Jan 2, 2020

Can we take one step back and come up with the list of functions we want to add, and make sure they are coherent, and then get to PR reviews?

@fqaiser94 fqaiser94 force-pushed the SPARK-22231-withField branch from 6ac28c9 to b768da7 Compare March 30, 2020 01:23
@SparkQA
Copy link

SparkQA commented Mar 30, 2020

Test build #120564 has finished for PR 27066 at commit 6ac28c9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 30, 2020

Test build #120565 has finished for PR 27066 at commit 3813002.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fqaiser94
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 31, 2020

Test build #120615 has finished for PR 27066 at commit a0eea6d.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 31, 2020

Test build #120617 has finished for PR 27066 at commit e84a86d.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 31, 2020

Test build #120640 has finished for PR 27066 at commit aa17612.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fqaiser94
Copy link
Contributor Author

Updated the code in this PR based on some learnings I had while developing an external library with feature, including:

  • adding an equivalent method to the Python Column API
  • changing the name and implementation of the underlying Catalyst Expression from AddField to AddFields (plural) which allows potentially for optimization rules that collapse multiple AddFields calls into a single AddFields call (see example rule here)
  • Making the constructor for AddFields more SQL-friendly i.e. accepts just a Seq[Expression]

All tests are passing now. Please feel free to leave a review/comments.

@rxin
Copy link
Contributor

rxin commented Mar 31, 2020

Sorry for asking this, but can you break this into multiple pull requests? cc @cloud-fan for review.

@fqaiser94 fqaiser94 changed the title [SPARK-22231][SQL] Add withField method to Column class [SPARK-31317][SQL] Add withField method to Column class Mar 31, 2020
@dbtsai
Copy link
Member

dbtsai commented Mar 31, 2020

Can you show the plan of this implementation by explain(true)?

@fqaiser94
Copy link
Contributor Author

@rxin no problems, I've removed the Python changes from this PR.
If the PR is still too big, I'm happy to also remove the changes to Column.scala and ColumnExpressionSuite.scala.

@fqaiser94
Copy link
Contributor Author

@dbtsai here is an example explain output with the current implementation:

val df = spark.createDataFrame(sparkContext.parallelize(
  Row(Row(1, 2, 3)) :: Nil),
  StructType(Seq(StructField("a", StructType(Seq(
    StructField("a", IntegerType),
    StructField("b", IntegerType),
    StructField("c", IntegerType)))))))

val result = df.withColumn("a", 'a.withField("d", lit(4)))

result.show()

+------------+
|           a|
+------------+
|[1, 2, 3, 4]|
+------------+


result.explain(true)

== Parsed Logical Plan ==
'Project [add_fields('a, d, 4) AS a#3]
+- LogicalRDD [a#1], false

== Analyzed Logical Plan ==
a: struct<a:int,b:int,c:int,d:int>
Project [add_fields(a#1, d, 4) AS a#3]
+- LogicalRDD [a#1], false

== Optimized Logical Plan ==
Project [add_fields(a#1, d, 4) AS a#3]
+- LogicalRDD [a#1], false

== Physical Plan ==
*(1) Project [add_fields(a#1, d, 4) AS a#3]
+- *(1) Scan ExistingRDD[a#1]

@SparkQA
Copy link

SparkQA commented Apr 1, 2020

Test build #120652 has finished for PR 27066 at commit 503cc37.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 1, 2020

Test build #120654 has finished for PR 27066 at commit b28c76b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 5, 2020

Test build #124957 has finished for PR 27066 at commit 4315e92.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fqaiser94
Copy link
Contributor Author

retest this please

1 similar comment
@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #125072 has finished for PR 27066 at commit 4315e92.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #5051 has started for PR 27066 at commit 4315e92.

@fqaiser94
Copy link
Contributor Author

fqaiser94 commented Jul 6, 2020

Not sure why but it looks like someone aborted the build mid-way.
Can someone please kick-off a new build? I don't think I'm on the jenkins white-list

@viirya
Copy link
Member

viirya commented Jul 6, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #125125 has finished for PR 27066 at commit 4315e92.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fqaiser94
Copy link
Contributor Author

looks like a flaky test failed unfortunately

@cloud-fan
Copy link
Contributor

retest this please

@cloud-fan
Copy link
Contributor

jenkins is very unstable recently...

@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125198 has finished for PR 27066 at commit 4315e92.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 4bbc343 Jul 7, 2020
@cloud-fan
Copy link
Contributor

@fqaiser94 can you leave a comment in the JIRA ticket? So that I can assign it to you. Thanks for your great work!

@fqaiser94
Copy link
Contributor Author

Done, and thank you all for a great experience!

*
* {{{
* val df = sql("SELECT named_struct('a', 1, 'b', 2) struct_col")
* df.select($"struct_col".withField("c", lit(3)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have any of you try to run these examples? The optimizer ConstantFolding rule will break these examples.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird, we have tests to cover these examples. @fqaiser94 can you take a look?

Copy link
Contributor Author

@fqaiser94 fqaiser94 Aug 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I failed to write a test case to cover this scenario, my bad.
And yea, I just tried this example again, and I can see that it fails.
The issue is that I override foldable for this Unevaluable Expression. And so, when foldable returns true, Spark tries to evaluate the expression and it fails at that point.
I kind-of realized this as well recently and in my PR for dropFields here, I've fixed the issue (basically i just don't override foldable anymore, which by default returns false).
I guess I should submit a follow-up PR to fix this immediately with associated unit tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising the issue @gatorsmile
I've created a JIRA and PR to address the issue.

HyukjinKwon pushed a commit that referenced this pull request Sep 16, 2020
### What changes were proposed in this pull request?

This PR adds a `withField` method on the pyspark Column class to call the Scala API method added in #27066.

### Why are the changes needed?

To update the Python API to match a new feature in the Scala API.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit test

Closes #29699 from Kimahriman/feature/pyspark-with-field.

Authored-by: Adam Binford <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.