Skip to content

Commit 05ff43c

Browse files
committed
Add to external shuffle client and add unit test
1 parent 66e5a24 commit 05ff43c

File tree

4 files changed

+343
-12
lines changed

4 files changed

+343
-12
lines changed

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,17 +77,33 @@ public void init(String appId) {
7777

7878
@Override
7979
public void fetchBlocks(
80-
String host,
81-
int port,
82-
String execId,
80+
final String host,
81+
final int port,
82+
final String execId,
8383
String[] blockIds,
8484
BlockFetchingListener listener) {
8585
assert appId != null : "Called before init()";
8686
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
8787
try {
88-
TransportClient client = clientFactory.createClient(host, port);
89-
new OneForOneBlockFetcher(client, blockIds, listener)
90-
.start(new ExternalShuffleMessages.OpenShuffleBlocks(appId, execId, blockIds));
88+
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
89+
new RetryingBlockFetcher.BlockFetchStarter() {
90+
@Override
91+
public void createAndStart(String[] blockIds, BlockFetchingListener listener)
92+
throws IOException {
93+
TransportClient client = clientFactory.createClient(host, port);
94+
new OneForOneBlockFetcher(client, blockIds, listener)
95+
.start(new ExternalShuffleMessages.OpenShuffleBlocks(appId, execId, blockIds));
96+
}
97+
};
98+
99+
int maxRetries = conf.maxIORetries();
100+
if (maxRetries > 0) {
101+
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
102+
// a bug in this code.
103+
new RetryingBlockFetcher(conf, blockFetchStarter, blockIds, listener).start();
104+
} else {
105+
blockFetchStarter.createAndStart(blockIds, listener);
106+
}
91107
} catch (Exception e) {
92108
logger.error("Exception while beginning fetchBlocks", e);
93109
for (String blockId : blockIds) {

network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public static interface BlockFetchStarter {
5757
* {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection
5858
* issues.
5959
*/
60-
void createAndStart(String[] blockIds, BlockFetchingListener listener);
60+
void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException;
6161
}
6262

6363
/** Shared executor service used for waiting and retrying. */

network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,16 @@ public void testFetchUnregisteredExecutor() throws Exception {
259259

260260
@Test
261261
public void testFetchNoServer() throws Exception {
262-
registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
263-
FetchResult execFetch = fetchBlocks("exec-0",
264-
new String[] { "shuffle_1_0_0", "shuffle_1_0_1" }, 1 /* port */);
265-
assertTrue(execFetch.successBlocks.isEmpty());
266-
assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
262+
System.setProperty("spark.shuffle.io.maxIORetries", "0");
263+
try {
264+
registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
265+
FetchResult execFetch = fetchBlocks("exec-0",
266+
new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, 1 /* port */);
267+
assertTrue(execFetch.successBlocks.isEmpty());
268+
assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
269+
} finally {
270+
System.clearProperty("spark.shuffle.io.maxIORetries");
271+
}
267272
}
268273

269274
private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffle;
19+
20+
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.util.LinkedHashSet;
24+
import java.util.Map;
25+
26+
import com.google.common.collect.ImmutableMap;
27+
import com.google.common.collect.Sets;
28+
import org.junit.After;
29+
import org.junit.Before;
30+
import org.junit.Test;
31+
import org.mockito.invocation.InvocationOnMock;
32+
import org.mockito.stubbing.Answer;
33+
import org.mockito.stubbing.Stubber;
34+
35+
import static org.junit.Assert.*;
36+
import static org.mockito.Mockito.*;
37+
38+
import org.apache.spark.network.buffer.ManagedBuffer;
39+
import org.apache.spark.network.buffer.NioManagedBuffer;
40+
import org.apache.spark.network.util.SystemPropertyConfigProvider;
41+
import org.apache.spark.network.util.TransportConf;
42+
import static org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchStarter;
43+
44+
/**
45+
* Tests retry logic by throwing IOExceptions and ensuring that subsequent attempts are made to
46+
* fetch the lost blocks.
47+
*/
48+
public class RetryingBlockFetcherSuite {
49+
50+
ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
51+
ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
52+
ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
53+
54+
@Before
55+
public void beforeEach() {
56+
System.setProperty("spark.shuffle.io.maxIORetries", "2");
57+
System.setProperty("spark.shuffle.io.ioRetryWaitTime", "0");
58+
}
59+
60+
@After
61+
public void afterEach() {
62+
System.clearProperty("spark.shuffle.io.maxIORetries");
63+
System.clearProperty("spark.shuffle.io.ioRetryWaitTime");
64+
}
65+
66+
@Test
67+
public void testNoFailures() throws IOException {
68+
BlockFetchingListener listener = mock(BlockFetchingListener.class);
69+
70+
Map[] interactions = new Map[] {
71+
// Immediately return both blocks successfully.
72+
ImmutableMap.<String, Object>builder()
73+
.put("b0", block0)
74+
.put("b1", block1)
75+
.build(),
76+
};
77+
78+
performInteractions(interactions, listener);
79+
80+
verify(listener).onBlockFetchSuccess("b0", block0);
81+
verify(listener).onBlockFetchSuccess("b1", block1);
82+
verifyNoMoreInteractions(listener);
83+
}
84+
85+
@Test
86+
public void testUnrecoverableFailure() throws IOException {
87+
BlockFetchingListener listener = mock(BlockFetchingListener.class);
88+
89+
Map[] interactions = new Map[] {
90+
// b0 throws a non-IOException error, so it will be failed without retry.
91+
ImmutableMap.<String, Object>builder()
92+
.put("b0", new RuntimeException("Ouch!"))
93+
.put("b1", block1)
94+
.build(),
95+
};
96+
97+
performInteractions(interactions, listener);
98+
99+
verify(listener).onBlockFetchFailure(eq("b0"), (Throwable) any());
100+
verify(listener).onBlockFetchSuccess("b1", block1);
101+
verifyNoMoreInteractions(listener);
102+
}
103+
104+
@Test
105+
public void testSingleIOExceptionOnFirst() throws IOException {
106+
BlockFetchingListener listener = mock(BlockFetchingListener.class);
107+
108+
Map[] interactions = new Map[] {
109+
// IOException will cause a retry. Since b0 fails, we will retry both.
110+
ImmutableMap.<String, Object>builder()
111+
.put("b0", new IOException("Connection failed or something"))
112+
.put("b1", block1)
113+
.build(),
114+
ImmutableMap.<String, Object>builder()
115+
.put("b0", block0)
116+
.put("b1", block1)
117+
.build(),
118+
};
119+
120+
performInteractions(interactions, listener);
121+
122+
verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
123+
verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
124+
verifyNoMoreInteractions(listener);
125+
}
126+
127+
@Test
128+
public void testSingleIOExceptionOnSecond() throws IOException {
129+
BlockFetchingListener listener = mock(BlockFetchingListener.class);
130+
131+
Map[] interactions = new Map[] {
132+
// IOException will cause a retry. Since b1 fails, we will not retry b0.
133+
ImmutableMap.<String, Object>builder()
134+
.put("b0", block0)
135+
.put("b1", new IOException("Connection failed or something"))
136+
.build(),
137+
ImmutableMap.<String, Object>builder()
138+
.put("b1", block1)
139+
.build(),
140+
};
141+
142+
performInteractions(interactions, listener);
143+
144+
verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
145+
verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
146+
verifyNoMoreInteractions(listener);
147+
}
148+
149+
@Test
150+
public void testTwoIOExceptions() throws IOException {
151+
BlockFetchingListener listener = mock(BlockFetchingListener.class);
152+
153+
Map[] interactions = new Map[] {
154+
// b0's IOException will trigger retry, b1's will be ignored.
155+
ImmutableMap.<String, Object>builder()
156+
.put("b0", new IOException())
157+
.put("b1", new IOException())
158+
.build(),
159+
// Next, b0 is successful and b1 errors again, so we just request that one.
160+
ImmutableMap.<String, Object>builder()
161+
.put("b0", block0)
162+
.put("b1", new IOException())
163+
.build(),
164+
// b1 returns successfully within 2 retries.
165+
ImmutableMap.<String, Object>builder()
166+
.put("b1", block1)
167+
.build(),
168+
};
169+
170+
performInteractions(interactions, listener);
171+
172+
verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
173+
verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
174+
verifyNoMoreInteractions(listener);
175+
}
176+
177+
@Test
178+
public void testThreeIOExceptions() throws IOException {
179+
BlockFetchingListener listener = mock(BlockFetchingListener.class);
180+
181+
Map[] interactions = new Map[] {
182+
// b0's IOException will trigger retry, b1's will be ignored.
183+
ImmutableMap.<String, Object>builder()
184+
.put("b0", new IOException())
185+
.put("b1", new IOException())
186+
.build(),
187+
// Next, b0 is successful and b1 errors again, so we just request that one.
188+
ImmutableMap.<String, Object>builder()
189+
.put("b0", block0)
190+
.put("b1", new IOException())
191+
.build(),
192+
// b1 errors again, but this was the last retry
193+
ImmutableMap.<String, Object>builder()
194+
.put("b1", new IOException())
195+
.build(),
196+
// This is not reached -- b1 has failed.
197+
ImmutableMap.<String, Object>builder()
198+
.put("b1", block1)
199+
.build(),
200+
};
201+
202+
performInteractions(interactions, listener);
203+
204+
verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
205+
verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) any());
206+
verifyNoMoreInteractions(listener);
207+
}
208+
209+
@Test
210+
public void testRetryAndUnrecoverable() throws IOException {
211+
BlockFetchingListener listener = mock(BlockFetchingListener.class);
212+
213+
Map[] interactions = new Map[] {
214+
// b0's IOException will trigger retry, subsequent messages will be ignored.
215+
ImmutableMap.<String, Object>builder()
216+
.put("b0", new IOException())
217+
.put("b1", new RuntimeException())
218+
.put("b2", block2)
219+
.build(),
220+
// Next, b0 is successful, b1 errors unrecoverably, and b2 triggers a retry.
221+
ImmutableMap.<String, Object>builder()
222+
.put("b0", block0)
223+
.put("b1", new RuntimeException())
224+
.put("b2", new IOException())
225+
.build(),
226+
// b2 succeeds in its last retry.
227+
ImmutableMap.<String, Object>builder()
228+
.put("b2", block2)
229+
.build(),
230+
};
231+
232+
performInteractions(interactions, listener);
233+
234+
verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
235+
verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) any());
236+
verify(listener, timeout(5000)).onBlockFetchSuccess("b2", block2);
237+
verifyNoMoreInteractions(listener);
238+
}
239+
240+
/**
241+
* Performs a set of interactions in response to block requests from a RetryingBlockFetcher.
242+
* Each interaction is a Map from BlockId to either ManagedBuffer or Exception. This interaction
243+
* means "respond to the next block fetch request with these Successful buffers and these Failure
244+
* exceptions". We verify that the expected block ids are exactly the ones requested.
245+
*
246+
* If multiple interactions are supplied, they will be used in order. This is useful for encoding
247+
* retries -- the first interaction may include an IOException, which causes a retry of some
248+
* subset of the original blocks in a second interaction.
249+
*/
250+
@SuppressWarnings("unchecked")
251+
private void performInteractions(final Map[] interactions, BlockFetchingListener listener)
252+
throws IOException {
253+
254+
TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
255+
BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);
256+
257+
Stubber stub = null;
258+
259+
// Contains all blockIds that are referenced across all interactions.
260+
final LinkedHashSet<String> blockIds = Sets.newLinkedHashSet();
261+
262+
for (final Map<String, Object> interaction : interactions) {
263+
blockIds.addAll(interaction.keySet());
264+
265+
Answer<Void> answer = new Answer<Void>() {
266+
@Override
267+
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
268+
try {
269+
// Verify that the RetryingBlockFetcher requested the expected blocks.
270+
String[] requestedBlockIds = (String[]) invocationOnMock.getArguments()[0];
271+
String[] desiredBlockIds = interaction.keySet().toArray(new String[interaction.size()]);
272+
assertArrayEquals(desiredBlockIds, requestedBlockIds);
273+
274+
// Now actually invoke the success/failure callbacks on each block.
275+
BlockFetchingListener retryListener =
276+
(BlockFetchingListener) invocationOnMock.getArguments()[1];
277+
for (Map.Entry<String, Object> block : interaction.entrySet()) {
278+
String blockId = block.getKey();
279+
Object blockValue = block.getValue();
280+
281+
if (blockValue instanceof ManagedBuffer) {
282+
retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) blockValue);
283+
} else if (blockValue instanceof Exception) {
284+
retryListener.onBlockFetchFailure(blockId, (Exception) blockValue);
285+
} else {
286+
fail("Can only handle ManagedBuffers and Exceptions, got " + blockValue);
287+
}
288+
}
289+
return null;
290+
} catch (Throwable e) {
291+
e.printStackTrace();
292+
throw e;
293+
}
294+
}
295+
};
296+
297+
// This is either the first stub, or should be chained behind the prior ones.
298+
if (stub == null) {
299+
stub = doAnswer(answer);
300+
} else {
301+
stub.doAnswer(answer);
302+
}
303+
}
304+
305+
assert stub != null;
306+
stub.when(fetchStarter).createAndStart((String[]) any(), (BlockFetchingListener) anyObject());
307+
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
308+
new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
309+
}
310+
}

0 commit comments

Comments
 (0)