Skip to content

Commit a6b95f1

Browse files
committed
Address comments
1 parent 785bbde commit a6b95f1

File tree

5 files changed

+20
-15
lines changed

5 files changed

+20
-15
lines changed

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.network.netty
1919

20-
import org.apache.spark.network.util.TransportConf
21-
2220
import scala.collection.JavaConversions._
2321
import scala.concurrent.{Future, Promise}
2422

@@ -27,7 +25,7 @@ import org.apache.spark.network._
2725
import org.apache.spark.network.buffer.ManagedBuffer
2826
import org.apache.spark.network.client.{TransportClientBootstrap, RpcResponseCallback, TransportClientFactory}
2927
import org.apache.spark.network.netty.NettyMessages.{OpenBlocks, UploadBlock}
30-
import org.apache.spark.network.sasl.{SecretKeyHolder, SaslRpcHandler, SaslBootstrap}
28+
import org.apache.spark.network.sasl.{SaslRpcHandler, SaslClientBootstrap}
3129
import org.apache.spark.network.server._
3230
import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher}
3331
import org.apache.spark.serializer.JavaSerializer
@@ -56,7 +54,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
5654
(nettyRpcHandler, None)
5755
} else {
5856
(new SaslRpcHandler(nettyRpcHandler, securityManager),
59-
Some(new SaslBootstrap(transportConf, conf.getAppId, securityManager)))
57+
Some(new SaslClientBootstrap(transportConf, conf.getAppId, securityManager)))
6058
}
6159
}
6260
transportContext = new TransportContext(transportConf, rpcHandler)

network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public void initChannel(SocketChannel ch) {
130130
});
131131

132132
// Connect to the remote server
133+
long preConnect = System.currentTimeMillis();
133134
ChannelFuture cf = bootstrap.connect(address);
134135
if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
135136
throw new RuntimeException(
@@ -142,25 +143,30 @@ public void initChannel(SocketChannel ch) {
142143
assert client != null : "Channel future completed successfully with null client";
143144

144145
// Execute any client bootstraps synchronously before marking the Client as successful.
146+
long preBootstrap = System.currentTimeMillis();
145147
logger.debug("Connection to {} successful, running bootstraps...", address);
146148
try {
147149
for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
148150
clientBootstrap.doBootstrap(client);
149151
}
150-
} catch (Exception e) { // catch Exception as the bootstrap may be written in Scala
151-
logger.error("Exception while bootstrapping client", e);
152+
} catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
153+
long bootstrapTime = System.currentTimeMillis() - preBootstrap;
154+
logger.error("Exception while bootstrapping client after " + bootstrapTime + " ms", e);
152155
client.close();
153156
throw Throwables.propagate(e);
154157
}
158+
long postBootstrap = System.currentTimeMillis();
155159

156-
logger.debug("Successfully executed {} bootstraps for {}", clientBootstraps.size(), address);
157160
// Successful connection & bootstrap -- in the event that two threads raced to create a client,
158161
// use the first one that was put into the connectionPool and close the one we made here.
159162
TransportClient oldClient = connectionPool.putIfAbsent(address, client);
160163
if (oldClient == null) {
164+
logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
165+
address, postBootstrap - preConnect, postBootstrap - preBootstrap);
161166
return client;
162167
} else {
163-
logger.debug("Two clients were created concurrently, second one will be disposed.");
168+
logger.debug("Two clients were created concurrently after {} ms, second will be disposed.",
169+
postBootstrap - preConnect);
164170
client.close();
165171
return oldClient;
166172
}

network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslBootstrap.java renamed to network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@
3030
* Bootstraps a {@link TransportClient} by performing SASL authentication on the connection. The
3131
* server should be setup with a {@link SaslRpcHandler} with matching keys for the given appId.
3232
*/
33-
public class SaslBootstrap implements TransportClientBootstrap {
34-
private final Logger logger = LoggerFactory.getLogger(SaslBootstrap.class);
33+
public class SaslClientBootstrap implements TransportClientBootstrap {
34+
private final Logger logger = LoggerFactory.getLogger(SaslClientBootstrap.class);
3535

3636
private final TransportConf conf;
3737
private final String appId;
3838
private final SecretKeyHolder secretKeyHolder;
3939

40-
public SaslBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder) {
40+
public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder) {
4141
this.conf = conf;
4242
this.appId = appId;
4343
this.secretKeyHolder = secretKeyHolder;
@@ -48,6 +48,7 @@ public SaslBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKey
4848
* challenge-response tokens until we either successfully authenticate or throw an exception
4949
* due to mismatch.
5050
*/
51+
@Override
5152
public void doBootstrap(TransportClient client) {
5253
SparkSaslClient saslClient = new SparkSaslClient(appId, secretKeyHolder);
5354
try {

network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
public class SaslRpcHandler extends RpcHandler {
4444
private final Logger logger = LoggerFactory.getLogger(SaslRpcHandler.class);
4545

46-
/** RpcHandler we will delegate for authenticated connections. */
46+
/** RpcHandler we will delegate to for authenticated connections. */
4747
private final RpcHandler delegate;
4848

4949
/** Class which provides secret keys which are shared by server and client on a per-app basis. */

network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void afterEach() {
9696
public void testGoodClient() {
9797
clientFactory = context.createClientFactory(
9898
Lists.<TransportClientBootstrap>newArrayList(
99-
new SaslBootstrap(conf, "app-id", new TestSecretKeyHolder("good-key"))));
99+
new SaslClientBootstrap(conf, "app-id", new TestSecretKeyHolder("good-key"))));
100100

101101
TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
102102
String msg = "Hello, World!";
@@ -108,7 +108,7 @@ public void testGoodClient() {
108108
public void testBadClient() {
109109
clientFactory = context.createClientFactory(
110110
Lists.<TransportClientBootstrap>newArrayList(
111-
new SaslBootstrap(conf, "app-id", new TestSecretKeyHolder("bad-key"))));
111+
new SaslClientBootstrap(conf, "app-id", new TestSecretKeyHolder("bad-key"))));
112112

113113
try {
114114
// Bootstrap should fail on startup.
@@ -146,7 +146,7 @@ public void testNoSaslServer() {
146146
TransportContext context = new TransportContext(conf, handler);
147147
clientFactory = context.createClientFactory(
148148
Lists.<TransportClientBootstrap>newArrayList(
149-
new SaslBootstrap(conf, "app-id", new TestSecretKeyHolder("key"))));
149+
new SaslClientBootstrap(conf, "app-id", new TestSecretKeyHolder("key"))));
150150
TransportServer server = context.createServer();
151151
try {
152152
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());

0 commit comments

Comments
 (0)