Skip to content

Commit 962770c

Browse files
committed
Disable port retry in StandaloneWorkerShuffleService and clone the Configuration in YarnShuffleService
1 parent 7126547 commit 962770c

File tree

4 files changed

+18
-7
lines changed

4 files changed

+18
-7
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
3939
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
4040
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
4141

42-
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
42+
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0,
43+
disablePortRetry = true)
4344
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
4445
private val transportContext: TransportContext = {
4546
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler

core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,12 @@ object SparkTransportConf {
4343
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
4444
* use the given number of cores, rather than all of the machine's cores.
4545
* This restriction will only occur if these properties are not already set.
46+
* @param disablePortRetry if true, server will not retry its port. It's better for the long-run
47+
* server to disable it since the server and client had the agreement of
48+
* the specific port.
4649
*/
47-
def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
50+
def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0,
51+
disablePortRetry: Boolean = false): TransportConf = {
4852
val conf = _conf.clone
4953

5054
// Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
@@ -55,6 +59,10 @@ object SparkTransportConf {
5559
conf.get("spark.shuffle.io.serverThreads", numThreads.toString))
5660
conf.set("spark.shuffle.io.clientThreads",
5761
conf.get("spark.shuffle.io.clientThreads", numThreads.toString))
62+
63+
if (disablePortRetry) {
64+
conf.set("spark.port.maxRetries", "0")
65+
}
5866

5967
new TransportConf(new ConfigProvider {
6068
override def get(name: String): String = conf.get(name)

network/common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ public void close() {
122122
}
123123

124124
/**
125-
* Attempt to bind to the specified port up to a fixed number of retries.
125+
* Attempt to bind on the given port, or fail after a number of attempts.
126+
* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
126127
* If all attempts fail after the max number of retries, exit.
127128
*/
128129
private void bindRightPort(int portToBind) {

network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,25 +98,26 @@ private boolean isAuthenticationEnabled() {
9898
*/
9999
@Override
100100
protected void serviceInit(Configuration conf) {
101+
Configuration newConf = new Configuration(conf);
101102

102103
// It's better to let the NodeManager get down rather than take a port retry
103104
// when `spark.shuffle.service.port` has been conflicted during starting
104105
// the Spark Yarn Shuffle Server, because the retry mechanism will make the
105106
// inconsistency of shuffle port and also make client fail to find the port.
106-
conf.setInt("spark.port.maxRetries", 0);
107+
newConf.setInt("spark.port.maxRetries", 0);
107108

108-
TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
109+
TransportConf transportConf = new TransportConf(new HadoopConfigProvider(newConf));
109110
// If authentication is enabled, set up the shuffle server to use a
110111
// special RPC handler that filters out unauthenticated fetch requests
111-
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
112+
boolean authEnabled = newConf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
112113
blockHandler = new ExternalShuffleBlockHandler(transportConf);
113114
RpcHandler rpcHandler = blockHandler;
114115
if (authEnabled) {
115116
secretManager = new ShuffleSecretManager();
116117
rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
117118
}
118119

119-
int port = conf.getInt(
120+
int port = newConf.getInt(
120121
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
121122
TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
122123
shuffleServer = transportContext.createServer(port);

0 commit comments

Comments
 (0)