Skip to content

Conversation

@vundela
Copy link

@vundela vundela commented Apr 20, 2017

What changes were proposed in this pull request?

In pyspark when multiple threads are used, broadcast variables are pickled with wrong PythonRDD wrap functions which leads to the following exception(Because of the race condition between the threads on java side with py4j).

16/01/08 17:10:20 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 9)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/broadcast.py", line 39, in _from_id
raise Exception("Broadcast variable '%s' not loaded!" % bid)
Exception: (Exception("Broadcast variable '6' not loaded!",), <function _from_id at 0xce7a28>, (6L,))

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

This change will fix the race condition by making sure that broadcast variables are pickled with same pythonRDD function.

How was this patch tested?

  1. Reproduced the issue mentioned in SPARK-12717, following the instructions specified in jira
  2. Make sure that issue is fixed with the changes.

Please review http://spark.apache.org/contributing.html before opening a pull request.

@vundela vundela changed the title [SPARK-12717][PYSPARK] Resolving race condition with pyspark broadcas… [SPARK-12717][PYSPARK] Resolving race condition with pyspark broadcasts when using multiple threads Apr 20, 2017
@vanzin
Copy link
Contributor

vanzin commented Apr 20, 2017

Please remove the boilerplate message at the end of your PR description.

ok to test

@maver1ck
Copy link
Contributor

I tested your patch in our environment.

Problem still exists.

Job aborted due to stage failure: Task 0 in stage 22.0 failed 8 times, most recent failure: Lost task 0.7 in stage 22.0 (TID 138, dwh-hn30.adpilot.co): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/grid/3/hadoop/yarn/log/usercache/bi/appcache/application_1492634694033_0092/container_e538_1492634694033_0092_01_000003/pyspark.zip/pyspark/worker.py", line 161, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/grid/3/hadoop/yarn/log/usercache/bi/appcache/application_1492634694033_0092/container_e538_1492634694033_0092_01_000003/pyspark.zip/pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
  File "/grid/3/hadoop/yarn/log/usercache/bi/appcache/application_1492634694033_0092/container_e538_1492634694033_0092_01_000003/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/grid/3/hadoop/yarn/log/usercache/bi/appcache/application_1492634694033_0092/container_e538_1492634694033_0092_01_000003/pyspark.zip/pyspark/serializers.py", line 419, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/grid/3/hadoop/yarn/log/usercache/bi/appcache/application_1492634694033_0092/container_e538_1492634694033_0092_01_000003/pyspark.zip/pyspark/broadcast.py", line 39, in _from_id
    raise Exception("Broadcast variable '%s' not loaded!" % bid)
Exception: Broadcast variable '22' not loaded!

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:

@maver1ck
Copy link
Contributor

The funny thing is this code works for me on 4 threads and throws exception on 10 threads

@vundela
Copy link
Author

vundela commented Apr 20, 2017

Hi @maver1ck, Thanks for your time in testing the patch.

I did run the patch with 1000 threads and it works fine. Please check the jira for the log file.

Can you please check /grid/3/hadoop/yarn/log/usercache/bi/appcache/application_1492634694033_0092/container_e538_1492634694033_0092_01_000003/pyspark.zip/pyspark/rdd.py whether the changes are available.

My guess is you might be missing changes in psypark.zip. Can you also let me know the steps you followed to test the patch.

@maver1ck
Copy link
Contributor

I checked pyspark.zip of running container and everything is on its place.
So I assume that there is more that one race condition in this code.

I'll try to prepare example of the problem.

@maver1ck
Copy link
Contributor

OK. I did additional tests.
Fix is working only with Spark 2.1.
I tried to apply it on 2.0.2 and that was the reason of my problem.

@vundela
Copy link
Author

vundela commented Apr 21, 2017

Thanks for testing @maver1ck. I will look into 1.6 and 2.0.2.

@maver1ck
Copy link
Contributor

@vundela
Great.
But I'm planning to migrate to 2.1 as soon as 2.1.1 will be released.

@vundela
Copy link
Author

vundela commented Apr 21, 2017

Filed a PR for fixing the issue in spark1.6 branch.

@jsoltren
Copy link

This seems reasonable to me.

There are some typos in the PR description. I think you meant "pickled" instead of "picked" in a few places.

Using threading.Lock seems okay here from my admittedly limited understanding of the deep details of Python, and my reading of https://docs.python.org/2/library/threading.html#lock-objects.

@vundela and I chatted off thread about this some. The precise race is this: the call to _wrap_function will define a number of broadcast variables. In the time between when the _wrap_function call finishes and self.ctx._jvm.PythonRDD executes, the RDD itself can be modified, perhaps changing broadcast variables and introducing the "Broadcast variable '%s' not loaded!" exception.

My understanding is that, due to the Global Interpreter Lock, this lock will cause all other execution to cease while this block of code runs, implicitly preventing any races. This is a very coarse grained lock for this action but it is as good as we can get. (Someone please correct me if I’m wrong here.)

It would be good if the PR description captured some of the above discussion.

@vundela
Copy link
Author

vundela commented Apr 25, 2017

cc @holdenk Can you please let me know your comments?

@holdenk
Copy link
Contributor

holdenk commented Apr 25, 2017

Interesting, I don't think I have all of the required context to review, but I'll try and take a look this weekend (I've got some other things happening this week already).

@vundela
Copy link
Author

vundela commented Apr 25, 2017

Thanks for your time @holdenk.

@vundela
Copy link
Author

vundela commented May 11, 2017

ping @holdenk, checking if you have some time to review this.

@vanzin
Copy link
Contributor

vanzin commented Jun 7, 2017

ok to test

@SparkQA
Copy link

SparkQA commented Jun 7, 2017

Test build #77800 has finished for PR 17694 at commit 4670678.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member

Hi @vundela , thanks for your PR. I was able to reproduce your issue, but I think your fix here uses too broad of lock, since the shared resource is the SparkContext pickle registry and needs to be locked only while the python command is being pickled. Please see #18695 and if you wouldn't mind checking if that solves your issue as well. Thanks!

@asfgit asfgit closed this in 3a45c7f Aug 5, 2017
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
## What changes were proposed in this pull request?

This PR proposes to close stale PRs, mostly the same instances with apache#18017

Closes apache#14085 - [SPARK-16408][SQL] SparkSQL Added file get Exception: is a directory …
Closes apache#14239 - [SPARK-16593] [CORE] [WIP] Provide a pre-fetch mechanism to accelerate shuffle stage.
Closes apache#14567 - [SPARK-16992][PYSPARK] Python Pep8 formatting and import reorganisation
Closes apache#14579 - [SPARK-16921][PYSPARK] RDD/DataFrame persist()/cache() should return Python context managers
Closes apache#14601 - [SPARK-13979][Core] Killed executor is re spawned without AWS key…
Closes apache#14830 - [SPARK-16992][PYSPARK][DOCS] import sort and autopep8 on Pyspark examples
Closes apache#14963 - [SPARK-16992][PYSPARK] Virtualenv for Pylint and pep8 in lint-python
Closes apache#15227 - [SPARK-17655][SQL]Remove unused variables declarations and definations in a WholeStageCodeGened stage
Closes apache#15240 - [SPARK-17556] [CORE] [SQL] Executor side broadcast for broadcast joins
Closes apache#15405 - [SPARK-15917][CORE] Added support for number of executors in Standalone [WIP]
Closes apache#16099 - [SPARK-18665][SQL] set statement state to "ERROR" after user cancel job
Closes apache#16445 - [SPARK-19043][SQL]Make SparkSQLSessionManager more configurable
Closes apache#16618 - [SPARK-14409][ML][WIP] Add RankingEvaluator
Closes apache#16766 - [SPARK-19426][SQL] Custom coalesce for Dataset
Closes apache#16832 - [SPARK-19490][SQL] ignore case sensitivity when filtering hive partition columns
Closes apache#17052 - [SPARK-19690][SS] Join a streaming DataFrame with a batch DataFrame which has an aggregation may not work
Closes apache#17267 - [SPARK-19926][PYSPARK] Make pyspark exception more user-friendly
Closes apache#17371 - [SPARK-19903][PYSPARK][SS] window operator miss the `watermark` metadata of time column
Closes apache#17401 - [SPARK-18364][YARN] Expose metrics for YarnShuffleService
Closes apache#17519 - [SPARK-15352][Doc] follow-up: add configuration docs for topology-aware block replication
Closes apache#17530 - [SPARK-5158] Access kerberized HDFS from Spark standalone
Closes apache#17854 - [SPARK-20564][Deploy] Reduce massive executor failures when executor count is large (>2000)
Closes apache#17979 - [SPARK-19320][MESOS][WIP]allow specifying a hard limit on number of gpus required in each spark executor when running on mesos
Closes apache#18127 - [SPARK-6628][SQL][Branch-2.1] Fix ClassCastException when executing sql statement 'insert into' on hbase table
Closes apache#18236 - [SPARK-21015] Check field name is not null and empty in GenericRowWit…
Closes apache#18269 - [SPARK-21056][SQL] Use at most one spark job to list files in InMemoryFileIndex
Closes apache#18328 - [SPARK-21121][SQL] Support changing storage level via the spark.sql.inMemoryColumnarStorage.level variable
Closes apache#18354 - [SPARK-18016][SQL][CATALYST][BRANCH-2.1] Code Generation: Constant Pool Limit - Class Splitting
Closes apache#18383 - [SPARK-21167][SS] Set kafka clientId while fetch messages
Closes apache#18414 - [SPARK-21169] [core] Make sure to update application status to RUNNING if executors are accepted and RUNNING after recovery
Closes apache#18432 - resolve com.esotericsoftware.kryo.KryoException
Closes apache#18490 - [SPARK-21269][Core][WIP] Fix FetchFailedException when enable maxReqSizeShuffleToMem and KryoSerializer
Closes apache#18585 - SPARK-21359
Closes apache#18609 - Spark SQL merge small files to big files Update InsertIntoHiveTable.scala

Added:
Closes apache#18308 - [SPARK-21099][Spark Core] INFO Log Message Using Incorrect Executor I…
Closes apache#18599 - [SPARK-21372] spark writes one log file even I set the number of spark_rotate_log to 0
Closes apache#18619 - [SPARK-21397][BUILD]Maven shade plugin adding dependency-reduced-pom.xml to …
Closes apache#18667 - Fix the simpleString used in error messages
Closes apache#18782 - Branch 2.1

Added:
Closes apache#17694 - [SPARK-12717][PYSPARK] Resolving race condition with pyspark broadcasts when using multiple threads

Added:
Closes apache#16456 - [SPARK-18994] clean up the local directories for application in future by annother thread
Closes apache#18683 - [SPARK-21474][CORE] Make number of parallel fetches from a reducer configurable
Closes apache#18690 - [SPARK-21334][CORE] Add metrics reporting service to External Shuffle Server

Added:
Closes apache#18827 - Merge pull request 1 from apache/master

## How was this patch tested?

N/A

Author: hyukjinkwon <[email protected]>

Closes apache#18780 from HyukjinKwon/close-prs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants