1818package org .apache .spark .network ;
1919
2020import java .io .IOException ;
21- import java .util .concurrent .TimeoutException ;
21+ import java .util .Collections ;
22+ import java .util .HashSet ;
23+ import java .util .NoSuchElementException ;
24+ import java .util .Set ;
25+ import java .util .concurrent .atomic .AtomicInteger ;
2226
2327import org .junit .After ;
2428import org .junit .Before ;
3236import org .apache .spark .network .server .NoOpRpcHandler ;
3337import org .apache .spark .network .server .RpcHandler ;
3438import org .apache .spark .network .server .TransportServer ;
39+ import org .apache .spark .network .util .ConfigProvider ;
3540import org .apache .spark .network .util .JavaUtils ;
3641import org .apache .spark .network .util .SystemPropertyConfigProvider ;
3742import org .apache .spark .network .util .TransportConf ;
@@ -57,16 +62,113 @@ public void tearDown() {
5762 JavaUtils .closeQuietly (server2 );
5863 }
5964
65+ /**
66+ * Request a bunch of clients to a single server to test
67+ * we create up to maxConnections of clients.
68+ */
69+ private void testClientReuse (final int maxConnections ) throws IOException {
70+ TransportConf conf = new TransportConf (new ConfigProvider () {
71+ @ Override
72+ public String get (String name ) {
73+ if (name .equals ("spark.shuffle.io.numConnectionsPerPeer" )) {
74+ return Integer .toString (maxConnections );
75+ } else {
76+ throw new NoSuchElementException ();
77+ }
78+ }
79+ });
80+
81+ RpcHandler rpcHandler = new NoOpRpcHandler ();
82+ TransportContext context = new TransportContext (conf , rpcHandler );
83+ TransportClientFactory factory = context .createClientFactory ();
84+ HashSet <TransportClient > clients = new HashSet <TransportClient >();
85+ for (int i = 0 ; i < maxConnections * 10 ; i ++) {
86+ TransportClient client = factory .createClient (TestUtils .getLocalHost (), server1 .getPort ());
87+ assert (client .isActive ());
88+ clients .add (client );
89+ }
90+
91+ assert (clients .size () == maxConnections );
92+ }
93+
94+ /**
95+ * Request a bunch of clients to a single server to test
96+ * we create up to maxConnections of clients. This is a parallel
97+ * version of testClientReuse.
98+ */
99+ private void testClientReuseConcurrent (final int maxConnections )
100+ throws IOException , InterruptedException {
101+ TransportConf conf = new TransportConf (new ConfigProvider () {
102+ @ Override
103+ public String get (String name ) {
104+ if (name .equals ("spark.shuffle.io.numConnectionsPerPeer" )) {
105+ return Integer .toString (maxConnections );
106+ } else {
107+ throw new NoSuchElementException ();
108+ }
109+ }
110+ });
111+
112+ RpcHandler rpcHandler = new NoOpRpcHandler ();
113+ TransportContext context = new TransportContext (conf , rpcHandler );
114+ final TransportClientFactory factory = context .createClientFactory ();
115+ final Set <TransportClient > clients = Collections .synchronizedSet (
116+ new HashSet <TransportClient >());
117+
118+ final AtomicInteger failed = new AtomicInteger ();
119+ Thread [] attempts = new Thread [maxConnections * 10 ];
120+
121+ // Launch a bunch of threads to create new clients.
122+ for (int i = 0 ; i < attempts .length ; i ++) {
123+ attempts [i ] = new Thread () {
124+ @ Override
125+ public void run () {
126+ try {
127+ TransportClient client =
128+ factory .createClient (TestUtils .getLocalHost (), server1 .getPort ());
129+ assert (client .isActive ());
130+ clients .add (client );
131+ } catch (IOException e ) {
132+ failed .incrementAndGet ();
133+ }
134+ }
135+ };
136+ attempts [i ].run ();
137+ }
138+
139+ // Wait until all the threads complete.
140+ for (int i = 0 ; i < attempts .length ; i ++) {
141+ attempts [i ].join ();
142+ }
143+
144+ assert (failed .get () == 0 );
145+ assert (clients .size () == maxConnections );
146+ }
147+
148+ @ Test
149+ public void reuseClientsUpToConfigVariable () throws IOException {
150+ testClientReuse (1 );
151+ testClientReuse (2 );
152+ testClientReuse (3 );
153+ testClientReuse (4 );
154+ }
155+
60156 @ Test
61- public void createAndReuseBlockClients () throws IOException {
157+ public void reuseClientsUpToConfigVariableConcurrent () throws Exception {
158+ testClientReuseConcurrent (1 );
159+ testClientReuseConcurrent (2 );
160+ testClientReuseConcurrent (3 );
161+ testClientReuseConcurrent (4 );
162+ }
163+
164+ @ Test
165+ public void returnDifferentClientsForDifferentServers () throws IOException {
62166 TransportClientFactory factory = context .createClientFactory ();
63167 TransportClient c1 = factory .createClient (TestUtils .getLocalHost (), server1 .getPort ());
64- TransportClient c2 = factory .createClient (TestUtils .getLocalHost (), server1 .getPort ());
65- TransportClient c3 = factory .createClient (TestUtils .getLocalHost (), server2 .getPort ());
168+ TransportClient c2 = factory .createClient (TestUtils .getLocalHost (), server2 .getPort ());
66169 assertTrue (c1 .isActive ());
67- assertTrue (c3 .isActive ());
68- assertTrue (c1 == c2 );
69- assertTrue (c1 != c3 );
170+ assertTrue (c2 .isActive ());
171+ assertTrue (c1 != c2 );
70172 factory .close ();
71173 }
72174
0 commit comments