Skip to content

Conversation

@ueshin
Copy link
Member

@ueshin ueshin commented Nov 1, 2020

What changes were proposed in this pull request?

This is a backport of #30177.

As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use ContextAwareIterator to stop consuming after the task ends.

Why are the changes needed?

Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.

E.g.,:

spark.range(0, 100000, 1, 1).write.parquet(path)

spark.conf.set("spark.sql.columnVector.offheap.enabled", True)

def f(x):
    return 0

fUdf = udf(f, LongType())

spark.read.parquet(path).select(fUdf('id')).head()

This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added tests, and manually.

…g after the task ends

As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends.

Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.

E.g.,:

```py
spark.range(0, 100000, 1, 1).write.parquet(path)

spark.conf.set("spark.sql.columnVector.offheap.enabled", True)

def f(x):
    return 0

fUdf = udf(f, LongType())

spark.read.parquet(path).select(fUdf('id')).head()
```

This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor.

No.

Added tests, and manually.

Closes apache#30177 from ueshin/issues/SPARK-33277/python_pandas_udf.

Authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
@SparkQA
Copy link

SparkQA commented Nov 1, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35105/

@SparkQA
Copy link

SparkQA commented Nov 1, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35105/

@HyukjinKwon
Copy link
Member

Merged to branch-2.4.

HyukjinKwon pushed a commit that referenced this pull request Nov 2, 2020
…sumin…

### What changes were proposed in this pull request?

This is a backport of #30177.

As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends.

### Why are the changes needed?

Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.

E.g.,:

```py
spark.range(0, 100000, 1, 1).write.parquet(path)

spark.conf.set("spark.sql.columnVector.offheap.enabled", True)

def f(x):
    return 0

fUdf = udf(f, LongType())

spark.read.parquet(path).select(fUdf('id')).head()
```

This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests, and manually.

Closes #30218 from ueshin/issues/SPARK-33277/2.4/python_pandas_udf.

Authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
@HyukjinKwon HyukjinKwon closed this Nov 2, 2020
@SparkQA
Copy link

SparkQA commented Nov 2, 2020

Test build #130501 has finished for PR 30218 at commit 001a586.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants