Skip to content

Commit c77acf0

Browse files
zhouyejoeMridul Muralidharan
authored andcommitted
[SPARK-35546][SHUFFLE] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better way
### What changes were proposed in this pull request? This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle. ### Summary of the change: When Executor registers with Shuffle Service, it will encode the merged shuffle dir created and also the application attemptId into the ShuffleManagerMeta into Json. Then in Shuffle Service, it will decode the Json string and get the correct merged shuffle dir and also the attemptId. If the registration comes from a newer attempt, the merged shuffle information will be updated to store the information from the newer attempt. This PR also refactored the management of the merged shuffle information to avoid concurrency issues. ### Why are the changes needed? Refer to the SPIP in SPARK-30602. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests. The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602. We have already verified the functionality and the improved performance as documented in the SPIP doc. Closes apache#33078 from zhouyejoe/SPARK-35546. Authored-by: Ye Zhou <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent d6b974f commit c77acf0

File tree

17 files changed

+810
-391
lines changed

17 files changed

+810
-391
lines changed

common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,4 +419,11 @@ public long mergedIndexCacheSize() {
419419
public int ioExceptionsThresholdDuringMerge() {
420420
return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4);
421421
}
422+
423+
/**
424+
* The application attemptID assigned from Hadoop YARN.
425+
*/
426+
public int appAttemptId() {
427+
return conf.getInt("spark.app.attempt.id", -1);
428+
}
422429
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ public void pushBlocks(
141141
RetryingBlockFetcher.BlockFetchStarter blockPushStarter =
142142
(inputBlockId, inputListener) -> {
143143
TransportClient client = clientFactory.createClient(host, port);
144-
new OneForOneBlockPusher(client, appId, inputBlockId, inputListener, buffersWithId)
145-
.start();
144+
new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId,
145+
inputListener, buffersWithId).start();
146146
};
147147
int maxRetries = conf.maxIORetries();
148148
if (maxRetries > 0) {
@@ -168,7 +168,8 @@ public void finalizeShuffleMerge(
168168
checkInit();
169169
try {
170170
TransportClient client = clientFactory.createClient(host, port);
171-
ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
171+
ByteBuffer finalizeShuffleMerge =
172+
new FinalizeShuffleMerge(appId, conf.appAttemptId(), shuffleId).toByteBuffer();
172173
client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
173174
@Override
174175
public void onSuccess(ByteBuffer response) {

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,21 @@ public class OneForOneBlockPusher {
4545

4646
private final TransportClient client;
4747
private final String appId;
48+
private final int appAttemptId;
4849
private final String[] blockIds;
4950
private final BlockFetchingListener listener;
5051
private final Map<String, ManagedBuffer> buffers;
5152

5253
public OneForOneBlockPusher(
5354
TransportClient client,
5455
String appId,
56+
int appAttemptId,
5557
String[] blockIds,
5658
BlockFetchingListener listener,
5759
Map<String, ManagedBuffer> buffers) {
5860
this.client = client;
5961
this.appId = appId;
62+
this.appAttemptId = appAttemptId;
6063
this.blockIds = blockIds;
6164
this.listener = listener;
6265
this.buffers = buffers;
@@ -123,8 +126,9 @@ public void start() {
123126
throw new IllegalArgumentException(
124127
"Unexpected shuffle push block id format: " + blockIds[i]);
125128
}
126-
ByteBuffer header = new PushBlockStream(appId, Integer.parseInt(blockIdParts[1]),
127-
Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , i).toByteBuffer();
129+
ByteBuffer header =
130+
new PushBlockStream(appId, appAttemptId, Integer.parseInt(blockIdParts[1]),
131+
Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , i).toByteBuffer();
128132
client.uploadStream(new NioManagedBuffer(header), buffers.get(blockIds[i]),
129133
new BlockPushCallback(i, blockIds[i]));
130134
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java

Lines changed: 324 additions & 222 deletions
Large diffs are not rendered by default.

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,12 @@ public class ExecutorShuffleInfo implements Encodable {
3535
public final String[] localDirs;
3636
/** Number of subdirectories created within each localDir. */
3737
public final int subDirsPerLocalDir;
38-
/** Shuffle manager (SortShuffleManager) that the executor is using. */
38+
/**
39+
* Shuffle manager (SortShuffleManager) that the executor is using.
40+
* If this string contains semicolon, it will also include the meta information
41+
* for push based shuffle in JSON format. Example of the string with semicolon would be:
42+
* SortShuffleManager:{"mergeDir": "mergeDirectory_1", "attemptId": 1}
43+
*/
3944
public final String shuffleManager;
4045

4146
@JsonCreator

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,15 @@
3232
*/
3333
public class FinalizeShuffleMerge extends BlockTransferMessage {
3434
public final String appId;
35+
public final int appAttemptId;
3536
public final int shuffleId;
3637

3738
public FinalizeShuffleMerge(
3839
String appId,
40+
int appAttemptId,
3941
int shuffleId) {
4042
this.appId = appId;
43+
this.appAttemptId = appAttemptId;
4144
this.shuffleId = shuffleId;
4245
}
4346

@@ -48,13 +51,14 @@ protected BlockTransferMessage.Type type() {
4851

4952
@Override
5053
public int hashCode() {
51-
return Objects.hashCode(appId, shuffleId);
54+
return Objects.hashCode(appId, appAttemptId, shuffleId);
5255
}
5356

5457
@Override
5558
public String toString() {
5659
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
5760
.append("appId", appId)
61+
.append("attemptId", appAttemptId)
5862
.append("shuffleId", shuffleId)
5963
.toString();
6064
}
@@ -64,25 +68,28 @@ public boolean equals(Object other) {
6468
if (other != null && other instanceof FinalizeShuffleMerge) {
6569
FinalizeShuffleMerge o = (FinalizeShuffleMerge) other;
6670
return Objects.equal(appId, o.appId)
71+
&& appAttemptId == appAttemptId
6772
&& shuffleId == o.shuffleId;
6873
}
6974
return false;
7075
}
7176

7277
@Override
7378
public int encodedLength() {
74-
return Encoders.Strings.encodedLength(appId) + 4;
79+
return Encoders.Strings.encodedLength(appId) + 4 + 4;
7580
}
7681

7782
@Override
7883
public void encode(ByteBuf buf) {
7984
Encoders.Strings.encode(buf, appId);
85+
buf.writeInt(appAttemptId);
8086
buf.writeInt(shuffleId);
8187
}
8288

8389
public static FinalizeShuffleMerge decode(ByteBuf buf) {
8490
String appId = Encoders.Strings.decode(buf);
91+
int attemptId = buf.readInt();
8592
int shuffleId = buf.readInt();
86-
return new FinalizeShuffleMerge(appId, shuffleId);
93+
return new FinalizeShuffleMerge(appId, attemptId, shuffleId);
8794
}
8895
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.common.base.Objects;
2121
import io.netty.buffer.ByteBuf;
22+
2223
import org.apache.commons.lang3.builder.ToStringBuilder;
2324
import org.apache.commons.lang3.builder.ToStringStyle;
2425

@@ -34,15 +35,23 @@
3435
*/
3536
public class PushBlockStream extends BlockTransferMessage {
3637
public final String appId;
38+
public final int appAttemptId;
3739
public final int shuffleId;
3840
public final int mapIndex;
3941
public final int reduceId;
4042
// Similar to the chunkIndex in StreamChunkId, indicating the index of a block in a batch of
4143
// blocks to be pushed.
4244
public final int index;
4345

44-
public PushBlockStream(String appId, int shuffleId, int mapIndex, int reduceId, int index) {
46+
public PushBlockStream(
47+
String appId,
48+
int appAttemptId,
49+
int shuffleId,
50+
int mapIndex,
51+
int reduceId,
52+
int index) {
4553
this.appId = appId;
54+
this.appAttemptId = appAttemptId;
4655
this.shuffleId = shuffleId;
4756
this.mapIndex = mapIndex;
4857
this.reduceId = reduceId;
@@ -56,13 +65,14 @@ protected Type type() {
5665

5766
@Override
5867
public int hashCode() {
59-
return Objects.hashCode(appId, shuffleId, mapIndex , reduceId, index);
68+
return Objects.hashCode(appId, appAttemptId, shuffleId, mapIndex , reduceId, index);
6069
}
6170

6271
@Override
6372
public String toString() {
6473
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
6574
.append("appId", appId)
75+
.append("attemptId", appAttemptId)
6676
.append("shuffleId", shuffleId)
6777
.append("mapIndex", mapIndex)
6878
.append("reduceId", reduceId)
@@ -75,6 +85,7 @@ public boolean equals(Object other) {
7585
if (other != null && other instanceof PushBlockStream) {
7686
PushBlockStream o = (PushBlockStream) other;
7787
return Objects.equal(appId, o.appId)
88+
&& appAttemptId == o.appAttemptId
7889
&& shuffleId == o.shuffleId
7990
&& mapIndex == o.mapIndex
8091
&& reduceId == o.reduceId
@@ -85,12 +96,13 @@ public boolean equals(Object other) {
8596

8697
@Override
8798
public int encodedLength() {
88-
return Encoders.Strings.encodedLength(appId) + 16;
99+
return Encoders.Strings.encodedLength(appId) + 4 + 4 + 4 + 4 + 4;
89100
}
90101

91102
@Override
92103
public void encode(ByteBuf buf) {
93104
Encoders.Strings.encode(buf, appId);
105+
buf.writeInt(appAttemptId);
94106
buf.writeInt(shuffleId);
95107
buf.writeInt(mapIndex);
96108
buf.writeInt(reduceId);
@@ -99,10 +111,11 @@ public void encode(ByteBuf buf) {
99111

100112
public static PushBlockStream decode(ByteBuf buf) {
101113
String appId = Encoders.Strings.decode(buf);
114+
int attemptId = buf.readInt();
102115
int shuffleId = buf.readInt();
103116
int mapIdx = buf.readInt();
104117
int reduceId = buf.readInt();
105118
int index = buf.readInt();
106-
return new PushBlockStream(appId, shuffleId, mapIdx, reduceId, index);
119+
return new PushBlockStream(appId, attemptId, shuffleId, mapIdx, reduceId, index);
107120
}
108121
}

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ public void testBadMessages() {
243243
public void testFinalizeShuffleMerge() throws IOException {
244244
RpcResponseCallback callback = mock(RpcResponseCallback.class);
245245

246-
FinalizeShuffleMerge req = new FinalizeShuffleMerge("app0", 0);
246+
FinalizeShuffleMerge req = new FinalizeShuffleMerge("app0", 1, 0);
247247
RoaringBitmap bitmap = RoaringBitmap.bitmapOf(0, 1, 2);
248248
MergeStatuses statuses = new MergeStatuses(0, new RoaringBitmap[]{bitmap},
249249
new int[]{3}, new long[]{30});

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void testPushOne() {
5151
BlockFetchingListener listener = pushBlocks(
5252
blocks,
5353
blockIds,
54-
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0)));
54+
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0)));
5555

5656
verify(listener).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
5757
}
@@ -67,9 +67,9 @@ public void testPushThree() {
6767
BlockFetchingListener listener = pushBlocks(
6868
blocks,
6969
blockIds,
70-
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
71-
new PushBlockStream("app-id", 0, 1, 0, 1),
72-
new PushBlockStream("app-id", 0, 2, 0, 2)));
70+
Arrays.asList(new PushBlockStream("app-id",0, 0, 0, 0, 0),
71+
new PushBlockStream("app-id", 0, 0, 1, 0, 1),
72+
new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
7373

7474
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
7575
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_1_0"), any());
@@ -87,9 +87,9 @@ public void testServerFailures() {
8787
BlockFetchingListener listener = pushBlocks(
8888
blocks,
8989
blockIds,
90-
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
91-
new PushBlockStream("app-id", 0, 1, 0, 1),
92-
new PushBlockStream("app-id", 0, 2, 0, 2)));
90+
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
91+
new PushBlockStream("app-id", 0, 0, 1, 0, 1),
92+
new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
9393

9494
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
9595
verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), any());
@@ -107,9 +107,9 @@ public void testHandlingRetriableFailures() {
107107
BlockFetchingListener listener = pushBlocks(
108108
blocks,
109109
blockIds,
110-
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
111-
new PushBlockStream("app-id", 0, 1, 0, 1),
112-
new PushBlockStream("app-id", 0, 2, 0, 2)));
110+
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
111+
new PushBlockStream("app-id", 0, 0, 1, 0, 1),
112+
new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
113113

114114
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
115115
verify(listener, times(0)).onBlockFetchSuccess(not(eq("shufflePush_0_0_0")), any());
@@ -130,7 +130,7 @@ private static BlockFetchingListener pushBlocks(
130130
TransportClient client = mock(TransportClient.class);
131131
BlockFetchingListener listener = mock(BlockFetchingListener.class);
132132
OneForOneBlockPusher pusher =
133-
new OneForOneBlockPusher(client, "app-id", blockIds, listener, blocks);
133+
new OneForOneBlockPusher(client, "app-id", 0, blockIds, listener, blocks);
134134

135135
Iterator<Map.Entry<String, ManagedBuffer>> blockIterator = blocks.entrySet().iterator();
136136
Iterator<BlockTransferMessage> msgIterator = expectMessages.iterator();

0 commit comments

Comments
 (0)