diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 686003e2c51dc..4569906b11112 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.IOException +import java.io.{FileNotFoundException, IOException} import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable @@ -40,7 +40,7 @@ private[storage] class BlockManagerDecommissioner( conf: SparkConf, bm: BlockManager) extends Logging { - private val fallbackStorage = FallbackStorage.getFallbackStorage(conf) + private[storage] val fallbackStorage = FallbackStorage.getFallbackStorage(conf) private val maxReplicationFailuresForDecommission = conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) private val blockSavedOnDecommissionedBlockManagerException = @@ -145,7 +145,15 @@ private[storage] class BlockManagerDecommissioner( // Confirm peer is not the fallback BM ID because fallbackStorage would already // have been used in the try-block above so there's no point trying again && peer != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) { - fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm)) + try { + fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm)) + } catch { + case e: FileNotFoundException => + logWarning(s"Skipping block $shuffleBlockInfo, block deleted.", e) + case NonFatal(e) => + logError(s"Fallback storage for $shuffleBlockInfo failed", e) + keepRunning = false + } } else if (e.getCause != null && e.getCause.getMessage != null && e.getCause.getMessage .contains(blockSavedOnDecommissionedBlockManagerException)) { diff --git a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala index 83c9707bfc273..3160846637373 100644 --- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala @@ -16,16 +16,17 @@ */ package org.apache.spark.storage -import java.io.{DataOutputStream, File, FileOutputStream, InputStream, IOException} +import java.io.{DataOutputStream, File, FileNotFoundException, FileOutputStream, InputStream, IOException} import java.nio.file.Files import scala.concurrent.duration._ +import scala.reflect.runtime.{universe => ru} import scala.util.Random import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable} import org.mockito.{ArgumentMatchers => mc} -import org.mockito.Mockito.{mock, never, verify, when} +import org.mockito.Mockito.{atLeastOnce, mock, never, verify, when} import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils} @@ -229,6 +230,152 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { } } + test("SPARK-45579: ignore deleted files") { + val conf = new SparkConf(false) + .set("spark.app.id", "testId") + .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true) + .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, + Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/") + + val ids = Set((1, 1L, 1)) + val bm = mock(classOf[BlockManager]) + val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false) + when(bm.diskBlockManager).thenReturn(dbm) + val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf, bm) + val indexFile = indexShuffleBlockResolver.getIndexFile(1, 1L) + val dataFile = indexShuffleBlockResolver.getDataFile(1, 1L) + indexFile.createNewFile() + dataFile.createNewFile() + + val resolver = mock(classOf[IndexShuffleBlockResolver]) + when(resolver.getStoredShuffles()) + .thenReturn(ids.map(triple => ShuffleBlockInfo(triple._1, triple._2)).toSeq) + ids.foreach { case (shuffleId: Int, mapId: Long, reduceId: Int) => + when(resolver.getMigrationBlocks(mc.any())) + .thenReturn(List( + (ShuffleIndexBlockId(shuffleId, mapId, reduceId), mock(classOf[ManagedBuffer])), + (ShuffleDataBlockId(shuffleId, mapId, reduceId), mock(classOf[ManagedBuffer])))) + when(resolver.getIndexFile(shuffleId, mapId)).thenReturn(indexFile) + when(resolver.getDataFile(shuffleId, mapId)).thenReturn(dataFile) + } + + when(bm.getPeers(mc.any())) + .thenReturn(Seq(BlockManagerId("test", "fake", 7337))) + val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false) + when(bm.master).thenReturn(bmm) + val blockTransferService = mock(classOf[BlockTransferService]) + when(blockTransferService.uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), + mc.any(), mc.any())).thenThrow(new IOException) + when(bm.blockTransferService).thenReturn(blockTransferService) + when(bm.migratableResolver).thenReturn(resolver) + when(bm.getMigratableRDDBlocks()).thenReturn(Seq()) + + val decommissioner = new BlockManagerDecommissioner(conf, bm) + val mirror = ru.runtimeMirror(decommissioner.getClass.getClassLoader) + val im = mirror.reflect(decommissioner) + val classSymbol = mirror.staticClass("org.apache.spark.storage.BlockManagerDecommissioner") + val passwordTermSymbol = + classSymbol.info + .decl(ru.TermName("fallbackStorage")) + .asTerm + val fallbackStorageField = im.reflectField(passwordTermSymbol) + val mockFallbackStorage = mock(classOf[FallbackStorage]) + when(mockFallbackStorage.copy(mc.any(), mc.any())) + .thenAnswer(_ => throw new FileNotFoundException()) + fallbackStorageField.set(Some(mockFallbackStorage)) + + try { + decommissioner.start() + val fallbackStorage = new FallbackStorage(conf) + eventually(timeout(10.second), interval(1.seconds)) { + // uploadBlockSync should not be used, verify that it is not called + verify(blockTransferService, atLeastOnce()) + .uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) + + Seq("shuffle_1_1_0.index", "shuffle_1_1_0.data").foreach { filename => + assert(!fallbackStorage.exists(shuffleId = 1, filename)) + } + assert(decommissioner.numMigratedShuffles.get() > 0) + } + } finally { + decommissioner.stop() + } + } + + test("SPARK-45579: abort for other errors") { + val conf = new SparkConf(false) + .set("spark.app.id", "testId") + .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true) + .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, + Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/") + + val ids = Set((1, 1L, 1)) + val bm = mock(classOf[BlockManager]) + val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false) + when(bm.diskBlockManager).thenReturn(dbm) + val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf, bm) + val indexFile = indexShuffleBlockResolver.getIndexFile(1, 1L) + val dataFile = indexShuffleBlockResolver.getDataFile(1, 1L) + indexFile.createNewFile() + dataFile.createNewFile() + + val resolver = mock(classOf[IndexShuffleBlockResolver]) + when(resolver.getStoredShuffles()) + .thenReturn(ids.map(triple => ShuffleBlockInfo(triple._1, triple._2)).toSeq) + ids.foreach { case (shuffleId: Int, mapId: Long, reduceId: Int) => + when(resolver.getMigrationBlocks(mc.any())) + .thenReturn(List( + (ShuffleIndexBlockId(shuffleId, mapId, reduceId), mock(classOf[ManagedBuffer])), + (ShuffleDataBlockId(shuffleId, mapId, reduceId), mock(classOf[ManagedBuffer])))) + when(resolver.getIndexFile(shuffleId, mapId)).thenReturn(indexFile) + when(resolver.getDataFile(shuffleId, mapId)).thenReturn(dataFile) + } + val bmIds = Seq(BlockManagerId("test", "fake", 7337), + BlockManagerId("test1", "fake", 7337), + BlockManagerId("test2", "fake", 7337)) + when(bm.getPeers(mc.any())) + .thenReturn(bmIds) + val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false) + when(bm.master).thenReturn(bmm) + val blockTransferService = mock(classOf[BlockTransferService]) + when(blockTransferService.uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), + mc.any(), mc.any())).thenThrow(new IOException) + when(bm.blockTransferService).thenReturn(blockTransferService) + when(bm.migratableResolver).thenReturn(resolver) + when(bm.getMigratableRDDBlocks()).thenReturn(Seq()) + + val decommissioner = new BlockManagerDecommissioner(conf, bm) + val mirror = ru.runtimeMirror(decommissioner.getClass.getClassLoader) + val im = mirror.reflect(decommissioner) + val classSymbol = mirror.staticClass("org.apache.spark.storage.BlockManagerDecommissioner") + val passwordTermSymbol = + classSymbol.info + .decl(ru.TermName("fallbackStorage")) + .asTerm + val fallbackStorageField = im.reflectField(passwordTermSymbol) + val mockFallbackStorage = mock(classOf[FallbackStorage]) + when(mockFallbackStorage.copy(mc.any(), mc.any())) + .thenAnswer(_ => throw new RuntimeException()) + fallbackStorageField.set(Some(mockFallbackStorage)) + + try { + decommissioner.start() + val fallbackStorage = new FallbackStorage(conf) + eventually(timeout(10.second), interval(1.seconds)) { + // uploadBlockSync should not be used, verify that it is not called + verify(blockTransferService, atLeastOnce()) + .uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) + + Seq("shuffle_1_1_0.index", "shuffle_1_1_0.data").foreach { filename => + assert(!fallbackStorage.exists(shuffleId = 1, filename)) + } + assert(decommissioner.numMigratedShuffles.get() > 0) + } + } finally { + decommissioner.stop() + } + } + test("Upload from all decommissioned executors") { sc = new SparkContext(getSparkConf(2, 2)) withSpark(sc) { sc =>