Skip to content

Commit 59e5e38

Browse files
committed
[SPARK-6955] Perform port retries at NettyBlockTransferService level
Currently we're doing port retries in the TransportServer level, but this is not specified by the TransportContext API and it has other further-reaching impacts like causing undesirable behvior for the Yarn and Standalone shuffle services.
1 parent 008a60d commit 59e5e38

File tree

3 files changed

+100
-39
lines changed

3 files changed

+100
-39
lines changed

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,23 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
5858
securityManager.isSaslEncryptionEnabled()))
5959
}
6060
transportContext = new TransportContext(transportConf, rpcHandler)
61-
clientFactory = transportContext.createClientFactory(clientBootstrap.toList)
62-
server = transportContext.createServer(conf.getInt("spark.blockManager.port", 0),
63-
serverBootstrap.toList)
61+
clientFactory = transportContext.createClientFactory(bootstrap.toList)
62+
server = createServer(serverBootstrap.toList)
6463
appId = conf.getAppId
6564
logInfo("Server created on " + server.getPort)
6665
}
6766

67+
/** Creates and binds the TransportServer, possibly trying multiple ports. */
68+
private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
69+
def startService(port: Int): (TransportServer, Int) = {
70+
val server = transportContext.createServer(port, bootstraps)
71+
(server, server.getPort)
72+
}
73+
74+
val portToTry = conf.getInt("spark.blockManager.port", 0)
75+
Utils.startServiceOnPort(portToTry, startService, conf, getClass.getName)._1
76+
}
77+
6878
override def fetchBlocks(
6979
host: String,
7080
port: Int,
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.netty
19+
20+
import org.apache.spark.network.BlockDataManager
21+
import org.apache.spark.{SecurityManager, SparkConf}
22+
import org.mockito.Mockito.mock
23+
import org.scalatest._
24+
25+
class NettyBlockTransferServiceSuite extends FunSuite with BeforeAndAfterEach with ShouldMatchers {
26+
private var service0: NettyBlockTransferService = _
27+
private var service1: NettyBlockTransferService = _
28+
29+
override def afterEach() {
30+
if (service0 != null) {
31+
service0.close()
32+
service0 = null
33+
}
34+
35+
if (service1 != null) {
36+
service1.close()
37+
service1 = null
38+
}
39+
}
40+
41+
test("can bind to a random port") {
42+
service0 = createService(port = 0)
43+
service0.port should not be 0
44+
}
45+
46+
test("can bind to two random ports") {
47+
service0 = createService(port = 0)
48+
service1 = createService(port = 0)
49+
service0.port should not be service1.port
50+
}
51+
52+
test("can bind to a specific port") {
53+
val port = 17634
54+
service0 = createService(port)
55+
service0.port should be >= port
56+
service0.port should be <= (port + 10) // avoid testing equality in case of simultaneous tests
57+
}
58+
59+
test("can bind to a specific port twice and the second increments") {
60+
val port = 17634
61+
service0 = createService(port)
62+
service1 = createService(port)
63+
service0.port should be >= port
64+
service0.port should be <= (port + 10)
65+
service1.port should be (service0.port + 1)
66+
}
67+
68+
private def createService(port: Int): NettyBlockTransferService = {
69+
val conf = new SparkConf()
70+
.set("spark.app.id", s"test-${getClass.getName}")
71+
.set("spark.blockManager.port", port.toString)
72+
val securityManager = new SecurityManager(conf)
73+
val blockDataManager = mock(classOf[BlockDataManager])
74+
val service = new NettyBlockTransferService(conf, securityManager, numCores = 1)
75+
service.init(blockDataManager)
76+
service
77+
}
78+
}

network/common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 9 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.netty.channel.ChannelOption;
3232
import io.netty.channel.EventLoopGroup;
3333
import io.netty.channel.socket.SocketChannel;
34+
import org.apache.spark.network.util.JavaUtils;
3435
import org.slf4j.Logger;
3536
import org.slf4j.LoggerFactory;
3637

@@ -65,7 +66,12 @@ public TransportServer(
6566
this.appRpcHandler = appRpcHandler;
6667
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
6768

68-
init(portToBind);
69+
try {
70+
init(portToBind);
71+
} catch (RuntimeException e) {
72+
JavaUtils.closeQuietly(this);
73+
throw e;
74+
}
6975
}
7076

7177
public int getPort() {
@@ -114,7 +120,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
114120
}
115121
});
116122

117-
bindRightPort(portToBind);
123+
channelFuture = bootstrap.bind(new InetSocketAddress(portToBind));
124+
channelFuture.syncUninterruptibly();
118125

119126
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
120127
logger.debug("Shuffle server started on port :" + port);
@@ -135,38 +142,4 @@ public void close() {
135142
}
136143
bootstrap = null;
137144
}
138-
139-
/**
140-
* Attempt to bind to the specified port up to a fixed number of retries.
141-
* If all attempts fail after the max number of retries, exit.
142-
*/
143-
private void bindRightPort(int portToBind) {
144-
int maxPortRetries = conf.portMaxRetries();
145-
146-
for (int i = 0; i <= maxPortRetries; i++) {
147-
int tryPort = -1;
148-
if (0 == portToBind) {
149-
// Do not increment port if tryPort is 0, which is treated as a special port
150-
tryPort = 0;
151-
} else {
152-
// If the new port wraps around, do not try a privilege port
153-
tryPort = ((portToBind + i - 1024) % (65536 - 1024)) + 1024;
154-
}
155-
try {
156-
channelFuture = bootstrap.bind(new InetSocketAddress(tryPort));
157-
channelFuture.syncUninterruptibly();
158-
return;
159-
} catch (Exception e) {
160-
logger.warn("Netty service could not bind on port " + tryPort +
161-
". Attempting the next port.");
162-
if (i >= maxPortRetries) {
163-
logger.error(e.getMessage() + ": Netty server failed after "
164-
+ maxPortRetries + " retries.");
165-
166-
// If it can't find a right port, it should exit directly.
167-
System.exit(-1);
168-
}
169-
}
170-
}
171-
}
172145
}

0 commit comments

Comments
 (0)