-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25591][PySpark][SQL] Avoid overwriting deserialized accumulator #22635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @HyukjinKwon |
|
Test build #96960 has finished for PR 22635 at commit
|
| _accumulatorRegistry[aid] = accum | ||
| return accum | ||
| # If this certain accumulator was deserialized, don't overwrite it. | ||
| if aid in _accumulatorRegistry: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be if aid in _accumulatorRegistry and _accumulatorRegistry[aid]._deserialized is True
or:
if aid in _accumulatorRegistry:
_accumulatorRegistry[aid]._deserialize = True
return _accumulatorRegistry[aid]
To make double sure that this function always returns a deserialize version of the accum ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only save deserialized accumulators (_deserialized is True) into this dict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That doesnt seem right because the constructor for Accumulator has:
...
self._deserialized = False
_accumulatorRegistry[aid] = self
PS: First time Im looking at this code, so not too familiar with it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but _deserialize_accumulator is only called when doing deserialzation at executors. The constructor saves accumulators in _accumulatorRegistry at driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see - got it 👍
|
Thanks for cc'ing me. Will take a look this week. |
|
Since this is for correctness, I think we should include this into 2.4 if it can catch up. cc @cloud-fan |
| data = data.withColumn("out1", func_udf(data["a"])) | ||
| data = data.withColumn("out2", func_udf2(data["b"])) | ||
| data.collect() | ||
| self.assertEqual(test_accum.value, 101) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya, can we just use int for data and accumulator as well in this test case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
| _accumulatorRegistry[aid] = accum | ||
| return accum | ||
| # If this certain accumulator was deserialized, don't overwrite it. | ||
| if aid in _accumulatorRegistry: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, so the problem is this accumulator is de/serialized multiple times and _deserialize_accumulator modifies the global status multiple times. I see. LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
|
Nice catch @viirya LGTM. |
|
Thanks @HyukjinKwon |
|
Test build #97100 has finished for PR 22635 at commit
|
## What changes were proposed in this pull request? If we use accumulators in more than one UDFs, it is possible to overwrite deserialized accumulators and its values. We should check if an accumulator was deserialized before overwriting it in accumulator registry. ## How was this patch tested? Added test. Closes #22635 from viirya/SPARK-25591. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: hyukjinkwon <[email protected]> (cherry picked from commit cb90617) Signed-off-by: hyukjinkwon <[email protected]>
|
Merged to master and branch-2.4. |
|
@cloud-fan @viirya Any chance of this making it into 2.4 ? |
|
How about pandas UDF? How about using RDD APIs? Do we face the same issues? |
|
@AbdealiJK since RC3 is not cut, this will be in 2.4. |
|
Yea, same issue exists in Pandas UDFs too (quickly double checked). This PR fixes it. That code path is rather one same place FYI. |
|
@cloud-fan @gatorsmile @HyukjinKwon Thanks. Yes. As Pandas UDF has the same issue and it is fixed by this PR. |
|
Please review https://issues.apache.org/jira/browse/SPARK-26019 I suspect this change might have introduced SPARK-26019 regression. |
|
@Tagar I will look into it. Thanks. |
|
Thank you @viirya |
|
How does it related with the JIRA? looks not quite related from a cursory look. Please leave some analysis next time or at least testing it before/after the specific commit. Let me take a look anyway. |
|
This is fixed in 2.4.0 and your issue is when 2.3.1 -> 2.3.2. It's not related. |
|
Yeah, thanks @HyukjinKwon. I have an initial look, looks like it is not quite related. |
|
@viirya I appologize, as I mentioned in my comment in SPARK-26019, it's due to another change |
## What changes were proposed in this pull request? If we use accumulators in more than one UDFs, it is possible to overwrite deserialized accumulators and its values. We should check if an accumulator was deserialized before overwriting it in accumulator registry. ## How was this patch tested? Added test. Closes apache#22635 from viirya/SPARK-25591. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
What changes were proposed in this pull request?
If we use accumulators in more than one UDFs, it is possible to overwrite deserialized accumulators and its values. We should check if an accumulator was deserialized before overwriting it in accumulator registry.
How was this patch tested?
Added test.