-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33277][PYSPARK][SQL] Use ContextAwareIterator to stop consuming after the task ends. #30242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
676c530 to
895d91d
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #130582 has finished for PR 30242 at commit
|
|
Kubernetes integration test starting |
|
Test build #130584 has finished for PR 30242 at commit
|
|
Kubernetes integration test status success |
|
Test build #130586 has finished for PR 30242 at commit
|
|
Test build #130589 has finished for PR 30242 at commit
|
|
Test build #130591 has finished for PR 30242 at commit
|
|
Kubernetes integration test starting |
|
Test build #130616 has finished for PR 30242 at commit
|
|
Kubernetes integration test status success |
|
Jenkins, retest this please. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #130668 has finished for PR 30242 at commit
|
|
Test build #130674 has finished for PR 30242 at commit
|
|
Now PySpark tests seem not fail. |
|
Test build #130677 has finished for PR 30242 at commit
|
|
cc @zsxwing too |
|
gentle ping |
|
ping @zsxwing |
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
Show resolved
Hide resolved
|
|
||
| while (thread == null && !failed.get()) { | ||
| // Wait for a while since the writer thread might not reach to consuming the iterator yet. | ||
| context.wait(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean Thread.sleep(10)? Object.wait is not supposed to use like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do mean wait. This will run within synchronized(context) and we should release the lock for the writer thread while waiting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't realize it. It's better to not rely on this in a listener. This is something we should consider to improve in future. It's a bad idea to hold an implicit lock when calling user's listener because it's pretty easy to cause surprising deadlock.
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
Outdated
Show resolved
Hide resolved
|
|
||
| val thread = new AtomicReference[Thread]() | ||
|
|
||
| if (iter.hasNext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this change the thread that iter.hasNext is running? We can add the listeners without checking it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this is to make sure the upstream iterator is initialized. The upstream iterator must be initialized earlier as it might register another completion listener and the listener should run later than this one.
| failed.set(true) | ||
| } | ||
|
|
||
| context.addTaskCompletionListener[Unit] { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assumes the task completion listener to stop thread runs before this one. Otherwise, it would hang forever. I'm wondering if there is any better solution to avoid this implicit assumption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The task completion lister will wait for the thread to stop within this listener, and the thread will stop soon as it checks !context.isCompleted() && !context.isInterrupted().
|
Test build #131717 has finished for PR 30242 at commit
|
| // Use `context.wait()` instead of `Thread.sleep()` here since the task completion lister | ||
| // works under `synchronized(context)`. We might need to consider to improve in the future. | ||
| // It's a bad idea to hold an implicit lock when calling user's listener because it's | ||
| // pretty easy to cause surprising deadlock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit scary. Is there a better way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bad idea to hold an implicit lock when calling user's listener because it's pretty easy to cause surprising deadlock.
Maybe we can fix this first. The this listener doesn't need to rely on an implicit lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Let me change the strategy here.
…g after the task ends ### What changes were proposed in this pull request? This is a retry of #30177. This is not a complete fix, but it would take long time to complete (#30242). As discussed offline, at least using `ContextAwareIterator` should be helpful enough for many cases. As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. ### Why are the changes needed? Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests, and manually. Closes #30899 from ueshin/issues/SPARK-33277/context_aware_iterator. Authored-by: Takuya UESHIN <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…g after the task ends ### What changes were proposed in this pull request? This is a retry of #30177. This is not a complete fix, but it would take long time to complete (#30242). As discussed offline, at least using `ContextAwareIterator` should be helpful enough for many cases. As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. ### Why are the changes needed? Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests, and manually. Closes #30899 from ueshin/issues/SPARK-33277/context_aware_iterator. Authored-by: Takuya UESHIN <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 5c9b421) Signed-off-by: Dongjoon Hyun <[email protected]>
…g after the task ends ### What changes were proposed in this pull request? This is a retry of #30177. This is not a complete fix, but it would take long time to complete (#30242). As discussed offline, at least using `ContextAwareIterator` should be helpful enough for many cases. As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. ### Why are the changes needed? Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests, and manually. Closes #30899 from ueshin/issues/SPARK-33277/context_aware_iterator. Authored-by: Takuya UESHIN <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 5c9b421) Signed-off-by: Dongjoon Hyun <[email protected]>
…g after the task ends ### What changes were proposed in this pull request? This is a retry of apache#30177. This is not a complete fix, but it would take long time to complete (apache#30242). As discussed offline, at least using `ContextAwareIterator` should be helpful enough for many cases. As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. ### Why are the changes needed? Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests, and manually.
…suming after the task ends ### What changes were proposed in this pull request? This is a backport of #30899. This is not a complete fix, but it would take long time to complete (#30242). As discussed offline, at least using `ContextAwareIterator` should be helpful enough for many cases. As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. ### Why are the changes needed? Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests, and manually. Closes #30913 from ueshin/issues/SPARK-33277/2.4/context_aware_iterator. Authored-by: Takuya UESHIN <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This is a retry of #30177.
Makes
TaskCompletionevent thread wait until the thread ends to avoid the race condition.Why are the changes needed?
There are still sometimes crashes of executors as discussed at #30177 (comment).
The race condition could happen between
!context.isCompleted() && !context.isInterrupted()anditer.hasNextin thehasNextmethod.This is because the
TaskCompletionevent thread could close the upstream iterator even between them.We should make the event wait for a while until the consuming thread ends which should end soon as the iterator returns
falseinhasNext.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests.