-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4740] Create multiple concurrent connections between two peer nodes in Netty. #3625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 5 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
4f21673
[SPARK-4740] Create multiple concurrent connections between two peer …
rxin 3e1306c
Minor style fix.
rxin 9076b4a
Fixed two NPEs.
rxin 41dfcb2
Added test case.
rxin 0fefabb
Use double check in synchronization.
rxin f33c72b
Code review feedback.
rxin ad4241a
Updated javadoc.
rxin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| import java.net.InetSocketAddress; | ||
| import java.net.SocketAddress; | ||
| import java.util.List; | ||
| import java.util.Random; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
|
|
||
|
|
@@ -56,12 +57,31 @@ | |
| * TransportClient, all given {@link TransportClientBootstrap}s will be run. | ||
| */ | ||
| public class TransportClientFactory implements Closeable { | ||
|
|
||
| /** A simple data structure to track the pool of clients between two peer nodes. */ | ||
| private class ClientPool { | ||
| TransportClient[] clients; | ||
| Object[] locks; | ||
|
|
||
| public ClientPool() { | ||
| clients = new TransportClient[numConnectionsPerPeer]; | ||
| locks = new Object[numConnectionsPerPeer]; | ||
| for (int i = 0; i < numConnectionsPerPeer; i++) { | ||
| locks[i] = new Object(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private final Logger logger = LoggerFactory.getLogger(TransportClientFactory.class); | ||
|
|
||
| private final TransportContext context; | ||
| private final TransportConf conf; | ||
| private final List<TransportClientBootstrap> clientBootstraps; | ||
| private final ConcurrentHashMap<SocketAddress, TransportClient> connectionPool; | ||
| private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool; | ||
|
|
||
| /** Random number generator for picking connections between peers. */ | ||
| private final Random rand; | ||
| private final int numConnectionsPerPeer; | ||
|
|
||
| private final Class<? extends Channel> socketChannelClass; | ||
| private EventLoopGroup workerGroup; | ||
|
|
@@ -73,7 +93,9 @@ public TransportClientFactory( | |
| this.context = Preconditions.checkNotNull(context); | ||
| this.conf = context.getConf(); | ||
| this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps)); | ||
| this.connectionPool = new ConcurrentHashMap<SocketAddress, TransportClient>(); | ||
| this.connectionPool = new ConcurrentHashMap<SocketAddress, ClientPool>(); | ||
| this.numConnectionsPerPeer = conf.numConnectionsPerPeer(); | ||
| this.rand = new Random(); | ||
|
|
||
| IOMode ioMode = IOMode.valueOf(conf.ioMode()); | ||
| this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode); | ||
|
|
@@ -97,23 +119,48 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO | |
| // Get connection from the connection pool first. | ||
| // If it is not found or not active, create a new one. | ||
| final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort); | ||
| TransportClient cachedClient = connectionPool.get(address); | ||
| if (cachedClient != null) { | ||
| if (cachedClient.isActive()) { | ||
| logger.trace("Returning cached connection to {}: {}", address, cachedClient); | ||
| return cachedClient; | ||
| } else { | ||
| logger.info("Found inactive connection to {}, closing it.", address); | ||
| connectionPool.remove(address, cachedClient); // Remove inactive clients. | ||
|
|
||
| // Create the ClientPool if we don't have it yet. | ||
| ClientPool clientPool = connectionPool.get(address); | ||
| if (clientPool == null) { | ||
| connectionPool.putIfAbsent(address, new ClientPool()); | ||
| clientPool = connectionPool.get(address); | ||
| } | ||
|
|
||
| int clientIndex = rand.nextInt(numConnectionsPerPeer); | ||
| TransportClient cachedClient = clientPool.clients[clientIndex]; | ||
|
|
||
| if (cachedClient != null && cachedClient.isActive()) { | ||
| logger.trace("Returning cached connection to {}: {}", address, cachedClient); | ||
| return cachedClient; | ||
| } | ||
|
|
||
| // If we reach here, we don't have an existing connection open. Let's create a new one. | ||
| // Multiple threads might race here to create new connections. Keep only one of them active. | ||
| synchronized (clientPool.locks[clientIndex]) { | ||
| cachedClient = clientPool.clients[clientIndex]; | ||
|
|
||
| if (cachedClient != null) { | ||
| if (cachedClient.isActive()) { | ||
| logger.trace("Returning cached connection to {}: {}", address, cachedClient); | ||
| return cachedClient; | ||
| } else { | ||
| logger.info("Found inactive connection to {}, creating a new one.", address); | ||
| } | ||
| } | ||
| clientPool.clients[clientIndex] = createClient(address); | ||
| return clientPool.clients[clientIndex]; | ||
| } | ||
| } | ||
|
|
||
| /** Create a completely new {@link TransportClient} to the remote address. */ | ||
| private TransportClient createClient(InetSocketAddress address) throws IOException { | ||
| logger.debug("Creating new connection to " + address); | ||
|
|
||
| Bootstrap bootstrap = new Bootstrap(); | ||
| bootstrap.group(workerGroup) | ||
| .channel(socketChannelClass) | ||
| // Disable Nagle's Algorithm since we don't want packets to wait | ||
| // Disable Nagle's Algorithm since we don't want packets to wait | ||
| .option(ChannelOption.TCP_NODELAY, true) | ||
| .option(ChannelOption.SO_KEEPALIVE, true) | ||
| .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()) | ||
|
|
@@ -130,7 +177,7 @@ public void initChannel(SocketChannel ch) { | |
| }); | ||
|
|
||
| // Connect to the remote server | ||
| long preConnect = System.currentTimeMillis(); | ||
| long preConnect = System.nanoTime(); | ||
| ChannelFuture cf = bootstrap.connect(address); | ||
| if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) { | ||
| throw new IOException( | ||
|
|
@@ -143,43 +190,41 @@ public void initChannel(SocketChannel ch) { | |
| assert client != null : "Channel future completed successfully with null client"; | ||
|
|
||
| // Execute any client bootstraps synchronously before marking the Client as successful. | ||
| long preBootstrap = System.currentTimeMillis(); | ||
| long preBootstrap = System.nanoTime(); | ||
| logger.debug("Connection to {} successful, running bootstraps...", address); | ||
| try { | ||
| for (TransportClientBootstrap clientBootstrap : clientBootstraps) { | ||
| clientBootstrap.doBootstrap(client); | ||
| } | ||
| } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala | ||
| long bootstrapTime = System.currentTimeMillis() - preBootstrap; | ||
| logger.error("Exception while bootstrapping client after " + bootstrapTime + " ms", e); | ||
| long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000; | ||
| logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e); | ||
| client.close(); | ||
| throw Throwables.propagate(e); | ||
| } | ||
| long postBootstrap = System.currentTimeMillis(); | ||
|
|
||
| // Successful connection & bootstrap -- in the event that two threads raced to create a client, | ||
| // use the first one that was put into the connectionPool and close the one we made here. | ||
| TransportClient oldClient = connectionPool.putIfAbsent(address, client); | ||
| if (oldClient == null) { | ||
| logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)", | ||
| address, postBootstrap - preConnect, postBootstrap - preBootstrap); | ||
| return client; | ||
| } else { | ||
| logger.debug("Two clients were created concurrently after {} ms, second will be disposed.", | ||
| postBootstrap - preConnect); | ||
| client.close(); | ||
| return oldClient; | ||
| } | ||
| long postBootstrap = System.nanoTime(); | ||
|
|
||
| logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)", | ||
| address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000); | ||
|
|
||
| return client; | ||
| } | ||
|
|
||
| /** Close all connections in the connection pool, and shutdown the worker thread pool. */ | ||
| @Override | ||
| public void close() { | ||
| for (TransportClient client : connectionPool.values()) { | ||
| try { | ||
| client.close(); | ||
| } catch (RuntimeException e) { | ||
| logger.warn("Ignoring exception during close", e); | ||
| // Go through all clients and close them if they are active. | ||
| for (ClientPool clientPool : connectionPool.values()) { | ||
| for (int i = 0; i < clientPool.clients.length; i++) { | ||
| TransportClient client = clientPool.clients[i]; | ||
| if (client != null) { | ||
| clientPool.clients[i] = null; | ||
| try { | ||
| client.close(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Q: Can we use JavaUtils.closeQuietly(client) here? |
||
| } catch (RuntimeException e) { | ||
| logger.warn("Ignoring exception during close", e); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| connectionPool.clear(); | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,11 @@ | |
| package org.apache.spark.network; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.concurrent.TimeoutException; | ||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.NoSuchElementException; | ||
| import java.util.Set; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| import org.junit.After; | ||
| import org.junit.Before; | ||
|
|
@@ -32,6 +36,7 @@ | |
| import org.apache.spark.network.server.NoOpRpcHandler; | ||
| import org.apache.spark.network.server.RpcHandler; | ||
| import org.apache.spark.network.server.TransportServer; | ||
| import org.apache.spark.network.util.ConfigProvider; | ||
| import org.apache.spark.network.util.JavaUtils; | ||
| import org.apache.spark.network.util.SystemPropertyConfigProvider; | ||
| import org.apache.spark.network.util.TransportConf; | ||
|
|
@@ -57,16 +62,113 @@ public void tearDown() { | |
| JavaUtils.closeQuietly(server2); | ||
| } | ||
|
|
||
| /** | ||
| * Request a bunch of clients to a single server to test | ||
| * we create up to maxConnections of clients. | ||
| */ | ||
| private void testClientReuse(final int maxConnections) throws IOException { | ||
| TransportConf conf = new TransportConf(new ConfigProvider() { | ||
| @Override | ||
| public String get(String name) { | ||
| if (name.equals("spark.shuffle.io.numConnectionsPerPeer")) { | ||
| return Integer.toString(maxConnections); | ||
| } else { | ||
| throw new NoSuchElementException(); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| RpcHandler rpcHandler = new NoOpRpcHandler(); | ||
| TransportContext context = new TransportContext(conf, rpcHandler); | ||
| TransportClientFactory factory = context.createClientFactory(); | ||
| HashSet<TransportClient> clients = new HashSet<TransportClient>(); | ||
| for (int i = 0; i < maxConnections * 10; i++) { | ||
| TransportClient client = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); | ||
| assert(client.isActive()); | ||
| clients.add(client); | ||
| } | ||
|
|
||
| assert(clients.size() == maxConnections); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems like we should close the clients afterwards |
||
| } | ||
|
|
||
| /** | ||
| * Request a bunch of clients to a single server to test | ||
| * we create up to maxConnections of clients. This is a parallel | ||
| * version of testClientReuse. | ||
| */ | ||
| private void testClientReuseConcurrent(final int maxConnections) | ||
| throws IOException, InterruptedException { | ||
| TransportConf conf = new TransportConf(new ConfigProvider() { | ||
| @Override | ||
| public String get(String name) { | ||
| if (name.equals("spark.shuffle.io.numConnectionsPerPeer")) { | ||
| return Integer.toString(maxConnections); | ||
| } else { | ||
| throw new NoSuchElementException(); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| RpcHandler rpcHandler = new NoOpRpcHandler(); | ||
| TransportContext context = new TransportContext(conf, rpcHandler); | ||
| final TransportClientFactory factory = context.createClientFactory(); | ||
| final Set<TransportClient> clients = Collections.synchronizedSet( | ||
| new HashSet<TransportClient>()); | ||
|
|
||
| final AtomicInteger failed = new AtomicInteger(); | ||
| Thread[] attempts = new Thread[maxConnections * 10]; | ||
|
|
||
| // Launch a bunch of threads to create new clients. | ||
| for (int i = 0; i < attempts.length; i++) { | ||
| attempts[i] = new Thread() { | ||
| @Override | ||
| public void run() { | ||
| try { | ||
| TransportClient client = | ||
| factory.createClient(TestUtils.getLocalHost(), server1.getPort()); | ||
| assert (client.isActive()); | ||
| clients.add(client); | ||
| } catch (IOException e) { | ||
| failed.incrementAndGet(); | ||
| } | ||
| } | ||
| }; | ||
| attempts[i].run(); | ||
| } | ||
|
|
||
| // Wait until all the threads complete. | ||
| for (int i = 0; i < attempts.length; i++) { | ||
| attempts[i].join(); | ||
| } | ||
|
|
||
| assert(failed.get() == 0); | ||
| assert(clients.size() == maxConnections); | ||
| } | ||
|
|
||
| @Test | ||
| public void reuseClientsUpToConfigVariable() throws IOException { | ||
| testClientReuse(1); | ||
| testClientReuse(2); | ||
| testClientReuse(3); | ||
| testClientReuse(4); | ||
| } | ||
|
|
||
| @Test | ||
| public void createAndReuseBlockClients() throws IOException { | ||
| public void reuseClientsUpToConfigVariableConcurrent() throws Exception { | ||
| testClientReuseConcurrent(1); | ||
| testClientReuseConcurrent(2); | ||
| testClientReuseConcurrent(3); | ||
| testClientReuseConcurrent(4); | ||
| } | ||
|
|
||
| @Test | ||
| public void returnDifferentClientsForDifferentServers() throws IOException { | ||
| TransportClientFactory factory = context.createClientFactory(); | ||
| TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); | ||
| TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); | ||
| TransportClient c3 = factory.createClient(TestUtils.getLocalHost(), server2.getPort()); | ||
| TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort()); | ||
| assertTrue(c1.isActive()); | ||
| assertTrue(c3.isActive()); | ||
| assertTrue(c1 == c2); | ||
| assertTrue(c1 != c3); | ||
| assertTrue(c2.isActive()); | ||
| assertTrue(c1 != c2); | ||
| factory.close(); | ||
| } | ||
|
|
||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make this a private static class if we make this a constructor parameter.