From 0815e75b40a79f0d1b57f3592cc5e1f86f7dbca1 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Sat, 6 Jun 2020 18:18:30 +0800 Subject: [PATCH 01/48] init --- .../network/shuffle/BlockStoreClient.java | 16 +++++ .../spark/network/BlockDataManager.scala | 2 + .../network/netty/NettyBlockRpcServer.scala | 8 +++ .../netty/NettyBlockTransferService.scala | 46 ++++++++++++++- .../apache/spark/storage/BlockManager.scala | 48 +++++++-------- .../storage/ShuffleBlockFetcherIterator.scala | 59 +++++++++++-------- 6 files changed, 130 insertions(+), 49 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index fbbe8ac0f1f9..d38b4999d926 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -18,9 +18,19 @@ package org.apache.spark.network.shuffle; import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import com.codahale.metrics.MetricSet; +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; +import org.apache.spark.network.shuffle.protocol.GetLocalDirsForExecutors; +import org.apache.spark.network.shuffle.protocol.LocalDirsForExecutors; /** * Provides an interface for reading both shuffle files and RDD blocks, either from an Executor @@ -61,4 +71,10 @@ public MetricSet shuffleMetrics() { // Return an empty MetricSet by default. return () -> Collections.emptyMap(); } + + public abstract void getHostLocalDirs( + String host, + int port, + String[] execIds, + CompletableFuture> hostLocalDirsCompletable); } diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala index 0bd5774b632b..4e349073f7f6 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala @@ -27,6 +27,8 @@ import org.apache.spark.storage.{BlockId, ShuffleBlockId, StorageLevel} private[spark] trait BlockDataManager { + def getLocalDiskDirs: Array[String] + /** * Interface to get host-local shuffle block data. Throws an exception if the block cannot be * found or cannot be read successfully. diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index 62726f7e147c..c301becff524 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -113,6 +113,14 @@ class NettyBlockRpcServer( s"when there is not sufficient space available to store the block.") responseContext.onFailure(exception) } + + case req: GetLocalDirsForExecutors => + assert(req.appId == appId) + val execId = req.execIds.head + val dirs = blockManager.getLocalDiskDirs + responseContext + .onSuccess(new LocalDirsForExecutors(Map(execId -> dirs).asJava).toByteBuffer) + } } diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 5d9cea068b09..1c282532ca57 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -19,7 +19,9 @@ package org.apache.spark.network.netty import java.io.IOException import java.nio.ByteBuffer -import java.util.{HashMap => JHashMap, Map => JMap} +import java.util +import java.util.{Arrays, HashMap => JHashMap, Map => JMap} +import java.util.concurrent.CompletableFuture import scala.collection.JavaConverters._ import scala.concurrent.{Future, Promise} @@ -33,11 +35,11 @@ import org.apache.spark.ExecutorDeadException import org.apache.spark.internal.config import org.apache.spark.network._ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} -import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory} +import org.apache.spark.network.client.{RpcResponseCallback, TransportClient, TransportClientBootstrap, TransportClientFactory} import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap} import org.apache.spark.network.server._ import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, OneForOneBlockFetcher, RetryingBlockFetcher} -import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream} +import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, GetLocalDirsForExecutors, LocalDirsForExecutors, UploadBlock, UploadBlockStream} import org.apache.spark.network.util.JavaUtils import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.serializer.JavaSerializer @@ -197,6 +199,44 @@ private[spark] class NettyBlockTransferService( result.future } + override def getHostLocalDirs( + host: String, + port: Int, + execIds: Array[String], + hostLocalDirsCompletable: CompletableFuture[util.Map[String, Array[String]]]) + : Unit = { + val getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds) + try { + val client: TransportClient = clientFactory.createClient(host, port) + client.sendRpc(getLocalDirsMessage.toByteBuffer, new RpcResponseCallback() { + override def onSuccess(response: ByteBuffer): Unit = { + try { + val msgObj: BlockTransferMessage = BlockTransferMessage.Decoder.fromByteBuffer(response) + hostLocalDirsCompletable.complete( + msgObj.asInstanceOf[LocalDirsForExecutors].getLocalDirsByExec) + } catch { + case t: Throwable => + logWarning(s"Error trying to get the host local dirs for " + + s"${getLocalDirsMessage.execIds.mkString(", ")} via " + + s"external shuffle service", t.getCause) + hostLocalDirsCompletable.completeExceptionally(t) + } finally client.close() + } + + override def onFailure(t: Throwable): Unit = { + logWarning(s"Error trying to get the host local dirs for " + + s"${getLocalDirsMessage.execIds.mkString(", ")} via external shuffle service", + t.getCause) + hostLocalDirsCompletable.completeExceptionally(t) + client.close() + } + }) + } catch { + case e@(_: IOException | _: InterruptedException) => + hostLocalDirsCompletable.completeExceptionally(e) + } + } + override def close(): Unit = { if (server != null) { server.close() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 5072340f337b..b9fe1d28dc37 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -124,7 +124,7 @@ private[spark] class HostLocalDirManager( host: String, externalShuffleServicePort: Int) extends Logging { - private val executorIdToLocalDirsCache = + private[spark] val executorIdToLocalDirsCache = CacheBuilder .newBuilder() .maximumSize(cacheSize) @@ -135,27 +135,6 @@ private[spark] class HostLocalDirManager( import scala.collection.JavaConverters._ return executorIdToLocalDirsCache.asMap().asScala } - - private[spark] def getHostLocalDirs( - executorIds: Array[String])( - callback: Try[java.util.Map[String, Array[String]]] => Unit): Unit = { - val hostLocalDirsCompletable = new CompletableFuture[java.util.Map[String, Array[String]]] - externalBlockStoreClient.getHostLocalDirs( - host, - externalShuffleServicePort, - executorIds, - hostLocalDirsCompletable) - hostLocalDirsCompletable.whenComplete { (hostLocalDirs, throwable) => - if (hostLocalDirs != null) { - callback(Success(hostLocalDirs)) - executorIdToLocalDirsCache.synchronized { - executorIdToLocalDirsCache.putAll(hostLocalDirs) - } - } else { - callback(Failure(throwable)) - } - } - } } /** @@ -212,7 +191,7 @@ private[spark] class BlockManager( private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory - private val externalShuffleServicePort = StorageUtils.externalShuffleServicePort(conf) + private[spark] val externalShuffleServicePort = StorageUtils.externalShuffleServicePort(conf) var blockManagerId: BlockManagerId = _ @@ -265,6 +244,29 @@ private[spark] class BlockManager( shuffleManager.shuffleBlockResolver.asInstanceOf[MigratableResolver] } + override def getLocalDiskDirs: Array[String] = diskBlockManager.localDirsString + + private[spark] def getHostLocalDirs( + host: String, + port: Int, + executorIds: Array[String])( + callback: Try[java.util.Map[String, Array[String]]] => Unit) + : Unit = { + val hostLocalDirsCompletable = new CompletableFuture[java.util.Map[String, Array[String]]] + blockStoreClient.getHostLocalDirs( + host, + port, + executorIds, + hostLocalDirsCompletable) + hostLocalDirsCompletable.whenComplete { (hostLocalDirs, throwable) => + if (hostLocalDirs != null) { + callback(Success(hostLocalDirs)) + } else { + callback(Failure(throwable)) + } + } + } + /** * Abstraction for storing blocks from bytes, whether they start in memory or on disk. * diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index a2843da0561e..60a551ca57a3 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -301,7 +301,7 @@ final class ShuffleBlockFetcherIterator( numBlocksToFetch += mergedBlockInfos.size localBlocks ++= mergedBlockInfos.map(info => (info.blockId, info.mapIndex)) localBlockBytes += mergedBlockInfos.map(_.size).sum - } else if (hostLocalDirReadingEnabled && address.host == blockManager.blockManagerId.host) { + } else if (address.host == blockManager.blockManagerId.host) { checkBlockSizes(blockInfos) val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded( blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)), doBatchFetch) @@ -462,8 +462,9 @@ final class ShuffleBlockFetcherIterator( * `ManagedBuffer`'s memory is allocated lazily when we create the input stream, so all we * track in-memory are the ManagedBuffer references themselves. */ - private[this] def fetchHostLocalBlocks(hostLocalDirManager: HostLocalDirManager): Unit = { - val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs() + private[this] def fetchHostLocalBlocks(): Unit = { + val cachedDirsByExec = blockManager.hostLocalDirManager + .map(_.getCachedHostLocalDirs()).getOrElse(Map.empty[String, Array[String]]) val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) = hostLocalBlocksByExecutor .map { case (hostLocalBmId, bmInfos) => @@ -477,26 +478,38 @@ final class ShuffleBlockFetcherIterator( if (immutableHostLocalBlocksWithoutDirs.nonEmpty) { logDebug(s"Asynchronous fetching host-local blocks without cached executors' dir: " + s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}") - val execIdsWithoutDirs = immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray - hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) { - case Success(dirs) => - immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, blockInfos) => - blockInfos.takeWhile { case (blockId, _, mapIndex) => - fetchHostLocalBlock( - blockId, - mapIndex, - dirs.get(hostLocalBmId.executorId), - hostLocalBmId) + val execIdsWithoutDirs = immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId) + + val addressesAnExecutors = blockManager.hostLocalDirManager.map { _ => + val host = blockManager.blockManagerId.host + val port = blockManager.externalShuffleServicePort + Seq((host, port, execIdsWithoutDirs.toArray)) + }.getOrElse { + hostLocalBlocksByExecutor.keysIterator.map { bm => + (bm.host, bm.port, Array(bm.executorId)) + } + } + addressesAnExecutors.foreach { case (host, port, executorIds) => + blockManager.getHostLocalDirs(host, port, executorIds) { + case Success(dirs) => + immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, blockInfos) => + blockInfos.takeWhile { case (blockId, _, mapIndex) => + fetchHostLocalBlock( + blockId, + mapIndex, + dirs.get(hostLocalBmId.executorId), + hostLocalBmId) + } } - } - logDebug(s"Got host-local blocks (without cached executors' dir) in " + - s"${Utils.getUsedTimeNs(startTimeNs)}") - - case Failure(throwable) => - logError(s"Error occurred while fetching host local blocks", throwable) - val (hostLocalBmId, blockInfoSeq) = immutableHostLocalBlocksWithoutDirs.head - val (blockId, _, mapIndex) = blockInfoSeq.head - results.put(FailureFetchResult(blockId, mapIndex, hostLocalBmId, throwable)) + logDebug(s"Got host-local blocks (without cached executors' dir) in " + + s"${Utils.getUsedTimeNs(startTimeNs)}") + + case Failure(throwable) => + logError(s"Error occurred while fetching host local blocks", throwable) + val (hostLocalBmId, blockInfoSeq) = immutableHostLocalBlocksWithoutDirs.head + val (blockId, _, mapIndex) = blockInfoSeq.head + results.put(FailureFetchResult(blockId, mapIndex, hostLocalBmId, throwable)) + } } } if (hostLocalBlocksWithCachedDirs.nonEmpty) { @@ -537,7 +550,7 @@ final class ShuffleBlockFetcherIterator( logDebug(s"Got local blocks in ${Utils.getUsedTimeNs(startTimeNs)}") if (hostLocalBlocks.nonEmpty) { - blockManager.hostLocalDirManager.foreach(fetchHostLocalBlocks) + fetchHostLocalBlocks() } } From dc6e7b8072f6aa19a02a5fdc08138d9b40b62753 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Wed, 10 Jun 2020 00:02:35 +0800 Subject: [PATCH 02/48] update --- .../org/apache/spark/network/shuffle/BlockStoreClient.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index d38b4999d926..3e8a0c908442 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -72,9 +72,10 @@ public MetricSet shuffleMetrics() { return () -> Collections.emptyMap(); } - public abstract void getHostLocalDirs( + public void getHostLocalDirs( String host, int port, String[] execIds, - CompletableFuture> hostLocalDirsCompletable); + CompletableFuture> hostLocalDirsCompletable) { + } } From b1c6ac0d495382b5997e1886afe56d2a26adcb94 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 22 Jun 2020 17:59:20 +0800 Subject: [PATCH 03/48] fix --- .../storage/ShuffleBlockFetcherIterator.scala | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 60a551ca57a3..6a1ae77cfbde 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -492,13 +492,31 @@ final class ShuffleBlockFetcherIterator( addressesAnExecutors.foreach { case (host, port, executorIds) => blockManager.getHostLocalDirs(host, port, executorIds) { case Success(dirs) => - immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, blockInfos) => + if (blockManager.hostLocalDirManager.isDefined) { + immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, blockInfos) => + blockInfos.takeWhile { case (blockId, _, mapIndex) => + logInfo(s"Got local dirs ${dirs.get(hostLocalBmId.executorId).mkString(", ")} " + + s"from ${executorIds.head}:$host:$port through hostLocalDirManager.") + fetchHostLocalBlock( + blockId, + mapIndex, + dirs.get(hostLocalBmId.executorId), + hostLocalBmId) + } + } + } else { + // FIXME: write in ugly way temporally + val execId = executorIds.head + logInfo(s"Got local dirs ${dirs.get(execId).mkString(", ")} " + + s"from ${executorIds.head}:$host:$port through executor-executor.") + val (bm, blockInfos) = + immutableHostLocalBlocksWithoutDirs.find(_._1.executorId == execId).get blockInfos.takeWhile { case (blockId, _, mapIndex) => fetchHostLocalBlock( blockId, mapIndex, - dirs.get(hostLocalBmId.executorId), - hostLocalBmId) + dirs.get(execId), + bm) } } logDebug(s"Got host-local blocks (without cached executors' dir) in " + From 64037cf19f8d186b9bc90a065ef916cb08fa3bf8 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 20:43:52 +0800 Subject: [PATCH 04/48] update --- .../spark/internal/config/package.scala | 9 +-- .../apache/spark/storage/BlockManager.scala | 59 ++++++++--------- .../storage/ShuffleBlockFetcherIterator.scala | 66 +++++++------------ .../shuffle/HostLocalShuffleFetchSuite.scala | 24 +++++++ .../ShuffleBlockFetcherIteratorSuite.scala | 8 +-- 5 files changed, 83 insertions(+), 83 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index b308115935d6..43ef5d66d0a0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1415,10 +1415,11 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " + - s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " + - "blocks requested from those block managers which are running on the same host are read " + - "from the disk directly instead of being fetched as remote blocks over the network.") + .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and 1) external " + + s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) ${DYN_ALLOCATION_ENABLED.key}" + + s" is disabled), shuffle blocks requested from those block managers which are running on " + + s"the same host are read from the disk directly instead of being fetched as remote blocks" + + s" over the network.") .version("3.0.0") .booleanConf .createWithDefault(true) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b9fe1d28dc37..2c289825c049 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -120,9 +120,7 @@ private[spark] class ByteBufferBlockData( private[spark] class HostLocalDirManager( futureExecutionContext: ExecutionContext, cacheSize: Int, - externalBlockStoreClient: ExternalBlockStoreClient, - host: String, - externalShuffleServicePort: Int) extends Logging { + blockStoreClient: BlockStoreClient) extends Logging { private[spark] val executorIdToLocalDirsCache = CacheBuilder @@ -135,6 +133,29 @@ private[spark] class HostLocalDirManager( import scala.collection.JavaConverters._ return executorIdToLocalDirsCache.asMap().asScala } + + private[spark] def getHostLocalDirs( + host: String, + port: Int, + executorIds: Array[String])( + callback: Try[java.util.Map[String, Array[String]]] => Unit): Unit = { + val hostLocalDirsCompletable = new CompletableFuture[java.util.Map[String, Array[String]]] + blockStoreClient.getHostLocalDirs( + host, + port, + executorIds, + hostLocalDirsCompletable) + hostLocalDirsCompletable.whenComplete { (hostLocalDirs, throwable) => + if (hostLocalDirs != null) { + callback(Success(hostLocalDirs)) + executorIdToLocalDirsCache.synchronized { + executorIdToLocalDirsCache.putAll(hostLocalDirs) + } + } else { + callback(Failure(throwable)) + } + } + } } /** @@ -246,27 +267,6 @@ private[spark] class BlockManager( override def getLocalDiskDirs: Array[String] = diskBlockManager.localDirsString - private[spark] def getHostLocalDirs( - host: String, - port: Int, - executorIds: Array[String])( - callback: Try[java.util.Map[String, Array[String]]] => Unit) - : Unit = { - val hostLocalDirsCompletable = new CompletableFuture[java.util.Map[String, Array[String]]] - blockStoreClient.getHostLocalDirs( - host, - port, - executorIds, - hostLocalDirsCompletable) - hostLocalDirsCompletable.whenComplete { (hostLocalDirs, throwable) => - if (hostLocalDirs != null) { - callback(Success(hostLocalDirs)) - } else { - callback(Failure(throwable)) - } - } - } - /** * Abstraction for storing blocks from bytes, whether they start in memory or on disk. * @@ -496,15 +496,12 @@ private[spark] class BlockManager( hostLocalDirManager = if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) && - !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { - externalBlockStoreClient.map { blockStoreClient => - new HostLocalDirManager( + !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) && + (externalBlockStoreClient.isDefined || !conf.get(config.DYN_ALLOCATION_ENABLED)) ) { + Some(new HostLocalDirManager( futureExecutionContext, conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE), - blockStoreClient, - blockManagerId.host, - externalShuffleServicePort) - } + blockStoreClient)) } else { None } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 6a1ae77cfbde..2491f01ca65e 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -290,9 +290,6 @@ final class ShuffleBlockFetcherIterator( var hostLocalBlockBytes = 0L var remoteBlockBytes = 0L - val hostLocalDirReadingEnabled = - blockManager.hostLocalDirManager != null && blockManager.hostLocalDirManager.isDefined - for ((address, blockInfos) <- blocksByAddress) { if (address.executorId == blockManager.blockManagerId.executorId) { checkBlockSizes(blockInfos) @@ -301,7 +298,8 @@ final class ShuffleBlockFetcherIterator( numBlocksToFetch += mergedBlockInfos.size localBlocks ++= mergedBlockInfos.map(info => (info.blockId, info.mapIndex)) localBlockBytes += mergedBlockInfos.map(_.size).sum - } else if (address.host == blockManager.blockManagerId.host) { + } else if (blockManager.hostLocalDirManager.isDefined && + address.host == blockManager.blockManagerId.host) { checkBlockSizes(blockInfos) val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded( blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)), doBatchFetch) @@ -462,15 +460,13 @@ final class ShuffleBlockFetcherIterator( * `ManagedBuffer`'s memory is allocated lazily when we create the input stream, so all we * track in-memory are the ManagedBuffer references themselves. */ - private[this] def fetchHostLocalBlocks(): Unit = { - val cachedDirsByExec = blockManager.hostLocalDirManager - .map(_.getCachedHostLocalDirs()).getOrElse(Map.empty[String, Array[String]]) + private[this] def fetchHostLocalBlocks(hostLocalDirManager: HostLocalDirManager): Unit = { + val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs() val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) = hostLocalBlocksByExecutor .map { case (hostLocalBmId, bmInfos) => (hostLocalBmId, bmInfos, cachedDirsByExec.get(hostLocalBmId.executorId)) }.partition(_._3.isDefined) - val bmId = blockManager.blockManagerId val immutableHostLocalBlocksWithoutDirs = hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, bmInfos, _) => hostLocalBmId -> bmInfos @@ -478,61 +474,47 @@ final class ShuffleBlockFetcherIterator( if (immutableHostLocalBlocksWithoutDirs.nonEmpty) { logDebug(s"Asynchronous fetching host-local blocks without cached executors' dir: " + s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}") - val execIdsWithoutDirs = immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId) + val execIdsWithoutDirs = immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray - val addressesAnExecutors = blockManager.hostLocalDirManager.map { _ => + val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) { val host = blockManager.blockManagerId.host val port = blockManager.externalShuffleServicePort - Seq((host, port, execIdsWithoutDirs.toArray)) - }.getOrElse { - hostLocalBlocksByExecutor.keysIterator.map { bm => - (bm.host, bm.port, Array(bm.executorId)) - } + Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray)) + } else { + hostLocalBlocksByExecutor.keysIterator + .filter(exec => execIdsWithoutDirs.contains(exec.executorId)) + .map(bm => (bm.host, bm.port, Array(bm))).toSeq } - addressesAnExecutors.foreach { case (host, port, executorIds) => - blockManager.getHostLocalDirs(host, port, executorIds) { + + dirFetchRequests.foreach { case (host, port, bmIds) => + hostLocalDirManager.getHostLocalDirs(host, port, bmIds.map(_.executorId)) { case Success(dirs) => - if (blockManager.hostLocalDirManager.isDefined) { - immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, blockInfos) => - blockInfos.takeWhile { case (blockId, _, mapIndex) => - logInfo(s"Got local dirs ${dirs.get(hostLocalBmId.executorId).mkString(", ")} " + - s"from ${executorIds.head}:$host:$port through hostLocalDirManager.") + bmIds.foreach { bmId => + immutableHostLocalBlocksWithoutDirs(bmId) + .takeWhile { case (blockId, _, mapIndex) => fetchHostLocalBlock( blockId, mapIndex, - dirs.get(hostLocalBmId.executorId), - hostLocalBmId) + dirs.get(bmId.executorId), + bmId) } - } - } else { - // FIXME: write in ugly way temporally - val execId = executorIds.head - logInfo(s"Got local dirs ${dirs.get(execId).mkString(", ")} " + - s"from ${executorIds.head}:$host:$port through executor-executor.") - val (bm, blockInfos) = - immutableHostLocalBlocksWithoutDirs.find(_._1.executorId == execId).get - blockInfos.takeWhile { case (blockId, _, mapIndex) => - fetchHostLocalBlock( - blockId, - mapIndex, - dirs.get(execId), - bm) - } } logDebug(s"Got host-local blocks (without cached executors' dir) in " + s"${Utils.getUsedTimeNs(startTimeNs)}") case Failure(throwable) => logError(s"Error occurred while fetching host local blocks", throwable) - val (hostLocalBmId, blockInfoSeq) = immutableHostLocalBlocksWithoutDirs.head + val bmId = bmIds.head + val blockInfoSeq = immutableHostLocalBlocksWithoutDirs(bmId) val (blockId, _, mapIndex) = blockInfoSeq.head - results.put(FailureFetchResult(blockId, mapIndex, hostLocalBmId, throwable)) + results.put(FailureFetchResult(blockId, mapIndex, bmId, throwable)) } } } if (hostLocalBlocksWithCachedDirs.nonEmpty) { logDebug(s"Synchronous fetching host-local blocks with cached executors' dir: " + s"${hostLocalBlocksWithCachedDirs.mkString(", ")}") + val bmId = blockManager.blockManagerId hostLocalBlocksWithCachedDirs.foreach { case (_, blockInfos, localDirs) => blockInfos.foreach { case (blockId, _, mapIndex) => if (!fetchHostLocalBlock(blockId, mapIndex, localDirs.get, bmId)) { @@ -568,7 +550,7 @@ final class ShuffleBlockFetcherIterator( logDebug(s"Got local blocks in ${Utils.getUsedTimeNs(startTimeNs)}") if (hostLocalBlocks.nonEmpty) { - fetchHostLocalBlocks() + blockManager.hostLocalDirManager.foreach(fetchHostLocalBlocks) } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala new file mode 100644 index 000000000000..9853f1420c7c --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.SparkFunSuite + +class HostLocalShuffleFetchSuite extends SparkFunSuite { + +} diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index bf1379ceb89a..210fa6502969 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -73,9 +73,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT val hostLocalDirManager = new HostLocalDirManager( futureExecutionContext = global, cacheSize = 1, - externalBlockStoreClient = mockExternalBlockStoreClient, - host = "localhost", - externalShuffleServicePort = 7337) + blockStoreClient = mockExternalBlockStoreClient) when(blockManager.hostLocalDirManager).thenReturn(Some(hostLocalDirManager)) when(mockExternalBlockStoreClient.getHostLocalDirs(any(), any(), any(), any())) @@ -218,9 +216,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT val hostLocalDirManager = new HostLocalDirManager( futureExecutionContext = global, cacheSize = 1, - externalBlockStoreClient = mockExternalBlockStoreClient, - host = "localhost", - externalShuffleServicePort = 7337) + blockStoreClient = mockExternalBlockStoreClient) when(blockManager.hostLocalDirManager).thenReturn(Some(hostLocalDirManager)) when(mockExternalBlockStoreClient.getHostLocalDirs(any(), any(), any(), any())) From 285fd70eff3989213e3b2016a016779758613a84 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 21:11:50 +0800 Subject: [PATCH 05/48] add test --- .../shuffle/HostLocalShuffleFetchSuite.scala | 42 ++++++++++++++++++- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala index 9853f1420c7c..0bdfdce41e03 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala @@ -17,8 +17,46 @@ package org.apache.spark.shuffle -import org.apache.spark.SparkFunSuite +import org.scalatest.Matchers -class HostLocalShuffleFetchSuite extends SparkFunSuite { +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite, TestUtils} +import org.apache.spark.internal.config._ +import org.apache.spark.network.netty.NettyBlockTransferService +/** + * This test suite is used to test host local shuffle reading with external shuffle service disabled + */ +class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with LocalSparkContext { + test("read host local shuffle from disk with external shuffle service disabled") { + val conf = new SparkConf() + .set(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) + .set(SHUFFLE_SERVICE_ENABLED, false) + .set(DYN_ALLOCATION_ENABLED, false) + sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) + sc.getConf.get(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) should equal(true) + sc.env.blockManager.externalShuffleServiceEnabled should equal(false) + sc.env.blockManager.hostLocalDirManager.isDefined should equal(true) + sc.env.blockManager.blockStoreClient.getClass should equal(classOf[NettyBlockTransferService]) + TestUtils.waitUntilExecutorsUp(sc, 2, 60000) + + val rdd = sc.parallelize(0 until 1000, 10) + .map { i => (i, 1) } + .reduceByKey(_ + _) + + rdd.count() + rdd.count() + + val cachedExecutors = rdd.mapPartitions { _ => + SparkEnv.get.blockManager.hostLocalDirManager.map { localDirManager => + localDirManager.getCachedHostLocalDirs().keySet.iterator + }.getOrElse(Iterator.empty) + }.collect().toSet + + // both executors are caching the dirs of the other one + cachedExecutors should equal(sc.getExecutorIds().toSet) + + // Now Spark will not receive FetchFailed as host local blocks are read from the cached local + // disk directly + rdd.collect().map(_._2).sum should equal(1000) + } } From 904335a69b719d586ce114bdf5d90d2bc3a12c53 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 21:27:46 +0800 Subject: [PATCH 06/48] update --- .../apache/spark/network/shuffle/BlockStoreClient.java | 8 -------- .../spark/network/netty/NettyBlockTransferService.scala | 2 +- .../spark/storage/ShuffleBlockFetcherIterator.scala | 9 +++------ 3 files changed, 4 insertions(+), 15 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 3e8a0c908442..8897750af3b6 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -18,19 +18,11 @@ package org.apache.spark.network.shuffle; import java.io.Closeable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; import com.codahale.metrics.MetricSet; -import org.apache.spark.network.client.RpcResponseCallback; -import org.apache.spark.network.client.TransportClient; -import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; -import org.apache.spark.network.shuffle.protocol.GetLocalDirsForExecutors; -import org.apache.spark.network.shuffle.protocol.LocalDirsForExecutors; /** * Provides an interface for reading both shuffle files and RDD blocks, either from an Executor diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 1c282532ca57..9c4234016cd7 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -20,7 +20,7 @@ package org.apache.spark.network.netty import java.io.IOException import java.nio.ByteBuffer import java.util -import java.util.{Arrays, HashMap => JHashMap, Map => JMap} +import java.util.{HashMap => JHashMap, Map => JMap} import java.util.concurrent.CompletableFuture import scala.collection.JavaConverters._ diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 2491f01ca65e..0c45d11e03a4 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -488,15 +488,12 @@ final class ShuffleBlockFetcherIterator( dirFetchRequests.foreach { case (host, port, bmIds) => hostLocalDirManager.getHostLocalDirs(host, port, bmIds.map(_.executorId)) { - case Success(dirs) => + case Success(dirsByExecId) => bmIds.foreach { bmId => + val dirs = dirsByExecId.get(bmId.executorId) immutableHostLocalBlocksWithoutDirs(bmId) .takeWhile { case (blockId, _, mapIndex) => - fetchHostLocalBlock( - blockId, - mapIndex, - dirs.get(bmId.executorId), - bmId) + fetchHostLocalBlock(blockId, mapIndex, dirs, bmId) } } logDebug(s"Got host-local blocks (without cached executors' dir) in " + From 9e3c875b61f1137d44092ada67992d6739cb5ad3 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 22:15:31 +0800 Subject: [PATCH 07/48] remove spark --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2c289825c049..218c8c832468 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -128,7 +128,7 @@ private[spark] class HostLocalDirManager( .maximumSize(cacheSize) .build[String, Array[String]]() - private[spark] def getCachedHostLocalDirs() + private def getCachedHostLocalDirs() : scala.collection.Map[String, Array[String]] = executorIdToLocalDirsCache.synchronized { import scala.collection.JavaConverters._ return executorIdToLocalDirsCache.asMap().asScala From 5e12b68b45f80f8445a0c585b8386bf2bf33932d Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 22:16:31 +0800 Subject: [PATCH 08/48] remove spark --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 218c8c832468..efa662f086af 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -122,13 +122,13 @@ private[spark] class HostLocalDirManager( cacheSize: Int, blockStoreClient: BlockStoreClient) extends Logging { - private[spark] val executorIdToLocalDirsCache = + private val executorIdToLocalDirsCache = CacheBuilder .newBuilder() .maximumSize(cacheSize) .build[String, Array[String]]() - private def getCachedHostLocalDirs() + private[spark] def getCachedHostLocalDirs() : scala.collection.Map[String, Array[String]] = executorIdToLocalDirsCache.synchronized { import scala.collection.JavaConverters._ return executorIdToLocalDirsCache.asMap().asScala From 3960b4463b832fd871023367d051b40fcb7f6eec Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 22:45:48 +0800 Subject: [PATCH 09/48] update getHostLocalDirs --- .../netty/NettyBlockTransferService.scala | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 9c4234016cd7..8e9a85a22522 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -203,36 +203,35 @@ private[spark] class NettyBlockTransferService( host: String, port: Int, execIds: Array[String], - hostLocalDirsCompletable: CompletableFuture[util.Map[String, Array[String]]]) - : Unit = { + hostLocalDirsCompletable: CompletableFuture[util.Map[String, Array[String]]]): Unit = { val getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds) try { - val client: TransportClient = clientFactory.createClient(host, port) + val client = clientFactory.createClient(host, port) client.sendRpc(getLocalDirsMessage.toByteBuffer, new RpcResponseCallback() { override def onSuccess(response: ByteBuffer): Unit = { try { - val msgObj: BlockTransferMessage = BlockTransferMessage.Decoder.fromByteBuffer(response) + val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response) hostLocalDirsCompletable.complete( msgObj.asInstanceOf[LocalDirsForExecutors].getLocalDirsByExec) } catch { case t: Throwable => - logWarning(s"Error trying to get the host local dirs for " + - s"${getLocalDirsMessage.execIds.mkString(", ")} via " + - s"external shuffle service", t.getCause) + logWarning(s"Error trying to get the host local dirs for executor ${execIds.head}", + t.getCause) hostLocalDirsCompletable.completeExceptionally(t) - } finally client.close() + } finally { + client.close() + } } override def onFailure(t: Throwable): Unit = { - logWarning(s"Error trying to get the host local dirs for " + - s"${getLocalDirsMessage.execIds.mkString(", ")} via external shuffle service", + logWarning(s"Error trying to get the host local dirs for executor ${execIds.head}", t.getCause) hostLocalDirsCompletable.completeExceptionally(t) client.close() } }) } catch { - case e@(_: IOException | _: InterruptedException) => + case e: IOException | InterruptedException => hostLocalDirsCompletable.completeExceptionally(e) } } From 27c5f5c9394b37807f66164650df2a9ac46e4c10 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 23:03:44 +0800 Subject: [PATCH 10/48] add comment --- .../org/apache/spark/network/shuffle/BlockStoreClient.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 8897750af3b6..775c508e7beb 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -64,6 +64,12 @@ public MetricSet shuffleMetrics() { return () -> Collections.emptyMap(); } + /** + * Request the local disk directories, which are specified by DiskBlockManager, for the executors + * from the external shuffle service (when this is a ExternalBlockStoreClient) or BlockManager + * (when this is a NettyBlockTransferService). Note there's only one executor when this is a + * NettyBlockTransferService. + */ public void getHostLocalDirs( String host, int port, From 8a5811f1b9a2841e2c544e375116107fb8e2d1a5 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 30 Jun 2020 17:32:02 +0800 Subject: [PATCH 11/48] fix build --- .../spark/network/netty/NettyBlockTransferService.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 8e9a85a22522..239e66a49f55 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -231,7 +231,9 @@ private[spark] class NettyBlockTransferService( } }) } catch { - case e: IOException | InterruptedException => + case e: IOException => + hostLocalDirsCompletable.completeExceptionally(e) + case e: InterruptedException => hostLocalDirsCompletable.completeExceptionally(e) } } From 05598f8174bb4173f45ef0938b11c388c838d270 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 30 Jun 2020 17:38:43 +0800 Subject: [PATCH 12/48] update test --- .../apache/spark/shuffle/HostLocalShuffleFetchSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala index 0bdfdce41e03..b94ab1aa6c23 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala @@ -43,8 +43,8 @@ class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with LocalS .map { i => (i, 1) } .reduceByKey(_ + _) - rdd.count() - rdd.count() + assert(rdd.count() === 1000) + assert(rdd.count() === 1000) val cachedExecutors = rdd.mapPartitions { _ => SparkEnv.get.blockManager.hostLocalDirManager.map { localDirManager => @@ -55,8 +55,6 @@ class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with LocalS // both executors are caching the dirs of the other one cachedExecutors should equal(sc.getExecutorIds().toSet) - // Now Spark will not receive FetchFailed as host local blocks are read from the cached local - // disk directly rdd.collect().map(_._2).sum should equal(1000) } } From 23181b66ce500256b89c14f31ad4cc90bd2d938a Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 2 Jul 2020 20:47:06 +0800 Subject: [PATCH 13/48] update conf doc --- .../main/scala/org/apache/spark/internal/config/package.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 43ef5d66d0a0..cb92f1391107 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1419,7 +1419,8 @@ package object config { s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) ${DYN_ALLOCATION_ENABLED.key}" + s" is disabled), shuffle blocks requested from those block managers which are running on " + s"the same host are read from the disk directly instead of being fetched as remote blocks" + - s" over the network.") + s" over the network. Note that for k8s workloads, this only works when nodes are using " + + s"non-isolated container storage.") .version("3.0.0") .booleanConf .createWithDefault(true) From eb6eb9af3a715af57442d793fd5f82103ae969d3 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 10 Jul 2020 16:15:05 +0800 Subject: [PATCH 14/48] rename to getLocalDirs --- .../apache/spark/network/netty/NettyBlockRpcServer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index c301becff524..70ff11b643b0 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -114,9 +114,9 @@ class NettyBlockRpcServer( responseContext.onFailure(exception) } - case req: GetLocalDirsForExecutors => - assert(req.appId == appId) - val execId = req.execIds.head + case getLocalDirs: GetLocalDirsForExecutors => + assert(getLocalDirs.appId == appId) + val execId = getLocalDirs.execIds.head val dirs = blockManager.getLocalDiskDirs responseContext .onSuccess(new LocalDirsForExecutors(Map(execId -> dirs).asJava).toByteBuffer) From 5fdf1bcd73b00ddb931550adced5ca65bf118854 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 10 Jul 2020 16:21:11 +0800 Subject: [PATCH 15/48] update comment --- .../java/org/apache/spark/network/shuffle/BlockStoreClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 775c508e7beb..293fcd253023 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -68,7 +68,7 @@ public MetricSet shuffleMetrics() { * Request the local disk directories, which are specified by DiskBlockManager, for the executors * from the external shuffle service (when this is a ExternalBlockStoreClient) or BlockManager * (when this is a NettyBlockTransferService). Note there's only one executor when this is a - * NettyBlockTransferService. + * NettyBlockTransferService because we ask one specific executor at a time. */ public void getHostLocalDirs( String host, From d192db882cf4f4f5b1135a24e45cb255f386dd5f Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 10 Jul 2020 16:24:23 +0800 Subject: [PATCH 16/48] correct to bmId --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 0c45d11e03a4..cbd02ec1bfa6 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -482,8 +482,8 @@ final class ShuffleBlockFetcherIterator( Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray)) } else { hostLocalBlocksByExecutor.keysIterator - .filter(exec => execIdsWithoutDirs.contains(exec.executorId)) - .map(bm => (bm.host, bm.port, Array(bm))).toSeq + .filter(bmId => execIdsWithoutDirs.contains(bmId.executorId)) + .map(bmId => (bmId.host, bmId.port, Array(bmId))).toSeq } dirFetchRequests.foreach { case (host, port, bmIds) => From db2500aba08eeccd32182eac5041d53b6cfdc211 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 10 Jul 2020 16:33:34 +0800 Subject: [PATCH 17/48] simplify to immutableHostLocalBlocksWithoutDirs --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index cbd02ec1bfa6..c88c2d50bdf8 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -474,15 +474,12 @@ final class ShuffleBlockFetcherIterator( if (immutableHostLocalBlocksWithoutDirs.nonEmpty) { logDebug(s"Asynchronous fetching host-local blocks without cached executors' dir: " + s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}") - val execIdsWithoutDirs = immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray - val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) { val host = blockManager.blockManagerId.host val port = blockManager.externalShuffleServicePort Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray)) } else { - hostLocalBlocksByExecutor.keysIterator - .filter(bmId => execIdsWithoutDirs.contains(bmId.executorId)) + immutableHostLocalBlocksWithoutDirs.keys .map(bmId => (bmId.host, bmId.port, Array(bmId))).toSeq } From 5c767da1144beffef605f90af57ebfe342d8d85b Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 10 Jul 2020 16:34:38 +0800 Subject: [PATCH 18/48] rename to blockInfos --- .../spark/storage/ShuffleBlockFetcherIterator.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index c88c2d50bdf8..eee91110a267 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -464,12 +464,12 @@ final class ShuffleBlockFetcherIterator( val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs() val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) = hostLocalBlocksByExecutor - .map { case (hostLocalBmId, bmInfos) => - (hostLocalBmId, bmInfos, cachedDirsByExec.get(hostLocalBmId.executorId)) + .map { case (hostLocalBmId, blockInfos) => + (hostLocalBmId, blockInfos, cachedDirsByExec.get(hostLocalBmId.executorId)) }.partition(_._3.isDefined) val immutableHostLocalBlocksWithoutDirs = - hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, bmInfos, _) => - hostLocalBmId -> bmInfos + hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, blockInfos, _) => + hostLocalBmId -> blockInfos }.toMap if (immutableHostLocalBlocksWithoutDirs.nonEmpty) { logDebug(s"Asynchronous fetching host-local blocks without cached executors' dir: " + From 0d254920b6b5123f06ee285352e766f19265a766 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 21 Jul 2020 11:05:35 +0800 Subject: [PATCH 19/48] add assert --- .../org/apache/spark/network/netty/NettyBlockRpcServer.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index 70ff11b643b0..12fcf732ffd7 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -116,6 +116,7 @@ class NettyBlockRpcServer( case getLocalDirs: GetLocalDirsForExecutors => assert(getLocalDirs.appId == appId) + assert(getLocalDirs.execIds.length == 1) val execId = getLocalDirs.execIds.head val dirs = blockManager.getLocalDiskDirs responseContext From f6cfcbc971631972ff8700391a0650b8313ee869 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 21 Jul 2020 14:48:28 +0800 Subject: [PATCH 20/48] refactor --- .../network/shuffle/BlockStoreClient.java | 68 +++++++++++++++++-- .../shuffle/ExternalBlockStoreClient.java | 42 ------------ .../spark/network/BlockTransferService.scala | 2 +- .../netty/NettyBlockTransferService.scala | 51 ++------------ 4 files changed, 70 insertions(+), 93 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 293fcd253023..79c424bdf527 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -18,17 +18,32 @@ package org.apache.spark.network.shuffle; import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; import com.codahale.metrics.MetricSet; +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.client.TransportClientFactory; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; +import org.apache.spark.network.shuffle.protocol.GetLocalDirsForExecutors; +import org.apache.spark.network.shuffle.protocol.LocalDirsForExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Provides an interface for reading both shuffle files and RDD blocks, either from an Executor * or external service. */ public abstract class BlockStoreClient implements Closeable { + protected final Logger logger = LoggerFactory.getLogger(this.getClass()); + + protected volatile TransportClientFactory clientFactory; + protected String appId; /** * Fetch a sequence of blocks from a remote node asynchronously, @@ -65,15 +80,60 @@ public MetricSet shuffleMetrics() { } /** - * Request the local disk directories, which are specified by DiskBlockManager, for the executors - * from the external shuffle service (when this is a ExternalBlockStoreClient) or BlockManager - * (when this is a NettyBlockTransferService). Note there's only one executor when this is a - * NettyBlockTransferService because we ask one specific executor at a time. + * Request the local disk directories for executors which are located at the same host with + * the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService). + * + * + * @param host the host of BlockManager or ExternalShuffleService. It's the same with current + * BlockStoreClient. + * @param port the port of BlockManager or ExternalShuffleService. + * @param execIds a collection of executor Ids, which specifies the target executors that we + * want to get their local directories. There could be multiple executor Ids if + * BlockStoreClient is implemented by ExternalBlockStoreClient since the request + * handler, ExternalShuffleService, can serve multiple executors on the same node. + * Or, only one executor Id if BlockStoreClient is implemented by + * NettyBlockTransferService. + * @param hostLocalDirsCompletable a CompletableFuture which contains a map from executor Id to its + * local directories if the request handler replies successfully. + * Otherwise, it contains a specific error. */ public void getHostLocalDirs( String host, int port, String[] execIds, CompletableFuture> hostLocalDirsCompletable) { + assert appId != null : "Called before init()"; + GetLocalDirsForExecutors getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds); + try { + TransportClient client = clientFactory.createClient(host, port); + client.sendRpc(getLocalDirsMessage.toByteBuffer(), new RpcResponseCallback() { + @Override + public void onSuccess(ByteBuffer response) { + try { + BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response); + hostLocalDirsCompletable.complete( + ((LocalDirsForExecutors) msgObj).getLocalDirsByExec()); + } catch (Throwable t) { + logger.warn("Error trying to get the host local dirs for " + + Arrays.toString(getLocalDirsMessage.execIds) + " via external shuffle service", + t.getCause()); + hostLocalDirsCompletable.completeExceptionally(t); + } finally { + client.close(); + } + } + + @Override + public void onFailure(Throwable t) { + logger.warn("Error trying to get the host local dirs for " + + Arrays.toString(getLocalDirsMessage.execIds) + " via external shuffle service", + t.getCause()); + hostLocalDirsCompletable.completeExceptionally(t); + client.close(); + } + }); + } catch (IOException | InterruptedException e) { + hostLocalDirsCompletable.completeExceptionally(e); + } } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 9fdb6322c966..49b5eb6a84c0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -47,16 +47,11 @@ * (via BlockTransferService), which has the downside of losing the data if we lose the executors. */ public class ExternalBlockStoreClient extends BlockStoreClient { - private static final Logger logger = LoggerFactory.getLogger(ExternalBlockStoreClient.class); - private final TransportConf conf; private final boolean authEnabled; private final SecretKeyHolder secretKeyHolder; private final long registrationTimeoutMs; - protected volatile TransportClientFactory clientFactory; - protected String appId; - /** * Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled, * then secretKeyHolder may be null. @@ -188,43 +183,6 @@ public void onFailure(Throwable e) { return numRemovedBlocksFuture; } - public void getHostLocalDirs( - String host, - int port, - String[] execIds, - CompletableFuture> hostLocalDirsCompletable) { - checkInit(); - GetLocalDirsForExecutors getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds); - try { - TransportClient client = clientFactory.createClient(host, port); - client.sendRpc(getLocalDirsMessage.toByteBuffer(), new RpcResponseCallback() { - @Override - public void onSuccess(ByteBuffer response) { - try { - BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response); - hostLocalDirsCompletable.complete( - ((LocalDirsForExecutors) msgObj).getLocalDirsByExec()); - } catch (Throwable t) { - logger.warn("Error trying to get the host local dirs for " + - Arrays.toString(getLocalDirsMessage.execIds) + " via external shuffle service", - t.getCause()); - hostLocalDirsCompletable.completeExceptionally(t); - } - } - - @Override - public void onFailure(Throwable t) { - logger.warn("Error trying to get the host local dirs for " + - Arrays.toString(getLocalDirsMessage.execIds) + " via external shuffle service", - t.getCause()); - hostLocalDirsCompletable.completeExceptionally(t); - } - }); - } catch (IOException | InterruptedException e) { - hostLocalDirsCompletable.completeExceptionally(e); - } - } - @Override public void close() { checkInit(); diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index 70a159f3eeec..98129b62b53d 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.ThreadUtils * BlockTransferService contains both client and server inside. */ private[spark] -abstract class BlockTransferService extends BlockStoreClient with Logging { +abstract class BlockTransferService extends BlockStoreClient { /** * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 239e66a49f55..b50f20c1f35a 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -67,8 +67,6 @@ private[spark] class NettyBlockTransferService( private[this] var transportContext: TransportContext = _ private[this] var server: TransportServer = _ - private[this] var clientFactory: TransportClientFactory = _ - private[this] var appId: String = _ override def init(blockDataManager: BlockDataManager): Unit = { val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager) @@ -82,7 +80,7 @@ private[spark] class NettyBlockTransferService( clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava) server = createServer(serverBootstrap.toList) appId = conf.getAppId - logInfo(s"Server created on ${hostName}:${server.getPort}") + logger.info(s"Server created on ${hostName}:${server.getPort}") } /** Creates and binds the TransportServer, possibly trying multiple ports. */ @@ -115,7 +113,7 @@ private[spark] class NettyBlockTransferService( blockIds: Array[String], listener: BlockFetchingListener, tempFileManager: DownloadFileManager): Unit = { - logTrace(s"Fetch blocks from $host:$port (executor id $execId)") + logger.trace(s"Fetch blocks from $host:$port (executor id $execId)") try { val maxRetries = transportConf.maxIORetries() val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { @@ -148,7 +146,7 @@ private[spark] class NettyBlockTransferService( } } catch { case e: Exception => - logError("Exception while beginning fetchBlocks", e) + logger.error("Exception while beginning fetchBlocks", e) blockIds.foreach(listener.onBlockFetchFailure(_, e)) } } @@ -176,12 +174,12 @@ private[spark] class NettyBlockTransferService( blockId.isShuffle) val callback = new RpcResponseCallback { override def onSuccess(response: ByteBuffer): Unit = { - logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}") + logger.trace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}") result.success((): Unit) } override def onFailure(e: Throwable): Unit = { - logError(s"Error while uploading $blockId${if (asStream) " as stream" else ""}", e) + logger.error(s"Error while uploading $blockId${if (asStream) " as stream" else ""}", e) result.failure(e) } } @@ -199,45 +197,6 @@ private[spark] class NettyBlockTransferService( result.future } - override def getHostLocalDirs( - host: String, - port: Int, - execIds: Array[String], - hostLocalDirsCompletable: CompletableFuture[util.Map[String, Array[String]]]): Unit = { - val getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds) - try { - val client = clientFactory.createClient(host, port) - client.sendRpc(getLocalDirsMessage.toByteBuffer, new RpcResponseCallback() { - override def onSuccess(response: ByteBuffer): Unit = { - try { - val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response) - hostLocalDirsCompletable.complete( - msgObj.asInstanceOf[LocalDirsForExecutors].getLocalDirsByExec) - } catch { - case t: Throwable => - logWarning(s"Error trying to get the host local dirs for executor ${execIds.head}", - t.getCause) - hostLocalDirsCompletable.completeExceptionally(t) - } finally { - client.close() - } - } - - override def onFailure(t: Throwable): Unit = { - logWarning(s"Error trying to get the host local dirs for executor ${execIds.head}", - t.getCause) - hostLocalDirsCompletable.completeExceptionally(t) - client.close() - } - }) - } catch { - case e: IOException => - hostLocalDirsCompletable.completeExceptionally(e) - case e: InterruptedException => - hostLocalDirsCompletable.completeExceptionally(e) - } - } - override def close(): Unit = { if (server != null) { server.close() From 0a144e379aac4ebb31d2d0b0147caae029ee46db Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 23 Jul 2020 13:44:16 +0800 Subject: [PATCH 21/48] fix NettyBlockTransferServiceSuite --- .../spark/network/netty/NettyBlockTransferServiceSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala index baa878eb1404..fa1a75d07605 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala @@ -118,8 +118,8 @@ class NettyBlockTransferServiceSuite .thenAnswer(_ => {hitExecutorDeadException = true}) service0 = createService(port, driverEndpointRef) - val clientFactoryField = service0.getClass.getField( - "org$apache$spark$network$netty$NettyBlockTransferService$$clientFactory") + val clientFactoryField = service0.getClass + .getSuperclass.getSuperclass.getDeclaredField("clientFactory") clientFactoryField.setAccessible(true) clientFactoryField.set(service0, clientFactory) From b44f1cb618024ef452c3907eb128575417ac8601 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 23 Jul 2020 14:18:20 +0800 Subject: [PATCH 22/48] fix private def mockBlockManager(): BlockManager = { --- .../ShuffleBlockFetcherIteratorSuite.scala | 77 ++++++------------- 1 file changed, 24 insertions(+), 53 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 210fa6502969..3666e6565127 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -66,6 +66,14 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT transfer } + private def createMockBlockManager(): BlockManager = { + val blockManager = mock(classOf[BlockManager]) + val localBmId = BlockManagerId("test-client", "test-local-host", 1) + doReturn(localBmId).when(blockManager).blockManagerId + doReturn(None).when(blockManager).hostLocalDirManager + blockManager + } + private def initHostLocalDirManager( blockManager: BlockManager, hostLocalDirs: Map[String, Array[String]]): Unit = { @@ -114,9 +122,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT } test("successful 3 local + 4 host local + 2 remote reads") { - val blockManager = mock(classOf[BlockManager]) - val localBmId = BlockManagerId("test-local-client", "test-local-host", 1) - doReturn(localBmId).when(blockManager).blockManagerId + val blockManager = createMockBlockManager() + val localBmId = blockManager.blockManagerId // Make sure blockManager.getBlockData would return the blocks val localBlocks = Map[BlockId, ManagedBuffer]( @@ -199,9 +206,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT } test("error during accessing host local dirs for executors") { - val blockManager = mock(classOf[BlockManager]) - val localBmId = BlockManagerId("test-local-client", "test-local-host", 1) - doReturn(localBmId).when(blockManager).blockManagerId + val blockManager = createMockBlockManager() val hostLocalBlocks = Map[BlockId, ManagedBuffer]( ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer()) @@ -252,10 +257,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT } test("Hit maxBytesInFlight limitation before maxBlocksInFlightPerAddress") { - val blockManager = mock(classOf[BlockManager]) - val localBmId = BlockManagerId("test-client", "test-local-host", 1) - doReturn(localBmId).when(blockManager).blockManagerId - + val blockManager = createMockBlockManager() val remoteBmId1 = BlockManagerId("test-remote-client-1", "test-remote-host1", 1) val remoteBmId2 = BlockManagerId("test-remote-client-2", "test-remote-host2", 2) val blockId1 = ShuffleBlockId(0, 1, 0) @@ -297,10 +299,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT } test("Hit maxBlocksInFlightPerAddress limitation before maxBytesInFlight") { - val blockManager = mock(classOf[BlockManager]) - val localBmId = BlockManagerId("test-client", "test-local-host", 1) - doReturn(localBmId).when(blockManager).blockManagerId - + val blockManager = createMockBlockManager() val remoteBmId = BlockManagerId("test-remote-client-1", "test-remote-host", 2) val blockId1 = ShuffleBlockId(0, 1, 0) val blockId2 = ShuffleBlockId(0, 2, 0) @@ -344,10 +343,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT } test("fetch continuous blocks in batch successful 3 local + 4 host local + 2 remote reads") { - val blockManager = mock(classOf[BlockManager]) - val localBmId = BlockManagerId("test-client", "test-local-host", 1) - doReturn(localBmId).when(blockManager).blockManagerId - + val blockManager = createMockBlockManager() + val localBmId = blockManager.blockManagerId // Make sure blockManager.getBlockData would return the merged block val localBlocks = Seq[BlockId]( ShuffleBlockId(0, 0, 0), @@ -431,10 +428,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT } test("fetch continuous blocks in batch should respect maxBytesInFlight") { - val blockManager = mock(classOf[BlockManager]) - val localBmId = BlockManagerId("test-client", "test-local-host", 1) - doReturn(localBmId).when(blockManager).blockManagerId - + val blockManager = createMockBlockManager() // Make sure remote blocks would return the merged block val remoteBmId1 = BlockManagerId("test-client-1", "test-client-1", 1) val remoteBmId2 = BlockManagerId("test-client-2", "test-client-2", 2) @@ -490,10 +484,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT } test("fetch continuous blocks in batch should respect maxBlocksInFlightPerAddress") { - val blockManager = mock(classOf[BlockManager]) - val localBmId = BlockManagerId("test-client", "test-local-host", 1) - doReturn(localBmId).when(blockManager).blockManagerId - + val blockManager = createMockBlockManager() // Make sure remote blocks would return the merged block val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 1) val remoteBlocks = Seq( @@ -545,10 +536,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT } test("release current unexhausted buffer in case the task completes early") { - val blockManager = mock(classOf[BlockManager]) - val localBmId = BlockManagerId("test-client", "test-client", 1) - doReturn(localBmId).when(blockManager).blockManagerId - + val blockManager = createMockBlockManager() // Make sure remote blocks would return val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) val blocks = Map[BlockId, ManagedBuffer]( @@ -613,10 +601,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT } test("fail all blocks if any of the remote request fails") { - val blockManager = mock(classOf[BlockManager]) - val localBmId = BlockManagerId("test-client", "test-client", 1) - doReturn(localBmId).when(blockManager).blockManagerId - + val blockManager = createMockBlockManager() // Make sure remote blocks would return val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) val blocks = Map[BlockId, ManagedBuffer]( @@ -703,10 +688,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT } test("retry corrupt blocks") { - val blockManager = mock(classOf[BlockManager]) - val localBmId = BlockManagerId("test-client", "test-client", 1) - doReturn(localBmId).when(blockManager).blockManagerId - + val blockManager = createMockBlockManager() // Make sure remote blocks would return val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) val blocks = Map[BlockId, ManagedBuffer]( @@ -781,9 +763,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT test("big blocks are also checked for corruption") { val streamLength = 10000L - val blockManager = mock(classOf[BlockManager]) - val localBlockManagerId = BlockManagerId("local-client", "local-client", 1) - doReturn(localBlockManagerId).when(blockManager).blockManagerId + val blockManager = createMockBlockManager() // This stream will throw IOException when the first byte is read val corruptBuffer1 = mockCorruptBuffer(streamLength, 0) @@ -902,10 +882,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT } test("retry corrupt blocks (disabled)") { - val blockManager = mock(classOf[BlockManager]) - val localBmId = BlockManagerId("test-client", "test-client", 1) - doReturn(localBmId).when(blockManager).blockManagerId - + val blockManager = createMockBlockManager() // Make sure remote blocks would return val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) val blocks = Map[BlockId, ManagedBuffer]( @@ -967,10 +944,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT test("Blocks should be shuffled to disk when size of the request is above the" + " threshold(maxReqSizeShuffleToMem).") { - val blockManager = mock(classOf[BlockManager]) - val localBmId = BlockManagerId("test-client", "test-client", 1) - doReturn(localBmId).when(blockManager).blockManagerId - + val blockManager = createMockBlockManager() val diskBlockManager = mock(classOf[DiskBlockManager]) val tmpDir = Utils.createTempDir() doReturn{ @@ -1032,10 +1006,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT } test("fail zero-size blocks") { - val blockManager = mock(classOf[BlockManager]) - val localBmId = BlockManagerId("test-client", "test-client", 1) - doReturn(localBmId).when(blockManager).blockManagerId - + val blockManager = createMockBlockManager() // Make sure remote blocks would return val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) val blocks = Map[BlockId, ManagedBuffer]( From 342ae608d532eb577a0a8a0898e785df3fee284e Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 23 Jul 2020 14:22:33 +0800 Subject: [PATCH 23/48] add comment --- .../apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 3666e6565127..c1fa02cf23cb 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -70,6 +70,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT val blockManager = mock(classOf[BlockManager]) val localBmId = BlockManagerId("test-client", "test-local-host", 1) doReturn(localBmId).when(blockManager).blockManagerId + // By default, the mock BlockManager returns None for hostLocalDirManager. One could + // still use initHostLocalDirManager() to specify a custom hostLocalDirManager. doReturn(None).when(blockManager).hostLocalDirManager blockManager } From 613ffab812f22a0706a1a64ada7ec717ab0924ab Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 23 Jul 2020 15:31:56 +0800 Subject: [PATCH 24/48] update --- .../spark/internal/config/package.scala | 16 +++++++----- .../apache/spark/storage/BlockManager.scala | 25 +++++++++++++------ 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index cb92f1391107..642b4c339677 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1415,12 +1415,16 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and 1) external " + - s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) ${DYN_ALLOCATION_ENABLED.key}" + - s" is disabled), shuffle blocks requested from those block managers which are running on " + - s"the same host are read from the disk directly instead of being fetched as remote blocks" + - s" over the network. Note that for k8s workloads, this only works when nodes are using " + - s"non-isolated container storage.") + .doc("When enabled, shuffle blocks requested from those block managers which are running " + + "on the same host are read from the disk directly instead of being fetched as remote " + + "blocks over the network. Note that for k8s workloads, this only works when nodes are " + + "using non-isolated container storage." + + s"To enable the feature, one should disable ${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key} first." + + " And make sure that one of the following requirements are satisfied:\n" + + s"1. external shuffle service is enabled (${SHUFFLE_SERVICE_ENABLED.key});" + + s"2. dynamic allocation is disabled (${DYN_ALLOCATION_ENABLED.key});" + + s"3. dynamic allocation is enabled with shuffle tracking " + + s"(${DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED});") .version("3.0.0") .booleanConf .createWithDefault(true) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index efa662f086af..2fffd3b3421d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -494,17 +494,26 @@ private[spark] class BlockManager( registerWithExternalShuffleServer() } - hostLocalDirManager = - if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) && - !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) && - (externalBlockStoreClient.isDefined || !conf.get(config.DYN_ALLOCATION_ENABLED)) ) { - Some(new HostLocalDirManager( - futureExecutionContext, - conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE), - blockStoreClient)) + hostLocalDirManager = { + val canUseHostLocalReading = conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) && + !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) + val externalShuffleServiceEnabled = externalBlockStoreClient.isDefined + val dynamicAllocationDisabled = !conf.get(config.DYN_ALLOCATION_ENABLED) + val dynamicAllocationEnabledWithShuffleTacking = conf.get(config.DYN_ALLOCATION_ENABLED) && + conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) + + if (canUseHostLocalReading && ( + externalShuffleServiceEnabled || + dynamicAllocationDisabled || + dynamicAllocationEnabledWithShuffleTacking)) { + Some(new HostLocalDirManager( + futureExecutionContext, + conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE), + blockStoreClient)) } else { None } + } logInfo(s"Initialized BlockManager: $blockManagerId") } From cd3e30dc8745d64aef17ccecacbefda465e3b461 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 24 Jul 2020 15:45:53 +0800 Subject: [PATCH 25/48] simplify the requirement --- .../apache/spark/internal/config/package.scala | 15 +++++---------- .../org/apache/spark/storage/BlockManager.scala | 13 ++----------- 2 files changed, 7 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 642b4c339677..168eec600cf9 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1415,16 +1415,11 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc("When enabled, shuffle blocks requested from those block managers which are running " + - "on the same host are read from the disk directly instead of being fetched as remote " + - "blocks over the network. Note that for k8s workloads, this only works when nodes are " + - "using non-isolated container storage." + - s"To enable the feature, one should disable ${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key} first." + - " And make sure that one of the following requirements are satisfied:\n" + - s"1. external shuffle service is enabled (${SHUFFLE_SERVICE_ENABLED.key});" + - s"2. dynamic allocation is disabled (${DYN_ALLOCATION_ENABLED.key});" + - s"3. dynamic allocation is enabled with shuffle tracking " + - s"(${DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED});") + .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled, shuffle " + + "blocks requested from those block managers which are running on the same host are " + + "read from the disk directly instead of being fetched as remote blocks over the " + + "network. Note that for k8s workloads, this only works when nodes are using " + + "non-isolated container storage.") .version("3.0.0") .booleanConf .createWithDefault(true) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2fffd3b3421d..129b3f786662 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -495,17 +495,8 @@ private[spark] class BlockManager( } hostLocalDirManager = { - val canUseHostLocalReading = conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) && - !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) - val externalShuffleServiceEnabled = externalBlockStoreClient.isDefined - val dynamicAllocationDisabled = !conf.get(config.DYN_ALLOCATION_ENABLED) - val dynamicAllocationEnabledWithShuffleTacking = conf.get(config.DYN_ALLOCATION_ENABLED) && - conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) - - if (canUseHostLocalReading && ( - externalShuffleServiceEnabled || - dynamicAllocationDisabled || - dynamicAllocationEnabledWithShuffleTacking)) { + if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) && + !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { Some(new HostLocalDirManager( futureExecutionContext, conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE), From da8d78ac260e015badc35947b464f57b0de75475 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 24 Jul 2020 15:50:45 +0800 Subject: [PATCH 26/48] fix log --- .../org/apache/spark/network/shuffle/BlockStoreClient.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 79c424bdf527..fdb2aee2065a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -115,8 +115,7 @@ public void onSuccess(ByteBuffer response) { ((LocalDirsForExecutors) msgObj).getLocalDirsByExec()); } catch (Throwable t) { logger.warn("Error trying to get the host local dirs for " + - Arrays.toString(getLocalDirsMessage.execIds) + " via external shuffle service", - t.getCause()); + Arrays.toString(getLocalDirsMessage.execIds), t.getCause()); hostLocalDirsCompletable.completeExceptionally(t); } finally { client.close(); @@ -126,8 +125,7 @@ public void onSuccess(ByteBuffer response) { @Override public void onFailure(Throwable t) { logger.warn("Error trying to get the host local dirs for " + - Arrays.toString(getLocalDirsMessage.execIds) + " via external shuffle service", - t.getCause()); + Arrays.toString(getLocalDirsMessage.execIds), t.getCause()); hostLocalDirsCompletable.completeExceptionally(t); client.close(); } From 7d9036a0c55b6e8e5ecf73279fd90aa9112fb2c0 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 24 Jul 2020 15:52:14 +0800 Subject: [PATCH 27/48] fix s --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index eee91110a267..8a68d1f51362 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -493,11 +493,11 @@ final class ShuffleBlockFetcherIterator( fetchHostLocalBlock(blockId, mapIndex, dirs, bmId) } } - logDebug(s"Got host-local blocks (without cached executors' dir) in " + + logDebug("Got host-local blocks (without cached executors' dir) in " + s"${Utils.getUsedTimeNs(startTimeNs)}") case Failure(throwable) => - logError(s"Error occurred while fetching host local blocks", throwable) + logError("Error occurred while fetching host local blocks", throwable) val bmId = bmIds.head val blockInfoSeq = immutableHostLocalBlocksWithoutDirs(bmId) val (blockId, _, mapIndex) = blockInfoSeq.head From 0372bd8a21383a021a96953ed430fc9212f8d62a Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Wed, 26 Aug 2020 09:55:43 +0800 Subject: [PATCH 28/48] fix Matcher --- .../org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala index b94ab1aa6c23..91cf8a9cfddc 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.shuffle -import org.scalatest.Matchers +import org.scalatest.matchers.must.Matchers +import org.scalatest.matchers.should.Matchers._ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite, TestUtils} import org.apache.spark.internal.config._ From 17f1b60a49da1d64e8df03400ba96bed750440ae Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Wed, 26 Aug 2020 10:24:43 +0800 Subject: [PATCH 29/48] fix java style --- .../org/apache/spark/network/shuffle/BlockStoreClient.java | 6 +++--- .../spark/network/shuffle/ExternalBlockStoreClient.java | 4 ---- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index fdb2aee2065a..bcc9c5f5d30f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -93,9 +93,9 @@ public MetricSet shuffleMetrics() { * handler, ExternalShuffleService, can serve multiple executors on the same node. * Or, only one executor Id if BlockStoreClient is implemented by * NettyBlockTransferService. - * @param hostLocalDirsCompletable a CompletableFuture which contains a map from executor Id to its - * local directories if the request handler replies successfully. - * Otherwise, it contains a specific error. + * @param hostLocalDirsCompletable a CompletableFuture which contains a map from executor Id + * to its local directories if the request handler replies + * successfully. Otherwise, it contains a specific error. */ public void getHostLocalDirs( String host, diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 49b5eb6a84c0..bac48bfcc548 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -21,7 +21,6 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; @@ -30,10 +29,7 @@ import org.apache.spark.network.client.RpcResponseCallback; import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.client.TransportClientBootstrap; -import org.apache.spark.network.client.TransportClientFactory; import org.apache.spark.network.shuffle.protocol.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.spark.network.TransportContext; import org.apache.spark.network.crypto.AuthClientBootstrap; From 530d63c20d483d24196ae0b661d8fccdb25be2cc Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 27 Aug 2020 09:24:06 +0800 Subject: [PATCH 30/48] don't close client --- .../org/apache/spark/network/shuffle/BlockStoreClient.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index bcc9c5f5d30f..72dd9f16a761 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -117,8 +117,6 @@ public void onSuccess(ByteBuffer response) { logger.warn("Error trying to get the host local dirs for " + Arrays.toString(getLocalDirsMessage.execIds), t.getCause()); hostLocalDirsCompletable.completeExceptionally(t); - } finally { - client.close(); } } @@ -127,7 +125,6 @@ public void onFailure(Throwable t) { logger.warn("Error trying to get the host local dirs for " + Arrays.toString(getLocalDirsMessage.execIds), t.getCause()); hostLocalDirsCompletable.completeExceptionally(t); - client.close(); } }); } catch (IOException | InterruptedException e) { From b14d6116ad5443dbe5eeb49f004c2562cc54bcde Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 27 Aug 2020 09:34:48 +0800 Subject: [PATCH 31/48] comment style --- .../apache/spark/network/shuffle/BlockStoreClient.java | 5 ++--- .../spark/network/netty/NettyBlockTransferService.scala | 8 ++++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 72dd9f16a761..92853df87f4a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -83,7 +83,6 @@ public MetricSet shuffleMetrics() { * Request the local disk directories for executors which are located at the same host with * the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService). * - * * @param host the host of BlockManager or ExternalShuffleService. It's the same with current * BlockStoreClient. * @param port the port of BlockManager or ExternalShuffleService. @@ -94,8 +93,8 @@ public MetricSet shuffleMetrics() { * Or, only one executor Id if BlockStoreClient is implemented by * NettyBlockTransferService. * @param hostLocalDirsCompletable a CompletableFuture which contains a map from executor Id - * to its local directories if the request handler replies - * successfully. Otherwise, it contains a specific error. + * to its local directories if the request handler replies + * successfully. Otherwise, it contains a specific error. */ public void getHostLocalDirs( String host, diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index b50f20c1f35a..788424fff469 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -113,7 +113,9 @@ private[spark] class NettyBlockTransferService( blockIds: Array[String], listener: BlockFetchingListener, tempFileManager: DownloadFileManager): Unit = { - logger.trace(s"Fetch blocks from $host:$port (executor id $execId)") + if (logger.isTraceEnabled) { + logger.trace(s"Fetch blocks from $host:$port (executor id $execId)") + } try { val maxRetries = transportConf.maxIORetries() val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { @@ -174,7 +176,9 @@ private[spark] class NettyBlockTransferService( blockId.isShuffle) val callback = new RpcResponseCallback { override def onSuccess(response: ByteBuffer): Unit = { - logger.trace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}") + if (logger.isTraceEnabled) { + logger.trace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}") + } result.success((): Unit) } From 3f7ea0b9adb2958772b2b0b9cc560735a6134b35 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 27 Aug 2020 09:42:04 +0800 Subject: [PATCH 32/48] add comment --- .../main/scala/org/apache/spark/network/BlockDataManager.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala index 4e349073f7f6..62fbc166167d 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala @@ -27,6 +27,9 @@ import org.apache.spark.storage.{BlockId, ShuffleBlockId, StorageLevel} private[spark] trait BlockDataManager { + /** + * Get the local directories that used by BlockManager to save the blocks to disk + */ def getLocalDiskDirs: Array[String] /** From 2aa71f659bb3be4ee7a7ae71beaf953fc77c4433 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 27 Aug 2020 13:04:34 +0800 Subject: [PATCH 33/48] refactor --- .../apache/spark/storage/BlockManager.scala | 13 ++- .../storage/ShuffleBlockFetcherIterator.scala | 93 +++++++++++-------- .../spark/ExternalShuffleServiceSuite.scala | 2 +- .../shuffle/HostLocalShuffleFetchSuite.scala | 2 +- .../ShuffleBlockFetcherIteratorSuite.scala | 4 +- 5 files changed, 64 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 129b3f786662..4c1c7eeef2be 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -128,17 +128,16 @@ private[spark] class HostLocalDirManager( .maximumSize(cacheSize) .build[String, Array[String]]() - private[spark] def getCachedHostLocalDirs() - : scala.collection.Map[String, Array[String]] = executorIdToLocalDirsCache.synchronized { - import scala.collection.JavaConverters._ - return executorIdToLocalDirsCache.asMap().asScala - } + private[spark] def getCachedHostLocalDirs: Map[String, Array[String]] = + executorIdToLocalDirsCache.synchronized { + executorIdToLocalDirsCache.asMap().asScala.toMap + } private[spark] def getHostLocalDirs( host: String, port: Int, executorIds: Array[String])( - callback: Try[java.util.Map[String, Array[String]]] => Unit): Unit = { + callback: Try[Map[String, Array[String]]] => Unit): Unit = { val hostLocalDirsCompletable = new CompletableFuture[java.util.Map[String, Array[String]]] blockStoreClient.getHostLocalDirs( host, @@ -147,7 +146,7 @@ private[spark] class HostLocalDirManager( hostLocalDirsCompletable) hostLocalDirsCompletable.whenComplete { (hostLocalDirs, throwable) => if (hostLocalDirs != null) { - callback(Success(hostLocalDirs)) + callback(Success(hostLocalDirs.asScala.toMap)) executorIdToLocalDirsCache.synchronized { executorIdToLocalDirsCache.putAll(hostLocalDirs) } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 8a68d1f51362..f3b5b8a36db3 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -210,13 +210,18 @@ final class ShuffleBlockFetcherIterator( while (iter.hasNext) { val result = iter.next() result match { - case SuccessFetchResult(_, _, address, _, buf, _) => + case SuccessFetchResult(blockId, mapIndex, address, _, buf, _) => if (address != blockManager.blockManagerId) { - shuffleMetrics.incRemoteBytesRead(buf.size) - if (buf.isInstanceOf[FileSegmentManagedBuffer]) { - shuffleMetrics.incRemoteBytesReadToDisk(buf.size) + if (hostLocalBlocks.contains(blockId -> mapIndex)) { + shuffleMetrics.incLocalBlocksFetched(1) + shuffleMetrics.incLocalBytesRead(buf.size) + } else { + shuffleMetrics.incRemoteBytesRead(buf.size) + if (buf.isInstanceOf[FileSegmentManagedBuffer]) { + shuffleMetrics.incRemoteBytesReadToDisk(buf.size) + } + shuffleMetrics.incRemoteBlocksFetched(1) } - shuffleMetrics.incRemoteBlocksFetched(1) } buf.release() case _ => @@ -461,63 +466,73 @@ final class ShuffleBlockFetcherIterator( * track in-memory are the ManagedBuffer references themselves. */ private[this] def fetchHostLocalBlocks(hostLocalDirManager: HostLocalDirManager): Unit = { - val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs() - val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) = - hostLocalBlocksByExecutor - .map { case (hostLocalBmId, blockInfos) => - (hostLocalBmId, blockInfos, cachedDirsByExec.get(hostLocalBmId.executorId)) - }.partition(_._3.isDefined) - val immutableHostLocalBlocksWithoutDirs = - hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, blockInfos, _) => - hostLocalBmId -> blockInfos - }.toMap - if (immutableHostLocalBlocksWithoutDirs.nonEmpty) { + val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs + val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) = { + val (hasCache, noCache) = hostLocalBlocksByExecutor.partition { case (hostLocalBmId, _) => + cachedDirsByExec.contains(hostLocalBmId.executorId) + } + (hasCache.toMap, noCache.toMap) + } + + if (hostLocalBlocksWithMissingDirs.nonEmpty) { logDebug(s"Asynchronous fetching host-local blocks without cached executors' dir: " + - s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}") + s"${hostLocalBlocksWithMissingDirs.mkString(", ")}") + + // If the external shuffle service is enabled, we'll fetch the local directories for + // multiple executors from the external shuffle service, which located at the same host + // with the executors, in once. Otherwise, we'll fetch the local directories from those + // executors directly one by one. The fetch requests won't be too much since one host is + // almost impossible to have many executors at the same time practically. val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) { val host = blockManager.blockManagerId.host val port = blockManager.externalShuffleServicePort - Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray)) + Seq((host, port, hostLocalBlocksWithMissingDirs.keys.toArray)) } else { - immutableHostLocalBlocksWithoutDirs.keys - .map(bmId => (bmId.host, bmId.port, Array(bmId))).toSeq + hostLocalBlocksWithMissingDirs.keys.map(bmId => (bmId.host, bmId.port, Array(bmId))).toSeq } dirFetchRequests.foreach { case (host, port, bmIds) => hostLocalDirManager.getHostLocalDirs(host, port, bmIds.map(_.executorId)) { case Success(dirsByExecId) => - bmIds.foreach { bmId => - val dirs = dirsByExecId.get(bmId.executorId) - immutableHostLocalBlocksWithoutDirs(bmId) - .takeWhile { case (blockId, _, mapIndex) => - fetchHostLocalBlock(blockId, mapIndex, dirs, bmId) - } - } - logDebug("Got host-local blocks (without cached executors' dir) in " + - s"${Utils.getUsedTimeNs(startTimeNs)}") + fetchMultipleHostLocalBlocks( + hostLocalBlocksWithMissingDirs.filterKeys(bmIds.contains), + dirsByExecId, + cached = false) case Failure(throwable) => logError("Error occurred while fetching host local blocks", throwable) val bmId = bmIds.head - val blockInfoSeq = immutableHostLocalBlocksWithoutDirs(bmId) + val blockInfoSeq = hostLocalBlocksWithMissingDirs(bmId) val (blockId, _, mapIndex) = blockInfoSeq.head results.put(FailureFetchResult(blockId, mapIndex, bmId, throwable)) } } } + if (hostLocalBlocksWithCachedDirs.nonEmpty) { logDebug(s"Synchronous fetching host-local blocks with cached executors' dir: " + s"${hostLocalBlocksWithCachedDirs.mkString(", ")}") - val bmId = blockManager.blockManagerId - hostLocalBlocksWithCachedDirs.foreach { case (_, blockInfos, localDirs) => - blockInfos.foreach { case (blockId, _, mapIndex) => - if (!fetchHostLocalBlock(blockId, mapIndex, localDirs.get, bmId)) { - return - } - } + fetchMultipleHostLocalBlocks(hostLocalBlocksWithCachedDirs, cachedDirsByExec, cached = true) + } + } + + private def fetchMultipleHostLocalBlocks( + bmIdToBlocks: Map[BlockManagerId, Seq[(BlockId, Long, Int)]], + localDirsByExecId: Map[String, Array[String]], + cached: Boolean) + : Unit = { + // We use `forall` because once there's a block fetch is failed, `fetchHostLocalBlock` will put + // a `FailureFetchResult` immediately to the `results`. So there's no reason to fetch the + // remaining blocks. + val allFetchSucceed = bmIdToBlocks.forall { case (bmId, blockInfos) => + blockInfos.forall { case (blockId, _, mapIndex) => + fetchHostLocalBlock(blockId, mapIndex, localDirsByExecId(bmId.executorId), bmId) } - logDebug(s"Got host-local blocks (with cached executors' dir) in " + - s"${Utils.getUsedTimeNs(startTimeNs)}") + } + if (allFetchSucceed) { + logDebug(s"Got host-local blocks from ${bmIdToBlocks.keys.mkString(", ")} " + + s"(${if (cached) "with" else "without"} cached executors' dir) " + + s"in ${Utils.getUsedTimeNs(startTimeNs)}") } } diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 9026447e5a98..d4a10f158c1e 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -127,7 +127,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi val cachedExecutors = rdd.mapPartitions { _ => SparkEnv.get.blockManager.hostLocalDirManager.map { localDirManager => - localDirManager.getCachedHostLocalDirs().keySet.iterator + localDirManager.getCachedHostLocalDirs.keySet.iterator }.getOrElse(Iterator.empty) }.collect().toSet diff --git a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala index 91cf8a9cfddc..eb5ffc180a09 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala @@ -49,7 +49,7 @@ class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with LocalS val cachedExecutors = rdd.mapPartitions { _ => SparkEnv.get.blockManager.hostLocalDirManager.map { localDirManager => - localDirManager.getCachedHostLocalDirs().keySet.iterator + localDirManager.getCachedHostLocalDirs.keySet.iterator }.getOrElse(Iterator.empty) }.collect().toSet diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index c1fa02cf23cb..99c43b12d655 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -204,7 +204,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT // 2 remote blocks are read from the same block manager verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(), any()) - assert(blockManager.hostLocalDirManager.get.getCachedHostLocalDirs().size === 1) + assert(blockManager.hostLocalDirManager.get.getCachedHostLocalDirs.size === 1) } test("error during accessing host local dirs for executors") { @@ -426,7 +426,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT verify(blockManager, times(1)) .getHostLocalShuffleData(any(), meq(Array("local-dir"))) - assert(blockManager.hostLocalDirManager.get.getCachedHostLocalDirs().size === 1) + assert(blockManager.hostLocalDirManager.get.getCachedHostLocalDirs.size === 1) } test("fetch continuous blocks in batch should respect maxBytesInFlight") { From 054bf69e7cdb49510f62b5f8d5a234cf6c6a3b36 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Sun, 30 Aug 2020 11:56:43 +0800 Subject: [PATCH 34/48] reorg import --- .../org/apache/spark/network/shuffle/BlockStoreClient.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 92853df87f4a..84a6ca724f10 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -26,14 +26,15 @@ import java.util.concurrent.CompletableFuture; import com.codahale.metrics.MetricSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.spark.network.client.RpcResponseCallback; import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.client.TransportClientFactory; import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; import org.apache.spark.network.shuffle.protocol.GetLocalDirsForExecutors; import org.apache.spark.network.shuffle.protocol.LocalDirsForExecutors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Provides an interface for reading both shuffle files and RDD blocks, either from an Executor From 5fbd6bb78f69e876fd42cfdc780173c590f2b7d9 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 31 Aug 2020 10:42:24 +0800 Subject: [PATCH 35/48] should be --- .../org/apache/spark/network/shuffle/BlockStoreClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 84a6ca724f10..926f199c03d1 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -84,8 +84,8 @@ public MetricSet shuffleMetrics() { * Request the local disk directories for executors which are located at the same host with * the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService). * - * @param host the host of BlockManager or ExternalShuffleService. It's the same with current - * BlockStoreClient. + * @param host the host of BlockManager or ExternalShuffleService. It should be the same host + * with current BlockStoreClient. * @param port the port of BlockManager or ExternalShuffleService. * @param execIds a collection of executor Ids, which specifies the target executors that we * want to get their local directories. There could be multiple executor Ids if From 8c5fdb3040b266dac5f879eab7d9b5e84a8e3446 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 31 Aug 2020 10:49:10 +0800 Subject: [PATCH 36/48] move checkInit to BlockStoreClient --- .../org/apache/spark/network/shuffle/BlockStoreClient.java | 6 +++++- .../spark/network/shuffle/ExternalBlockStoreClient.java | 4 ---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 926f199c03d1..24803d7bfad4 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -80,6 +80,10 @@ public MetricSet shuffleMetrics() { return () -> Collections.emptyMap(); } + protected void checkInit() { + assert appId != null : "Called before init()"; + } + /** * Request the local disk directories for executors which are located at the same host with * the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService). @@ -102,7 +106,7 @@ public void getHostLocalDirs( int port, String[] execIds, CompletableFuture> hostLocalDirsCompletable) { - assert appId != null : "Called before init()"; + checkInit(); GetLocalDirsForExecutors getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds); try { TransportClient client = clientFactory.createClient(host, port); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index bac48bfcc548..76e23e7c69d2 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -63,10 +63,6 @@ public ExternalBlockStoreClient( this.registrationTimeoutMs = registrationTimeoutMs; } - protected void checkInit() { - assert appId != null : "Called before init()"; - } - /** * Initializes the BlockStoreClient, specifying this Executor's appId. * Must be called before any other method on the BlockStoreClient. From 9aa6974152a2ace2e077b2380cc279f1fa31ba0e Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 31 Aug 2020 10:52:22 +0800 Subject: [PATCH 37/48] fix indent --- .../org/apache/spark/network/shuffle/BlockStoreClient.java | 2 +- .../main/scala/org/apache/spark/storage/BlockManager.scala | 4 ++-- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 3 +-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 24803d7bfad4..81a587c11b46 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -116,7 +116,7 @@ public void onSuccess(ByteBuffer response) { try { BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response); hostLocalDirsCompletable.complete( - ((LocalDirsForExecutors) msgObj).getLocalDirsByExec()); + ((LocalDirsForExecutors) msgObj).getLocalDirsByExec()); } catch (Throwable t) { logger.warn("Error trying to get the host local dirs for " + Arrays.toString(getLocalDirsMessage.execIds), t.getCause()); diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 4c1c7eeef2be..b41fc3b218da 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -130,7 +130,7 @@ private[spark] class HostLocalDirManager( private[spark] def getCachedHostLocalDirs: Map[String, Array[String]] = executorIdToLocalDirsCache.synchronized { - executorIdToLocalDirsCache.asMap().asScala.toMap + executorIdToLocalDirsCache.asMap().asScala.toMap } private[spark] def getHostLocalDirs( @@ -495,7 +495,7 @@ private[spark] class BlockManager( hostLocalDirManager = { if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) && - !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { + !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { Some(new HostLocalDirManager( futureExecutionContext, conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE), diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index f3b5b8a36db3..dbf1d3374be8 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -519,8 +519,7 @@ final class ShuffleBlockFetcherIterator( private def fetchMultipleHostLocalBlocks( bmIdToBlocks: Map[BlockManagerId, Seq[(BlockId, Long, Int)]], localDirsByExecId: Map[String, Array[String]], - cached: Boolean) - : Unit = { + cached: Boolean): Unit = { // We use `forall` because once there's a block fetch is failed, `fetchHostLocalBlock` will put // a `FailureFetchResult` immediately to the `results`. So there's no reason to fetch the // remaining blocks. From 09665e22836ce9ef6cc93c5c863af81ce890cd17 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 31 Aug 2020 10:53:48 +0800 Subject: [PATCH 38/48] improve error message --- .../org/apache/spark/network/shuffle/BlockStoreClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 81a587c11b46..e762bd207163 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -118,7 +118,7 @@ public void onSuccess(ByteBuffer response) { hostLocalDirsCompletable.complete( ((LocalDirsForExecutors) msgObj).getLocalDirsByExec()); } catch (Throwable t) { - logger.warn("Error trying to get the host local dirs for " + + logger.warn("Error while trying to get the host local dirs for " + Arrays.toString(getLocalDirsMessage.execIds), t.getCause()); hostLocalDirsCompletable.completeExceptionally(t); } @@ -126,7 +126,7 @@ public void onSuccess(ByteBuffer response) { @Override public void onFailure(Throwable t) { - logger.warn("Error trying to get the host local dirs for " + + logger.warn("Error while trying to get the host local dirs for " + Arrays.toString(getLocalDirsMessage.execIds), t.getCause()); hostLocalDirsCompletable.completeExceptionally(t); } From 6d2190631782ddada1b3c60b4a90f22cc08fd92f Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 31 Aug 2020 10:55:40 +0800 Subject: [PATCH 39/48] remove k8s tip --- .../main/scala/org/apache/spark/internal/config/package.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 168eec600cf9..4b6770b3194d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1417,9 +1417,7 @@ package object config { ConfigBuilder("spark.shuffle.readHostLocalDisk") .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled, shuffle " + "blocks requested from those block managers which are running on the same host are " + - "read from the disk directly instead of being fetched as remote blocks over the " + - "network. Note that for k8s workloads, this only works when nodes are using " + - "non-isolated container storage.") + "read from the disk directly instead of being fetched as remote blocks over the network.") .version("3.0.0") .booleanConf .createWithDefault(true) From 5ea8f24e2f5d1aaa78675454eff89f0a38fc6505 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 31 Aug 2020 11:14:20 +0800 Subject: [PATCH 40/48] resply with error --- .../network/netty/NettyBlockRpcServer.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index 12fcf732ffd7..81fb0c93ae87 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -115,13 +115,19 @@ class NettyBlockRpcServer( } case getLocalDirs: GetLocalDirsForExecutors => - assert(getLocalDirs.appId == appId) - assert(getLocalDirs.execIds.length == 1) - val execId = getLocalDirs.execIds.head - val dirs = blockManager.getLocalDiskDirs - responseContext - .onSuccess(new LocalDirsForExecutors(Map(execId -> dirs).asJava).toByteBuffer) - + val isIncorrectAppId = getLocalDirs.appId != appId + val execNum = getLocalDirs.execIds.length + if (isIncorrectAppId || execNum != 1) { + val errorMsg = "Invalid GetLocalDirsForExecutors request: " + + s"${if (isIncorrectAppId) s"incorrect application id: ${getLocalDirs.appId};"}" + + s"${if (execNum != 1) s"incorrect executor number: $execNum (expected 1);"}" + responseContext.onFailure(new IllegalStateException(errorMsg)) + } else { + val execId = getLocalDirs.execIds.head + val dirs = blockManager.getLocalDiskDirs + responseContext + .onSuccess(new LocalDirsForExecutors(Map(execId -> dirs).asJava).toByteBuffer) + } } } From 57f3e1d9381ea67720551c9c1fcab3e2bff8afbc Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 31 Aug 2020 11:16:02 +0800 Subject: [PATCH 41/48] improve comment --- .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index dbf1d3374be8..1773c8785947 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -520,7 +520,7 @@ final class ShuffleBlockFetcherIterator( bmIdToBlocks: Map[BlockManagerId, Seq[(BlockId, Long, Int)]], localDirsByExecId: Map[String, Array[String]], cached: Boolean): Unit = { - // We use `forall` because once there's a block fetch is failed, `fetchHostLocalBlock` will put + // We use `forall` because once there's a failed block fetch, `fetchHostLocalBlock` will put // a `FailureFetchResult` immediately to the `results`. So there's no reason to fetch the // remaining blocks. val allFetchSucceed = bmIdToBlocks.forall { case (bmId, blockInfos) => From d70e7573d721243c13f0f316c1cef2a264a9b87f Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 31 Aug 2020 11:16:30 +0800 Subject: [PATCH 42/48] allFetchSucceed -> allFetchSucceeded --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 1773c8785947..57b6a38ae6b6 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -523,12 +523,12 @@ final class ShuffleBlockFetcherIterator( // We use `forall` because once there's a failed block fetch, `fetchHostLocalBlock` will put // a `FailureFetchResult` immediately to the `results`. So there's no reason to fetch the // remaining blocks. - val allFetchSucceed = bmIdToBlocks.forall { case (bmId, blockInfos) => + val allFetchSucceeded = bmIdToBlocks.forall { case (bmId, blockInfos) => blockInfos.forall { case (blockId, _, mapIndex) => fetchHostLocalBlock(blockId, mapIndex, localDirsByExecId(bmId.executorId), bmId) } } - if (allFetchSucceed) { + if (allFetchSucceeded) { logDebug(s"Got host-local blocks from ${bmIdToBlocks.keys.mkString(", ")} " + s"(${if (cached) "with" else "without"} cached executors' dir) " + s"in ${Utils.getUsedTimeNs(startTimeNs)}") From 6b97be552b1d5a78f476c4a97d794c567222d9bf Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 31 Aug 2020 20:05:41 +0800 Subject: [PATCH 43/48] combine the tests --- .../spark/ExternalShuffleServiceSuite.scala | 44 ------ .../shuffle/HostLocalShuffleFetchSuite.scala | 61 --------- .../HostLocalShuffleReadingSuite.scala | 126 ++++++++++++++++++ 3 files changed, 126 insertions(+), 105 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index d4a10f158c1e..48c1cc5906f3 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -100,50 +100,6 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi e.getMessage should include ("Fetch failure will not retry stage due to testing config") } - test("SPARK-27651: read host local shuffle blocks from disk and avoid network remote fetches") { - val confWithHostLocalRead = - conf.clone.set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) - confWithHostLocalRead.set(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE, 5) - sc = new SparkContext("local-cluster[2,1,1024]", "test", confWithHostLocalRead) - sc.getConf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) should equal(true) - sc.env.blockManager.externalShuffleServiceEnabled should equal(true) - sc.env.blockManager.hostLocalDirManager.isDefined should equal(true) - sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) - - // In a slow machine, one executor may register hundreds of milliseconds ahead of the other one. - // If we don't wait for all executors, it's possible that only one executor runs all jobs. Then - // all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will directly fetch - // local blocks from the local BlockManager and won't send requests to ExternalShuffleService. - // In this case, we won't receive FetchFailed. And it will make this test fail. - // Therefore, we should wait until all executors are up - TestUtils.waitUntilExecutorsUp(sc, 2, 60000) - - val rdd = sc.parallelize(0 until 1000, 10) - .map { i => (i, 1) } - .reduceByKey(_ + _) - - rdd.count() - rdd.count() - - val cachedExecutors = rdd.mapPartitions { _ => - SparkEnv.get.blockManager.hostLocalDirManager.map { localDirManager => - localDirManager.getCachedHostLocalDirs.keySet.iterator - }.getOrElse(Iterator.empty) - }.collect().toSet - - // both executors are caching the dirs of the other one - cachedExecutors should equal(sc.getExecutorIds().toSet) - - // Invalidate the registered executors, disallowing access to their shuffle blocks (without - // deleting the actual shuffle files, so we could access them without the shuffle service). - // As directories are already cached there is no request to external shuffle service. - rpcHandler.applicationRemoved(sc.conf.getAppId, false /* cleanupLocalDirs */) - - // Now Spark will not receive FetchFailed as host local blocks are read from the cached local - // disk directly - rdd.collect().map(_._2).sum should equal(1000) - } - test("SPARK-25888: using external shuffle service fetching disk persisted blocks") { val confWithRddFetchEnabled = conf.clone.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true) sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithRddFetchEnabled) diff --git a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala deleted file mode 100644 index eb5ffc180a09..000000000000 --- a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle - -import org.scalatest.matchers.must.Matchers -import org.scalatest.matchers.should.Matchers._ - -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite, TestUtils} -import org.apache.spark.internal.config._ -import org.apache.spark.network.netty.NettyBlockTransferService - -/** - * This test suite is used to test host local shuffle reading with external shuffle service disabled - */ -class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with LocalSparkContext { - test("read host local shuffle from disk with external shuffle service disabled") { - val conf = new SparkConf() - .set(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) - .set(SHUFFLE_SERVICE_ENABLED, false) - .set(DYN_ALLOCATION_ENABLED, false) - sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) - sc.getConf.get(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) should equal(true) - sc.env.blockManager.externalShuffleServiceEnabled should equal(false) - sc.env.blockManager.hostLocalDirManager.isDefined should equal(true) - sc.env.blockManager.blockStoreClient.getClass should equal(classOf[NettyBlockTransferService]) - TestUtils.waitUntilExecutorsUp(sc, 2, 60000) - - val rdd = sc.parallelize(0 until 1000, 10) - .map { i => (i, 1) } - .reduceByKey(_ + _) - - assert(rdd.count() === 1000) - assert(rdd.count() === 1000) - - val cachedExecutors = rdd.mapPartitions { _ => - SparkEnv.get.blockManager.hostLocalDirManager.map { localDirManager => - localDirManager.getCachedHostLocalDirs.keySet.iterator - }.getOrElse(Iterator.empty) - }.collect().toSet - - // both executors are caching the dirs of the other one - cachedExecutors should equal(sc.getExecutorIds().toSet) - - rdd.collect().map(_._2).sum should equal(1000) - } -} diff --git a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala new file mode 100644 index 000000000000..4b5aaf3e6300 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.scalatest.matchers.must.Matchers +import org.scalatest.matchers.should.Matchers._ + +import org.apache.spark._ +import org.apache.spark.internal.config._ +import org.apache.spark.network.TransportContext +import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf} +import org.apache.spark.network.server.TransportServer +import org.apache.spark.network.shuffle.{ExternalBlockHandler, ExternalBlockStoreClient} +import org.apache.spark.util.Utils + +/** + * This's an end to end test suite used to test the host local shuffle reading. + */ +class HostLocalShuffleReadingSuite extends SparkFunSuite with Matchers with LocalSparkContext { + var rpcHandler: ExternalBlockHandler = null + var server: TransportServer = null + var transportContext: TransportContext = null + + override def afterEach(): Unit = { + Option(rpcHandler).foreach { handler => + Utils.tryLogNonFatalError{ + server.close() + } + Utils.tryLogNonFatalError{ + handler.close() + } + Utils.tryLogNonFatalError{ + transportContext.close() + } + server = null + rpcHandler = null + transportContext = null + } + super.afterEach() + } + + Seq(true, false).foreach { isESSEnabled => /* ESS: external shuffle service */ + val conf = new SparkConf() + .set(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) + + val (essStatus, blockStoreClientClass) = if (isESSEnabled) { + // LocalSparkCluster will disable the ExternalShuffleService by default. Therefore, + // we have to manually setup an server which embedded with ExternalBlockHandler to + // mimic a ExternalShuffleService. Then, executors on the Worker can successfully + // find a ExternalShuffleService to connect. + val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 2) + rpcHandler = new ExternalBlockHandler(transportConf, null) + transportContext = new TransportContext(transportConf, rpcHandler) + server = transportContext.createServer() + conf.set(SHUFFLE_SERVICE_PORT, server.getPort) + + ("enabled (SPARK-27651)", classOf[ExternalBlockStoreClient]) + } else { + ("disabled (SPARK-32077)", classOf[NettyBlockTransferService]) + } + + test(s"host local shuffle reading with external shuffle service $essStatus") { + conf.set(SHUFFLE_SERVICE_ENABLED, isESSEnabled) + .set(STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE, 5) + sc = new SparkContext("local-cluster[2,1,1024]", "test-host-local-shuffle-reading", conf) + // In a slow machine, one executor may register hundreds of milliseconds ahead of the other + // one. If we don't wait for all executors, it's possible that only one executor runs all + // jobs. Then all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will + // directly fetch local blocks from the local BlockManager and won't send requests to + // BlockStoreClient. In this case, we won't receive FetchFailed. And it will make this + // test fail. Therefore, we should wait until all executors are up + TestUtils.waitUntilExecutorsUp(sc, 2, 60000) + + sc.getConf.get(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) should equal(true) + sc.env.blockManager.externalShuffleServiceEnabled should equal(isESSEnabled) + sc.env.blockManager.hostLocalDirManager.isDefined should equal(true) + sc.env.blockManager.blockStoreClient.getClass should equal(blockStoreClientClass) + + val rdd = sc.parallelize(0 until 1000, 10) + .map(i => (i, 1)).reduceByKey(_ + _) + + // raise a job and trigger the shuffle fetching during the job + assert(rdd.count() === 1000) + + val cachedExecutors = rdd.mapPartitions { _ => + SparkEnv.get.blockManager.hostLocalDirManager.map { localDirManager => + localDirManager.getCachedHostLocalDirs.keySet.iterator + }.getOrElse(Iterator.empty) + }.collect().toSet + + // both executors are caching the dirs of the other one + cachedExecutors should equal(sc.getExecutorIds().toSet) + + Option(rpcHandler).foreach { handler => + // Invalidate the registered executors, disallowing access to their shuffle blocks (without + // deleting the actual shuffle files, so we could access them without the shuffle service). + // As directories are already cached there is no request to external shuffle service. + handler.applicationRemoved(sc.conf.getAppId, false /* cleanupLocalDirs */) + } + + val (localBytesRead, remoteBytesRead) = rdd.map { case (_, _) => + val shuffleReadMetrics = TaskContext.get().taskMetrics().shuffleReadMetrics + (shuffleReadMetrics.localBytesRead, shuffleReadMetrics.remoteBytesRead) + }.collect().unzip + // Spark should read the shuffle data locally from the cached directories on the same host, + // so there's no remote fetching at all. + assert(localBytesRead.sum > 0) + assert(remoteBytesRead.sum === 0) + } + } +} From 5e98eca669c4a141c6fb33c87e44ce850490f562 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 1 Sep 2020 13:36:00 +0800 Subject: [PATCH 44/48] remove unnecesary brackets --- .../apache/spark/network/netty/NettyBlockTransferService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 788424fff469..806fbf52795b 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -80,7 +80,7 @@ private[spark] class NettyBlockTransferService( clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava) server = createServer(serverBootstrap.toList) appId = conf.getAppId - logger.info(s"Server created on ${hostName}:${server.getPort}") + logger.info(s"Server created on $hostName:${server.getPort}") } /** Creates and binds the TransportServer, possibly trying multiple ports. */ From 8f16c177bec4e9571bb5f868bbb74f06a3586844 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 1 Sep 2020 13:49:13 +0800 Subject: [PATCH 45/48] check executorId --- .../spark/network/netty/NettyBlockRpcServer.scala | 15 ++++++++++----- .../org/apache/spark/storage/BlockManager.scala | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index 81fb0c93ae87..5f831dc666ca 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -29,7 +29,7 @@ import org.apache.spark.network.client.{RpcResponseCallback, StreamCallbackWithI import org.apache.spark.network.server.{OneForOneStreamManager, RpcHandler, StreamManager} import org.apache.spark.network.shuffle.protocol._ import org.apache.spark.serializer.Serializer -import org.apache.spark.storage.{BlockId, ShuffleBlockBatchId, ShuffleBlockId, StorageLevel} +import org.apache.spark.storage.{BlockId, BlockManager, ShuffleBlockBatchId, ShuffleBlockId, StorageLevel} /** * Serves requests to open blocks by simply registering one chunk per block requested. @@ -123,10 +123,15 @@ class NettyBlockRpcServer( s"${if (execNum != 1) s"incorrect executor number: $execNum (expected 1);"}" responseContext.onFailure(new IllegalStateException(errorMsg)) } else { - val execId = getLocalDirs.execIds.head - val dirs = blockManager.getLocalDiskDirs - responseContext - .onSuccess(new LocalDirsForExecutors(Map(execId -> dirs).asJava).toByteBuffer) + val expectedExecId = blockManager.asInstanceOf[BlockManager].executorId + val actualExecId = getLocalDirs.execIds.head + if (actualExecId != expectedExecId) { + responseContext.onFailure(new IllegalStateException( + s"Invalid executor id: $actualExecId, expected $expectedExecId.")) + } else { + responseContext.onSuccess(new LocalDirsForExecutors( + Map(actualExecId -> blockManager.getLocalDiskDirs).asJava).toByteBuffer) + } } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b41fc3b218da..25f6b251edb6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -164,7 +164,7 @@ private[spark] class HostLocalDirManager( * Note that [[initialize()]] must be called before the BlockManager is usable. */ private[spark] class BlockManager( - executorId: String, + val executorId: String, rpcEnv: RpcEnv, val master: BlockManagerMaster, val serializerManager: SerializerManager, From 7c381903592c0c0f42ab6fc7eb45a8fcd7d19105 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 1 Sep 2020 13:53:31 +0800 Subject: [PATCH 46/48] use _ --- .../apache/spark/shuffle/HostLocalShuffleReadingSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala index 4b5aaf3e6300..fd807f645101 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala @@ -32,9 +32,9 @@ import org.apache.spark.util.Utils * This's an end to end test suite used to test the host local shuffle reading. */ class HostLocalShuffleReadingSuite extends SparkFunSuite with Matchers with LocalSparkContext { - var rpcHandler: ExternalBlockHandler = null - var server: TransportServer = null - var transportContext: TransportContext = null + var rpcHandler: ExternalBlockHandler = _ + var server: TransportServer = _ + var transportContext: TransportContext = _ override def afterEach(): Unit = { Option(rpcHandler).foreach { handler => From a35807ba6abc933671ff4516efbd138bdf83591c Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 1 Sep 2020 14:26:00 +0800 Subject: [PATCH 47/48] indent --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 25f6b251edb6..ff0f38a2479b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -129,9 +129,9 @@ private[spark] class HostLocalDirManager( .build[String, Array[String]]() private[spark] def getCachedHostLocalDirs: Map[String, Array[String]] = - executorIdToLocalDirsCache.synchronized { - executorIdToLocalDirsCache.asMap().asScala.toMap - } + executorIdToLocalDirsCache.synchronized { + executorIdToLocalDirsCache.asMap().asScala.toMap + } private[spark] def getHostLocalDirs( host: String, From a23ab1721b1225e0a95fb27660fe81847287c622 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 1 Sep 2020 14:41:50 +0800 Subject: [PATCH 48/48] update test --- .../HostLocalShuffleReadingSuite.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala index fd807f645101..12c40f4462c7 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala @@ -92,7 +92,13 @@ class HostLocalShuffleReadingSuite extends SparkFunSuite with Matchers with Loca sc.env.blockManager.blockStoreClient.getClass should equal(blockStoreClientClass) val rdd = sc.parallelize(0 until 1000, 10) - .map(i => (i, 1)).reduceByKey(_ + _) + .map { i => + SparkEnv.get.blockManager.hostLocalDirManager.map { localDirManager => + // No shuffle fetch yet. So the cache must be empty + assert(localDirManager.getCachedHostLocalDirs.isEmpty) + } + (i, 1) + }.reduceByKey(_ + _) // raise a job and trigger the shuffle fetching during the job assert(rdd.count() === 1000) @@ -113,14 +119,17 @@ class HostLocalShuffleReadingSuite extends SparkFunSuite with Matchers with Loca handler.applicationRemoved(sc.conf.getAppId, false /* cleanupLocalDirs */) } - val (localBytesRead, remoteBytesRead) = rdd.map { case (_, _) => + val (local, remote) = rdd.map { case (_, _) => val shuffleReadMetrics = TaskContext.get().taskMetrics().shuffleReadMetrics - (shuffleReadMetrics.localBytesRead, shuffleReadMetrics.remoteBytesRead) + ((shuffleReadMetrics.localBytesRead, shuffleReadMetrics.localBlocksFetched), + (shuffleReadMetrics.remoteBytesRead, shuffleReadMetrics.remoteBlocksFetched)) }.collect().unzip // Spark should read the shuffle data locally from the cached directories on the same host, // so there's no remote fetching at all. - assert(localBytesRead.sum > 0) - assert(remoteBytesRead.sum === 0) + val (localBytesRead, localBlocksFetched) = local.unzip + val (remoteBytesRead, remoteBlocksFetched) = remote.unzip + assert(localBytesRead.sum > 0 && localBlocksFetched.sum > 0) + assert(remoteBytesRead.sum === 0 && remoteBlocksFetched.sum === 0) } } }