Skip to content

Commit 9b6e058

Browse files
author
Andrew Or
committed
Address various feedback
This includes moving the Yarn logic out of BM into Utils, updating a few test comments, adding a HadoopConfigProvider and using it in the Yarn shuffle service.
1 parent f48b20c commit 9b6e058

File tree

5 files changed

+68
-23
lines changed

5 files changed

+68
-23
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
7979
private val executorIdleTimeout = conf.getLong(
8080
"spark.dynamicAllocation.executorIdleTimeout", 600)
8181

82-
// Whether we are testing this class. This should only be used internally.
82+
// During testing, the methods to actually kill and add executors are mocked out
8383
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
8484

8585
validateSettings()

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import akka.actor.{ActorSystem, Props}
3030
import sun.nio.ch.DirectBuffer
3131

3232
import org.apache.spark._
33-
import org.apache.spark.deploy.SparkHadoopUtil
3433
import org.apache.spark.executor._
3534
import org.apache.spark.io.CompressionCodec
3635
import org.apache.spark.network._
@@ -41,7 +40,6 @@ import org.apache.spark.network.util.{ConfigProvider, TransportConf}
4140
import org.apache.spark.serializer.Serializer
4241
import org.apache.spark.shuffle.ShuffleManager
4342
import org.apache.spark.shuffle.hash.HashShuffleManager
44-
import org.apache.spark.shuffle.sort.SortShuffleManager
4543
import org.apache.spark.util._
4644

4745
private[spark] sealed trait BlockValues
@@ -93,18 +91,7 @@ private[spark] class BlockManager(
9391

9492
private[spark]
9593
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
96-
97-
// In Yarn, the shuffle service port maybe set through the Hadoop config
98-
private val shuffleServicePortKey = "spark.shuffle.service.port"
99-
private val externalShuffleServicePort = {
100-
val sparkPort = conf.getInt(shuffleServicePortKey, 7337)
101-
if (SparkHadoopUtil.get.isYarnMode) {
102-
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
103-
hadoopConf.getInt(shuffleServicePortKey, sparkPort)
104-
} else {
105-
sparkPort
106-
}
107-
}
94+
private val externalShuffleServicePort = Utils.getExternalShuffleServicePort(conf)
10895

10996
// Check that we're not using external shuffle service with consolidated shuffle files.
11097
if (externalShuffleServiceEnabled

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import org.json4s._
4444
import tachyon.client.{TachyonFile,TachyonFS}
4545

4646
import org.apache.spark._
47+
import org.apache.spark.deploy.SparkHadoopUtil
4748
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
4849

4950
/** CallSite represents a place in user code. It can have a short and a long form. */
@@ -1767,6 +1768,21 @@ private[spark] object Utils extends Logging {
17671768
val manifest = new JarManifest(manifestUrl.openStream())
17681769
manifest.getMainAttributes.getValue(Name.IMPLEMENTATION_VERSION)
17691770
}.getOrElse("Unknown")
1771+
1772+
/**
1773+
* Return the port used in the external shuffle service as specified through
1774+
* `spark.shuffle.service.port`. In Yarn, this is set in the Hadoop configuration.
1775+
*/
1776+
def getExternalShuffleServicePort(conf: SparkConf): Int = {
1777+
val shuffleServicePortKey = "spark.shuffle.service.port"
1778+
val sparkPort = conf.getInt(shuffleServicePortKey, 7337)
1779+
if (SparkHadoopUtil.get.isYarnMode) {
1780+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
1781+
hadoopConf.getInt(shuffleServicePortKey, sparkPort)
1782+
} else {
1783+
sparkPort
1784+
}
1785+
}
17701786
}
17711787

17721788
/**

network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,6 @@
2020
import java.lang.Override;
2121
import java.nio.ByteBuffer;
2222

23-
import org.apache.spark.network.TransportContext;
24-
import org.apache.spark.network.server.RpcHandler;
25-
import org.apache.spark.network.server.TransportServer;
26-
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
27-
import org.apache.spark.network.util.TransportConf;
28-
import org.apache.spark.network.util.SystemPropertyConfigProvider;
29-
3023
import org.apache.hadoop.conf.Configuration;
3124
import org.apache.hadoop.yarn.api.records.ApplicationId;
3225
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -38,6 +31,13 @@
3831
import org.slf4j.Logger;
3932
import org.slf4j.LoggerFactory;
4033

34+
import org.apache.spark.network.TransportContext;
35+
import org.apache.spark.network.server.RpcHandler;
36+
import org.apache.spark.network.server.TransportServer;
37+
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
38+
import org.apache.spark.network.util.TransportConf;
39+
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
40+
4141
/**
4242
* External shuffle service used by Spark on Yarn.
4343
*/
@@ -63,7 +63,7 @@ protected void serviceInit(Configuration conf) {
6363
try {
6464
int port = conf.getInt(
6565
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
66-
TransportConf transportConf = new TransportConf(new SystemPropertyConfigProvider());
66+
TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
6767
RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
6868
TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
6969
shuffleServer = transportContext.createServer(port);
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.yarn.util;
19+
20+
import java.util.NoSuchElementException;
21+
22+
import org.apache.hadoop.conf.Configuration;
23+
24+
import org.apache.spark.network.util.ConfigProvider;
25+
26+
/** Use the Hadoop configuration to obtain config values. */
27+
public class HadoopConfigProvider extends ConfigProvider {
28+
private final Configuration conf;
29+
30+
public HadoopConfigProvider(Configuration conf) {
31+
this.conf = conf;
32+
}
33+
34+
@Override
35+
public String get(String name) {
36+
String value = conf.get(name);
37+
if (value == null) {
38+
throw new NoSuchElementException(name);
39+
}
40+
return value;
41+
}
42+
}

0 commit comments

Comments
 (0)