Skip to content

Commit 6489db5

Browse files
author
Andrew Or
committed
Try catch at the right places
On service init, we want to fail fast if the server fails to start for any reason (e.g. port bind exception). However, once the NM has been running for a while, it should be super robust so we need to add try catches around the application initialization / stopping code.
1 parent 7b71d8f commit 6489db5

File tree

1 file changed

+39
-31
lines changed

1 file changed

+39
-31
lines changed

network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -95,67 +95,75 @@ private boolean isAuthenticationEnabled() {
9595
*/
9696
@Override
9797
protected void serviceInit(Configuration conf) {
98-
try {
99-
// If authentication is enabled, set up the shuffle server to use a
100-
// special RPC handler that filters out unauthenticated fetch requests
101-
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
102-
RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
103-
if (authEnabled) {
104-
secretManager = new ShuffleSecretManager();
105-
rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
106-
}
107-
108-
int port = conf.getInt(
109-
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
110-
TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
111-
TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
112-
shuffleServer = transportContext.createServer(port);
113-
String authEnabledString = authEnabled ? "enabled" : "not enabled";
114-
logger.info("Started YARN shuffle service for Spark on port {}. " +
115-
"Authentication is {}.", port, authEnabledString);
116-
} catch (Exception e) {
117-
logger.error("Exception in starting YARN shuffle service for Spark", e);
98+
// If authentication is enabled, set up the shuffle server to use a
99+
// special RPC handler that filters out unauthenticated fetch requests
100+
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
101+
RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
102+
if (authEnabled) {
103+
secretManager = new ShuffleSecretManager();
104+
rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
118105
}
106+
107+
int port = conf.getInt(
108+
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
109+
TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
110+
TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
111+
shuffleServer = transportContext.createServer(port);
112+
String authEnabledString = authEnabled ? "enabled" : "not enabled";
113+
logger.info("Started YARN shuffle service for Spark on port {}. " +
114+
"Authentication is {}.", port, authEnabledString);
119115
}
120116

121117
@Override
122118
public void initializeApplication(ApplicationInitializationContext context) {
123119
String appId = context.getApplicationId().toString();
124-
ByteBuffer shuffleSecret = context.getApplicationDataForService();
125-
logger.debug("Initializing application {}", appId);
126-
if (isAuthenticationEnabled()) {
127-
secretManager.registerApp(appId, shuffleSecret);
120+
try {
121+
ByteBuffer shuffleSecret = context.getApplicationDataForService();
122+
logger.info("Initializing application {}", appId);
123+
if (isAuthenticationEnabled()) {
124+
secretManager.registerApp(appId, shuffleSecret);
125+
}
126+
} catch (Exception e) {
127+
logger.error("Exception when initializing application {}", appId, e);
128128
}
129129
}
130130

131131
@Override
132132
public void stopApplication(ApplicationTerminationContext context) {
133133
String appId = context.getApplicationId().toString();
134-
logger.debug("Stopping application {}", appId);
135-
if (isAuthenticationEnabled()) {
136-
secretManager.unregisterApp(appId);
134+
try {
135+
logger.info("Stopping application {}", appId);
136+
if (isAuthenticationEnabled()) {
137+
secretManager.unregisterApp(appId);
138+
}
139+
} catch (Exception e) {
140+
logger.error("Exception when stopping application {}", appId, e);
137141
}
138142
}
139143

140144
@Override
141145
public void initializeContainer(ContainerInitializationContext context) {
142146
ContainerId containerId = context.getContainerId();
143-
logger.debug("Initializing container {}", containerId);
147+
logger.info("Initializing container {}", containerId);
144148
}
145149

146150
@Override
147151
public void stopContainer(ContainerTerminationContext context) {
148152
ContainerId containerId = context.getContainerId();
149-
logger.debug("Stopping container {}", containerId);
153+
logger.info("Stopping container {}", containerId);
150154
}
151155

152156
/**
153157
* Close the shuffle server to clean up any associated state.
154158
*/
155159
@Override
156160
protected void serviceStop() {
157-
if (shuffleServer != null) {
158-
shuffleServer.close();
161+
try {
162+
if (shuffleServer != null) {
163+
shuffleServer.close();
164+
}
165+
} catch (Exception e) {
166+
logger.error("Exception when stopping service", e);
159167
}
160168
}
161169

0 commit comments

Comments
 (0)