Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jul 9, 2018

What changes were proposed in this pull request?

SparkSQL doesn't support to encode Option[Product] as a top-level row now, because in SparkSQL entire top-level row can't be null.

However for use cases like Aggregator, it is reasonable to use Option[Product] as buffer and output column types. Due to above limitation, we don't do it for now.

This patch proposes to encode Option[Product] at top-level as single struct column. So we can work around the issue that entire top-level row can't be null.

To summarize encoding of Product and Option[Product].

For Product, 1. at root level, the schema is all fields are flatten it into multiple columns. The Product can't be null, otherwise it throws an exception.

val df = Seq((1 -> "a"), (2 -> "b")).toDF()
df.printSchema()

root
 |-- _1: integer (nullable = false)
 |-- _2: string (nullable = true)
  1. At non-root level, Product is a struct type column.
val df = Seq((1, (1 -> "a")), (2, (2 -> "b")), (3, null)).toDF()
df.printSchema()

root
 |-- _1: integer (nullable = false)
 |-- _2: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: string (nullable = true)

For Option[Product], 1. it was not supported at root level. After this change, it is a struct type column.

val df = Seq(Some(1 -> "a"), Some(2 -> "b"), None).toDF()
df.printSchema

root
 |-- value: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: string (nullable = true)
  1. At non-root level, it is also a struct type column.
val df = Seq((1, Some(1 -> "a")), (2, Some(2 -> "b")), (3, None)).toDF()
df.printSchema

root
 |-- _1: integer (nullable = false)
 |-- _2: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: string (nullable = true)
  1. For use case like Aggregator, it was not supported too. After this change, we support to use Option[Product] as buffer/output column type.
val df = Seq(
    OptionBooleanIntData("bob", Some((true, 1))),
    OptionBooleanIntData("bob", Some((false, 2))),
    OptionBooleanIntData("bob", None)).toDF()

val group = df
    .groupBy("name")
    .agg(OptionBooleanIntAggregator("isGood").toColumn.alias("isGood"))
group.printSchema

root                                                  
 |-- name: string (nullable = true)                          
 |-- isGood: struct (nullable = true)
 |    |-- _1: boolean (nullable = false)
 |    |-- _2: integer (nullable = false)

The buffer and output type of OptionBooleanIntAggregator is both Option[(Boolean, Int).

How was this patch tested?

Added test.

@SparkQA
Copy link

SparkQA commented Jul 9, 2018

Test build #92729 has finished for PR 21732 at commit e1b5dee.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jul 9, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 9, 2018

Test build #92739 has finished for PR 21732 at commit e1b5dee.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jul 9, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 9, 2018

Test build #92748 has finished for PR 21732 at commit e1b5dee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jul 9, 2018

I'm wondering should we add encoders of Option of Product into object Encoders.

@viirya
Copy link
Member Author

viirya commented Jul 9, 2018

cc @cloud-fan @hvanhovell

@cloud-fan
Copy link
Contributor

how about we treat the top level Option[Product] as a single struct column? then we can git rid of this limitation entirely.

@viirya
Copy link
Member Author

viirya commented Jul 10, 2018

It sounds like much more behavior changing?

@cloud-fan
Copy link
Contributor

yes it is, but it makes the encoder framework more consistent. And making a failure case into runnable is a safe behavior change.

@viirya
Copy link
Member Author

viirya commented Jul 17, 2018

Non top-level and top-level encoders for Option[Product] have a little difference.

As you said, top-level Option[Product] can only be encoded as a single struct column.

For non top-level one, we can't apply the same change because it is already a struct column. We don't want to change current behavior of it from a struct column to a struct column of a struct column.

This means that we can remove the limitation of top-level Option[Product], but it doesn't help too much Aggregator issue here. We still need to use non top-level Option[Product] encoders for Aggregator case.

@cloud-fan Do you want to incorporate top-level Option[Product] encoders into this PR too? Or let's create another Jira & PR for it? Thanks.

@viirya
Copy link
Member Author

viirya commented Jul 19, 2018

ping @cloud-fan

@cloud-fan
Copy link
Contributor

Non top-level and top-level encoders for Option[Product] have a little difference.

Can we treat them the same but at the end of encoder creation, we flatten the Option[Product]?

@viirya
Copy link
Member Author

viirya commented Jul 19, 2018

At the end of encoder creation? You mean at the end of calling ExpressionEncoder.apply()? But it is used both for top-level encoder e.g., Dataset[Option[Product]] and non top-level encoder e.g., Aggregator's encoder. If we flatten it, doesn't it mean for top-level, it is encoded as a row, not a struct column?

}

case class OptionBooleanIntAggregator(colName: String)
extends Aggregator[Row, Option[(Boolean, Int)], Option[(Boolean, Int)]] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the expected schema after we apply an aggregator with Option[Product] as buffer/output?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a non top-level encoder, the output schema of Option[Product] should be struct column.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assuming non top level, Option[Product] is same as Product?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For non top level, [Option[Product] is same as Product. The difference is additional WrapOption and UnwrapOption around expressions.

@viirya
Copy link
Member Author

viirya commented Jul 26, 2018

ping @cloud-fan @hvanhovell

@cloud-fan
Copy link
Contributor

Again, can we always support Option[Product] with some special handling for top-level encoder expression?

@viirya
Copy link
Member Author

viirya commented Jul 27, 2018

@cloud-fan We can. Just wondering if you think it is good to have that in this PR too?

@cloud-fan
Copy link
Contributor

This PR is just a special handling for Option[Product] in Aggregator, I think we don't need it when we have the more general solution, right?

@viirya viirya changed the title [SPARK-24762][SQL] Aggregator should be able to use Option of Product encoder [SPARK-24762][SQL] Enable Option of Product encoders Jul 30, 2018
@viirya viirya force-pushed the SPARK-24762 branch 2 times, most recently from b6c6e9f to 4f5628d Compare July 30, 2018 02:33
@SparkQA
Copy link

SparkQA commented Jul 30, 2018

Test build #93763 has finished for PR 21732 at commit 2b73b33.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • * For example, we build an encoder forcase class Data(a: Int, b: String) and the real type

@SparkQA
Copy link

SparkQA commented Jul 30, 2018

Test build #93762 has finished for PR 21732 at commit 6e32abe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • * For example, we build an encoder forcase class Data(a: Int, b: String) and the real type

@SparkQA
Copy link

SparkQA commented Jul 30, 2018

Test build #93764 has finished for PR 21732 at commit b6c6e9f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 30, 2018

Test build #93765 has finished for PR 21732 at commit 4f5628d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • * For example, we build an encoder forcase class Data(a: Int, b: String) and the real type

@cloud-fan
Copy link
Contributor

last comment, LGTM otherwise

@SparkQA
Copy link

SparkQA commented Nov 22, 2018

Test build #99177 has finished for PR 21732 at commit 29de9e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 22, 2018

Test build #99178 has finished for PR 21732 at commit dbd8678.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 24, 2018

Test build #99222 has finished for PR 21732 at commit 62fdb17.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master, great work!

* flattened to top-level row, because in Spark SQL top-level row can't be null. This method
* returns true if `T` is serialized as struct and is not `Option` type.
*/
def isSerializedAsStructForTopLevel: Boolean = isSerializedAsStruct && !isOptionType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you send a followup PR to inline isOptionType if it's only used here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

@asfgit asfgit closed this in 6339c8c Nov 26, 2018
asfgit pushed a commit that referenced this pull request Nov 27, 2018
## What changes were proposed in this pull request?

This is follow-up of #21732. This patch inlines `isOptionType` method.

## How was this patch tested?

Existing tests.

Closes #23143 from viirya/SPARK-24762-followup.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@HyukjinKwon
Copy link
Member

Hm .. sorry for joining this party late. I was reading and testing it by myself.

scala> Seq((1, "a"), (2, "b")).toDF.show()
+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2|  b|
+---+---+


scala> Seq(1, 2).toDF.show()
+-----+
|value|
+-----+
|    1|
|    2|
+-----+
scala> Seq(Some((1, "a")), Some((2, "b"))).toDF.show()
+------+
| value|
+------+
|[1, a]|
|[2, b]|
+------+


scala> Seq(Some(1), Some(2)).toDF.show()
+-----+
|value|
+-----+
|    1|
|    2|
+-----+

I think this behaviour can be actually controversial. If we interpret Option as existent/missing value. Why did we interpret Option as Tuple1?

@viirya
Copy link
Member Author

viirya commented Dec 27, 2018

@HyukjinKwon What do you mean we interpret Option as Tuple1?

@cloud-fan
Copy link
Contributor

Seq(Some(1), Some(2)) is treated same as Seq(1, 2), because it's only a single column and we can make it nullable. So the Some here changes nothing but the column nullability.

Seq((1, "a"),(2, "b")) is special, as dataset encoder assumes the top level Product will never be null, and flatten it into 2 columns. If we add Some here, we can't do the flattening.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Dec 27, 2018

re: #21732 (comment) I was thinking both below

Seq(Some((1, "a")), Some((2, "b"))).toDF.show()
Seq((1, "a"), (2, "b")).toDF.show()

should produce the same result since

Seq(Some(1), Some(2)).toDF.show()
Seq(1, 2).toDF.show()

produces the same results anyhow. Apparently this looks why it has been disallowed.

@viirya
Copy link
Member Author

viirya commented Dec 27, 2018

Thanks for @cloud-fan's explanation. So I think @HyukjinKwon you mean why we interpret Option[Product] like Some((1, "a")), Some((2, "b")) as Tuple2.

For the following

Seq(Some((1, "a")), Some((2, "b"))).toDF.show()
Seq((1, "a"), (2, "b")).toDF.show()

we can't produce the same result since top-level null row is not allowed in Spark.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Dec 27, 2018

Yea, then why did we allow the different result? I was thinking we're going to allow this only for aggregators.

@viirya
Copy link
Member Author

viirya commented Dec 27, 2018

@HyukjinKwon there was a comment #21732 (comment) for it. This was originally to make Option[Product] work for aggregators, but it makes encoder framework inconsistent because the created encoders can used for aggregators but not for Dataset. So I think the idea is to make it consistent. Users are allowed to encoder Option[Product] as Dataset but they should be aware of the encoding.

@HyukjinKwon
Copy link
Member

Hm, for aggregators, I would consider this as non root level. Looks they use the same encoder but can't be the same.

@cloud-fan
Copy link
Contributor

@HyukjinKwon If we can go back, I'd say we should not have this optimization which flattens top-level Product when encoding. This brings the assumption that top level Product can't be null, because in Spark the top level Row can't be null.

Ideally Option[T] should be the same as T if T is nullable. It's just 2 different ways to represent null in Scala and Java. But because of the optimization I mentioned before, top-level Product is the only exception.

It's too late to revert that optimization, I think we should accept this special case.

@HyukjinKwon
Copy link
Member

I didn't mean that we should revert .. was just checking the PRs in my queue and was just curious.

I mean, I understood the limitation but failed to understand why it's been allowed. We exposed Option[T < Project] at top level but limitation is still there.

@HyukjinKwon
Copy link
Member

Ah, but you're saying Product at top-level is an exception and the other cases are all coherent? hmm ... okie.

@cloud-fan
Copy link
Contributor

Yes, top-level Product is the only exception, because of the flatten trick which has been there for years.

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

This is inspired during implementing apache#21732. For now `ScalaReflection` needs to consider how `ExpressionEncoder` uses generated serializers and deserializers. And `ExpressionEncoder` has a weird `flat` flag. After discussion with cloud-fan, it seems to be better to refactor `ExpressionEncoder`. It should make SPARK-24762 easier to do.

To summarize the proposed changes:

1. `serializerFor` and `deserializerFor` return expressions for serializing/deserializing an input expression for a given type. They are private and should not be called directly.
2. `serializerForType` and `deserializerForType` returns an expression for serializing/deserializing for an object of type T to/from Spark SQL representation. It assumes the input object/Spark SQL representation is located at ordinal 0 of a row.

So in other words, `serializerForType` and `deserializerForType` return expressions for atomically serializing/deserializing JVM object to/from Spark SQL value.

A serializer returned by `serializerForType` will serialize an object at `row(0)` to a corresponding Spark SQL representation, e.g. primitive type, array, map, struct.

A deserializer returned by `deserializerForType` will deserialize an input field at `row(0)` to an object with given type.

3. The construction of `ExpressionEncoder` takes a pair of serializer and deserializer for type `T`. It uses them to create serializer and deserializer for T <-> row serialization. Now `ExpressionEncoder` dones't need to remember if serializer is flat or not. When we need to construct new `ExpressionEncoder` based on existing ones, we only need to change input location in the atomic serializer and deserializer.

## How was this patch tested?

Existing tests.

Closes apache#22749 from viirya/SPARK-24762-refactor.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

SparkSQL doesn't support to encode `Option[Product]` as a top-level row now, because in SparkSQL entire top-level row can't be null.

However for use cases like Aggregator, it is reasonable to use `Option[Product]` as buffer and output column types. Due to above limitation, we don't do it for now.

This patch proposes to encode `Option[Product]` at top-level as single struct column. So we can work around the issue that entire top-level row can't be null.

To summarize encoding of `Product` and `Option[Product]`.

For `Product`, 1. at root level, the schema is all fields are flatten it into multiple columns. The `Product ` can't be null, otherwise it throws an exception.

```scala
val df = Seq((1 -> "a"), (2 -> "b")).toDF()
df.printSchema()

root
 |-- _1: integer (nullable = false)
 |-- _2: string (nullable = true)
```

2. At non-root level, `Product` is a struct type column.

```scala
val df = Seq((1, (1 -> "a")), (2, (2 -> "b")), (3, null)).toDF()
df.printSchema()

root
 |-- _1: integer (nullable = false)
 |-- _2: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: string (nullable = true)
```

For `Option[Product]`, 1. it was not supported at root level. After this change, it is a struct type column.

```scala
val df = Seq(Some(1 -> "a"), Some(2 -> "b"), None).toDF()
df.printSchema

root
 |-- value: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: string (nullable = true)
```

2. At non-root level, it is also a struct type column.

```scala
val df = Seq((1, Some(1 -> "a")), (2, Some(2 -> "b")), (3, None)).toDF()
df.printSchema

root
 |-- _1: integer (nullable = false)
 |-- _2: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: string (nullable = true)
```

3. For use case like Aggregator, it was not supported too. After this change, we support to use `Option[Product]` as buffer/output column type.

```scala
val df = Seq(
    OptionBooleanIntData("bob", Some((true, 1))),
    OptionBooleanIntData("bob", Some((false, 2))),
    OptionBooleanIntData("bob", None)).toDF()

val group = df
    .groupBy("name")
    .agg(OptionBooleanIntAggregator("isGood").toColumn.alias("isGood"))
group.printSchema

root
 |-- name: string (nullable = true)
 |-- isGood: struct (nullable = true)
 |    |-- _1: boolean (nullable = false)
 |    |-- _2: integer (nullable = false)
```

The buffer and output type of `OptionBooleanIntAggregator` is both `Option[(Boolean, Int)`.

## How was this patch tested?

Added test.

Closes apache#21732 from viirya/SPARK-24762.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

This is follow-up of apache#21732. This patch inlines `isOptionType` method.

## How was this patch tested?

Existing tests.

Closes apache#23143 from viirya/SPARK-24762-followup.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@viirya viirya deleted the SPARK-24762 branch December 27, 2023 18:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants