Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import scala.concurrent.Await
import akka.actor._
import akka.pattern.ask

import org.apache.spark.util._
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util._

private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
Expand Down Expand Up @@ -107,14 +107,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
Await.result(future, timeout)
} catch {
case e: Exception =>
logError("Error communicating with MapOutputTracker", e)
throw new SparkException("Error communicating with MapOutputTracker", e)
}
}

/** Send a one-way message to the trackerActor, to which we expect it to reply with true. */
protected def sendTracker(message: Any) {
if (askTracker(message) != true) {
throw new SparkException("Error reply received from MapOutputTracker")
val response = askTracker(message)
if (response != true) {
throw new SparkException(
"Error reply received from MapOutputTracker. Expecting true, got " + response.toString)
}
}

Expand Down Expand Up @@ -366,9 +369,9 @@ private[spark] object MapOutputTracker {
// any of the statuses is null (indicating a missing location due to a failed mapper),
// throw a FetchFailedException.
private def convertMapStatuses(
shuffleId: Int,
reduceId: Int,
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
shuffleId: Int,
reduceId: Int,
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
assert (statuses != null)
statuses.map {
status =>
Expand Down Expand Up @@ -403,7 +406,7 @@ private[spark] object MapOutputTracker {
if (compressedSize == 0) {
0
} else {
math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
math.pow(LOG_BASE, compressedSize & 0xFF).toLong
}
}
}