From b7f21ef4310a8fde11d0f345e0d9b4313a15c810 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Thu, 27 Jun 2024 16:46:26 +0800 Subject: [PATCH 1/2] [SPARK-46957][CORE] Decommission migrated shuffle files should be able to cleanup from executor ### What changes were proposed in this pull request? This PR uses `SortShuffleManager#taskIdMapsForShuffle` to track the migrated shuffle files on the destination executor. ### Why are the changes needed? This is a long-standing bug in decommission where the migrated shuffle files can't be cleaned up from the executor. Normally, the shuffle files are tracked by `taskIdMapsForShuffle` during the map task execution. Upon receiving the `RemoveShuffle(shuffleId)` request from driver, executor can clean up those shuffle files by searching `taskIdMapsForShuffle`. However, for the migrated shuffle files by decommission, they lose the track in the destination executor's `taskIdMapsForShuffle` and can't be deleted as a result. Note this bug only affects shuffle removal on the executor. For shuffle removal on the external shuffle service (when `spark.shuffle.service.removeShuffle` enabled and the executor stores the shuffle files has gone), we don't rely on `taskIdMapsForShuffle` but using the specific shuffle block id to locate the shuffle file directly. So it won't be an issue there. ### Does this PR introduce _any_ user-facing change? No. (Common users won't see the difference underlying.) ### How was this patch tested? Add unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47037 from Ngone51/SPARK-46957. Authored-by: Yi Wu Signed-off-by: Yi Wu --- .../LocalDiskShuffleExecutorComponents.java | 3 +- .../shuffle/IndexShuffleBlockResolver.scala | 20 ++++- .../shuffle/sort/SortShuffleManager.scala | 3 +- .../sort/UnsafeShuffleWriterSuite.java | 6 +- ...kManagerDecommissionIntegrationSuite.scala | 76 +++++++++++++++++++ 5 files changed, 103 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java index eb4d9d9abc8e3..861a8e623a6e5 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java @@ -56,7 +56,8 @@ public void initializeExecutor(String appId, String execId, Map if (blockManager == null) { throw new IllegalStateException("No blockManager available from the SparkEnv."); } - blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager); + blockResolver = + new IndexShuffleBlockResolver(sparkConf, blockManager, Map.of() /* Shouldn't be accessed */); } @Override diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 919b0f5f7c135..299f299249b9a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -21,6 +21,7 @@ import java.io._ import java.nio.ByteBuffer import java.nio.channels.Channels import java.nio.file.Files +import java.util.{Map => JMap} import scala.collection.mutable.ArrayBuffer @@ -37,6 +38,7 @@ import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID import org.apache.spark.storage._ import org.apache.spark.util.Utils +import org.apache.spark.util.collection.OpenHashSet /** * Create and maintain the shuffle blocks' mapping between logic block and physical file location. @@ -52,7 +54,8 @@ import org.apache.spark.util.Utils private[spark] class IndexShuffleBlockResolver( conf: SparkConf, // var for testing - var _blockManager: BlockManager = null) + var _blockManager: BlockManager = null, + val taskIdMapsForShuffle: JMap[Int, OpenHashSet[Long]] = JMap.of()) extends ShuffleBlockResolver with Logging with MigratableResolver { @@ -270,6 +273,21 @@ private[spark] class IndexShuffleBlockResolver( throw SparkCoreErrors.failedRenameTempFileError(fileTmp, file) } } + blockId match { + case ShuffleIndexBlockId(shuffleId, mapId, _) => + val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent( + shuffleId, _ => new OpenHashSet[Long](8) + ) + mapTaskIds.add(mapId) + + case ShuffleDataBlockId(shuffleId, mapId, _) => + val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent( + shuffleId, _ => new OpenHashSet[Long](8) + ) + mapTaskIds.add(mapId) + + case _ => // Unreachable + } blockManager.reportBlockStatus(blockId, BlockStatus(StorageLevel.DISK_ONLY, 0, diskSize)) } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 79dff6f87534a..4234d0ec5fd04 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -87,7 +87,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf) - override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) + override val shuffleBlockResolver = + new IndexShuffleBlockResolver(conf, taskIdMapsForShuffle = taskIdMapsForShuffle) /** * Obtains a [[ShuffleHandle]] to pass to tasks. diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 1fa17b908699f..ed3a3b887c304 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -314,7 +314,8 @@ public void writeWithoutSpilling() throws Exception { @Test public void writeChecksumFileWithoutSpill() throws Exception { - IndexShuffleBlockResolver blockResolver = new IndexShuffleBlockResolver(conf, blockManager); + IndexShuffleBlockResolver blockResolver = + new IndexShuffleBlockResolver(conf, blockManager, Map.of()); ShuffleChecksumBlockId checksumBlockId = new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID()); String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM()); @@ -344,7 +345,8 @@ public void writeChecksumFileWithoutSpill() throws Exception { @Test public void writeChecksumFileWithSpill() throws Exception { - IndexShuffleBlockResolver blockResolver = new IndexShuffleBlockResolver(conf, blockManager); + IndexShuffleBlockResolver blockResolver = + new IndexShuffleBlockResolver(conf, blockManager, Map.of()); ShuffleChecksumBlockId checksumBlockId = new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID()); String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM()); diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index d9d2e6102f120..2ba348222f7be 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -17,12 +17,14 @@ package org.apache.spark.storage +import java.io.File import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ +import org.apache.commons.io.FileUtils import org.scalatest.concurrent.Eventually import org.apache.spark._ @@ -352,4 +354,78 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS import scala.language.reflectiveCalls assert(listener.removeReasonValidated) } + + test("SPARK-46957: Migrated shuffle files should be able to cleanup from executor") { + + val sparkTempDir = System.getProperty("java.io.tmpdir") + + def shuffleFiles: Seq[File] = { + FileUtils + .listFiles(new File(sparkTempDir), Array("data", "index"), true) + .asScala + .toSeq + } + + val existingShuffleFiles = shuffleFiles + + val conf = new SparkConf() + .setAppName("SPARK-46957") + .setMaster("local-cluster[2,1,1024]") + .set(config.DECOMMISSION_ENABLED, true) + .set(config.STORAGE_DECOMMISSION_ENABLED, true) + .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true) + sc = new SparkContext(conf) + TestUtils.waitUntilExecutorsUp(sc, 2, 60000) + val shuffleBlockUpdates = new ArrayBuffer[BlockId]() + var isDecommissionedExecutorRemoved = false + val execToDecommission = sc.getExecutorIds().head + sc.addSparkListener(new SparkListener { + override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { + if (blockUpdated.blockUpdatedInfo.blockId.isShuffle) { + shuffleBlockUpdates += blockUpdated.blockUpdatedInfo.blockId + } + } + + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + assert(execToDecommission === executorRemoved.executorId) + isDecommissionedExecutorRemoved = true + } + }) + + // Run a job to create shuffle data + val result = sc.parallelize(1 to 1000, 10) + .map { i => (i % 2, i) } + .reduceByKey(_ + _).collect() + + assert(result.head === (0, 250500)) + assert(result.tail.head === (1, 250000)) + sc.schedulerBackend + .asInstanceOf[StandaloneSchedulerBackend] + .decommissionExecutor( + execToDecommission, + ExecutorDecommissionInfo("test", None), + adjustTargetNumExecutors = true + ) + + eventually(timeout(1.minute), interval(10.milliseconds)) { + assert(isDecommissionedExecutorRemoved) + // Ensure there are shuffle data have been migrated + assert(shuffleBlockUpdates.size >= 2) + } + + val shuffleId = shuffleBlockUpdates + .find(_.isInstanceOf[ShuffleIndexBlockId]) + .map(_.asInstanceOf[ShuffleIndexBlockId].shuffleId) + .get + + val newShuffleFiles = shuffleFiles.diff(existingShuffleFiles) + assert(newShuffleFiles.size >= shuffleBlockUpdates.size) + + // Remove the shuffle data + sc.shuffleDriverComponents.removeShuffle(shuffleId, true) + + eventually(timeout(1.minute), interval(10.milliseconds)) { + assert(newShuffleFiles.intersect(shuffleFiles).isEmpty) + } + } } From dbc4da4ea94904c763cd843d4ca99de7d441993f Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Thu, 27 Jun 2024 19:27:40 +0800 Subject: [PATCH 2/2] fix --- .../shuffle/sort/io/LocalDiskShuffleExecutorComponents.java | 5 ++++- .../org/apache/spark/shuffle/IndexShuffleBlockResolver.scala | 4 ++-- .../apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java index 861a8e623a6e5..38f0a60f8b0dd 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java @@ -17,6 +17,7 @@ package org.apache.spark.shuffle.sort.io; +import java.util.Collections; import java.util.Map; import java.util.Optional; @@ -57,7 +58,9 @@ public void initializeExecutor(String appId, String execId, Map throw new IllegalStateException("No blockManager available from the SparkEnv."); } blockResolver = - new IndexShuffleBlockResolver(sparkConf, blockManager, Map.of() /* Shouldn't be accessed */); + new IndexShuffleBlockResolver( + sparkConf, blockManager, Collections.emptyMap() /* Shouldn't be accessed */ + ); } @Override diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 299f299249b9a..34eea575bbfd2 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -21,7 +21,7 @@ import java.io._ import java.nio.ByteBuffer import java.nio.channels.Channels import java.nio.file.Files -import java.util.{Map => JMap} +import java.util.{Collections, Map => JMap} import scala.collection.mutable.ArrayBuffer @@ -55,7 +55,7 @@ private[spark] class IndexShuffleBlockResolver( conf: SparkConf, // var for testing var _blockManager: BlockManager = null, - val taskIdMapsForShuffle: JMap[Int, OpenHashSet[Long]] = JMap.of()) + val taskIdMapsForShuffle: JMap[Int, OpenHashSet[Long]] = Collections.emptyMap()) extends ShuffleBlockResolver with Logging with MigratableResolver { diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index ed3a3b887c304..472d03baeae05 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -315,7 +315,7 @@ public void writeWithoutSpilling() throws Exception { @Test public void writeChecksumFileWithoutSpill() throws Exception { IndexShuffleBlockResolver blockResolver = - new IndexShuffleBlockResolver(conf, blockManager, Map.of()); + new IndexShuffleBlockResolver(conf, blockManager, Collections.emptyMap()); ShuffleChecksumBlockId checksumBlockId = new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID()); String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM()); @@ -346,7 +346,7 @@ public void writeChecksumFileWithoutSpill() throws Exception { @Test public void writeChecksumFileWithSpill() throws Exception { IndexShuffleBlockResolver blockResolver = - new IndexShuffleBlockResolver(conf, blockManager, Map.of()); + new IndexShuffleBlockResolver(conf, blockManager, Collections.emptyMap()); ShuffleChecksumBlockId checksumBlockId = new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID()); String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());