Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ class LocalSparkCluster(
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")

// Disable REST server on Master in this mode unless otherwise specified
val _conf = conf.clone().setIfMissing("spark.master.rest.enabled", "false")
val _conf = conf.clone()
.setIfMissing("spark.master.rest.enabled", "false")
.set("spark.shuffle.service.enabled", "false")

/* Start the Master */
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,22 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
}
transportContext = new TransportContext(transportConf, rpcHandler)
clientFactory = transportContext.createClientFactory(clientBootstrap.toList)
server = transportContext.createServer(conf.getInt("spark.blockManager.port", 0),
serverBootstrap.toList)
server = createServer(serverBootstrap.toList)
appId = conf.getAppId
logInfo("Server created on " + server.getPort)
}

/** Creates and binds the TransportServer, possibly trying multiple ports. */
private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
def startService(port: Int): (TransportServer, Int) = {
val server = transportContext.createServer(port, bootstraps)
(server, server.getPort)
}

val portToTry = conf.getInt("spark.blockManager.port", 0)
Utils.startServiceOnPort(portToTry, startService, conf, getClass.getName)._1
}

override def fetchBlocks(
host: String,
port: Int,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.netty

import org.apache.spark.network.BlockDataManager
import org.apache.spark.{SecurityManager, SparkConf}
import org.mockito.Mockito.mock
import org.scalatest._

class NettyBlockTransferServiceSuite extends FunSuite with BeforeAndAfterEach with ShouldMatchers {
private var service0: NettyBlockTransferService = _
private var service1: NettyBlockTransferService = _

override def afterEach() {
if (service0 != null) {
service0.close()
service0 = null
}

if (service1 != null) {
service1.close()
service1 = null
}
}

test("can bind to a random port") {
service0 = createService(port = 0)
service0.port should not be 0
}

test("can bind to two random ports") {
service0 = createService(port = 0)
service1 = createService(port = 0)
service0.port should not be service1.port
}

test("can bind to a specific port") {
val port = 17634
service0 = createService(port)
service0.port should be >= port
service0.port should be <= (port + 10) // avoid testing equality in case of simultaneous tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems flaky. Isn't the previous check enough? Otherwise, instead of 10 I'd use 1024 which is the actual limit in the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can increase the number no problem, though I guess we should use Utils.portMaxRetries instead. However, note that even 10 should be very unlikely to be hit, that would mean there's 5-10 concurrently running tests at this exact part of the build, or else we're very, very unlucky.

}

test("can bind to a specific port twice and the second increments") {
val port = 17634
service0 = createService(port)
service1 = createService(port)
service0.port should be >= port
service0.port should be <= (port + 10)
service1.port should be (service0.port + 1)
}

private def createService(port: Int): NettyBlockTransferService = {
val conf = new SparkConf()
.set("spark.app.id", s"test-${getClass.getName}")
.set("spark.blockManager.port", port.toString)
val securityManager = new SecurityManager(conf)
val blockDataManager = mock(classOf[BlockDataManager])
val service = new NettyBlockTransferService(conf, securityManager, numCores = 1)
service.init(blockDataManager)
service
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import org.apache.spark.network.util.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -65,7 +66,12 @@ public TransportServer(
this.appRpcHandler = appRpcHandler;
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));

init(portToBind);
try {
init(portToBind);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(this);
throw e;
}
}

public int getPort() {
Expand Down Expand Up @@ -114,7 +120,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
}
});

bindRightPort(portToBind);
channelFuture = bootstrap.bind(new InetSocketAddress(portToBind));
channelFuture.syncUninterruptibly();

port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port :" + port);
Expand All @@ -135,38 +142,4 @@ public void close() {
}
bootstrap = null;
}

/**
* Attempt to bind to the specified port up to a fixed number of retries.
* If all attempts fail after the max number of retries, exit.
*/
private void bindRightPort(int portToBind) {
int maxPortRetries = conf.portMaxRetries();

for (int i = 0; i <= maxPortRetries; i++) {
int tryPort = -1;
if (0 == portToBind) {
// Do not increment port if tryPort is 0, which is treated as a special port
tryPort = 0;
} else {
// If the new port wraps around, do not try a privilege port
tryPort = ((portToBind + i - 1024) % (65536 - 1024)) + 1024;
}
try {
channelFuture = bootstrap.bind(new InetSocketAddress(tryPort));
channelFuture.syncUninterruptibly();
return;
} catch (Exception e) {
logger.warn("Netty service could not bind on port " + tryPort +
". Attempting the next port.");
if (i >= maxPortRetries) {
logger.error(e.getMessage() + ": Netty server failed after "
+ maxPortRetries + " retries.");

// If it can't find a right port, it should exit directly.
System.exit(-1);
}
}
}
}
}