diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 816d1082850c..c3a2e9a883ac 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -227,15 +227,15 @@ AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( // Higher shuffleMergeId seen for the shuffle ID meaning new stage attempt is being // run for the shuffle ID. Close and clean up old shuffleMergeId files, // happens in the indeterminate stage retries - AppAttemptShuffleMergeId appAttemptShuffleMergeId = - new AppAttemptShuffleMergeId( - appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, shuffleMergeId); + AppAttemptShuffleMergeId currrentAppAttemptShuffleMergeId = + new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId, + shuffleId, latestShuffleMergeId); logger.info("{}: creating a new shuffle merge metadata since received " + - "shuffleMergeId is higher than latest shuffleMergeId {}", - appAttemptShuffleMergeId, latestShuffleMergeId); + "shuffleMergeId {} is higher than latest shuffleMergeId {}", + currrentAppAttemptShuffleMergeId, shuffleMergeId, latestShuffleMergeId); submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + closeAndDeleteOutdatedPartitions(currrentAppAttemptShuffleMergeId, + mergePartitionsInfo.shuffleMergePartitions)); return new AppShuffleMergePartitionsInfo(shuffleMergeId, false); } else { // The request is for block with same shuffleMergeId as the latest shuffleMergeId @@ -653,9 +653,11 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return // empty MergeStatuses but cleanup the older shuffleMergeId files. + AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new AppAttemptShuffleMergeId( + msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId); submitCleanupTask(() -> closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); } else { // This block covers: // 1. finalization of determinate stage diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index eb2c1d9fa5cb..6a595ee346d0 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -28,7 +28,9 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -1146,11 +1148,14 @@ public void testBlockFetchWithOlderShuffleMergeId() throws IOException { @Test public void testCleanupOlderShuffleMergeId() throws IOException, InterruptedException { Semaphore closed = new Semaphore(0); + List removedIds = + new CopyOnWriteArrayList<>(); pushResolver = new RemoteBlockPushResolver(conf, null) { @Override void closeAndDeleteOutdatedPartitions( AppAttemptShuffleMergeId appAttemptShuffleMergeId, Map partitions) { + removedIds.add(appAttemptShuffleMergeId); super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, partitions); closed.release(); } @@ -1167,6 +1172,10 @@ void closeAndDeleteOutdatedPartitions( RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo = pushResolver.validateAndGetAppShuffleInfo(testApp); closed.acquire(); + assertEquals(1, removedIds.size()); + // For the previous merge id + assertEquals(1, removedIds.iterator().next().shuffleMergeId); + removedIds.clear(); assertFalse("Data files on the disk should be cleaned up", appShuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists()); assertFalse("Meta files on the disk should be cleaned up", @@ -1186,6 +1195,9 @@ void closeAndDeleteOutdatedPartitions( pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 3, 0, 0, 0)); closed.acquire(); + assertEquals(1, removedIds.size()); + assertEquals(2, removedIds.iterator().next().shuffleMergeId); + removedIds.clear(); stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2])); stream3.onComplete(stream3.getID()); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 3)); @@ -1196,6 +1208,9 @@ void closeAndDeleteOutdatedPartitions( pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 4, 0, 0, 0)); closed.acquire(); + assertEquals(1, removedIds.size()); + assertEquals(3, removedIds.iterator().next().shuffleMergeId); + removedIds.clear(); // Do not finalize shuffleMergeId 4 can happen during stage cancellation. stream4.onData(stream4.getID(), ByteBuffer.wrap(new byte[2])); stream4.onComplete(stream4.getID()); @@ -1204,6 +1219,10 @@ void closeAndDeleteOutdatedPartitions( // but no blocks pushed for that shuffleMergeId pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 5)); closed.acquire(); + assertEquals(1, removedIds.size()); + // For the previous merge id - here the cleanup is from finalizeShuffleMerge + assertEquals(4, removedIds.iterator().next().shuffleMergeId); + removedIds.clear(); assertFalse("MergedBlock meta file for shuffle 0 and shuffleMergeId 4 should be cleaned" + " up", appShuffleInfo.getMergedShuffleMetaFile(0, 4, 0).exists()); assertFalse("MergedBlock index file for shuffle 0 and shuffleMergeId 4 should be cleaned" diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 16fa42056921..075a21c399e0 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -956,7 +956,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { assert(ShuffleTestAccessor.getOutdatedAppPathInfoCountDuringDBReload( mergeManager2, mergeManager2DB) == 1) assert(ShuffleTestAccessor.getOutdatedFinalizedShuffleCountDuringDBReload( - mergeManager2, mergeManager2DB) == 2) + mergeManager2, mergeManager2DB) == 1) s2.stop() // Yarn Shuffle service comes back up without custom mergeManager