Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ public class YarnShuffleService extends AuxiliaryService {
private DB db;

public YarnShuffleService() {
super("spark_shuffle");
this("spark_shuffle");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still hardcoded? Should we use configured SHUFFLE_SERVICE_NAME?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still hardcoded. I haven't found a way to access Spark configuration from that constructor and org.apache.hadoop.yarn.server.api.AuxiliaryService requires the name. Do you have a suggestion of how that could be done?

Copy link
Member

@viirya viirya Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I commented below #26000 (comment), if this is just for yarn, put it in YarnShuffleService, like "spark.yarn.shuffle.stopOnFailure"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hardcoded here. Once the shuffle service name is configured, won't they mismatch? Will it cause problem?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hardcoded here. HDP hardcodes another value though (spark2_shuffle). While vanilla Spark would keep working as is and would use the name spark_shuffle, the new configuration option would allow users to point Spark to non-vanilla shuffle service.
The changes to that class are done only to test that changing the name of the service and in the configuration play nicely together.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems impossible to register the service with the name passed in the configuration because the configuration is passed after the class is instantiated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So this config can only be used to let Spark choose which service to connect. It cannot change the name of Shuffle Service.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I guess I could implement a workaround, which would get the config setting from the default Configuration, but that, at least theoretically, wouldn't guarantee that the exact configuration would be passed during service initialization.

}

public YarnShuffleService(String serviceName) {
Copy link
Member

@viirya viirya Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it needed to add new constructor? You could not just read configured service name inside original constructor and do initialization?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly to be able to test it since yarn.nodemanager.aux-services.spark_shuffle.class option assumes no-args constructor. Maybe make it protected?

super(serviceName);
logger.info("Initializing YARN shuffle service for Spark");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets change the log statement to have the servicename in it

instance = this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ package object config {
private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)

private[spark] val SHUFFLE_SERVICE_NAME =
ConfigBuilder("spark.shuffle.service.name").stringConf.createWithDefault("spark_shuffle")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for yarn external shuffle service, right? If so, just put in YarnShuffleService and name as spark.yarn.shuffle.service.name?

Copy link
Author

@nonsleepr nonsleepr Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's spark.shuffle.service.port right above it, which specifies the port of the said service. If the ports match but the name doesn't, the service wouldn't be located.

EDIT: I would think being consistent with the names is more important than properly namespacing the option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand you correctly. Only yarn needs this config, cannot it be in YarnShuffleService like "spark.yarn.shuffle.stopOnFailure"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You only need to make sure ExecutorRunnable, YarnShuffleService uses the same config and configuring it correctly in yarn-site.xml. Isn't? Or I miss anything here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Okay. I'm convinced.
Will move the option and the docs to YARN namespace.


private[spark] val KEYTAB = ConfigBuilder("spark.kerberos.keytab")
.doc("Location of user's keytab.")
.stringConf.createOptional
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,13 @@ Apart from these, the following properties are also available, and may be useful
configuration and setup documentation</a> for more information.
</td>
</tr>
<tr>
<td><code>spark.shuffle.service.name</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For yarn, docs/running-on-yarn.md is more suitable for documentation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Will move it there.

<td>spark_shuffle</td>
<td>
Name of the external shuffle service.
</td>
</tr>
<tr>
<td><code>spark.shuffle.service.port</code></td>
<td>7337</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ private[yarn] class ExecutorRunnable(
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
}
ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME)
ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes))
}

// Send the start request to the ContainerManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {

override def newYarnConfig(): YarnConfiguration = {
val yarnConfig = new YarnConfiguration()
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark2_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark2_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0")
yarnConfig
Expand All @@ -55,6 +55,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
logInfo("Shuffle service port = " + shuffleServicePort)

Map(
SHUFFLE_SERVICE_NAME.key -> "spark2_shuffle",
SHUFFLE_SERVICE_ENABLED.key -> "true",
SHUFFLE_SERVICE_PORT.key -> shuffleServicePort.toString,
MAX_EXECUTOR_FAILURES.key -> "1"
Expand Down