Skip to content

Commit 5bf9b7e

Browse files
author
Andrew Or
committed
Address a few minor comments
1 parent 5b419b8 commit 5bf9b7e

File tree

3 files changed

+20
-12
lines changed

3 files changed

+20
-12
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
115115
* If not, throw an appropriate exception.
116116
*/
117117
private def validateSettings(): Unit = {
118-
// Verify that bounds are valid
119118
if (minNumExecutors < 0 || maxNumExecutors < 0) {
120119
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
121120
}
@@ -126,21 +125,21 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
126125
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
127126
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
128127
}
129-
// Verify that timeouts are positive
130128
if (schedulerBacklogTimeout <= 0) {
131-
throw new SparkException(s"spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
129+
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
132130
}
133131
if (sustainedSchedulerBacklogTimeout <= 0) {
134132
throw new SparkException(
135-
s"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
133+
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
136134
}
137135
if (executorIdleTimeout <= 0) {
138-
throw new SparkException(s"spark.dynamicAllocation.executorIdleTimeout must be > 0!")
136+
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
139137
}
140-
// Verify that external shuffle service is enabled
138+
// Require external shuffle service for dynamic allocation
139+
// Otherwise, we may lose shuffle files when killing executors
141140
if (!conf.getBoolean("spark.shuffle.service.enabled", false)) {
142-
throw new SparkException(s"Dynamic allocation of executors requires the external " +
143-
s"shuffle service. You may enable this through spark.shuffle.service.enabled.")
141+
throw new SparkException("Dynamic allocation of executors requires the external " +
142+
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
144143
}
145144
}
146145

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ private[spark] class BlockManager(
100100
val sparkPort = conf.getInt(shuffleServicePortKey, 7337)
101101
if (SparkHadoopUtil.get.isYarnMode) {
102102
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
103-
Option(hadoopConf.get(shuffleServicePortKey)).map(_.toInt).getOrElse(sparkPort)
103+
hadoopConf.getInt(shuffleServicePortKey, sparkPort)
104104
} else {
105105
sparkPort
106106
}

network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222

2323
import org.apache.spark.network.TransportContext;
2424
import org.apache.spark.network.server.RpcHandler;
25+
import org.apache.spark.network.server.TransportServer;
2526
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
2627
import org.apache.spark.network.util.TransportConf;
2728
import org.apache.spark.network.util.SystemPropertyConfigProvider;
2829

2930
import org.apache.hadoop.conf.Configuration;
30-
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
3131
import org.apache.hadoop.yarn.api.records.ApplicationId;
3232
import org.apache.hadoop.yarn.api.records.ContainerId;
3333
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
@@ -43,11 +43,13 @@
4343
*/
4444
public class YarnShuffleService extends AuxiliaryService {
4545
private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
46-
private static final JobTokenSecretManager secretManager = new JobTokenSecretManager();
4746

4847
private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
4948
private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
5049

50+
// Actual server that serves the shuffle files
51+
private TransportServer shuffleServer = null;
52+
5153
public YarnShuffleService() {
5254
super("spark_shuffle");
5355
logger.info("Initializing Yarn shuffle service for Spark");
@@ -64,7 +66,7 @@ protected void serviceInit(Configuration conf) {
6466
TransportConf transportConf = new TransportConf(new SystemPropertyConfigProvider());
6567
RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
6668
TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
67-
transportContext.createServer(port);
69+
shuffleServer = transportContext.createServer(port);
6870
logger.info("Started Yarn shuffle service for Spark on port " + port);
6971
} catch (Exception e) {
7072
logger.error("Exception in starting Yarn shuffle service for Spark", e);
@@ -100,4 +102,11 @@ public void stopContainer(ContainerTerminationContext context) {
100102
ContainerId containerId = context.getContainerId();
101103
logger.debug("Stopping container " + containerId + "!");
102104
}
105+
106+
@Override
107+
protected void serviceStop() {
108+
if (shuffleServer != null) {
109+
shuffleServer.close();
110+
}
111+
}
103112
}

0 commit comments

Comments
 (0)