Skip to content

Conversation

@davies
Copy link
Contributor

@davies davies commented Dec 15, 2014

UTF8Deserializer can not be used in BatchedSerializer, so always use PickleSerializer() when change batchSize in zip().

Also, if two RDD have the same batch size already, they did not need re-serialize any more.

@SparkQA
Copy link

SparkQA commented Dec 15, 2014

Test build #24471 has started for PR 3706 at commit 379d2c8.

  • This patch merges cleanly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reproduce the bug by creating an RDD of string directly? It is simpler than touching disk. It is also helpful to put the JIRA number in the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't generate a RDD with UTF8Deserializer right now, it's only used to read data from JVM.

@SparkQA
Copy link

SparkQA commented Dec 15, 2014

Test build #24472 has started for PR 3706 at commit e3ebf7c.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor

It looks like there are still a few lingering bugs related to zip. For example, the following program crashes:

text = sc.textFile("README.md")
numbers = text.map(lambda x: 1)
text.zip(numbers).count()  # Works fine
text.zip(numbers).count()  # A second time, this throws an error:

The error is

py4j.protocol.Py4JJavaError: An error occurred while calling o185.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 8.0 failed 1 times, most recent failure: Lost task 1.0 in stage 8.0 (TID 16, localhost): org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
    at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anon$1.hasNext(RDD.scala:727)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:351)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:411)
    at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:241)
    at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
    at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1459)
    at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:202)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Not sure if this is a new bug, but potentially an indicator that there's other lingering problems in zip.

@SparkQA
Copy link

SparkQA commented Dec 16, 2014

Test build #24471 has finished for PR 3706 at commit 379d2c8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24471/
Test PASSed.

@davies
Copy link
Contributor Author

davies commented Dec 16, 2014

@JoshRosen Good catch, it's a bug in _reserialize(), introduced in #2920, great thanks!

@SparkQA
Copy link

SparkQA commented Dec 16, 2014

Test build #24473 has started for PR 3706 at commit 20ce3a3.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 16, 2014

Test build #24472 has finished for PR 3706 at commit e3ebf7c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24472/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Dec 16, 2014

Test build #24473 has finished for PR 3706 at commit 20ce3a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24473/
Test PASSed.

@JoshRosen
Copy link
Contributor

Thanks for the update.

I've looked over this again and tried it out with a few more hand-written test cases and I've been unable to find any more bugs, so this looks good to me.

I'm going to merge this into master and add a backport-needed label in JIRA targeted for 1.2.1. Thanks!

@asfgit asfgit closed this in c246b95 Dec 16, 2014
@JoshRosen
Copy link
Contributor

I've merged this into branch-1.2, so this fix will be included in Spark 1.2.1.

asfgit pushed a commit that referenced this pull request Dec 17, 2014
UTF8Deserializer can not be used in BatchedSerializer, so always use PickleSerializer() when change batchSize in zip().

Also, if two RDD have the same batch size already, they did not need re-serialize any more.

Author: Davies Liu <[email protected]>

Closes #3706 from davies/fix_4841 and squashes the following commits:

20ce3a3 [Davies Liu] fix bug in _reserialize()
e3ebf7c [Davies Liu] add comment
379d2c8 [Davies Liu] fix zip with textFile()

(cherry picked from commit c246b95)
Signed-off-by: Josh Rosen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants