From 0a2d0d7950186edbda0f036f5f0d242480bbbf9f Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 21 Dec 2021 20:05:07 -0500 Subject: [PATCH 01/24] Remove shuffle blocks using the shuffle service for released executors --- .../shuffle/ExternalBlockStoreClient.java | 4 +- .../scala/org/apache/spark/SparkEnv.scala | 6 +- .../shuffle/IndexShuffleBlockResolver.scala | 7 ++ .../spark/shuffle/ShuffleBlockResolver.scala | 8 +++ .../storage/BlockManagerMasterEndpoint.scala | 54 ++++++++++++--- .../spark/ExternalShuffleServiceSuite.scala | 68 ++++++++++++++++++- .../BlockManagerReplicationSuite.scala | 3 +- .../spark/storage/BlockManagerSuite.scala | 3 +- .../streaming/ReceivedBlockHandlerSuite.scala | 3 +- 9 files changed, 135 insertions(+), 21 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index d2df77658ccd..b066d99e8ef8 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -299,7 +299,7 @@ public void onSuccess(ByteBuffer response) { BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response); numRemovedBlocksFuture.complete(((BlocksRemoved) msgObj).numRemovedBlocks); } catch (Throwable t) { - logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) + + logger.warn("Error trying to remove blocks " + Arrays.toString(blockIds) + " via external shuffle service from executor: " + execId, t); numRemovedBlocksFuture.complete(0); } @@ -307,7 +307,7 @@ public void onSuccess(ByteBuffer response) { @Override public void onFailure(Throwable e) { - logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) + + logger.warn("Error trying to remove blocks " + Arrays.toString(blockIds) + " via external shuffle service from executor: " + execId, e); numRemovedBlocksFuture.complete(0); } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index d07614a5e212..19467e7eca12 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -343,12 +343,14 @@ object SparkEnv extends Logging { isLocal, conf, listenerBus, - if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) { + if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) { externalShuffleClient } else { None }, blockManagerInfo, - mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], isDriver)), + mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], + shuffleManager, + isDriver)), registerOrLookupEndpoint( BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME, new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)), diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index f1485ec99789..72bf3f06d4bb 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -597,6 +597,13 @@ private[spark] class IndexShuffleBlockResolver( } } + override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = { + Seq( + ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID), + ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) + ) + } + override def stop(): Unit = {} } diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala index 0f35f8c983d6..de9937b2394e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala @@ -41,6 +41,14 @@ trait ShuffleBlockResolver { */ def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None): ManagedBuffer + /** + * Retrive a list of BlockIds for a given shuffle map. Used to delete shuffle files + * from the external shuffle service after the associated executor has been removed. + */ + def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = { + Seq.empty + } + /** * Retrieve the data for the specified merged shuffle block as multiple chunks. */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index b96befce2c0d..edc584045ad0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -36,6 +36,7 @@ import org.apache.spark.network.shuffle.ExternalBlockStoreClient import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend} +import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} @@ -52,6 +53,7 @@ class BlockManagerMasterEndpoint( externalBlockStoreClient: Option[ExternalBlockStoreClient], blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo], mapOutputTracker: MapOutputTrackerMaster, + shuffleManager: ShuffleManager, isDriver: Boolean) extends IsolatedRpcEndpoint with Logging { @@ -104,9 +106,9 @@ class BlockManagerMasterEndpoint( private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf, isDriver) logInfo("BlockManagerMasterEndpoint up") - // same as `conf.get(config.SHUFFLE_SERVICE_ENABLED) - // && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)` - private val externalShuffleServiceRddFetchEnabled: Boolean = externalBlockStoreClient.isDefined + + private val externalShuffleServiceRddFetchEnabled: Boolean = + externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf) private lazy val driverEndpoint = @@ -311,16 +313,46 @@ class BlockManagerMasterEndpoint( } private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = { - // Nothing to do in the BlockManagerMasterEndpoint data structures + // Find all shuffle blocks on executors that are no longer running + val blocksToDeleteByShuffleService = + new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] + mapOutputTracker.shuffleStatuses.get(shuffleId).map { shuffleStatus => + shuffleStatus.mapStatuses + .filter(_.location.port == externalShuffleServicePort) + .foreach { mapStatus => + val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location, + new mutable.HashSet[BlockId]) + val blocksToDel = + shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId) + blocksToDel.foreach(blocks.add(_)) + } + } + val removeMsg = RemoveShuffle(shuffleId) - Future.sequence( - blockManagerInfo.values.map { bm => - bm.storageEndpoint.ask[Boolean](removeMsg).recover { - // use false as default value means no shuffle data were removed - handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) + val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm => + bm.storageEndpoint.ask[Boolean](removeMsg).recover { + // use false as default value means no shuffle data were removed + handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) + } + }.toSeq + + val removeShuffleBlockViaExtShuffleServiceFutures = + externalBlockStoreClient.map { shuffleClient => + blocksToDeleteByShuffleService.map { case (bmId, blockIds) => + Future[Boolean] { + val numRemovedBlocks = shuffleClient.removeBlocks( + bmId.host, + bmId.port, + bmId.executorId, + blockIds.map(_.toString).toArray) + numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, + TimeUnit.SECONDS) == blockIds.size + } } - }.toSeq - ) + }.getOrElse(Seq.empty) + + Future.sequence(removeShuffleFromExecutorsFutures ++ + removeShuffleBlockViaExtShuffleServiceFutures) } /** diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 48c1cc5906f3..dab93ba3560b 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -17,6 +17,11 @@ package org.apache.spark +import java.io.File + +import scala.concurrent.Promise +import scala.concurrent.duration.Duration + import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually import org.scalatest.matchers.should.Matchers._ @@ -26,9 +31,9 @@ import org.apache.spark.internal.config import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server.TransportServer -import org.apache.spark.network.shuffle.{ExternalBlockHandler, ExternalBlockStoreClient} -import org.apache.spark.storage.{RDDBlockId, StorageLevel} -import org.apache.spark.util.Utils +import org.apache.spark.network.shuffle.{ExecutorDiskUtils, ExternalBlockHandler, ExternalBlockStoreClient} +import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel} +import org.apache.spark.util.{ThreadUtils, Utils} /** * This suite creates an external shuffle server and routes all shuffle fetches through it. @@ -138,4 +143,61 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi rpcHandler.applicationRemoved(sc.conf.getAppId, true) } } + + test("SPARK-37618: external shuffle service removes shuffle blocks from deallocated executors") { + // Use local disk reading to get location of shuffle files on disk + val confWithLocalDiskReading = + conf.clone.set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) + sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithLocalDiskReading) + sc.env.blockManager.externalShuffleServiceEnabled should equal(true) + sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) + try { + val rdd = sc.parallelize(0 until 100, 2) + .map { i => (i, 1) } + .repartition(1) + + rdd.count() + + val mapOutputs = sc.env.mapOutputTracker.getMapSizesByExecutorId(0, 0).toSeq + + val dirManager = sc.env.blockManager.hostLocalDirManager + .getOrElse(fail("No host local dir manager")) + + val promises = mapOutputs.map { case (bmid, blocks) => + val promise = Promise[Seq[File]]() + dirManager.getHostLocalDirs(bmid.host, bmid.port, Seq(bmid.executorId).toArray) { + case scala.util.Success(res) => res.foreach { case (eid, dirs) => + val files = blocks.flatMap { case (blockId, _, _) => + val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId] + Seq( + ExecutorDiskUtils.getFile(dirs, sc.env.blockManager.subDirsPerLocalDir, + ShuffleDataBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, + shuffleBlockId.reduceId).name), + ExecutorDiskUtils.getFile(dirs, sc.env.blockManager.subDirsPerLocalDir, + ShuffleIndexBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, + shuffleBlockId.reduceId).name) + ) + } + promise.success(files) + } + case scala.util.Failure(error) => promise.failure(error) + } + promise.future + } + val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) + assert(filesToCheck.length == 4) + filesToCheck.foreach { f => assert(f.exists()) } + + sc.killExecutors(sc.getExecutorIds()) + eventually(timeout(2.seconds), interval(100.milliseconds)) { + assert(sc.env.blockManager.master.getExecutorEndpointRef("0").isEmpty) + } + + sc.env.blockManager.master.removeShuffle(0, true) + + filesToCheck.foreach { f => assert(!f.exists()) } + } finally { + rpcHandler.applicationRemoved(sc.conf.getAppId, true) + } + } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index fc7b7a440697..14e1ee5b09d5 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -102,7 +102,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]() master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, isDriver = true)), + new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, sc.env.shuffleManager, + isDriver = true)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true) allStores.clear() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 0f99ea819f67..45e05b2cc2da 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -188,7 +188,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE liveListenerBus = spy(new LiveListenerBus(conf)) master = spy(new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - liveListenerBus, None, blockManagerInfo, mapOutputTracker, isDriver = true)), + liveListenerBus, None, blockManagerInfo, mapOutputTracker, shuffleManager, + isDriver = true)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true)) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index a3b5b38904a2..dcf82d5e2c28 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -93,7 +93,8 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]() blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, isDriver = true)), + new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, shuffleManager, + isDriver = true)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true) From 92af8aec4041f9dc0a53c1967727e47a09fd1a03 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 4 Jan 2022 17:51:48 -0500 Subject: [PATCH 02/24] Check for existing executor --- .../spark/storage/BlockManagerMasterEndpoint.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index edc584045ad0..7a1e5e3704e3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -317,14 +317,17 @@ class BlockManagerMasterEndpoint( val blocksToDeleteByShuffleService = new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] mapOutputTracker.shuffleStatuses.get(shuffleId).map { shuffleStatus => - shuffleStatus.mapStatuses - .filter(_.location.port == externalShuffleServicePort) - .foreach { mapStatus => + shuffleStatus.mapStatuses.foreach { mapStatus => + // Port should always be external shuffle port if external shuffle is enabled + val isShufflePort = mapStatus.location.port == externalShuffleServicePort + val executorDeallocated = !blockManagerIdByExecutor.contains(mapStatus.location.executorId) + if (isShufflePort && executorDeallocated) { val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location, new mutable.HashSet[BlockId]) val blocksToDel = shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId) blocksToDel.foreach(blocks.add(_)) + } } } From 2a0dfaca9ccebb8c4acd00b063fd229c27f22c55 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Wed, 12 Jan 2022 18:43:09 -0500 Subject: [PATCH 03/24] Fix to work through the context cleaner --- core/src/main/scala/org/apache/spark/ContextCleaner.scala | 4 +++- .../scala/org/apache/spark/ExternalShuffleServiceSuite.scala | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 091b5e1600d9..c70347b619ae 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -235,8 +235,10 @@ private[spark] class ContextCleaner( try { if (mapOutputTrackerMaster.containsShuffle(shuffleId)) { logDebug("Cleaning shuffle " + shuffleId) - mapOutputTrackerMaster.unregisterShuffle(shuffleId) + // Shuffle must be removed before unregistered from the output tracker + // to find blocks served by the shuffle service on deallocated executors shuffleDriverComponents.removeShuffle(shuffleId, blocking) + mapOutputTrackerMaster.unregisterShuffle(shuffleId) listeners.asScala.foreach(_.shuffleCleaned(shuffleId)) logDebug("Cleaned shuffle " + shuffleId) } else { diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index dab93ba3560b..6fd4578f6c04 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -193,7 +193,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi assert(sc.env.blockManager.master.getExecutorEndpointRef("0").isEmpty) } - sc.env.blockManager.master.removeShuffle(0, true) + sc.cleaner.foreach(_.doCleanupShuffle(0, true)) filesToCheck.foreach { f => assert(!f.exists()) } } finally { From 1190470c074445b18e25847766671cbfcdd561d5 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Fri, 14 Jan 2022 17:28:48 -0500 Subject: [PATCH 04/24] Create shuffle files as group writable --- .../sort/io/LocalDiskShuffleMapOutputWriter.java | 2 ++ .../main/scala/org/apache/spark/ContextCleaner.scala | 2 +- .../spark/shuffle/IndexShuffleBlockResolver.scala | 2 ++ core/src/main/scala/org/apache/spark/util/Utils.scala | 11 +++++++++++ 4 files changed, 16 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index 6c5025d1822f..bdcc85f9628c 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -88,6 +88,8 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I lastPartitionId = reducePartitionId; if (outputTempFile == null) { outputTempFile = Utils.tempFileWith(outputFile); + // SPARK-37618: Create the file as group writable so it can be deleted by the shuffle service + Utils.createFileAsGroupWritable(outputTempFile); } if (outputFileChannel != null) { currChannelPosition = outputFileChannel.position(); diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index c70347b619ae..a6fa28b8ae8e 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -235,7 +235,7 @@ private[spark] class ContextCleaner( try { if (mapOutputTrackerMaster.containsShuffle(shuffleId)) { logDebug("Cleaning shuffle " + shuffleId) - // Shuffle must be removed before unregistered from the output tracker + // Shuffle must be removed before it's unregistered from the output tracker // to find blocks served by the shuffle service on deallocated executors shuffleDriverComponents.removeShuffle(shuffleId, blocking) mapOutputTrackerMaster.unregisterShuffle(shuffleId) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 72bf3f06d4bb..04664bcf05a3 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -336,6 +336,8 @@ private[spark] class IndexShuffleBlockResolver( dataTmp: File): Unit = { val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) + // SPARK-37618: Create the file as group writable so it can be deleted by the shuffle service + Utils.createFileAsGroupWritable(indexTmp) val checksumEnabled = checksums.nonEmpty val (checksumFileOpt, checksumTmpOpt) = if (checksumEnabled) { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 17bec9f666ae..0f3dcd70034f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -27,6 +27,7 @@ import java.nio.ByteBuffer import java.nio.channels.{Channels, FileChannel, WritableByteChannel} import java.nio.charset.StandardCharsets import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermissions import java.security.SecureRandom import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ @@ -2745,6 +2746,16 @@ private[spark] object Utils extends Logging { new File(path.getAbsolutePath + "." + UUID.randomUUID()) } + /** + * Creates a file with group write permission. + */ + def createFileAsGroupWritable(file: File): Unit = { + val perms = PosixFilePermissions.fromString("rw-rw----") + val path = file.toPath + Files.createFile(path) + Files.setPosixFilePermissions(path, perms) + } + /** * Returns the name of this JVM process. This is OS dependent but typically (OSX, Linux, Windows), * this is formatted as PID@hostname. From d12b6b2369c4fc57657560c2b2827210b37c664d Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sun, 16 Jan 2022 09:21:45 -0500 Subject: [PATCH 05/24] Make sure external shuffle is used and clean some things up --- .../storage/BlockManagerMasterEndpoint.scala | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 7a1e5e3704e3..2a16178bda43 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -316,17 +316,20 @@ class BlockManagerMasterEndpoint( // Find all shuffle blocks on executors that are no longer running val blocksToDeleteByShuffleService = new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] - mapOutputTracker.shuffleStatuses.get(shuffleId).map { shuffleStatus => - shuffleStatus.mapStatuses.foreach { mapStatus => - // Port should always be external shuffle port if external shuffle is enabled - val isShufflePort = mapStatus.location.port == externalShuffleServicePort - val executorDeallocated = !blockManagerIdByExecutor.contains(mapStatus.location.executorId) - if (isShufflePort && executorDeallocated) { - val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location, - new mutable.HashSet[BlockId]) - val blocksToDel = - shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId) - blocksToDel.foreach(blocks.add(_)) + if (externalBlockStoreClient.isDefined) { + mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus => + shuffleStatus.mapStatuses.foreach { mapStatus => + // Port should always be external shuffle port if external shuffle is enabled + val isShufflePort = mapStatus.location.port == externalShuffleServicePort + val executorDeallocated = + !blockManagerIdByExecutor.contains(mapStatus.location.executorId) + if (isShufflePort && executorDeallocated) { + val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location, + new mutable.HashSet[BlockId]) + val blocksToDel = + shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId) + blocks ++= blocksToDel + } } } } @@ -339,7 +342,7 @@ class BlockManagerMasterEndpoint( } }.toSeq - val removeShuffleBlockViaExtShuffleServiceFutures = + val removeShuffleFromShuffleServicesFutures = externalBlockStoreClient.map { shuffleClient => blocksToDeleteByShuffleService.map { case (bmId, blockIds) => Future[Boolean] { @@ -355,7 +358,7 @@ class BlockManagerMasterEndpoint( }.getOrElse(Seq.empty) Future.sequence(removeShuffleFromExecutorsFutures ++ - removeShuffleBlockViaExtShuffleServiceFutures) + removeShuffleFromShuffleServicesFutures) } /** From b7641416db70638ee759db3e43935e7cd3aba487 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sun, 16 Jan 2022 10:25:14 -0500 Subject: [PATCH 06/24] Create disk block dirs as group writable rather than files --- .../sort/io/LocalDiskShuffleMapOutputWriter.java | 2 -- .../spark/shuffle/IndexShuffleBlockResolver.scala | 2 -- .../org/apache/spark/storage/DiskBlockManager.scala | 6 +++++- core/src/main/scala/org/apache/spark/util/Utils.scala | 11 ----------- 4 files changed, 5 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index bdcc85f9628c..6c5025d1822f 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -88,8 +88,6 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I lastPartitionId = reducePartitionId; if (outputTempFile == null) { outputTempFile = Utils.tempFileWith(outputFile); - // SPARK-37618: Create the file as group writable so it can be deleted by the shuffle service - Utils.createFileAsGroupWritable(outputTempFile); } if (outputFileChannel != null) { currChannelPosition = outputFileChannel.position(); diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 04664bcf05a3..72bf3f06d4bb 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -336,8 +336,6 @@ private[spark] class IndexShuffleBlockResolver( dataTmp: File): Unit = { val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) - // SPARK-37618: Create the file as group writable so it can be deleted by the shuffle service - Utils.createFileAsGroupWritable(indexTmp) val checksumEnabled = checksums.nonEmpty val (checksumFileOpt, checksumTmpOpt) = if (checksumEnabled) { diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index c6a22972d2a0..efc325d0ef19 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import java.io.{File, IOException} import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermissions import java.util.UUID import scala.collection.mutable.HashMap @@ -94,7 +95,10 @@ private[spark] class DiskBlockManager( } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) if (!newDir.exists()) { - Files.createDirectory(newDir.toPath) + // SPARK-37618: Create dir as group writable so it can be deleted by the shuffle service + val path = newDir.toPath + Files.createDirectory(path) + Files.setPosixFilePermissions(path, PosixFilePermissions.fromString("rwxrwx---")) } subDirs(dirId)(subDirId) = newDir newDir diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0f3dcd70034f..17bec9f666ae 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -27,7 +27,6 @@ import java.nio.ByteBuffer import java.nio.channels.{Channels, FileChannel, WritableByteChannel} import java.nio.charset.StandardCharsets import java.nio.file.Files -import java.nio.file.attribute.PosixFilePermissions import java.security.SecureRandom import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ @@ -2746,16 +2745,6 @@ private[spark] object Utils extends Logging { new File(path.getAbsolutePath + "." + UUID.randomUUID()) } - /** - * Creates a file with group write permission. - */ - def createFileAsGroupWritable(file: File): Unit = { - val perms = PosixFilePermissions.fromString("rw-rw----") - val path = file.toPath - Files.createFile(path) - Files.setPosixFilePermissions(path, perms) - } - /** * Returns the name of this JVM process. This is OS dependent but typically (OSX, Linux, Windows), * this is formatted as PID@hostname. From b837362dfffb5a5f7562eecfb08f24d4b0aa2a58 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sun, 16 Jan 2022 12:37:43 -0500 Subject: [PATCH 07/24] Add test for block manager sub dir being group writable --- .../apache/spark/storage/DiskBlockManagerSuite.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index b36eeb767e2e..5279362c511c 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import java.io.{File, FileWriter} import java.nio.file.{Files, Paths} -import java.nio.file.attribute.PosixFilePermissions +import java.nio.file.attribute.{PosixFilePermission, PosixFilePermissions} import java.util.HashMap import com.fasterxml.jackson.core.`type`.TypeReference @@ -141,6 +141,15 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B assert(attemptId.equals("1")) } + test("SPARK-37618: Sub dirs are group writable") { + val blockId = new TestBlockId("test") + val newFile = diskBlockManager.getFile(blockId) + val parentDir = newFile.getParentFile() + assert(parentDir.exists && parentDir.isDirectory) + val permission = Files.getPosixFilePermissions(parentDir.toPath) + assert(permission.contains(PosixFilePermission.GROUP_WRITE)) + } + def writeToFile(file: File, numBytes: Int): Unit = { val writer = new FileWriter(file, true) for (i <- 0 until numBytes) writer.write(i) From f6b756008ca2bb113946c5fb0764eb8d5c6a79c7 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 18 Jan 2022 19:41:25 -0500 Subject: [PATCH 08/24] Use the create dir with 770 helper --- .../spark/storage/DiskBlockManager.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index efc325d0ef19..1bd037c3aa7c 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -18,8 +18,6 @@ package org.apache.spark.storage import java.io.{File, IOException} -import java.nio.file.Files -import java.nio.file.attribute.PosixFilePermissions import java.util.UUID import scala.collection.mutable.HashMap @@ -95,10 +93,9 @@ private[spark] class DiskBlockManager( } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) if (!newDir.exists()) { - // SPARK-37618: Create dir as group writable so it can be deleted by the shuffle service - val path = newDir.toPath - Files.createDirectory(path) - Files.setPosixFilePermissions(path, PosixFilePermissions.fromString("rwxrwx---")) + // SPARK-37618: Create dir as group writable so files within can be deleted by the + // shuffle service + createDirWithPermission770(newDir) } subDirs(dirId)(subDirId) = newDir newDir @@ -248,9 +245,13 @@ private[spark] class DiskBlockManager( * Create a directory that is writable by the group. * Grant the permission 770 "rwxrwx---" to the directory so the shuffle server can * create subdirs/files within the merge folder. - * TODO: Find out why can't we create a dir using java api with permission 770 - * Files.createDirectories(mergeDir.toPath, PosixFilePermissions.asFileAttribute( - * PosixFilePermissions.fromString("rwxrwx---"))) + * We can't use java.nio.files.Files.setPosixPermissions because Java doesn't support + * maintaining or adding the setgid bit when assigning permissions. The Hadoop + * RawLocalFileSystem also doesn't support this. Yarn uses this + * mechanism to make sure all subdirectories and files are assigned the group of + * the container executor. + * + * See https://bugs.openjdk.java.net/browse/JDK-8137404 */ def createDirWithPermission770(dirToCreate: File): Unit = { var attempts = 0 From 3689b9ed570178015d8d0a2c33c177bafd0faf28 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sun, 23 Jan 2022 10:48:26 -0500 Subject: [PATCH 09/24] World readable block file approach --- .../spark/storage/DiskBlockManager.scala | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 1bd037c3aa7c..8371e0235a94 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -18,6 +18,8 @@ package org.apache.spark.storage import java.io.{File, IOException} +import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermission import java.util.UUID import scala.collection.mutable.HashMap @@ -95,7 +97,11 @@ private[spark] class DiskBlockManager( if (!newDir.exists()) { // SPARK-37618: Create dir as group writable so files within can be deleted by the // shuffle service - createDirWithPermission770(newDir) + val path = newDir.toPath + Files.createDirectory(path) + val currentPerms = Files.getPosixFilePermissions(path) + currentPerms.add(PosixFilePermission.GROUP_WRITE) + Files.setPosixFilePermissions(path, currentPerms) } subDirs(dirId)(subDirId) = newDir newDir @@ -167,13 +173,31 @@ private[spark] class DiskBlockManager( } } + /** + * SPARK-37618: Makes sure that the file is created as world readable. This is to get + * around the fact that making the block manager sub dirs group writable removes + * the setgid bit in secure Yarn environments, which prevents the shuffle service + * from being able ot read shuffle files. The outer directories will still not be + * world executable, so this doesn't allow access to these files except for the + * running user and shuffle service. + */ + def createWorldReadableFile(file: File): Unit = { + val path = file.toPath + Files.createFile(path) + val currentPerms = Files.getPosixFilePermissions(path) + currentPerms.add(PosixFilePermission.OTHERS_READ) + Files.setPosixFilePermissions(path, currentPerms) + } + /** Produces a unique block id and File suitable for storing local intermediate results. */ def createTempLocalBlock(): (TempLocalBlockId, File) = { var blockId = new TempLocalBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { blockId = new TempLocalBlockId(UUID.randomUUID()) } - (blockId, getFile(blockId)) + val file = getFile(blockId) + createWorldReadableFile(file) + (blockId, file) } /** Produces a unique block id and File suitable for storing shuffled intermediate results. */ @@ -182,7 +206,9 @@ private[spark] class DiskBlockManager( while (getFile(blockId).exists()) { blockId = new TempShuffleBlockId(UUID.randomUUID()) } - (blockId, getFile(blockId)) + val file = getFile(blockId) + createWorldReadableFile(file) + (blockId, file) } /** From 7be49a0108cb7a58c17b853fec7f093c2fb9d522 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 25 Jan 2022 07:21:01 -0500 Subject: [PATCH 10/24] Create final shuffle files correctly with world readable --- .../sort/io/LocalDiskShuffleMapOutputWriter.java | 2 +- .../sort/io/LocalDiskSingleSpillMapOutputWriter.java | 2 +- .../spark/shuffle/IndexShuffleBlockResolver.scala | 11 ++++++++--- .../org/apache/spark/storage/DiskBlockManager.scala | 10 ++++++++++ 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index 6c5025d1822f..7181691f348e 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -87,7 +87,7 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I } lastPartitionId = reducePartitionId; if (outputTempFile == null) { - outputTempFile = Utils.tempFileWith(outputFile); + outputTempFile = blockResolver.createTempFile(outputFile); } if (outputFileChannel != null) { currChannelPosition = outputFileChannel.position(); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java index 6a994b49d3a2..f566c42af144 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java @@ -49,7 +49,7 @@ public void transferMapSpillFile( // The map spill file already has the proper format, and it contains all of the partition data. // So just transfer it directly to the destination without any merging. File outputFile = blockResolver.getDataFile(shuffleId, mapId); - File tempFile = Utils.tempFileWith(outputFile); + File tempFile = blockResolver.createTempFile(outputFile); Files.move(mapSpillFile.toPath(), tempFile.toPath()); blockResolver .writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, checksums, tempFile); diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 72bf3f06d4bb..ba54555311e7 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -84,6 +84,11 @@ private[spark] class IndexShuffleBlockResolver( shuffleFiles.map(_.length()).sum } + /** Create a temporary file that will be renamed to the final resulting file */ + def createTempFile(file: File): File = { + blockManager.diskBlockManager.createTempFileWith(file) + } + /** * Get the shuffle data file. * @@ -234,7 +239,7 @@ private[spark] class IndexShuffleBlockResolver( throw new IllegalStateException(s"Unexpected shuffle block transfer ${blockId} as " + s"${blockId.getClass().getSimpleName()}") } - val fileTmp = Utils.tempFileWith(file) + val fileTmp = createTempFile(file) val channel = Channels.newChannel( serializerManager.wrapStream(blockId, new FileOutputStream(fileTmp))) @@ -335,7 +340,7 @@ private[spark] class IndexShuffleBlockResolver( checksums: Array[Long], dataTmp: File): Unit = { val indexFile = getIndexFile(shuffleId, mapId) - val indexTmp = Utils.tempFileWith(indexFile) + val indexTmp = createTempFile(indexFile) val checksumEnabled = checksums.nonEmpty val (checksumFileOpt, checksumTmpOpt) = if (checksumEnabled) { @@ -343,7 +348,7 @@ private[spark] class IndexShuffleBlockResolver( "The size of partition lengths and checksums should be equal") val checksumFile = getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)) - (Some(checksumFile), Some(Utils.tempFileWith(checksumFile))) + (Some(checksumFile), Some(createTempFile(checksumFile))) } else { (None, None) } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 8371e0235a94..183908287e1d 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -189,6 +189,16 @@ private[spark] class DiskBlockManager( Files.setPosixFilePermissions(path, currentPerms) } + /** + * Creates a temporary version of the given file with world readable permissions. + * Used to create block files that will be renamed to the final version of the file. + */ + def createTempFileWith(file: File): File = { + val tmpFile = Utils.tempFileWith(file) + createWorldReadableFile(tmpFile) + tmpFile + } + /** Produces a unique block id and File suitable for storing local intermediate results. */ def createTempLocalBlock(): (TempLocalBlockId, File) = { var blockId = new TempLocalBlockId(UUID.randomUUID()) From 4e6219152c498b0e687315e8b7861e82eb14e96e Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 25 Jan 2022 10:08:33 -0500 Subject: [PATCH 11/24] Update LocalDiskSingleSpillMapOutputWriter.java --- .../shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java index f566c42af144..6a994b49d3a2 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java @@ -49,7 +49,7 @@ public void transferMapSpillFile( // The map spill file already has the proper format, and it contains all of the partition data. // So just transfer it directly to the destination without any merging. File outputFile = blockResolver.getDataFile(shuffleId, mapId); - File tempFile = blockResolver.createTempFile(outputFile); + File tempFile = Utils.tempFileWith(outputFile); Files.move(mapSpillFile.toPath(), tempFile.toPath()); blockResolver .writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, checksums, tempFile); From 940a9342dfa06bb682a1999ba355d1a0b630ae2f Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 25 Jan 2022 19:19:16 -0500 Subject: [PATCH 12/24] Fix tests and lint --- .../sort/io/LocalDiskShuffleMapOutputWriter.java | 1 - .../org/apache/spark/storage/DiskBlockManager.scala | 10 +++------- .../spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 8 ++++++++ .../sort/BypassMergeSortShuffleWriterSuite.scala | 11 +++++++++++ .../shuffle/sort/IndexShuffleBlockResolverSuite.scala | 5 +++++ .../io/LocalDiskShuffleMapOutputWriterSuite.scala | 5 +++++ 6 files changed, 32 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index 7181691f348e..efe508d1361c 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -36,7 +36,6 @@ import org.apache.spark.internal.config.package$; import org.apache.spark.shuffle.IndexShuffleBlockResolver; import org.apache.spark.shuffle.api.metadata.MapOutputCommitMessage; -import org.apache.spark.util.Utils; /** * Implementation of {@link ShuffleMapOutputWriter} that replicates the functionality of shuffle diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 183908287e1d..01360fc815a5 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -281,13 +281,9 @@ private[spark] class DiskBlockManager( * Create a directory that is writable by the group. * Grant the permission 770 "rwxrwx---" to the directory so the shuffle server can * create subdirs/files within the merge folder. - * We can't use java.nio.files.Files.setPosixPermissions because Java doesn't support - * maintaining or adding the setgid bit when assigning permissions. The Hadoop - * RawLocalFileSystem also doesn't support this. Yarn uses this - * mechanism to make sure all subdirectories and files are assigned the group of - * the container executor. - * - * See https://bugs.openjdk.java.net/browse/JDK-8137404 + * TODO: Find out why can't we create a dir using java api with permission 770 + * Files.createDirectories(mergeDir.toPath, PosixFilePermissions.asFileAttribute( + * PosixFilePermissions.fromString("rwxrwx---"))) */ def createDirWithPermission770(dirToCreate: File): Unit = { var attempts = 0 diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index cd25f32cca89..c4ee261b38fb 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -128,6 +128,10 @@ public void setUp() throws Exception { }); when(shuffleBlockResolver.getDataFile(anyInt(), anyLong())).thenReturn(mergedOutputFile); + when(shuffleBlockResolver.createTempFile(any(File.class))).thenAnswer(invocationOnMock -> { + File file = (File) invocationOnMock.getArguments()[0]; + return Utils.tempFileWith(file); + }); Answer renameTempAnswer = invocationOnMock -> { partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2]; @@ -159,6 +163,10 @@ public void setUp() throws Exception { spillFilesCreated.add(file); return Tuple2$.MODULE$.apply(blockId, file); }); + when(diskBlockManager.createTempFileWith(any(File.class))).thenAnswer(invocationOnMock -> { + File file = (File) invocationOnMock.getArguments()[0]; + return Utils.tempFileWith(file); + }); when(taskContext.taskMetrics()).thenReturn(taskMetrics); when(shuffleDep.serializer()).thenReturn(serializer); diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index 38ed702d0e4c..83bd3b0a9977 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -111,6 +111,12 @@ class BypassMergeSortShuffleWriterSuite blockId = args(0).asInstanceOf[BlockId]) } + when(blockResolver.createTempFile(any(classOf[File]))) + .thenAnswer { invocationOnMock => + val file = invocationOnMock.getArguments()(0).asInstanceOf[File] + Utils.tempFileWith(file) + } + when(diskBlockManager.createTempShuffleBlock()) .thenAnswer { _ => val blockId = new TempShuffleBlockId(UUID.randomUUID) @@ -266,6 +272,11 @@ class BypassMergeSortShuffleWriterSuite temporaryFilesCreated += file (blockId, file) } + when(diskBlockManager.createTempFileWith(any(classOf[File]))) + .thenAnswer { invocationOnMock => + val file = invocationOnMock.getArguments()(0).asInstanceOf[File] + Utils.tempFileWith(file) + } val numPartition = shuffleHandle.dependency.partitioner.numPartitions val writer = new BypassMergeSortShuffleWriter[Int, Int]( diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala index 21704b1c6732..de12f6840a1a 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala @@ -56,6 +56,11 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa any[BlockId], any[Option[Array[String]]])).thenAnswer( (invocation: InvocationOnMock) => new File(tempDir, invocation.getArguments.head.toString)) when(diskBlockManager.localDirs).thenReturn(Array(tempDir)) + when(diskBlockManager.createTempFileWith(any(classOf[File]))) + .thenAnswer { invocationOnMock => + val file = invocationOnMock.getArguments()(0).asInstanceOf[File] + Utils.tempFileWith(file) + } conf.set("spark.app.id", appId) } diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala index 35d9b4ab1f76..6c9ec8b71a42 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala @@ -74,6 +74,11 @@ class LocalDiskShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndA .set("spark.app.id", "example.spark.app") .set("spark.shuffle.unsafe.file.output.buffer", "16k") when(blockResolver.getDataFile(anyInt, anyLong)).thenReturn(mergedOutputFile) + when(blockResolver.createTempFile(any(classOf[File]))) + .thenAnswer { invocationOnMock => + val file = invocationOnMock.getArguments()(0).asInstanceOf[File] + Utils.tempFileWith(file) + } when(blockResolver.writeMetadataFileAndCommit( anyInt, anyLong, any(classOf[Array[Long]]), any(classOf[Array[Long]]), any(classOf[File]))) .thenAnswer { invocationOnMock => From e4a71afe4c032344873debabf244693f6ee0c68b Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sat, 29 Jan 2022 09:09:41 -0500 Subject: [PATCH 13/24] Rework some conditional checks --- .../storage/BlockManagerMasterEndpoint.scala | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 2a16178bda43..6e1e7644c9b0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -296,18 +296,22 @@ class BlockManagerMasterEndpoint( } }.toSeq - val removeRddBlockViaExtShuffleServiceFutures = externalBlockStoreClient.map { shuffleClient => - blocksToDeleteByShuffleService.map { case (bmId, blockIds) => - Future[Int] { - val numRemovedBlocks = shuffleClient.removeBlocks( - bmId.host, - bmId.port, - bmId.executorId, - blockIds.map(_.toString).toArray) - numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS) + val removeRddBlockViaExtShuffleServiceFutures = if (externalShuffleServiceRddFetchEnabled) { + externalBlockStoreClient.map { shuffleClient => + blocksToDeleteByShuffleService.map { case (bmId, blockIds) => + Future[Int] { + val numRemovedBlocks = shuffleClient.removeBlocks( + bmId.host, + bmId.port, + bmId.executorId, + blockIds.map(_.toString).toArray) + numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS) + } } - } - }.getOrElse(Seq.empty) + }.getOrElse(Seq.empty) + } else { + Seq.empty + } Future.sequence(removeRddFromExecutorsFutures ++ removeRddBlockViaExtShuffleServiceFutures) } @@ -319,16 +323,17 @@ class BlockManagerMasterEndpoint( if (externalBlockStoreClient.isDefined) { mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus => shuffleStatus.mapStatuses.foreach { mapStatus => - // Port should always be external shuffle port if external shuffle is enabled - val isShufflePort = mapStatus.location.port == externalShuffleServicePort - val executorDeallocated = - !blockManagerIdByExecutor.contains(mapStatus.location.executorId) - if (isShufflePort && executorDeallocated) { - val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location, - new mutable.HashSet[BlockId]) + // Port should always be external shuffle port if external shuffle is enabled so + // also check if the executor has been deallocated + if (mapStatus.location.port == externalShuffleServicePort && + !blockManagerIdByExecutor.contains(mapStatus.location.executorId)) { val blocksToDel = shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId) - blocks ++= blocksToDel + if (blocksToDel.nonEmpty) { + val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location, + new mutable.HashSet[BlockId]) + blocks ++= blocksToDel + } } } } From 2b4373fcf9d43486aa3634c29514c8778ad9ed47 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Wed, 2 Feb 2022 20:25:35 -0500 Subject: [PATCH 14/24] Add feature flag --- .../spark/internal/config/package.scala | 8 ++ .../storage/BlockManagerMasterEndpoint.scala | 4 +- .../spark/ExternalShuffleServiceSuite.scala | 102 ++++++++++-------- 3 files changed, 66 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index dbec61a1fdb7..6432b6337598 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -654,6 +654,14 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED = + ConfigBuilder("spark.shuffle.service.remove.shuffle.enabled") + .doc("Whether to use the ExternalShuffleService for deleting shuffle blocks for " + + "deallocated executors.") + .version("3.3.0") + .booleanConf + .createWithDefault(true) + private[spark] val SHUFFLE_SERVICE_FETCH_RDD_ENABLED = ConfigBuilder(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) .doc("Whether to use the ExternalShuffleService for fetching disk persisted RDD blocks. " + diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 6e1e7644c9b0..924f2f1265b3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -107,6 +107,8 @@ class BlockManagerMasterEndpoint( logInfo("BlockManagerMasterEndpoint up") + private val externalShuffleServiceRemoveShuffleEnabled: Boolean = + externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) private val externalShuffleServiceRddFetchEnabled: Boolean = externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf) @@ -320,7 +322,7 @@ class BlockManagerMasterEndpoint( // Find all shuffle blocks on executors that are no longer running val blocksToDeleteByShuffleService = new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] - if (externalBlockStoreClient.isDefined) { + if (externalShuffleServiceRemoveShuffleEnabled) { mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus => shuffleStatus.mapStatuses.foreach { mapStatus => // Port should always be external shuffle port if external shuffle is enabled so diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 6fd4578f6c04..5f52af5a6ad0 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -145,59 +145,67 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi } test("SPARK-37618: external shuffle service removes shuffle blocks from deallocated executors") { - // Use local disk reading to get location of shuffle files on disk - val confWithLocalDiskReading = - conf.clone.set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) - sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithLocalDiskReading) - sc.env.blockManager.externalShuffleServiceEnabled should equal(true) - sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) - try { - val rdd = sc.parallelize(0 until 100, 2) - .map { i => (i, 1) } - .repartition(1) - - rdd.count() - - val mapOutputs = sc.env.mapOutputTracker.getMapSizesByExecutorId(0, 0).toSeq - - val dirManager = sc.env.blockManager.hostLocalDirManager - .getOrElse(fail("No host local dir manager")) - - val promises = mapOutputs.map { case (bmid, blocks) => - val promise = Promise[Seq[File]]() - dirManager.getHostLocalDirs(bmid.host, bmid.port, Seq(bmid.executorId).toArray) { - case scala.util.Success(res) => res.foreach { case (eid, dirs) => - val files = blocks.flatMap { case (blockId, _, _) => - val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId] - Seq( - ExecutorDiskUtils.getFile(dirs, sc.env.blockManager.subDirsPerLocalDir, - ShuffleDataBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, - shuffleBlockId.reduceId).name), - ExecutorDiskUtils.getFile(dirs, sc.env.blockManager.subDirsPerLocalDir, - ShuffleIndexBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, - shuffleBlockId.reduceId).name) - ) + for (enabled <- Seq(true, false)) { + // Use local disk reading to get location of shuffle files on disk + val confWithLocalDiskReading = conf.clone + .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) + .set(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED, enabled) + sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithLocalDiskReading) + sc.env.blockManager.externalShuffleServiceEnabled should equal(true) + sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) + try { + val rdd = sc.parallelize(0 until 100, 2) + .map { i => (i, 1) } + .repartition(1) + + rdd.count() + + val mapOutputs = sc.env.mapOutputTracker.getMapSizesByExecutorId(0, 0).toSeq + + val dirManager = sc.env.blockManager.hostLocalDirManager + .getOrElse(fail("No host local dir manager")) + + val promises = mapOutputs.map { case (bmid, blocks) => + val promise = Promise[Seq[File]]() + dirManager.getHostLocalDirs(bmid.host, bmid.port, Seq(bmid.executorId).toArray) { + case scala.util.Success(res) => res.foreach { case (eid, dirs) => + val files = blocks.flatMap { case (blockId, _, _) => + val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId] + Seq( + ExecutorDiskUtils.getFile(dirs, sc.env.blockManager.subDirsPerLocalDir, + ShuffleDataBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, + shuffleBlockId.reduceId).name), + ExecutorDiskUtils.getFile(dirs, sc.env.blockManager.subDirsPerLocalDir, + ShuffleIndexBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, + shuffleBlockId.reduceId).name) + ) + } + promise.success(files) } - promise.success(files) + case scala.util.Failure(error) => promise.failure(error) } - case scala.util.Failure(error) => promise.failure(error) + promise.future } - promise.future - } - val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) - assert(filesToCheck.length == 4) - filesToCheck.foreach { f => assert(f.exists()) } + val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) + assert(filesToCheck.length == 4) + assert(filesToCheck.forall(_.exists())) - sc.killExecutors(sc.getExecutorIds()) - eventually(timeout(2.seconds), interval(100.milliseconds)) { - assert(sc.env.blockManager.master.getExecutorEndpointRef("0").isEmpty) - } + sc.killExecutors(sc.getExecutorIds()) + eventually(timeout(2.seconds), interval(100.milliseconds)) { + assert(sc.env.blockManager.master.getExecutorEndpointRef("0").isEmpty) + } - sc.cleaner.foreach(_.doCleanupShuffle(0, true)) + sc.cleaner.foreach(_.doCleanupShuffle(0, true)) - filesToCheck.foreach { f => assert(!f.exists()) } - } finally { - rpcHandler.applicationRemoved(sc.conf.getAppId, true) + if (enabled) { + assert(filesToCheck.forall(!_.exists())) + } else { + assert(filesToCheck.forall(_.exists())) + } + } finally { + rpcHandler.applicationRemoved(sc.conf.getAppId, true) + sc.stop() + } } } } From 9698b09ffca1e10b7d0408c4f935a261dfd7dd6f Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sun, 27 Feb 2022 13:14:32 -0500 Subject: [PATCH 15/24] Change config name --- .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 6432b6337598..fec6dd9cf16e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -655,7 +655,7 @@ package object config { .createWithDefault(false) private[spark] val SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED = - ConfigBuilder("spark.shuffle.service.remove.shuffle.enabled") + ConfigBuilder("spark.shuffle.service.removeShuffle") .doc("Whether to use the ExternalShuffleService for deleting shuffle blocks for " + "deallocated executors.") .version("3.3.0") From 126955be9f193b9e88caf119642da85d23571189 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Wed, 16 Mar 2022 21:04:18 -0400 Subject: [PATCH 16/24] Only change permissions if removing shuffle through external shuffle is enabled --- .../spark/storage/DiskBlockManager.scala | 33 +++++++++++-------- .../spark/ExternalShuffleServiceSuite.scala | 15 +++++---- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 01360fc815a5..d213ef4be169 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -78,6 +78,9 @@ private[spark] class DiskBlockManager( private val shutdownHook = addShutdownHook() + private val shuffleServiceRemoveShuffleEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) && + conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) + /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFilePath(). @@ -95,13 +98,16 @@ private[spark] class DiskBlockManager( } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) if (!newDir.exists()) { - // SPARK-37618: Create dir as group writable so files within can be deleted by the - // shuffle service val path = newDir.toPath Files.createDirectory(path) - val currentPerms = Files.getPosixFilePermissions(path) - currentPerms.add(PosixFilePermission.GROUP_WRITE) - Files.setPosixFilePermissions(path, currentPerms) + if (shuffleServiceRemoveShuffleEnabled) { + // SPARK-37618: Create dir as group writable so files within can be deleted by the + // shuffle service in a secure setup. This will remove the setgid bit so files created + // within won't be created with the parent folder group. + val currentPerms = Files.getPosixFilePermissions(path) + currentPerms.add(PosixFilePermission.GROUP_WRITE) + Files.setPosixFilePermissions(path, currentPerms) + } } subDirs(dirId)(subDirId) = newDir newDir @@ -177,7 +183,7 @@ private[spark] class DiskBlockManager( * SPARK-37618: Makes sure that the file is created as world readable. This is to get * around the fact that making the block manager sub dirs group writable removes * the setgid bit in secure Yarn environments, which prevents the shuffle service - * from being able ot read shuffle files. The outer directories will still not be + * from being able to read shuffle files. The outer directories will still not be * world executable, so this doesn't allow access to these files except for the * running user and shuffle service. */ @@ -195,7 +201,12 @@ private[spark] class DiskBlockManager( */ def createTempFileWith(file: File): File = { val tmpFile = Utils.tempFileWith(file) - createWorldReadableFile(tmpFile) + if (shuffleServiceRemoveShuffleEnabled) { + // SPARK-37618: we need to make the file world readable because the parent will + // lose the setgid bit when making it group writable. Without this the shuffle + // service can't read the shuffle files in a secure setup. + createWorldReadableFile(tmpFile) + } tmpFile } @@ -205,9 +216,7 @@ private[spark] class DiskBlockManager( while (getFile(blockId).exists()) { blockId = new TempLocalBlockId(UUID.randomUUID()) } - val file = getFile(blockId) - createWorldReadableFile(file) - (blockId, file) + (blockId, getFile(blockId)) } /** Produces a unique block id and File suitable for storing shuffled intermediate results. */ @@ -216,9 +225,7 @@ private[spark] class DiskBlockManager( while (getFile(blockId).exists()) { blockId = new TempShuffleBlockId(UUID.randomUUID()) } - val file = getFile(blockId) - createWorldReadableFile(file) - (blockId, file) + (blockId, getFile(blockId)) } /** diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 5f52af5a6ad0..b23202d1556e 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -172,13 +172,14 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi val files = blocks.flatMap { case (blockId, _, _) => val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId] Seq( - ExecutorDiskUtils.getFile(dirs, sc.env.blockManager.subDirsPerLocalDir, - ShuffleDataBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, - shuffleBlockId.reduceId).name), - ExecutorDiskUtils.getFile(dirs, sc.env.blockManager.subDirsPerLocalDir, - ShuffleIndexBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, - shuffleBlockId.reduceId).name) - ) + ShuffleDataBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, + shuffleBlockId.reduceId).name, + ShuffleIndexBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, + shuffleBlockId.reduceId).name + ).map { blockId => + new File(ExecutorDiskUtils.getFilePath(dirs, + sc.env.blockManager.subDirsPerLocalDir, blockId)) + } } promise.success(files) } From cc7938459393b38b2850f9e30df548163a8a3ce8 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Wed, 16 Mar 2022 21:16:10 -0400 Subject: [PATCH 17/24] Update description, add to markdown, and reorder logic --- .../apache/spark/internal/config/package.scala | 4 +++- .../storage/BlockManagerMasterEndpoint.scala | 16 ++++++++-------- docs/configuration.md | 11 +++++++++++ 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index fec6dd9cf16e..b6b3247d7507 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -657,7 +657,9 @@ package object config { private[spark] val SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED = ConfigBuilder("spark.shuffle.service.removeShuffle") .doc("Whether to use the ExternalShuffleService for deleting shuffle blocks for " + - "deallocated executors.") + "deallocated executors when the shuffle is no longer needed. Without this enabled, " + + "shuffle data on executors that are deallocated will remain on disk until the " + + "application ends.") .version("3.3.0") .booleanConf .createWithDefault(true) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 924f2f1265b3..65bf80f614bb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -319,6 +319,14 @@ class BlockManagerMasterEndpoint( } private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = { + val removeMsg = RemoveShuffle(shuffleId) + val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm => + bm.storageEndpoint.ask[Boolean](removeMsg).recover { + // use false as default value means no shuffle data were removed + handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) + } + }.toSeq + // Find all shuffle blocks on executors that are no longer running val blocksToDeleteByShuffleService = new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] @@ -341,14 +349,6 @@ class BlockManagerMasterEndpoint( } } - val removeMsg = RemoveShuffle(shuffleId) - val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm => - bm.storageEndpoint.ask[Boolean](removeMsg).recover { - // use false as default value means no shuffle data were removed - handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) - } - }.toSeq - val removeShuffleFromShuffleServicesFutures = externalBlockStoreClient.map { shuffleClient => blocksToDeleteByShuffleService.map { case (bmId, blockIds) => diff --git a/docs/configuration.md b/docs/configuration.md index ae3f422f34b3..8414697ec17e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -966,6 +966,17 @@ Apart from these, the following properties are also available, and may be useful 2.3.0 + + spark.shuffle.service.removeShuffle + true + + Whether to use the ExternalShuffleService for deleting shuffle blocks for + deallocated executors when the shuffle is no longer needed. Without this enabled, + shuffle data on executors that are deallocated will remain on disk until the + application ends. + + 3.3.0 + spark.shuffle.maxChunksBeingTransferred Long.MAX_VALUE From 02057b85657d27506faf15fa300f546c61caa379 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 17 Mar 2022 06:59:26 -0400 Subject: [PATCH 18/24] Enable settings for test --- .../org/apache/spark/storage/DiskBlockManagerSuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 5279362c511c..79ef00a6fafe 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -142,6 +142,11 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B } test("SPARK-37618: Sub dirs are group writable") { + val conf = testConf.clone + conf.set("spark.local.dir", rootDirs) + conf.set("spark.shuffle.service.enabled", "true") + conf.set("spark.shuffle.service.removeShufle", "true") + val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true, isDriver = false) val blockId = new TestBlockId("test") val newFile = diskBlockManager.getFile(blockId) val parentDir = newFile.getParentFile() From b55eb200d1e601b3f1663c551d16ebecc8ed14c1 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 17 Mar 2022 19:36:28 -0400 Subject: [PATCH 19/24] Add permission changing back to temp shuffle block --- .../org/apache/spark/storage/DiskBlockManager.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index d213ef4be169..8400e8b50cfd 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -225,7 +225,14 @@ private[spark] class DiskBlockManager( while (getFile(blockId).exists()) { blockId = new TempShuffleBlockId(UUID.randomUUID()) } - (blockId, getFile(blockId)) + val tmpFile = getFile(blockId) + if (shuffleServiceRemoveShuffleEnabled) { + // SPARK-37618: we need to make the file world readable because the parent will + // lose the setgid bit when making it group writable. Without this the shuffle + // service can't read the shuffle files in a secure setup. + createWorldReadableFile(tmpFile) + } + (blockId, tmpFile) } /** From 84f2929ccbf1e195822ebea21097480c1aac66cd Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Fri, 18 Mar 2022 08:06:20 -0400 Subject: [PATCH 20/24] Apply suggestions for comment change Co-authored-by: wuyi --- .../main/scala/org/apache/spark/storage/DiskBlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 8400e8b50cfd..ee2bfbdbff53 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -196,7 +196,7 @@ private[spark] class DiskBlockManager( } /** - * Creates a temporary version of the given file with world readable permissions. + * Creates a temporary version of the given file with world readable permissions (if required). * Used to create block files that will be renamed to the final version of the file. */ def createTempFileWith(file: File): File = { From 61aa9f0b683ba4b0d0138dd6490c452ec3b1d491 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Fri, 18 Mar 2022 08:07:44 -0400 Subject: [PATCH 21/24] Fix typo, add negative test, and remove duplicate check --- .../spark/shuffle/ShuffleBlockResolver.scala | 2 +- .../storage/BlockManagerMasterEndpoint.scala | 6 ++---- .../spark/storage/DiskBlockManagerSuite.scala | 16 +++++++++++++--- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala index de9937b2394e..c8fde8d2d39d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala @@ -42,7 +42,7 @@ trait ShuffleBlockResolver { def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None): ManagedBuffer /** - * Retrive a list of BlockIds for a given shuffle map. Used to delete shuffle files + * Retrieve a list of BlockIds for a given shuffle map. Used to delete shuffle files * from the external shuffle service after the associated executor has been removed. */ def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 65bf80f614bb..259a1e070a40 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -333,10 +333,8 @@ class BlockManagerMasterEndpoint( if (externalShuffleServiceRemoveShuffleEnabled) { mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus => shuffleStatus.mapStatuses.foreach { mapStatus => - // Port should always be external shuffle port if external shuffle is enabled so - // also check if the executor has been deallocated - if (mapStatus.location.port == externalShuffleServicePort && - !blockManagerIdByExecutor.contains(mapStatus.location.executorId)) { + // Check if the executor has been deallocated + if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) { val blocksToDel = shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId) if (blocksToDel.nonEmpty) { diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 79ef00a6fafe..58fe40f9adeb 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -141,18 +141,28 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B assert(attemptId.equals("1")) } - test("SPARK-37618: Sub dirs are group writable") { + test("SPARK-37618: Sub dirs are group writable when removing from shuffle service enabled") { val conf = testConf.clone conf.set("spark.local.dir", rootDirs) conf.set("spark.shuffle.service.enabled", "true") - conf.set("spark.shuffle.service.removeShufle", "true") + conf.set("spark.shuffle.service.removeShuffle", "false") val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true, isDriver = false) val blockId = new TestBlockId("test") val newFile = diskBlockManager.getFile(blockId) val parentDir = newFile.getParentFile() assert(parentDir.exists && parentDir.isDirectory) val permission = Files.getPosixFilePermissions(parentDir.toPath) - assert(permission.contains(PosixFilePermission.GROUP_WRITE)) + assert(!permission.contains(PosixFilePermission.GROUP_WRITE)) + + assert(parentDir.delete()) + + conf.set("spark.shuffle.service.removeShuffle", "true") + val diskBlockManager2 = new DiskBlockManager(conf, deleteFilesOnStop = true, isDriver = false) + val newFile2 = diskBlockManager2.getFile(blockId) + val parentDir2 = newFile2.getParentFile() + assert(parentDir2.exists && parentDir2.isDirectory) + val permission2 = Files.getPosixFilePermissions(parentDir2.toPath) + assert(permission2.contains(PosixFilePermission.GROUP_WRITE)) } def writeToFile(file: File, numBytes: Int): Unit = { From d34be201ca60d3fd6b8b5f1287ac7dd423a9e74c Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Fri, 18 Mar 2022 14:57:05 -0600 Subject: [PATCH 22/24] Default config to false --- .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- docs/configuration.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index b6b3247d7507..7ea7c05515bd 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -662,7 +662,7 @@ package object config { "application ends.") .version("3.3.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) private[spark] val SHUFFLE_SERVICE_FETCH_RDD_ENABLED = ConfigBuilder(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) diff --git a/docs/configuration.md b/docs/configuration.md index 8414697ec17e..494d4fc78737 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -968,7 +968,7 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.service.removeShuffle - true + false Whether to use the ExternalShuffleService for deleting shuffle blocks for deallocated executors when the shuffle is no longer needed. Without this enabled, From ad0f9eb6fdea088d19b33a7cd6c857502ec771e3 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 22 Mar 2022 07:43:59 -0600 Subject: [PATCH 23/24] Update permissions for RDD blocks if shuffle service fetching is enabled and add permission checks to tests --- .../spark/storage/DiskBlockManager.scala | 14 ++++-- .../org/apache/spark/storage/DiskStore.scala | 10 ++++ .../spark/ExternalShuffleServiceSuite.scala | 50 ++++++++++++++++++- 3 files changed, 67 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index ee2bfbdbff53..50f0b09bd2e0 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -78,8 +78,12 @@ private[spark] class DiskBlockManager( private val shutdownHook = addShutdownHook() - private val shuffleServiceRemoveShuffleEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) && - conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) + // If either of these features are enabled, we must change permissions on block manager + // directories and files to accomodate the shuffle service deleting files in a secure environment + private val permissionChangingRequired = conf.get(config.SHUFFLE_SERVICE_ENABLED) && ( + conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) || + conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) + ) /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with @@ -100,7 +104,7 @@ private[spark] class DiskBlockManager( if (!newDir.exists()) { val path = newDir.toPath Files.createDirectory(path) - if (shuffleServiceRemoveShuffleEnabled) { + if (permissionChangingRequired) { // SPARK-37618: Create dir as group writable so files within can be deleted by the // shuffle service in a secure setup. This will remove the setgid bit so files created // within won't be created with the parent folder group. @@ -201,7 +205,7 @@ private[spark] class DiskBlockManager( */ def createTempFileWith(file: File): File = { val tmpFile = Utils.tempFileWith(file) - if (shuffleServiceRemoveShuffleEnabled) { + if (permissionChangingRequired) { // SPARK-37618: we need to make the file world readable because the parent will // lose the setgid bit when making it group writable. Without this the shuffle // service can't read the shuffle files in a secure setup. @@ -226,7 +230,7 @@ private[spark] class DiskBlockManager( blockId = new TempShuffleBlockId(UUID.randomUUID()) } val tmpFile = getFile(blockId) - if (shuffleServiceRemoveShuffleEnabled) { + if (permissionChangingRequired) { // SPARK-37618: we need to make the file world readable because the parent will // lose the setgid bit when making it group writable. Without this the shuffle // service can't read the shuffle files in a secure setup. diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index f0334c56962c..a3351affc2a2 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -50,6 +50,9 @@ private[spark] class DiskStore( private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS) private val blockSizes = new ConcurrentHashMap[BlockId, Long]() + private val shuffleServiceFetchRddEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) && + conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) + def getSize(blockId: BlockId): Long = blockSizes.get(blockId) /** @@ -71,6 +74,13 @@ private[spark] class DiskStore( logDebug(s"Attempting to put block $blockId") val startTimeNs = System.nanoTime() val file = diskManager.getFile(blockId) + + // SPARK-37618: If fetching cached RDDs from the shuffle service is enabled, we must + // make the file world readable, as it will not be owned by the gropu running the shuffle + // service in a secure environment + if (shuffleServiceFetchRddEnabled) { + diskManager.createWorldReadableFile(file) + } val out = new CountingWritableChannel(openForWrite(file)) var threwException: Boolean = true try { diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index b23202d1556e..dd3d90f3124d 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark import java.io.File +import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermission import scala.concurrent.Promise import scala.concurrent.duration.Duration @@ -106,7 +108,9 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi } test("SPARK-25888: using external shuffle service fetching disk persisted blocks") { - val confWithRddFetchEnabled = conf.clone.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true) + val confWithRddFetchEnabled = conf.clone + .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) + .set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true) sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithRddFetchEnabled) sc.env.blockManager.externalShuffleServiceEnabled should equal(true) sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) @@ -118,13 +122,42 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi rdd.count() val blockId = RDDBlockId(rdd.id, 0) - eventually(timeout(2.seconds), interval(100.milliseconds)) { + val bms = eventually(timeout(2.seconds), interval(100.milliseconds)) { val locations = sc.env.blockManager.master.getLocations(blockId) assert(locations.size === 2) assert(locations.map(_.port).contains(server.getPort), "external shuffle service port should be contained") + locations } + val dirManager = sc.env.blockManager.hostLocalDirManager + .getOrElse(fail("No host local dir manager")) + + val promises = bms.map { case bmid => + val promise = Promise[File]() + dirManager.getHostLocalDirs(bmid.host, bmid.port, Seq(bmid.executorId).toArray) { + case scala.util.Success(res) => res.foreach { case (eid, dirs) => + val file = new File(ExecutorDiskUtils.getFilePath(dirs, + sc.env.blockManager.subDirsPerLocalDir, blockId.name)) + promise.success(file) + } + case scala.util.Failure(error) => promise.failure(error) + } + promise.future + } + val filesToCheck = promises.map(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) + + filesToCheck.foreach(f => { + val parentPerms = Files.getPosixFilePermissions(f.getParentFile.toPath) + assert(parentPerms.contains(PosixFilePermission.GROUP_WRITE)) + + // On most operating systems the default umask will make this test pass + // even if the permission isn't changed. To properly test this, run the + // test with a umask of 0027 + val perms = Files.getPosixFilePermissions(f.toPath) + assert(perms.contains(PosixFilePermission.OTHERS_READ)) + }) + sc.killExecutors(sc.getExecutorIds()) eventually(timeout(2.seconds), interval(100.milliseconds)) { @@ -191,6 +224,19 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi assert(filesToCheck.length == 4) assert(filesToCheck.forall(_.exists())) + if (enabled) { + filesToCheck.foreach(f => { + val parentPerms = Files.getPosixFilePermissions(f.getParentFile.toPath) + assert(parentPerms.contains(PosixFilePermission.GROUP_WRITE)) + + // On most operating systems the default umask will make this test pass + // even if the permission isn't changed. To properly test this, run the + // test with a umask of 0027 + val perms = Files.getPosixFilePermissions(f.toPath) + assert(perms.contains(PosixFilePermission.OTHERS_READ)) + }) + } + sc.killExecutors(sc.getExecutorIds()) eventually(timeout(2.seconds), interval(100.milliseconds)) { assert(sc.env.blockManager.master.getExecutorEndpointRef("0").isEmpty) From c39cdf385436a58309dfcd986cdad8f5179854e0 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 24 Mar 2022 11:10:19 -0600 Subject: [PATCH 24/24] Update comments and use locking withMapStatuses --- .../storage/BlockManagerMasterEndpoint.scala | 20 ++++++++++--------- .../spark/storage/DiskBlockManager.scala | 4 +++- .../org/apache/spark/storage/DiskStore.scala | 6 +++--- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 259a1e070a40..4d8ba9b3e4e0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -332,15 +332,17 @@ class BlockManagerMasterEndpoint( new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] if (externalShuffleServiceRemoveShuffleEnabled) { mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus => - shuffleStatus.mapStatuses.foreach { mapStatus => - // Check if the executor has been deallocated - if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) { - val blocksToDel = - shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId) - if (blocksToDel.nonEmpty) { - val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location, - new mutable.HashSet[BlockId]) - blocks ++= blocksToDel + shuffleStatus.withMapStatuses { mapStatuses => + mapStatuses.foreach { mapStatus => + // Check if the executor has been deallocated + if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) { + val blocksToDel = + shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId) + if (blocksToDel.nonEmpty) { + val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location, + new mutable.HashSet[BlockId]) + blocks ++= blocksToDel + } } } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 50f0b09bd2e0..e29f3fc1b805 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -79,7 +79,9 @@ private[spark] class DiskBlockManager( private val shutdownHook = addShutdownHook() // If either of these features are enabled, we must change permissions on block manager - // directories and files to accomodate the shuffle service deleting files in a secure environment + // directories and files to accomodate the shuffle service deleting files in a secure environment. + // Parent directories are assumed to be restrictive to prevent unauthorized users from accessing + // or modifying world readable files. private val permissionChangingRequired = conf.get(config.SHUFFLE_SERVICE_ENABLED) && ( conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) || conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index a3351affc2a2..d45947db6934 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -75,9 +75,9 @@ private[spark] class DiskStore( val startTimeNs = System.nanoTime() val file = diskManager.getFile(blockId) - // SPARK-37618: If fetching cached RDDs from the shuffle service is enabled, we must - // make the file world readable, as it will not be owned by the gropu running the shuffle - // service in a secure environment + // SPARK-37618: If fetching cached RDDs from the shuffle service is enabled, we must make + // the file world readable, as it will not be owned by the group running the shuffle service + // in a secure environment. This is due to changing directory permissions to allow deletion, if (shuffleServiceFetchRddEnabled) { diskManager.createWorldReadableFile(file) }