Skip to content

Commit 43dcb96

Browse files
author
Andrew Or
committed
First cut integration of shuffle service with Yarn aux service
1 parent b54a0c4 commit 43dcb96

File tree

2 files changed

+41
-16
lines changed

2 files changed

+41
-16
lines changed
Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11

22
package org.apache.spark.network.yarn;
33

4+
import java.lang.Override;
45
import java.nio.ByteBuffer;
5-
import java.sql.Timestamp;
6-
import java.util.Date;
76

7+
import org.apache.spark.network.TransportContext;
8+
import org.apache.spark.network.server.RpcHandler;
9+
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
10+
import org.apache.spark.network.util.TransportConf;
11+
import org.apache.spark.network.util.SystemPropertyConfigProvider;
12+
13+
import org.apache.hadoop.conf.Configuration;
814
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
915
import org.apache.hadoop.yarn.api.records.ApplicationId;
1016
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -13,51 +19,68 @@
1319
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
1420
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
1521
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
1624

1725
/**
1826
* External shuffle service used by Spark on Yarn.
1927
*/
2028
public class YarnShuffleService extends AuxiliaryService {
21-
29+
private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
2230
private static final JobTokenSecretManager secretManager = new JobTokenSecretManager();
2331

32+
private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
33+
private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
34+
2435
public YarnShuffleService() {
25-
super("sparkshuffleservice");
26-
log("--- [ Welcome to YarnShuffleService v0.1 ] ---");
36+
super("spark_shuffle");
37+
logger.info("Initializing Yarn shuffle service for Spark");
38+
}
39+
40+
/**
41+
* Start the shuffle server with the given configuration.
42+
*/
43+
@Override
44+
protected void serviceInit(Configuration conf) {
45+
try {
46+
int port = conf.getInt(
47+
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
48+
TransportConf transportConf = new TransportConf(new SystemPropertyConfigProvider());
49+
RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
50+
TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
51+
transportContext.createServer(port);
52+
} catch (Exception e) {
53+
logger.error("Exception in starting Yarn shuffle service for Spark!", e);
54+
}
2755
}
2856

2957
@Override
3058
public void initializeApplication(ApplicationInitializationContext context) {
3159
ApplicationId appId = context.getApplicationId();
32-
log("Initializing application " + appId + "!");
60+
logger.debug("Initializing application " + appId + "!");
3361
}
3462

3563
@Override
3664
public void stopApplication(ApplicationTerminationContext context) {
3765
ApplicationId appId = context.getApplicationId();
38-
log("Stopping application " + appId + "!");
66+
logger.debug("Stopping application " + appId + "!");
3967
}
4068

4169
@Override
4270
public ByteBuffer getMetaData() {
43-
log("Getting meta data");
44-
return ByteBuffer.wrap("".getBytes());
71+
logger.debug("Getting meta data");
72+
return ByteBuffer.allocate(0);
4573
}
4674

4775
@Override
4876
public void initializeContainer(ContainerInitializationContext context) {
4977
ContainerId containerId = context.getContainerId();
50-
log("Initializing container " + containerId + "!");
78+
logger.debug("Initializing container " + containerId + "!");
5179
}
5280

5381
@Override
5482
public void stopContainer(ContainerTerminationContext context) {
5583
ContainerId containerId = context.getContainerId();
56-
log("Stopping container " + containerId + "!");
57-
}
58-
59-
private void log(String msg) {
60-
Timestamp timestamp = new Timestamp(new Date().getTime());
61-
System.out.println("* org.apache.spark.YarnShuffleService " + timestamp + ": " + msg);
84+
logger.debug("Stopping container " + containerId + "!");
6285
}
6386
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ class ExecutorRunnable(
8989

9090
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
9191

92+
ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> ByteBuffer.allocate(0)))
93+
9294
// Send the start request to the ContainerManager
9395
nmClient.startContainer(container, ctx)
9496
}

0 commit comments

Comments
 (0)