Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,15 @@ AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
// Higher shuffleMergeId seen for the shuffle ID meaning new stage attempt is being
// run for the shuffle ID. Close and clean up old shuffleMergeId files,
// happens in the indeterminate stage retries
AppAttemptShuffleMergeId appAttemptShuffleMergeId =
new AppAttemptShuffleMergeId(
appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, shuffleMergeId);
AppAttemptShuffleMergeId currrentAppAttemptShuffleMergeId =
new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId,
shuffleId, latestShuffleMergeId);
logger.info("{}: creating a new shuffle merge metadata since received " +
"shuffleMergeId is higher than latest shuffleMergeId {}",
appAttemptShuffleMergeId, latestShuffleMergeId);
"shuffleMergeId {} is higher than latest shuffleMergeId {}",
currrentAppAttemptShuffleMergeId, shuffleMergeId, latestShuffleMergeId);
submitCleanupTask(() ->
closeAndDeleteOutdatedPartitions(
appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions));
closeAndDeleteOutdatedPartitions(currrentAppAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions));
return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
} else {
// The request is for block with same shuffleMergeId as the latest shuffleMergeId
Expand Down Expand Up @@ -653,9 +653,11 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
} else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
// If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return
// empty MergeStatuses but cleanup the older shuffleMergeId files.
AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new AppAttemptShuffleMergeId(
msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
submitCleanupTask(() ->
closeAndDeleteOutdatedPartitions(
appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions));
currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions));
} else {
// This block covers:
// 1. finalization of determinate stage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;

Expand Down Expand Up @@ -1146,11 +1148,14 @@ public void testBlockFetchWithOlderShuffleMergeId() throws IOException {
@Test
public void testCleanupOlderShuffleMergeId() throws IOException, InterruptedException {
Semaphore closed = new Semaphore(0);
List<RemoteBlockPushResolver.AppAttemptShuffleMergeId> removedIds =
new CopyOnWriteArrayList<>();
pushResolver = new RemoteBlockPushResolver(conf, null) {
@Override
void closeAndDeleteOutdatedPartitions(
AppAttemptShuffleMergeId appAttemptShuffleMergeId,
Map<Integer, AppShufflePartitionInfo> partitions) {
removedIds.add(appAttemptShuffleMergeId);
super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, partitions);
closed.release();
}
Expand All @@ -1167,6 +1172,10 @@ void closeAndDeleteOutdatedPartitions(
RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
pushResolver.validateAndGetAppShuffleInfo(testApp);
closed.acquire();
assertEquals(1, removedIds.size());
// For the previous merge id
assertEquals(1, removedIds.iterator().next().shuffleMergeId);
removedIds.clear();
assertFalse("Data files on the disk should be cleaned up",
appShuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists());
assertFalse("Meta files on the disk should be cleaned up",
Expand All @@ -1186,6 +1195,9 @@ void closeAndDeleteOutdatedPartitions(
pushResolver.receiveBlockDataAsStream(
new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 3, 0, 0, 0));
closed.acquire();
assertEquals(1, removedIds.size());
assertEquals(2, removedIds.iterator().next().shuffleMergeId);
removedIds.clear();
stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2]));
stream3.onComplete(stream3.getID());
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 3));
Expand All @@ -1196,6 +1208,9 @@ void closeAndDeleteOutdatedPartitions(
pushResolver.receiveBlockDataAsStream(
new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 4, 0, 0, 0));
closed.acquire();
assertEquals(1, removedIds.size());
assertEquals(3, removedIds.iterator().next().shuffleMergeId);
removedIds.clear();
// Do not finalize shuffleMergeId 4 can happen during stage cancellation.
stream4.onData(stream4.getID(), ByteBuffer.wrap(new byte[2]));
stream4.onComplete(stream4.getID());
Expand All @@ -1204,6 +1219,10 @@ void closeAndDeleteOutdatedPartitions(
// but no blocks pushed for that shuffleMergeId
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 5));
closed.acquire();
assertEquals(1, removedIds.size());
// For the previous merge id - here the cleanup is from finalizeShuffleMerge
assertEquals(4, removedIds.iterator().next().shuffleMergeId);
removedIds.clear();
assertFalse("MergedBlock meta file for shuffle 0 and shuffleMergeId 4 should be cleaned"
+ " up", appShuffleInfo.getMergedShuffleMetaFile(0, 4, 0).exists());
assertFalse("MergedBlock index file for shuffle 0 and shuffleMergeId 4 should be cleaned"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
assert(ShuffleTestAccessor.getOutdatedAppPathInfoCountDuringDBReload(
mergeManager2, mergeManager2DB) == 1)
assert(ShuffleTestAccessor.getOutdatedFinalizedShuffleCountDuringDBReload(
mergeManager2, mergeManager2DB) == 2)
mergeManager2, mergeManager2DB) == 1)
s2.stop()

// Yarn Shuffle service comes back up without custom mergeManager
Expand Down