Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.scheduler.{Location, MapStatus}
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId}
import org.apache.spark.util._
Expand Down Expand Up @@ -124,13 +124,13 @@ private class ShuffleStatus(numPartitions: Int) extends Logging {
/**
* Update the map output location (e.g. during migration).
*/
def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = withWriteLock {
def updateMapOutput(mapId: Long, loc: Location): Unit = withWriteLock {
try {
val mapStatusOpt = mapStatuses.find(_.mapId == mapId)
mapStatusOpt match {
case Some(mapStatus) =>
logInfo(s"Updating map output for ${mapId} to ${bmAddress}")
mapStatus.updateLocation(bmAddress)
logInfo(s"Updating map output for $mapId to $loc")
mapStatus.updateLocation(loc)
invalidateSerializedMapOutputStatusCache()
case None =>
logWarning(s"Asked to update map output ${mapId} for untracked map status.")
Expand All @@ -146,9 +146,9 @@ private class ShuffleStatus(numPartitions: Int) extends Logging {
* This is a no-op if there is no registered map output or if the registered output is from a
* different block manager.
*/
def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = withWriteLock {
logDebug(s"Removing existing map output ${mapIndex} ${bmAddress}")
if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == bmAddress) {
def removeMapOutput(mapIndex: Int, loc: Location): Unit = withWriteLock {
logDebug(s"Removing existing map output $mapIndex $loc")
if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == loc) {
Comment thread
Ngone51 marked this conversation as resolved.
_numAvailableOutputs -= 1
mapStatuses(mapIndex) = null
invalidateSerializedMapOutputStatusCache()
Expand Down Expand Up @@ -178,7 +178,7 @@ private class ShuffleStatus(numPartitions: Int) extends Logging {
* Removes all shuffle outputs which satisfies the filter. Note that this will also
* remove outputs which are served by an external shuffle server (if one exists).
*/
def removeOutputsByFilter(f: BlockManagerId => Boolean): Unit = withWriteLock {
def removeOutputsByFilter(f: Location => Boolean): Unit = withWriteLock {
for (mapIndex <- mapStatuses.indices) {
if (mapStatuses(mapIndex) != null && f(mapStatuses(mapIndex).location)) {
_numAvailableOutputs -= 1
Expand Down Expand Up @@ -344,7 +344,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging

// For testing
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
: Iterator[(Location, Seq[(BlockId, Long, Int)])] = {
getMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1)
}

Expand All @@ -365,7 +365,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
endPartition: Int): Iterator[(Location, Seq[(BlockId, Long, Int)])]

/**
* Deletes map output status information for the specified shuffle stage.
Expand Down Expand Up @@ -502,7 +502,7 @@ private[spark] class MapOutputTrackerMaster(
}

/** Unregister map output information of the given shuffle, mapper and block manager */
def unregisterMapOutput(shuffleId: Int, mapIndex: Int, bmAddress: BlockManagerId): Unit = {
def unregisterMapOutput(shuffleId: Int, mapIndex: Int, bmAddress: Location): Unit = {
Comment thread
Ngone51 marked this conversation as resolved.
Outdated
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.removeMapOutput(mapIndex, bmAddress)
Comment thread
Ngone51 marked this conversation as resolved.
Outdated
Expand Down Expand Up @@ -643,10 +643,10 @@ private[spark] class MapOutputTrackerMaster(
: Seq[String] = {
if (shuffleLocalityEnabled && dep.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD &&
dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
val locations = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
if (blockManagerIds.nonEmpty) {
blockManagerIds.get.map(_.host)
if (locations.nonEmpty) {
locations.get.map(_.host)
} else {
Nil
}
Expand All @@ -670,14 +670,14 @@ private[spark] class MapOutputTrackerMaster(
reducerId: Int,
numReducers: Int,
fractionThreshold: Double)
: Option[Array[BlockManagerId]] = {
: Option[Array[Location]] = {

val shuffleStatus = shuffleStatuses.get(shuffleId).orNull
if (shuffleStatus != null) {
shuffleStatus.withMapStatuses { statuses =>
if (statuses.nonEmpty) {
// HashMap to add up sizes of all blocks at the same location
val locs = new HashMap[BlockManagerId, Long]
val locs = new HashMap[Location, Long]
var totalOutputSize = 0L
var mapIdx = 0
while (mapIdx < statuses.length) {
Expand Down Expand Up @@ -758,7 +758,7 @@ private[spark] class MapOutputTrackerMaster(
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
endPartition: Int): Iterator[(Location, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId")
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
Expand Down Expand Up @@ -810,7 +810,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
endPartition: Int): Iterator[(Location, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId")
val statuses = getStatuses(shuffleId, conf)
try {
Expand Down Expand Up @@ -989,9 +989,9 @@ private[spark] object MapOutputTracker extends Logging {
endPartition: Int,
statuses: Array[MapStatus],
startMapIndex : Int,
endMapIndex: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
endMapIndex: Int): Iterator[(Location, Seq[(BlockId, Long, Int)])] = {
assert (statuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]]
val splitsByAddress = new HashMap[Location, ListBuffer[(BlockId, Long, Int)]]
val iter = statuses.iterator.zipWithIndex
for ((status, mapIndex) <- iter.slice(startMapIndex, endMapIndex)) {
if (status == null) {
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.io.{ObjectInputStream, ObjectOutputStream}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.{AccumulableInfo, Location}
import org.apache.spark.util.{AccumulatorV2, Utils}

// ==============================================================================================
Expand Down Expand Up @@ -81,7 +80,7 @@ case object Resubmitted extends TaskFailedReason {
*/
@DeveloperApi
case class FetchFailed(
bmAddress: BlockManagerId, // Note that bmAddress can be null
bmAddress: Location, // Note that bmAddress can be null

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a backwardly incompatible change, which also impacts event files.
Unfortunately it looks like most folks who worked on this in past are no longer active.
+CC @tgravescs, @dongjoon-hyun for additional review.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the event files, we still log the json of BlockManagerId. So I think it's still compatible, no?

Besides, do you worry about the possible backward incompatibility due to the reference out of Spark?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two changes here: a) change to public api (programmatic, REST api, etc), b) change to generated/persisted data.
Both need to be addressed.

Specifically about (b), ​with bmAddress changing to Location - since it need not be a BlockManagerId anymore - json serde needs to account for it. Currently, it is a TODO here.

Spark history server, REST consumers, other apps depending on event files - will need a way to identify Location type and serde all valid Location types.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a) change to public api (programmatic, REST api, etc)

Technically, it's really a problem. Although, I can't imagine how users would use it as an API.

And I have a new idea that we can introduce a new fetch failed class for the custom location and leave this one unchanged. For example, we can have CustomStorageFetchFailed. Thus, we the location is BlockManagerId then we use FetchFailed, otherwise, uses CustomStorageFetchFailed. WDYT?

b) change to generated/persisted data.

I think we won't change the data of BlockManagerId here. If we find the Location is a BlockManagerId, we'd still output as ("Block Manager Address" -> blockManagerAddress). So, IIUC, it won't cause problems.

The only problem is the custom location. It's new data, e.g., ("XXXLocation" -> XXXLocationJson). So it can be a problem if users use the old version Spark to load event files. Although, I think this's really an unexpected usage.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I have a new idea that we can introduce a new fetch failed class for the custom location and leave this one unchanged. For example, we can have CustomStorageFetchFailed. Thus, we the location is BlockManagerId then we use FetchFailed, otherwise, uses CustomStorageFetchFailed. WDYT?

CustomStorageFetchFailed looks like a promising approach, we will need to think through what the implications of it would be would on the face of it, it should address immediate concerns IMO.
Thoughts @attilapiros, @tgravescs ?

The only problem is the custom location. It's new data, e.g., ("XXXLocation" -> XXXLocationJson). So it can be a problem if users use the old version Spark to load event files. Although, I think this's really an unexpected usage.

There are couple of issues here:

  • A simpler question of how to handle custom location - from programmatic and data point of view.
  • How to handle different shuffle impls being in play for the same event directory.
    • If deployments have multiple shuffle infra in use over course of time (or different clusters with different configs and a shared history event dir), each with their own Location's.
    • How will SHS/REST api, etc understand which location class is being used/how to parse them.

I actually dont have good solutions on this - other than adding some metadata per location record to indicate the 'type'.
Any other thoughts ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for a new class without introducing a breaking change. Thank you for pinging me, @mridulm .

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll have to add interfaces to Location in order to support json serde , e.g.,

public JValue serializeToJson();

public Location deserilaizeFromJson(json: JValue);

How to handle different shuffle impls being in play for the same event directory.

Adding metadata is good idea, we can have the format like,

"Mapstatus Location": {
  "type":  "xx.yy.zz", // qualified class name
  "content": { // content is generate by `Location.serializeToJson`
    "aaa":"bbb"
   }
}

with the constant format, end-users and SHS are able to parse them as well.

(I had an idea about SHS is to add the location type as the extension of the event log file. That's the way what compression does now. But I think it doesn't solve the problem of REST case.)

BTW, I have added https://issues.apache.org/jira/browse/SPARK-35188 for this support.

shuffleId: Int,
mapId: Long,
mapIndex: Int,
Expand Down
34 changes: 22 additions & 12 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,23 @@ import org.apache.spark.internal.config
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

trait Location extends Externalizable {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Externalizable would require the implementers to provide a zero-parameter constructor. Do you think it's acceptable or have any better idea? @hiboyang @attilapiros

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original BlockManagerId extends Externalizable, I think Externalizable here for Location is ok.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have missed some context here, but why Externalizable and not Serializable ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the BlockManagerId already used Externalizable and we want to make the BlockManagerId as a default implementation of Location for Spark.

A possible way to let Location extends Serializable is to let BlockManagerId extends Serializable directly too. But we need to figure out whether serializers could handle topologyInfo_: Option[String] properly as topologyInfo_ now is handled manually in readExternal/writeExternal:

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeUTF(executorId_)
out.writeUTF(host_)
out.writeInt(port_)
out.writeBoolean(topologyInfo_.isDefined)
// we only write topologyInfo if we have it
topologyInfo.foreach(out.writeUTF(_))
}
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
executorId_ = in.readUTF()
host_ = in.readUTF()
port_ = in.readInt()
val isTopologyInfoAvailable = in.readBoolean()
topologyInfo_ = if (isTopologyInfoAvailable) Option(in.readUTF()) else None
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UsingExternalizable here is totally fine.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: Externalizable takes precedence over Serializable.
We can continue to use Externalizable for BlockManagerId while not requiring other implementations to necessarily require it.

def host: String
def port: Int
def hostPort: String
def executorId: String = "unknown"

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this only for the convenient purpose, doesn't mean I have any preference for the interface.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, adding this here helps for this initial version. Could we add something like storageInfo: Option[Serializable], which could be used to store extra information for different disaggregated shuffle solutions? e.g. storageInfo could be remote storage file path(s) or remote shuffle server(s).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For "store extra information", I think implementors can add whatever they want only if they're serializable. e.g., extra info can be Option[Serializable] or Map[String, String].

But I did think about adding a common StorageType class to Location. For example, a valid use case is that we could know from it whether the storage is reliable (e.g., location.storageType.isReliable) so we can decide whether to apply "decommission" on such storage.

@hiboyang hiboyang Mar 18, 2021

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or could we add topologyInfo: Option[String] in Location, like other fields host/port/... which are from BlockManagerId?

location.storageType.isReliable is a good idea for "decommission".

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on discussion, if we are going to mirror everything in BlockManagerId, why not expose that directly with ability to subclass it and avoid Location altogether ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it (exposing BlockManagerId) depends on the use cases. We could probably expose it if BlockManagerId is generally satisfied all the common use cases(e.g., BlockManager, hdfs, file server). That being said, I think the concept of BlockManagerId is not suitable for every case. For example, each executor would be corresponding to a BlockManagerId. But, in the case of hdfs, all the executors would only have one corresponding location indeed.

Adding Location would be more flexible for users if they have specific requirements that BlockManagerId can't satisfy. Besides, I think it's safer for Spark as BlockManagerId is not only used for shuffle phase but also widely used in RDD cache.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or could we add topologyInfo: Option[String] in Location, like other fields host/port/... which are from BlockManagerId?

Do you know other cases that need it?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uber Remote Shuffle Service uses topologyInfo to store shuffle server information. Since we kind of mirror things in BlockManagerId, thus suggest adding topologyInfo inside Location as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in Location to specify these methods violates the abstraction. I know this convenient and avoids casting but still we should avoid meaningless methods. In a disaggregated storage solution the executorId has no value and probably host and port is not enough and never needed as separate entities but an URL like construct will be more useful (or something else but it is the responsibility of the specific subclass).

}

/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task has shuffle files stored on as well as the sizes of outputs for each reducer, for passing
* on to the reduce tasks.
*/
private[spark] sealed trait MapStatus {
/** Location where this task output is. */
def location: BlockManagerId
def location: Location

def updateLocation(newLoc: BlockManagerId): Unit
def updateLocation(newLoc: Location): Unit

/**
* Estimated size for the reduce block, in bytes.
Expand Down Expand Up @@ -66,7 +73,7 @@ private[spark] object MapStatus {
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)

def apply(
loc: BlockManagerId,
loc: Location,
uncompressedSizes: Array[Long],
mapTaskId: Long): MapStatus = {
if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
Expand Down Expand Up @@ -115,21 +122,21 @@ private[spark] object MapStatus {
* @param _mapTaskId unique task id for the task
*/
private[spark] class CompressedMapStatus(
private[this] var loc: BlockManagerId,
private[this] var loc: Location,
private[this] var compressedSizes: Array[Byte],
private[this] var _mapTaskId: Long)
extends MapStatus with Externalizable {

// For deserialization only
protected def this() = this(null, null.asInstanceOf[Array[Byte]], -1)

def this(loc: BlockManagerId, uncompressedSizes: Array[Long], mapTaskId: Long) = {
def this(loc: Location, uncompressedSizes: Array[Long], mapTaskId: Long) = {
this(loc, uncompressedSizes.map(MapStatus.compressSize), mapTaskId)
}

override def location: BlockManagerId = loc
override def location: Location = loc

override def updateLocation(newLoc: BlockManagerId): Unit = {
override def updateLocation(newLoc: Location): Unit = {
loc = newLoc
}

Expand Down Expand Up @@ -168,7 +175,7 @@ private[spark] class CompressedMapStatus(
* @param _mapTaskId unique task id for the task
*/
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var loc: Location,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long,
Expand All @@ -183,9 +190,9 @@ private[spark] class HighlyCompressedMapStatus private (

protected def this() = this(null, -1, null, -1, null, -1) // For deserialization only

override def location: BlockManagerId = loc
override def location: Location = loc

override def updateLocation(newLoc: BlockManagerId): Unit = {
override def updateLocation(newLoc: Location): Unit = {
loc = newLoc
}

Expand Down Expand Up @@ -216,7 +223,10 @@ private[spark] class HighlyCompressedMapStatus private (
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
// TODO(wuyi): config
val location = "org.apache.spark.storage.BlockManagerId"
loc = Utils.classForName(location).newInstance().asInstanceOf[Location]
loc.readExternal(in)
Comment thread
Ngone51 marked this conversation as resolved.
Outdated
numNonEmptyBlocks = -1 // SPARK-32436 Scala 2.13 doesn't initialize this during deserialization
emptyBlocks = new RoaringBitmap()
emptyBlocks.deserialize(in)
Expand All @@ -235,7 +245,7 @@ private[spark] class HighlyCompressedMapStatus private (

private[spark] object HighlyCompressedMapStatus {
def apply(
loc: BlockManagerId,
loc: Location,
uncompressedSizes: Array[Long],
mapTaskId: Long): HighlyCompressedMapStatus = {
// We must keep track of which blocks are empty so that we don't report a zero-sized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package org.apache.spark.shuffle
import org.apache.spark._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler.Location
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, ShuffleBlockFetcherIterator}
import org.apache.spark.storage.{BlockId, BlockManager, ShuffleBlockFetcherIterator}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter

Expand All @@ -30,7 +31,7 @@ import org.apache.spark.util.collection.ExternalSorter
*/
private[spark] class BlockStoreShuffleReader[K, C](
handle: BaseShuffleHandle[K, _, C],
blocksByAddress: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])],
blocksByAddress: Iterator[(Location, Seq[(BlockId, Long, Int)])],
context: TaskContext,
readMetrics: ShuffleReadMetricsReporter,
serializerManager: SerializerManager = SparkEnv.get.serializerManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.shuffle

import org.apache.spark.{FetchFailed, TaskContext, TaskFailedReason}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.Location
import org.apache.spark.util.Utils

/**
Expand All @@ -33,7 +33,7 @@ import org.apache.spark.util.Utils
* (or risk triggering any other exceptions). See SPARK-19276.
*/
private[spark] class FetchFailedException(
bmAddress: BlockManagerId,
bmAddress: Location,
shuffleId: Int,
mapId: Long,
mapIndex: Int,
Expand All @@ -43,7 +43,7 @@ private[spark] class FetchFailedException(
extends Exception(message, cause) {

def this(
bmAddress: BlockManagerId,
bmAddress: Location,
shuffleId: Int,
mapTaskId: Long,
mapIndex: Int,
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.spark.storage

import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.io.{IOException, ObjectInput, ObjectOutput}

import com.google.common.cache.{CacheBuilder, CacheLoader}

import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler.Location
import org.apache.spark.storage.BlockManagerId.getCachedBlockManagerId
import org.apache.spark.util.Utils

/**
Expand All @@ -40,27 +42,27 @@ class BlockManagerId private (
private var host_ : String,
private var port_ : Int,
private var topologyInfo_ : Option[String])
extends Externalizable {
extends Location {

private def this() = this(null, null, 0, None) // For deserialization only

def executorId: String = executorId_
override def executorId: String = executorId_

if (null != host_) {
Utils.checkHost(host_)
assert (port_ > 0)
}

def hostPort: String = {
override def hostPort: String = {
// DEBUG code
Utils.checkHost(host)
assert (port > 0)
host + ":" + port
}

def host: String = host_
override def host: String = host_

def port: Int = port_
override def port: Int = port_

def topologyInfo: Option[String] = topologyInfo_

Expand All @@ -83,6 +85,7 @@ class BlockManagerId private (
port_ = in.readInt()
val isTopologyInfoAvailable = in.readBoolean()
topologyInfo_ = if (isTopologyInfoAvailable) Option(in.readUTF()) else None
getCachedBlockManagerId(this)
}

@throws(classOf[IOException])
Expand Down Expand Up @@ -129,7 +132,7 @@ private[spark] object BlockManagerId {
def apply(in: ObjectInput): BlockManagerId = {
val obj = new BlockManagerId()
obj.readExternal(in)
getCachedBlockManagerId(obj)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A reminder for myself: cache is still needed for the use case in UpdateBlockInfo.

obj
Comment thread
Ngone51 marked this conversation as resolved.
}

/**
Expand Down
Loading