diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index c170f99b112c..76cbc6051f78 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -136,8 +136,18 @@ public class YarnShuffleService extends AuxiliaryService { private DB db; public YarnShuffleService() { - super("spark_shuffle"); - logger.info("Initializing YARN shuffle service for Spark"); + this("spark_shuffle"); + } + + /** + * Instantiate YarnShuffleService with arbitrary service name. + * Used for tests. + * YARN doesn't pass service name or any parameters to AuxiliaryServices. + * When instantiated by YARN, constructor without arguments would be called. + */ + protected YarnShuffleService(String serviceName) { + super(serviceName); + logger.info("Initializing YARN shuffle service \"{}\" for Spark", serviceName); instance = this; } diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 418db41216cd..35ab29de45c5 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -492,6 +492,14 @@ To use a custom metrics.properties for the application master and executors, upd If it is not set then the YARN application ID is used. + + spark.yarn.shuffle.service.name + spark_shuffle + + The name of the external shuffle service. + The external shuffle service itself is configured and started by YARN (see [Configuring the External Shuffle Service](#configuring-the-external-shuffle-service) for details). The name specified here must match the name used in YARN service implementation. + + #### Available patterns for SHS custom executor log URL diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 7046ad74056f..5853bef5a7af 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.Records import org.apache.spark.{SecurityManager, SparkConf, SparkException} +import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.JavaUtils @@ -115,7 +116,8 @@ private[yarn] class ExecutorRunnable( // Authentication is not enabled, so just provide dummy metadata ByteBuffer.allocate(0) } - ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes)) + val serviceName = sparkConf.get(YARN_SHUFFLE_SERVICE_NAME) + ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes)) } // Send the start request to the ContainerManager diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 4c187b2cc68e..f298c41130a0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -320,4 +320,8 @@ package object config { private[yarn] val YARN_DRIVER_RESOURCE_TYPES_PREFIX = "spark.yarn.driver.resource." private[yarn] val YARN_AM_RESOURCE_TYPES_PREFIX = "spark.yarn.am.resource." + + private[yarn] val YARN_SHUFFLE_SERVICE_NAME = ConfigBuilder("spark.yarn.shuffle.service.name") + .stringConf + .createWithDefault("spark_shuffle") } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index 8c62069a8dd6..97e0f258f146 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -42,8 +42,8 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { override def newYarnConfig(): YarnConfiguration = { val yarnConfig = new YarnConfiguration() - yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle") - yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"), + yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark2_shuffle") + yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark2_shuffle"), classOf[YarnShuffleService].getCanonicalName) yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0") yarnConfig @@ -55,6 +55,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { logInfo("Shuffle service port = " + shuffleServicePort) Map( + YARN_SHUFFLE_SERVICE_NAME.key -> "spark2_shuffle", SHUFFLE_SERVICE_ENABLED.key -> "true", SHUFFLE_SERVICE_PORT.key -> shuffleServicePort.toString, MAX_EXECUTOR_FAILURES.key -> "1"