Skip to content

Commit 0fefabb

Browse files
committed
Use double check in synchronization.
1 parent 41dfcb2 commit 0fefabb

File tree

1 file changed

+17
-15
lines changed

1 file changed

+17
-15
lines changed

network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -129,26 +129,28 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO
129129

130130
int clientIndex = rand.nextInt(numConnectionsPerPeer);
131131
TransportClient cachedClient = clientPool.clients[clientIndex];
132-
if (cachedClient != null) {
133-
if (cachedClient.isActive()) {
134-
logger.trace("Returning cached connection to {}: {}", address, cachedClient);
135-
return cachedClient;
136-
} else {
137-
logger.info("Found inactive connection to {}, closing it.", address);
138-
clientPool.clients[clientIndex] = null; // Remove inactive clients.
139-
}
132+
133+
if (cachedClient != null && cachedClient.isActive()) {
134+
logger.trace("Returning cached connection to {}: {}", address, cachedClient);
135+
return cachedClient;
140136
}
141137

142138
// If we reach here, we don't have an existing connection open. Let's create a new one.
143-
// Multiple threads might race here to create new connections. Let's keep only one of them
144-
// active at anytime.
139+
// Multiple threads might race here to create new connections. Keep only one of them active.
145140
synchronized (clientPool.locks[clientIndex]) {
146-
if (clientPool.clients[clientIndex] == null || !clientPool.clients[clientIndex].isActive()) {
147-
clientPool.clients[clientIndex] = createClient(address);
141+
cachedClient = clientPool.clients[clientIndex];
142+
143+
if (cachedClient != null) {
144+
if (cachedClient.isActive()) {
145+
logger.trace("Returning cached connection to {}: {}", address, cachedClient);
146+
return cachedClient;
147+
} else {
148+
logger.info("Found inactive connection to {}, creating a new one.", address);
149+
}
148150
}
151+
clientPool.clients[clientIndex] = createClient(address);
152+
return clientPool.clients[clientIndex];
149153
}
150-
151-
return clientPool.clients[clientIndex];
152154
}
153155

154156
/** Create a completely new {@link TransportClient} to the remote address. */
@@ -203,7 +205,7 @@ public void initChannel(SocketChannel ch) {
203205
long postBootstrap = System.nanoTime();
204206

205207
logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
206-
address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);
208+
address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);
207209

208210
return client;
209211
}

0 commit comments

Comments
 (0)