Skip to content

Conversation

@fqaiser94
Copy link
Contributor

@fqaiser94 fqaiser94 commented Sep 17, 2020

What changes were proposed in this pull request?

  1. Refactored WithFields Expression to make it more extensible (now UpdateFields).
  2. Added a new dropFields method to the Column class. This method should allow users to drop a StructField in a StructType column (with similar semantics to the drop method on Dataset).

Why are the changes needed?

Often Spark users have to work with deeply nested data e.g. to fix a data quality issue with an existing StructField. To do this with the existing Spark APIs, users have to rebuild the entire struct column.

For example, let's say you have the following deeply nested data structure which has a data quality issue (5 is missing):

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val data = spark.createDataFrame(sc.parallelize(
      Seq(Row(Row(Row(1, 2, 3), Row(Row(4, null, 6), Row(7, 8, 9), Row(10, 11, 12)), Row(13, 14, 15))))),
      StructType(Seq(
        StructField("a", StructType(Seq(
          StructField("a", StructType(Seq(
            StructField("a", IntegerType),
            StructField("b", IntegerType),
            StructField("c", IntegerType)))),
          StructField("b", StructType(Seq(
            StructField("a", StructType(Seq(
              StructField("a", IntegerType),
              StructField("b", IntegerType),
              StructField("c", IntegerType)))),
            StructField("b", StructType(Seq(
              StructField("a", IntegerType),
              StructField("b", IntegerType),
              StructField("c", IntegerType)))), 
            StructField("c", StructType(Seq(
              StructField("a", IntegerType),
              StructField("b", IntegerType),
              StructField("c", IntegerType))))
          ))), 
          StructField("c", StructType(Seq(
            StructField("a", IntegerType),
            StructField("b", IntegerType),
            StructField("c", IntegerType))))
        )))))).cache

data.show(false)
+---------------------------------+                                             
|a                                |
+---------------------------------+
|[[1, 2, 3], [[4,, 6], [7, 8, 9]]]|
+---------------------------------+

Currently, to drop the missing value users would have to do something like this:

val result = data.withColumn("a", 
  struct(
    $"a.a", 
    struct(
      struct(
        $"a.b.a.a", 
        $"a.b.a.c"
      ).as("a"), 
      $"a.b.b", 
      $"a.b.c"
    ).as("b"), 
    $"a.c"
  ))

result.show(false)
+---------------------------------------------------------------+
|a                                                              |
+---------------------------------------------------------------+
|[[1, 2, 3], [[4, 6], [7, 8, 9], [10, 11, 12]], [13, 14, 15]]|
+---------------------------------------------------------------+

As you can see above, with the existing methods users must call the struct function and list all fields, including fields they don't want to change. This is not ideal as:

this leads to complex, fragile code that cannot survive schema evolution.
SPARK-16483

In contrast, with the method added in this PR, a user could simply do something like this to get the same result:

val result = data.withColumn("a", 'a.dropFields("b.a.b"))
result.show(false)
+---------------------------------------------------------------+
|a                                                              |
+---------------------------------------------------------------+
|[[1, 2, 3], [[4, 6], [7, 8, 9], [10, 11, 12]], [13, 14, 15]]|
+---------------------------------------------------------------+

This is the second of maybe 3 methods that could be added to the Column class to make it easier to manipulate nested data.
Other methods under discussion in SPARK-22231 include withFieldRenamed.
However, this should be added in a separate PR.

Does this PR introduce any user-facing change?

The documentation for Column.withField method has changed to include an additional note about how to write optimized queries when adding multiple nested Column directly.

How was this patch tested?

New unit tests were added. Jenkins must pass them.

Related JIRAs:

More discussion on this topic can be found here:

* [[CreateNamedStruct]] respectively inside of [[UpdateFields]].
*/
def apply(values: Seq[(StructField, Expression)]): Seq[(StructField, Expression)]
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya I'm not quite done with this PR yet but I wanted to share it with you early because some of the changes I'm making in here may be helpful for #29587 (assuming this PR is accepted). Specifically, it would be possible to implement sorting of fields in a struct simply by:

case class OrderStructFieldsByName() extends StructFieldsOperation {
  override def apply(values: Seq[(StructField, Expression)]): Seq[(StructField, Expression)] =
    values.sortBy { case (field, _) => field.name }
}

UpdateFields(structExpr, OrderStructFieldsByName() :: Nil)

@SparkQA
Copy link

SparkQA commented Sep 18, 2020

Test build #128840 has finished for PR 29795 at commit ac149a5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

WithField("a2", UpdateFields(GetStructField('a1, 0), WithField("b3", 2) :: Nil)),
WithField("a2", UpdateFields(GetStructField('a1, 0), WithField("b3", 2) :: WithField("c3", 3) :: Nil))
// scalastyle:on line.size.limit
)).as("a1"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This first WithField in here is entirely redundant and ideally we would optimize this away as well.
However, in the interests of keeping this PR simple, I have opted to forgo writing any such optimizer rule.
If necessary, we can address this in a future PR.

UpdateFields('a1, Seq(
WithField("a2", UpdateFields(GetStructField('a1, 0), Seq(DropField("b3")))),
WithField("a2", UpdateFields(GetStructField('a1, 0), Seq(DropField("b3"), DropField("c3"))))
)).as("a1"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This first WithField in here is entirely redundant as well and ideally we would optimize this away as well.
However, in the interests of keeping this PR simple, I have opted to forgo writing any such optimizer rule.
If necessary, we can address this in a future PR.

* df.select($"struct_col".withField("a", $"struct_col.a".withField("c", lit(3)).withField("d", lit(4))))
* // result: {"a":{"a":1,"b":2,"c":3,"d":4}}
* }}}
*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the issues in master branch with the current Column.withField implementation is the size of the parsed logical plan scales non-linearly with the number of directly-add-nested-column operations. This results in the driver spending a considerable amount of time analyzing and optimizing the logical plan (literally minutes, if it ever completes).
Users can avoid this issue entirely by writing their queries in a performant manner.
For example:

  lazy val nullableStructLevel2: DataFrame = spark.createDataFrame(
    sparkContext.parallelize(Row(Row(Row(0))) :: Nil),
    StructType(Seq(
      StructField("a1", StructType(Seq(
        StructField("a2", StructType(Seq(
          StructField("col0", IntegerType, nullable = false))),
          nullable = true))),
        nullable = true))))

  val numColsToAdd = 100

  val expectedRows = Row(Row(Row(0 to numColsToAdd: _*))) :: Nil
  val expectedSchema =
    StructType(Seq(
      StructField("a1", StructType(Seq(
        StructField("a2", StructType((0 to numColsToAdd).map(num =>
          StructField(s"col$num", IntegerType, nullable = false))),
          nullable = true))),
        nullable = true)))

  test("good way of writing query") {
    // Spark can easily analyze and optimize the parsed logical plan in seconds
    checkAnswer(
      nullableStructLevel2
        .select(col("a1").withField("a2", (1 to numColsToAdd).foldLeft(col("a1.a2")) {
          (column, num) => column.withField(s"col$num", lit(num))
        }).as("a1")),
      expectedRows,
      expectedSchema)
  }

  test("bad way of writing the same query that will eventually fail with timeout exception with as little as numColsToAdd = 10") {
    checkAnswer(
      nullableStructLevel2
        .select((1 to numColsToAdd).foldLeft(col("a1")) {
          (column, num) => column.withField(s"a2.col$num", lit(num))
        }.as("a1")),
      expectedRows,
      expectedSchema)
  }

This issue and its solution is what I've attempted to capture here as part of the method doc.

There are other options here instead of method-doc-note:

  • We could potentially write some kind of optimization in updateFieldsHelper (I've bashed my head against this for a while but haven't been able to come up with anything satisfactory).
  • Remove the ability to change nested fields directly entirely. While this has the advantage that there will be absolutely no way to run into this "performance" issue, the user-experience definitely suffers for more advanced users who would know how to use these methods properly.

I've gone with what made most sense to me (method-doc-note) but am open to hearing other people's thoughts on the matter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the same issue happens in withColumn as well. I'm fine with method doc.

StructField("d", IntegerType, nullable = false),
StructField("e", IntegerType, nullable = false))),
nullable = false))))
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't expect anyone will be surprised or feel that this is wrong but nevertheless, I did want to highlight this behaviour. Same goes for the two tests below.

@fqaiser94 fqaiser94 changed the title [SPARK-32511][SQL][WIP] Add dropFields method to Column class [SPARK-32511][SQL] Add dropFields method to Column class Sep 21, 2020
@SparkQA
Copy link

SparkQA commented Sep 21, 2020

Test build #128923 has finished for PR 29795 at commit 2f16213.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fqaiser94
Copy link
Contributor Author

cc @cloud-fan @dbtsai @maropu @viirya

@SparkQA
Copy link

SparkQA commented Sep 23, 2020

Test build #128995 has finished for PR 29795 at commit 650d366.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 25, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33719/

@SparkQA
Copy link

SparkQA commented Sep 25, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33719/

* represents the next depth.
* @param nullable: This value is used to set the nullability of StructType columns.
*/
def nestedDf(maxDepth: Int, numColsAtEachDepth: Int, nullable: Boolean): DataFrame = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the nullable only controls the top-level column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the nullable parameter affects the top-level and nested StructType columns.

@SparkQA
Copy link

SparkQA commented Sep 25, 2020

Test build #129098 has finished for PR 29795 at commit 53d83b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

To non-nullable StructTypes using performant method 4595 4927 470 0.0 Infinity 1.0X
To nullable StructTypes using performant method 5185 5516 468 0.0 Infinity 0.9X


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the benchmark up a little bit so that we can compare the performant and non-performant methods of updating multiple nested columns.

To non-nullable StructTypes using performant method 10 11 2 0.0 Infinity 1.0X
To nullable StructTypes using performant method 9 10 1 0.0 Infinity 1.0X
To non-nullable StructTypes using non-performant method 2457 2464 10 0.0 Infinity 0.0X
To nullable StructTypes using non-performant method 42641 43804 1644 0.0 Infinity 0.0X
Copy link
Contributor Author

@fqaiser94 fqaiser94 Sep 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As expected, this last result isn't great (43 seconds).
It's partially because of the non-performant method and partially because the optimizer rules aren't able to perfectly optimize complex nullable StructType scenarios (I've documented these scenarios in this commit).
It should be possible to improve the optimizer rules further in the future. I have a couple of simple ideas I'm toying around with but it will take me a while to reason/test if they are safe from a correctness point of view.

@SparkQA
Copy link

SparkQA commented Sep 27, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33771/

@SparkQA
Copy link

SparkQA commented Sep 27, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33771/

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Test build #129156 has finished for PR 29795 at commit cca6f37.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33868/

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33868/

@SparkQA
Copy link

SparkQA commented Sep 30, 2020

Test build #129251 has finished for PR 29795 at commit 7e51f35.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait ModifyNestedColumns

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 2793347 Oct 6, 2020
dongjoon-hyun pushed a commit that referenced this pull request Oct 6, 2020
… Maven

### What changes were proposed in this pull request?

This PR fixes the broken build for Scala 2.13 with Maven.
https://github.com/apache/spark/pull/29913/checks?check_run_id=1187826966

#29795 was merged though it doesn't successfully finish the build for Scala 2.13

### Why are the changes needed?

To fix the build.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`build/mvn -Pscala-2.13 -Phive -Phive-thriftserver -DskipTests package`

Closes #29954 from sarutak/hotfix-seq.

Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants