Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b54a0c4
Initial skeleton for Yarn shuffle service
Nov 3, 2014
43dcb96
First cut integration of shuffle service with Yarn aux service
Nov 3, 2014
b4b1f0c
4 tabs -> 2 tabs
Nov 3, 2014
1bf5109
Use the shuffle service port specified through hadoop config
Nov 4, 2014
ea764e0
Connect to Yarn shuffle service only if it's enabled
Nov 4, 2014
cd076a4
Require external shuffle service for dynamic allocation
Nov 4, 2014
804e7ff
Include the Yarn shuffle service jar in the distribution
Nov 4, 2014
5b419b8
Add missing license header
Nov 4, 2014
5bf9b7e
Address a few minor comments
Nov 4, 2014
baff916
Fix tests
Nov 4, 2014
15a5b37
Fix build for Hadoop 1.x
Nov 4, 2014
761f58a
Merge branch 'master' of github.com:apache/spark into yarn-shuffle-se…
Nov 4, 2014
f39daa6
Do not make network-yarn an assembly module
Nov 4, 2014
f48b20c
Fix tests again
Nov 4, 2014
9b6e058
Address various feedback
Nov 4, 2014
5f8a96f
Merge branch 'master' of github.com:apache/spark into yarn-shuffle-se…
Nov 5, 2014
d1124e4
Add security to shuffle service (INCOMPLETE)
Nov 5, 2014
7b71d8f
Add detailed java docs + reword a few comments
Nov 5, 2014
6489db5
Try catch at the right places
Nov 5, 2014
0eb6233
Merge branch 'master' of github.com:apache/spark into yarn-shuffle-se…
Nov 5, 2014
1c66046
Remove unused provided dependencies
Nov 5, 2014
0ee67a2
Minor wording suggestions
Nov 5, 2014
ef3ddae
Merge branch 'master' of github.com:apache/spark into yarn-shuffle-se…
Nov 5, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Lower and upper bounds on the number of executors. These are required.
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
verifyBounds()

// How long there must be backlogged tasks for before an addition is triggered
private val schedulerBacklogTimeout = conf.getLong(
Expand All @@ -77,9 +76,11 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)

// How long an executor must be idle for before it is removed
private val removeThresholdSeconds = conf.getLong(
private val executorIdleTimeout = conf.getLong(
"spark.dynamicAllocation.executorIdleTimeout", 600)

validateSettings()

// Number of executors to add in the next round
private var numExecutorsToAdd = 1

Expand Down Expand Up @@ -110,10 +111,11 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
private var clock: Clock = new RealClock

/**
* Verify that the lower and upper bounds on the number of executors are valid.
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
*/
private def verifyBounds(): Unit = {
private def validateSettings(): Unit = {
// Verify that bounds are valid
if (minNumExecutors < 0 || maxNumExecutors < 0) {
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
}
Expand All @@ -124,6 +126,22 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
}
// Verify that timeouts are positive
if (schedulerBacklogTimeout <= 0) {
throw new SparkException(s"spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
}
if (sustainedSchedulerBacklogTimeout <= 0) {
throw new SparkException(
s"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
}
if (executorIdleTimeout <= 0) {
throw new SparkException(s"spark.dynamicAllocation.executorIdleTimeout must be > 0!")
}
// Verify that external shuffle service is enabled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment is redundant given the code.

if (!conf.getBoolean("spark.shuffle.service.enabled", false)) {
throw new SparkException(s"Dynamic allocation of executors requires the external " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: interpolation not necessary (also on next line).

s"shuffle service. You may enable this through spark.shuffle.service.enabled.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, is there any sense in having two separate settings that need to be set in tandem? Couldn't we just base everything on whether dynamic executor allocation is enabled or not?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I guess you might want to allow people to still use the external shuffle service even without dynamic allocation. But still we could be nice and automatically use the external shuffle service when dynamic allocation is enabled, without requiring the user to set both configs.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky, because the user may not have set up an external shuffle service when running Spark with dynamic allocation. If we automatically enable this service for them, they will be confused if they see "connection refused" messages when executors try to fetch files from a service that doesn't exist. It's not intuitive to me why enabling a dynamic scaling feature will cause my executors to die because of this. However, if the user explicitly sets spark.shuffle.service.enabled then he/she likely has a better idea that it's trying to connect to some external service.

}
}

/**
Expand Down Expand Up @@ -254,7 +272,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})")
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
true
} else {
Expand Down Expand Up @@ -329,8 +347,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)")
removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
}
}

Expand Down
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import akka.actor.{ActorSystem, Props}
import sun.nio.ch.DirectBuffer

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
Expand Down Expand Up @@ -92,7 +93,19 @@ private[spark] class BlockManager(

private[spark]
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
private val externalShuffleServicePort = conf.getInt("spark.shuffle.service.port", 7337)

// In Yarn, the shuffle service port maybe set through the Hadoop config
private val shuffleServicePortKey = "spark.shuffle.service.port"
private val externalShuffleServicePort = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not put YARN-related logic directly in the BlockManager itself. Can we extract this type of logic out to a utility function somewhere like
SparkHadoopUtil.get.getYarnOrSparkConf("spark.shuffle.service.port", "7337").toInt

It doesn't save many characters in here, but at least it hides the checking for YARN and constructing a Hadoop Configuration.

val sparkPort = conf.getInt(shuffleServicePortKey, 7337)
if (SparkHadoopUtil.get.isYarnMode) {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
Option(hadoopConf.get(shuffleServicePortKey)).map(_.toInt).getOrElse(sparkPort)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hadoopConf.getInt(shuffleServicePortKey, sparkPort)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh cool I didn't realize Hadoop conf also has that

} else {
sparkPort
}
}

// Check that we're not using external shuffle service with consolidated shuffle files.
if (externalShuffleServiceEnabled
&& conf.getBoolean("spark.shuffle.consolidateFiles", false)
Expand Down
1 change: 1 addition & 0 deletions make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ echo "Spark $VERSION$GITREVSTRING built for Hadoop $SPARK_HADOOP_VERSION" > "$DI
# Copy jars
cp "$FWDIR"/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/"
cp "$FWDIR"/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/"
cp "$FWDIR"/network/yarn/target/scala*/spark-network-yarn*.jar "$DISTDIR/lib/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be nicer to use maven-shade-plugin to create a single jar for the NM aux service. That might make it easier for people to install it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I plan to do that though in a separate PR. See my comment in andrewor14@f39daa6

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14
Here is a problem:
I use this command line: ./make-distribution.sh -Dhadoop.version=2.3.0-cdh5.0.1 -Dyarn.version=2.3.0-cdh5.0.1 -Phadoop-2.3 -Pyarn -Pnetlib-lgpl , but $FWDIR"/network/yarn/target/scala*/spark-network-yarn*.jar does not exist

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I will fix this in a separate PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @witgo I just pushed a hot fix. I didn't realize make-distribution.sh fails fast if a copy fails. Thanks for bringing this up.


# Copy example sources (needed for python and SQL)
mkdir -p "$DISTDIR/examples/src/main"
Expand Down
78 changes: 78 additions & 0 deletions network/yarn/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-network-yarn_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project Yarn Shuffle Service Code</name>
<url>http://spark.apache.org/</url>
<properties>
<sbt.project.name>network-yarn</sbt.project.name>
</properties>

<dependencies>
<!-- Core dependencies -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-shuffle_2.10</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Provided dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.yarn;

import java.lang.Override;
import java.nio.ByteBuffer;

import org.apache.spark.network.TransportContext;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think our import order calls for spark to go below hadoop (unless that's different in YARN code)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I thought I was in Hadoop land for a second. Will fix

import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.SystemPropertyConfigProvider;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* External shuffle service used by Spark on Yarn.
*/
public class YarnShuffleService extends AuxiliaryService {
private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
private static final JobTokenSecretManager secretManager = new JobTokenSecretManager();

private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;

public YarnShuffleService() {
super("spark_shuffle");
logger.info("Initializing Yarn shuffle service for Spark");
}

/**
* Start the shuffle server with the given configuration.
*/
@Override
protected void serviceInit(Configuration conf) {
try {
int port = conf.getInt(
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
TransportConf transportConf = new TransportConf(new SystemPropertyConfigProvider());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the SystemPropertyConfigProvider correct? Perhaps it should be one of these:

private static class HadoopConfigProvider extends ConfigProvider {
  private final Configuration conf;

  public HadoopConfigProvider(Configuration conf) {
    this.conf = conf;
  }

  @Override
  public String get(String name) {
    String value = conf.get(name)
    if (value != null) {
      return value;
    } else {
      throw new NoSuchElementException(name);
    }
  } 
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oop you're right.

RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
transportContext.createServer(port);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To try to play nice, wouldn't it be better to implement serviceStop and shut down this server instance there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm planning on adding that too

logger.info("Started Yarn shuffle service for Spark on port " + port);
} catch (Exception e) {
logger.error("Exception in starting Yarn shuffle service for Spark", e);
}
}

@Override
public void initializeApplication(ApplicationInitializationContext context) {
ApplicationId appId = context.getApplicationId();
logger.debug("Initializing application " + appId + "!");
}

@Override
public void stopApplication(ApplicationTerminationContext context) {
ApplicationId appId = context.getApplicationId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entirely unknowledgeable about the shuffler service's inner workings, but is there no state to clean up once an application completes? E.g. if an app is terminated suddenly, how do its shuffle blocks get cleaned up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to add some timeout to clean up the files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a feature that still needs to be added to the external shuffle service. I made SPARK-4236 to track this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah cool.

logger.debug("Stopping application " + appId + "!");
}

@Override
public ByteBuffer getMetaData() {
logger.debug("Getting meta data");
return ByteBuffer.allocate(0);
}

@Override
public void initializeContainer(ContainerInitializationContext context) {
ContainerId containerId = context.getContainerId();
logger.debug("Initializing container " + containerId + "!");
}

@Override
public void stopContainer(ContainerTerminationContext context) {
ContainerId containerId = context.getContainerId();
logger.debug("Stopping container " + containerId + "!");
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
<module>tools</module>
<module>network/common</module>
<module>network/shuffle</module>
<module>network/yarn</module>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this should be probably gated on some profile (e.g. "-Pyarn"). I don't think this would compile with Hadoop 1.0.4, for example...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... good point

<module>streaming</module>
<module>sql/catalyst</module>
<module>sql/core</module>
Expand Down
6 changes: 3 additions & 3 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ object BuildCommons {
Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl", "kinesis-asl")
.map(ProjectRef(buildLocation, _))

val assemblyProjects@Seq(assembly, examples) = Seq("assembly", "examples")
.map(ProjectRef(buildLocation, _))
val assemblyProjects@Seq(assembly, examples, networkYarn) =
Seq("assembly", "examples", "network-yarn").map(ProjectRef(buildLocation, _))

val tools = ProjectRef(buildLocation, "tools")
// Root project.
Expand Down Expand Up @@ -143,7 +143,7 @@ object SparkBuild extends PomBuild {

// TODO: Add Sql to mima checks
allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl,
streamingFlumeSink, networkCommon, networkShuffle).contains(x)).foreach {
streamingFlumeSink, networkCommon, networkShuffle, networkYarn).contains(x)).foreach {
x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ class ExecutorRunnable(

ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))

// If external shuffle service is enabled, register with the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to say "register and transfer keys" rather than just register

// Yarn shuffle service already started on the node manager
if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> ByteBuffer.allocate(0)))
}

// Send the start request to the ContainerManager
nmClient.startContainer(container, ctx)
}
Expand Down