Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,18 @@ public class YarnShuffleService extends AuxiliaryService {
private DB db;

public YarnShuffleService() {
super("spark_shuffle");
logger.info("Initializing YARN shuffle service for Spark");
this("spark_shuffle");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still hardcoded? Should we use configured SHUFFLE_SERVICE_NAME?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still hardcoded. I haven't found a way to access Spark configuration from that constructor and org.apache.hadoop.yarn.server.api.AuxiliaryService requires the name. Do you have a suggestion of how that could be done?

Copy link
Member

@viirya viirya Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I commented below #26000 (comment), if this is just for yarn, put it in YarnShuffleService, like "spark.yarn.shuffle.stopOnFailure"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hardcoded here. Once the shuffle service name is configured, won't they mismatch? Will it cause problem?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hardcoded here. HDP hardcodes another value though (spark2_shuffle). While vanilla Spark would keep working as is and would use the name spark_shuffle, the new configuration option would allow users to point Spark to non-vanilla shuffle service.
The changes to that class are done only to test that changing the name of the service and in the configuration play nicely together.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems impossible to register the service with the name passed in the configuration because the configuration is passed after the class is instantiated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So this config can only be used to let Spark choose which service to connect. It cannot change the name of Shuffle Service.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I guess I could implement a workaround, which would get the config setting from the default Configuration, but that, at least theoretically, wouldn't guarantee that the exact configuration would be passed during service initialization.

}

/**
* Instantiate YarnShuffleService with arbitrary service name.
* Used for tests.
* YARN doesn't pass service name or any parameters to AuxiliaryServices.
* When instantiated by YARN, constructor without arguments would be called.
*/
protected YarnShuffleService(String serviceName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the name by itself isn't going to be enough. If you really want it configurable we are going to have to have the port configurable. For instance the config name for the port spark.shuffle.service.port needs to be able to be something like spark.shuffle.service.{serviceName}.port. Otherwise all the spark shuffle servers will try to get the same port and fail. The only other option will be to use 0 for ephemeral but

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name specified here is actually useful only in tests. YARN's service instantiation logic wouldn't even pass the name of the service used in the config to instantiated service. I guess that's the main reason the names and ports are hardcoded or bound to non-namespaced configuration keys.
The way HDP overcomes that is by providing different classpaths with different implementations for different versions of the service (spark_shuffle for Spark 1.6.x and spark2_shuffle for Spark 2+). The only way I see it's possible to pass different parameters to the same implementation of the service is by providing different configs on the classpath.

I will add a comment here stating that the name is actually only used for the tests, but otherwise would always be hardcoded to spark_shuffle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are a few things getting muddled together here -- one is how you'd support running two shuffle services, and the other is how a client could choose which shuffle service it talks to.

The client can already set the port for the shuffle server with spark.shuffle.service.port, it just can't set the name used in the ExecutorRunnable.

The other thing to add about how the names of the shuffle servers matter in yarn is that the name goes into yarn-site.xml as described in the "Configuring the External Shuffle Service" in running-on-yarn.md.

super(serviceName);
logger.info("Initializing YARN shuffle service \"{}\" for Spark", serviceName);
instance = this;
}

Expand Down
8 changes: 8 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,14 @@ To use a custom metrics.properties for the application master and executors, upd
If it is not set then the YARN application ID is used.
</td>
</tr>
<tr>
<td><code>spark.yarn.shuffle.service.name</code></td>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this option under Spark Properties section and not under Configuring the External Shuffle Service section because it's a "client" setting, not the setting of the external shuffle service.

<td><code>spark_shuffle</code></td>
<td>
The name of the external shuffle service.
The external shuffle service itself is configured and started by YARN (see [Configuring the External Shuffle Service](#configuring-the-external-shuffle-service) for details). The name specified here must match the name used in YARN service implementation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help to mention that must match the name given to the shuffle service in yarn-site.xml under yarn.nodemanager.aux-services.

</td>
</tr>
</table>

#### Available patterns for SHS custom executor log URL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.Records

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.JavaUtils
Expand Down Expand Up @@ -115,7 +116,8 @@ private[yarn] class ExecutorRunnable(
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
}
ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
val serviceName = sparkConf.get(YARN_SHUFFLE_SERVICE_NAME)
ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes))
}

// Send the start request to the ContainerManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,8 @@ package object config {
private[yarn] val YARN_DRIVER_RESOURCE_TYPES_PREFIX = "spark.yarn.driver.resource."
private[yarn] val YARN_AM_RESOURCE_TYPES_PREFIX = "spark.yarn.am.resource."


private[yarn] val YARN_SHUFFLE_SERVICE_NAME = ConfigBuilder("spark.yarn.shuffle.service.name")
.stringConf
.createWithDefault("spark_shuffle")
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {

override def newYarnConfig(): YarnConfiguration = {
val yarnConfig = new YarnConfiguration()
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark2_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark2_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0")
yarnConfig
Expand All @@ -55,6 +55,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
logInfo("Shuffle service port = " + shuffleServicePort)

Map(
YARN_SHUFFLE_SERVICE_NAME.key -> "spark2_shuffle",
SHUFFLE_SERVICE_ENABLED.key -> "true",
SHUFFLE_SERVICE_PORT.key -> shuffleServicePort.toString,
MAX_EXECUTOR_FAILURES.key -> "1"
Expand Down