-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished #37922
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 18 commits
b91a344
6fb184d
75e2013
5b39ba3
7cff74d
d4132b3
3a0b6a4
0a9730d
0f24bb9
c848da6
e54bb75
4d2b940
5d3aa2d
d034985
11e7d9a
29f4918
3a7d99c
613a99a
96ba7e1
e06ef76
43085d1
371feae
6975bcc
475f8bd
0f1f5eb
7284b8f
3181e64
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,6 +71,7 @@ | |
| import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; | ||
| import org.apache.spark.network.shuffle.protocol.MergeStatuses; | ||
| import org.apache.spark.network.shuffle.protocol.PushBlockStream; | ||
| import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; | ||
| import org.apache.spark.network.shuffledb.DB; | ||
| import org.apache.spark.network.shuffledb.DBBackend; | ||
| import org.apache.spark.network.shuffledb.DBIterator; | ||
|
|
@@ -95,6 +96,12 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { | |
| public static final String MERGE_DIR_KEY = "mergeDir"; | ||
| public static final String ATTEMPT_ID_KEY = "attemptId"; | ||
| private static final int UNDEFINED_ATTEMPT_ID = -1; | ||
|
|
||
| /** | ||
| * The flag for deleting the current merged shuffle data. | ||
| */ | ||
| public static final int DELETE_ALL_MERGED_SHUFFLE = -1; | ||
mridulm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| private static final String DB_KEY_DELIMITER = ";"; | ||
| private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = createErrorHandler(); | ||
| // ByteBuffer to respond to client upon a successful merge of a pushed block | ||
|
|
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) { | |
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void removeShuffleMerge(RemoveShuffleMerge msg) { | ||
| AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId); | ||
| if (appShuffleInfo.attemptId != msg.appAttemptId) { | ||
| throw new IllegalArgumentException( | ||
| String.format("The attempt id %s in this RemoveShuffleMerge message does not match " | ||
| + "with the current attempt id %s stored in shuffle service for application %s", | ||
| msg.appAttemptId, appShuffleInfo.attemptId, msg.appId)); | ||
| } | ||
| appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> { | ||
| if (mergePartitionsInfo == null) { | ||
| if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) { | ||
| return null; | ||
| } else { | ||
| writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId( | ||
| msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId)); | ||
| return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true); | ||
| } | ||
| } | ||
| boolean deleteCurrentMergedShuffle = | ||
| msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE || | ||
| msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId; | ||
| int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ? | ||
| msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId; | ||
| AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = | ||
| new AppAttemptShuffleMergeId( | ||
| msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId); | ||
mridulm marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId( | ||
| msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId); | ||
|
||
| if(deleteCurrentMergedShuffle) { | ||
| // request to clean up shuffle we are currently hosting | ||
| if (!mergePartitionsInfo.isFinalized()) { | ||
| submitCleanupTask(() -> | ||
| closeAndDeleteOutdatedPartitions( | ||
| currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); | ||
| } else { | ||
| submitCleanupTask(() -> | ||
| deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo, | ||
| mergePartitionsInfo.getReduceIds(), false)); | ||
| } | ||
| } else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { | ||
|
||
| throw new RuntimeException(String.format("Asked to remove old shuffle merged data for " + | ||
| "application %s shuffleId %s shuffleMergeId %s, but current shuffleMergeId %s ", | ||
| msg.appId, msg.shuffleId, shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); | ||
mridulm marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } else if (shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { | ||
mridulm marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // cleanup request for newer shuffle - remove the outdated data we have. | ||
| if (!mergePartitionsInfo.isFinalized()) { | ||
| submitCleanupTask(() -> | ||
| closeAndDeleteOutdatedPartitions( | ||
mridulm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); | ||
| } else { | ||
| submitCleanupTask(() -> | ||
| deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo, | ||
| mergePartitionsInfo.getReduceIds(), false)); | ||
| } | ||
| } | ||
| writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); | ||
| return new AppShuffleMergePartitionsInfo(shuffleMergeId, true); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo. | ||
| * If cleanupLocalDirs is true, the merged shuffle files will also be deleted. | ||
|
|
@@ -470,6 +538,40 @@ void closeAndDeleteOutdatedPartitions( | |
| }); | ||
| } | ||
|
|
||
| void deleteMergedFiles( | ||
| AppAttemptShuffleMergeId appAttemptShuffleMergeId, | ||
| AppShuffleInfo appShuffleInfo, | ||
| int[] reduceIds, | ||
| boolean deleteFromDB) { | ||
| if(deleteFromDB) { | ||
wankunde marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId); | ||
| } | ||
| int shuffleId = appAttemptShuffleMergeId.shuffleId; | ||
| int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId; | ||
| int dataFilesDeleteCnt = 0; | ||
| int indexFilesDeleteCnt = 0; | ||
| int metaFilesDeleteCnt = 0; | ||
| for (int reduceId : reduceIds) { | ||
| File dataFile = | ||
| appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, reduceId); | ||
| if (dataFile.delete()) { | ||
mridulm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| dataFilesDeleteCnt++; | ||
| } | ||
| File indexFile = new File( | ||
| appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, shuffleMergeId, reduceId)); | ||
| if (indexFile.delete()) { | ||
| indexFilesDeleteCnt++; | ||
| } | ||
| File metaFile = | ||
| appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId); | ||
| if (metaFile.delete()) { | ||
| metaFilesDeleteCnt++; | ||
| } | ||
| } | ||
| logger.info("Delete {} data files, {} index files, {} meta files for {}", | ||
| dataFilesDeleteCnt, indexFilesDeleteCnt, metaFilesDeleteCnt, appAttemptShuffleMergeId); | ||
| } | ||
|
|
||
| /** | ||
| * Remove the finalized shuffle partition information for a specific appAttemptShuffleMergeId | ||
| * @param appAttemptShuffleMergeId | ||
|
|
@@ -712,6 +814,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { | |
| mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId, | ||
| bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), | ||
| Longs.toArray(sizes)); | ||
| appShuffleInfo.shuffles.get(msg.shuffleId).setReduceIds(Ints.toArray(reduceIds)); | ||
| } | ||
| logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalization of shuffle merge completed", | ||
| msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId); | ||
|
|
@@ -1465,6 +1568,8 @@ public static class AppShuffleMergePartitionsInfo { | |
| private final int shuffleMergeId; | ||
| private final Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions; | ||
|
|
||
| private final AtomicReference<int[]> reduceIds = new AtomicReference<>(new int[0]); | ||
|
|
||
| public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean shuffleFinalized) { | ||
| this.shuffleMergeId = shuffleMergeId; | ||
| this.shuffleMergePartitions = shuffleFinalized ? SHUFFLE_FINALIZED_MARKER : | ||
|
|
@@ -1479,6 +1584,14 @@ public Map<Integer, AppShufflePartitionInfo> getShuffleMergePartitions() { | |
| public boolean isFinalized() { | ||
| return shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER; | ||
| } | ||
|
|
||
| public void setReduceIds(int[] reduceIds) { | ||
| this.reduceIds.set(reduceIds); | ||
| } | ||
|
|
||
| public int[] getReduceIds() { | ||
| return this.reduceIds.get(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1687,9 +1800,9 @@ void closeAllFilesAndDeleteIfNeeded(boolean delete) { | |
| try { | ||
| if (dataChannel.isOpen()) { | ||
| dataChannel.close(); | ||
| if (delete) { | ||
| dataFile.delete(); | ||
| } | ||
| } | ||
| if (delete) { | ||
| dataFile.delete(); | ||
mridulm marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } catch (IOException ioe) { | ||
| logger.warn("Error closing data channel for {} reduceId {}", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.network.shuffle.protocol; | ||
|
|
||
| import com.google.common.base.Objects; | ||
| import io.netty.buffer.ByteBuf; | ||
| import org.apache.commons.lang3.builder.ToStringBuilder; | ||
| import org.apache.commons.lang3.builder.ToStringStyle; | ||
|
|
||
| import org.apache.spark.network.protocol.Encoders; | ||
|
|
||
| /** | ||
| * Remove the merged data for a given shuffle. | ||
| * Returns {@link Boolean} | ||
| * | ||
| * @since 3.4.0 | ||
| */ | ||
| public class RemoveShuffleMerge extends BlockTransferMessage { | ||
| public final String appId; | ||
wankunde marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| public final int appAttemptId; | ||
| public final int shuffleId; | ||
| public final int shuffleMergeId; | ||
|
|
||
| public RemoveShuffleMerge( | ||
| String appId, | ||
| int appAttemptId, | ||
| int shuffleId, | ||
| int shuffleMergeId) { | ||
| this.appId = appId; | ||
| this.appAttemptId = appAttemptId; | ||
| this.shuffleId = shuffleId; | ||
| this.shuffleMergeId = shuffleMergeId; | ||
| } | ||
|
|
||
| @Override | ||
| protected Type type() { | ||
| return Type.REMOVE_SHUFFLE_MERGE; | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hashCode(appId, appAttemptId, shuffleId, shuffleMergeId); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) | ||
| .append("appId", appId) | ||
| .append("attemptId", appAttemptId) | ||
| .append("shuffleId", shuffleId) | ||
| .append("shuffleMergeId", shuffleMergeId) | ||
| .toString(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object other) { | ||
| if (other != null && other instanceof RemoveShuffleMerge) { | ||
| RemoveShuffleMerge o = (RemoveShuffleMerge) other; | ||
| return Objects.equal(appId, o.appId) | ||
| && appAttemptId == o.appAttemptId | ||
| && shuffleId == o.shuffleId | ||
| && shuffleMergeId == o.shuffleMergeId; | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public int encodedLength() { | ||
| return Encoders.Strings.encodedLength(appId) + 4 + 4 + 4; | ||
| } | ||
|
|
||
| @Override | ||
| public void encode(ByteBuf buf) { | ||
| Encoders.Strings.encode(buf, appId); | ||
| buf.writeInt(appAttemptId); | ||
| buf.writeInt(shuffleId); | ||
| buf.writeInt(shuffleMergeId); | ||
| } | ||
|
|
||
| public static RemoveShuffleMerge decode(ByteBuf buf) { | ||
| String appId = Encoders.Strings.decode(buf); | ||
| int attemptId = buf.readInt(); | ||
| int shuffleId = buf.readInt(); | ||
| int shuffleMergeId = buf.readInt(); | ||
| return new RemoveShuffleMerge(appId, attemptId, shuffleId, shuffleMergeId); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.