Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

For simplicity, all LambdaVariables are globally unique, to avoid any potential conflicts. However, this causes a perf problem: we can never hit codegen cache for encoder expressions that deal with collections (which means they contain LambdaVariable).

To overcome this problem, LambdaVariable should have per-query unique IDs. This PR does 2 things:

  1. refactor LambdaVariable to carry an ID, so that it's easier to change the ID.
  2. add an optimizer rule to reassign LambdaVariable IDs, which are per-query unique.

How was this patch tested?

new tests

@cloud-fan
Copy link
Contributor Author

cloud-fan commented May 29, 2019

@SparkQA
Copy link

SparkQA commented May 29, 2019

Test build #105891 has finished for PR 24735 at commit 55677c0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class DummyExpressionHolder(exprs: Seq[Expression]) extends LeafNode

@cloud-fan cloud-fan force-pushed the dataset branch 2 times, most recently from 3959b7a to fe9e44a Compare May 29, 2019 08:02
@SparkQA
Copy link

SparkQA commented May 29, 2019

Test build #105898 has finished for PR 24735 at commit 3959b7a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class DummyExpressionHolder(exprs: Seq[Expression]) extends LeafNode

@SparkQA
Copy link

SparkQA commented May 29, 2019

Test build #105899 has finished for PR 24735 at commit fe9e44a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class DummyExpressionHolder(exprs: Seq[Expression]) extends LeafNode

@SparkQA
Copy link

SparkQA commented May 29, 2019

Test build #105913 has finished for PR 24735 at commit 88346d7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class DummyExpressionHolder(exprs: Seq[Expression]) extends LeafNode

Copy link
Contributor

@rednaxelafx rednaxelafx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I like the idea of normalizing lambda variable IDs to make higher-order functions and codegen cache work better together. The details in implementation needs some polishing though.

BTW, there are a few change related to resolvedEnc that don't seem directly related to the topic of lambda variables. Can we separate that out to another PR instead? I like those improvements but they look a bit confusing in the context of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My earlier PR that added the whole-stage codegen ID used another metric for the same purpose:
e57f394#diff-0314224342bb8c30143ab784b3805d19R296

Should we try to make them use the exact same logic for checking whether or not codegen cache was hit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "starts from"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two traversals on the plan here sure makes the intent clean: one pass for collecting old-to-new ID mappings, and then another pass to actually do the transformation.

But efficiency-wise, these two traversals can be combined into one easily, right? Reducing the number of traversals can help save a lot of time when dealing with large plans.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There an implicitly assumption here that within a plan, all the LambdaVariables either hold IDs that were the original IDs (positive) or the reassigned ones (negative). We should probably add a comment on that, because if the positive/negative ones are mixed together, you can actually get a conflict when you do abs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this part. It might make codegen a bit easier to write but you're making unnecessary hoisting of local variables to Java object fields. Doesn't sound like a good idea to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this just moves code around: we already blindly put LambdaVariable to the mutable states. I agree that there is room to optimize this part, we can do it in followups.

@cloud-fan
Copy link
Contributor Author

BTW, there are a few change related to resolvedEnc that don't seem directly related to the topic of lambda variables.

Actually they are related. When using ExpressionEncoder directly, I added special logic to apply the new rule. Then I found out that I need to re-apply this special logic in Dataset twice. So I go ahead and refactor this part a bit, so that Dataset uses ExpressionEncoder directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule should be applied only Once instead of fixedPoint? otherwise, per-query unique ID might conflict?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly generated unique IDs are negative, so this rule is idempotent because it only catches positive IDs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if the rule applied twice with including the positive IDs in the second loop for some reason, the new ID starts from -1 again, then it conflicts the first -1, I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IDs should all be positive or negative. Let me add some checks to ensure that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, sounds great with the check.

@SparkQA
Copy link

SparkQA commented May 30, 2019

Test build #105936 has finished for PR 24735 at commit 8ac310c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class DummyExpressionHolder(exprs: Seq[Expression]) extends LeafNode

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably add comment about the id? The allocated ids here can't be directly used. It needs going through ReassignLambdaVariableID to reassign. If the expressions including LambdaVariable skip the normal query processing steps, we still make sure ReassignLambdaVariableID is applied.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allocated ids here can't be directly used

Yes they can. A globally unique ID is fine here, it just breaks the codegen cache.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There we do abs on positive and negative ids, if globally unique id is used, won't it probably get conflict?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the newly added check. If ReassignLambdaVariableID is applied, then all IDs are negative and unique. If the rule is not applied, then all IDs are positive and unique.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK this is safe as we never combine optimized plans.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just add sanity-check if id is negative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a check in the rule.

@SparkQA
Copy link

SparkQA commented May 30, 2019

Test build #105957 has finished for PR 24735 at commit eafdf2d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 30, 2019

Test build #105969 has finished for PR 24735 at commit 053b3ba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 12, 2019

Test build #106404 has finished for PR 24735 at commit 053b3ba.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 12, 2019

Test build #106410 has finished for PR 24735 at commit 053b3ba.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 13, 2019

Test build #106457 has finished for PR 24735 at commit 053b3ba.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 19, 2019

Test build #106655 has finished for PR 24735 at commit 053b3ba.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2019

Test build #106790 has finished for PR 24735 at commit 51211f4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 22, 2019

Test build #106791 has finished for PR 24735 at commit 51211f4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 23, 2019

Test build #106799 has finished for PR 24735 at commit 2713f38.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 24, 2019

Test build #106817 has finished for PR 24735 at commit 73c74df.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106854 has finished for PR 24735 at commit 73c74df.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@rednaxelafx rednaxelafx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Thanks! Merged to master.

HyukjinKwon pushed a commit that referenced this pull request Mar 2, 2020
### What changes were proposed in this pull request?

This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

Here is an example demonstrating the problem:

```scala
import org.apache.spark.sql._

val enc = implicitly[Encoder[(Int, Int)]]

val datasets = (1 to 100).par.map { _ =>
  val pairs = (1 to 100).map(x => (x, x))
  spark.createDataset(pairs)(enc)
}

datasets.reduce(_ union _).collect().foreach {
  pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```

Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.

This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.

The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Tested manually using the example listed above.

Thanks to smcnamara-stripe for identifying this bug.

Closes #26076 from JoshRosen/SPARK-29419.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit f4499f6)
Signed-off-by: HyukjinKwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Mar 2, 2020
### What changes were proposed in this pull request?

This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

Here is an example demonstrating the problem:

```scala
import org.apache.spark.sql._

val enc = implicitly[Encoder[(Int, Int)]]

val datasets = (1 to 100).par.map { _ =>
  val pairs = (1 to 100).map(x => (x, x))
  spark.createDataset(pairs)(enc)
}

datasets.reduce(_ union _).collect().foreach {
  pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```

Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.

This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.

The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Tested manually using the example listed above.

Thanks to smcnamara-stripe for identifying this bug.

Closes #26076 from JoshRosen/SPARK-29419.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Mar 2, 2020
### What changes were proposed in this pull request?

This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

Here is an example demonstrating the problem:

```scala
import org.apache.spark.sql._

val enc = implicitly[Encoder[(Int, Int)]]

val datasets = (1 to 100).par.map { _ =>
  val pairs = (1 to 100).map(x => (x, x))
  spark.createDataset(pairs)(enc)
}

datasets.reduce(_ union _).collect().foreach {
  pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```

Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.

This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.

The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Tested manually using the example listed above.

Thanks to smcnamara-stripe for identifying this bug.

Closes #26076 from JoshRosen/SPARK-29419.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit f4499f6)
Signed-off-by: HyukjinKwon <[email protected]>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
### What changes were proposed in this pull request?

This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

Here is an example demonstrating the problem:

```scala
import org.apache.spark.sql._

val enc = implicitly[Encoder[(Int, Int)]]

val datasets = (1 to 100).par.map { _ =>
  val pairs = (1 to 100).map(x => (x, x))
  spark.createDataset(pairs)(enc)
}

datasets.reduce(_ union _).collect().foreach {
  pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```

Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.

This bug is similar to SPARK-22355 / apache#19577, a similar problem in `Dataset.collect()`.

The fix implemented here is based on apache#24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Tested manually using the example listed above.

Thanks to smcnamara-stripe for identifying this bug.

Closes apache#26076 from JoshRosen/SPARK-29419.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants