3737import java .util .concurrent .atomic .AtomicBoolean ;
3838import java .util .concurrent .atomic .AtomicLong ;
3939import java .util .concurrent .atomic .AtomicReference ;
40+ import java .util .concurrent .locks .ReentrantLock ;
4041import java .util .logging .Level ;
4142import java .util .logging .Logger ;
4243import java .util .stream .Collectors ;
@@ -98,7 +99,9 @@ public class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
9899 protected final Object topologyUpdated = new Object ();
99100 protected final AtomicBoolean requestToUpdateTopology = new AtomicBoolean (false );
100101 protected final AtomicLong ignoreNewTopologyRequestsEndTimeNano = new AtomicLong (-1 );
101- protected final ConcurrentHashMap <String /* host */ , Thread > nodeThreads = new ConcurrentHashMap <>();
102+ protected final ConcurrentHashMap <String , Boolean > submittedNodes = new ConcurrentHashMap <>();
103+ protected ExecutorService nodeExecutorService = null ;
104+ protected final ReentrantLock nodeExecutorLock = new ReentrantLock ();
102105 protected final AtomicBoolean nodeThreadsStop = new AtomicBoolean (false );
103106 protected final AtomicReference <Connection > nodeThreadsWriterConnection = new AtomicReference <>(null );
104107 protected final AtomicReference <HostSpec > nodeThreadsWriterHostSpec = new AtomicReference <>(null );
@@ -266,6 +269,7 @@ protected List<HostSpec> waitTillTopologyGetsUpdated(final long timeoutMs) throw
266269 public void close () throws Exception {
267270 this .stop .set (true );
268271 this .nodeThreadsStop .set (true );
272+ this .shutdownNodeExecutorService ();
269273
270274 // It breaks a waiting/sleeping cycles in monitoring thread
271275 synchronized (this .requestToUpdateTopology ) {
@@ -277,8 +281,6 @@ public void close() throws Exception {
277281 if (!this .monitorExecutor .awaitTermination (30 , TimeUnit .SECONDS )) {
278282 this .monitorExecutor .shutdownNow ();
279283 }
280-
281- this .nodeThreads .clear ();
282284 }
283285
284286 @ Override
@@ -292,7 +294,7 @@ public void run() {
292294
293295 if (this .isInPanicMode ()) {
294296
295- if (this .nodeThreads .isEmpty ()) {
297+ if (this .submittedNodes .isEmpty ()) {
296298 LOGGER .finest (Messages .get ("ClusterTopologyMonitorImpl.startingNodeMonitoringThreads" ));
297299
298300 // start node threads
@@ -308,15 +310,19 @@ public void run() {
308310 hosts = this .openAnyConnectionAndUpdateTopology ();
309311 }
310312
313+ this .shutdownNodeExecutorService ();
314+ this .createNodeExecutorService ();
315+
311316 if (hosts != null && !this .isVerifiedWriterConnection ) {
312317 for (HostSpec hostSpec : hosts ) {
313- this .nodeThreads .computeIfAbsent (hostSpec .getHost (),
318+ this .submittedNodes .computeIfAbsent (hostSpec .getHost (),
314319 (key ) -> {
315- final Thread thread = this .getNodeMonitoringThread ( hostSpec , this . writerHostSpec . get ());
316- thread . start ( );
317- return thread ;
320+ this .nodeExecutorService . submit (
321+ this . getNodeMonitoringWorker ( hostSpec , this . writerHostSpec . get ()) );
322+ return true ;
318323 });
319324 }
325+ // It's not possible to call shutdown() on this.nodeExecutorService since more node may be added later.
320326 }
321327 // otherwise let's try it again the next round
322328
@@ -345,10 +351,8 @@ public void run() {
345351 }
346352
347353 this .nodeThreadsStop .set (true );
348- for (Thread thread : this .nodeThreads .values ()) {
349- thread .interrupt ();
350- }
351- this .nodeThreads .clear ();
354+ this .shutdownNodeExecutorService ();
355+ this .submittedNodes .clear ();
352356
353357 continue ;
354358
@@ -357,13 +361,14 @@ public void run() {
357361 List <HostSpec > hosts = this .nodeThreadsLatestTopology .get ();
358362 if (hosts != null && !this .nodeThreadsStop .get ()) {
359363 for (HostSpec hostSpec : hosts ) {
360- this .nodeThreads .computeIfAbsent (hostSpec .getHost (),
364+ this .submittedNodes .computeIfAbsent (hostSpec .getHost (),
361365 (key ) -> {
362- final Thread thread = this .getNodeMonitoringThread ( hostSpec , this . writerHostSpec . get ());
363- thread . start ( );
364- return thread ;
366+ this .nodeExecutorService . submit (
367+ this . getNodeMonitoringWorker ( hostSpec , this . writerHostSpec . get ()) );
368+ return true ;
365369 });
366370 }
371+ // It's not possible to call shutdown() on this.nodeExecutorService since more node may be added later.
367372 }
368373 }
369374 }
@@ -373,12 +378,9 @@ public void run() {
373378 } else {
374379 // regular mode (not panic mode)
375380
376- if (!this .nodeThreads .isEmpty ()) {
377- // stop node threads
378- for (Thread thread : this .nodeThreads .values ()) {
379- thread .interrupt ();
380- }
381- this .nodeThreads .clear ();
381+ if (!this .submittedNodes .isEmpty ()) {
382+ this .shutdownNodeExecutorService ();
383+ this .submittedNodes .clear ();
382384 }
383385
384386 final List <HostSpec > hosts = this .fetchTopologyAndUpdateCache (this .monitoringConnection .get ());
@@ -427,6 +429,7 @@ public void run() {
427429
428430 } finally {
429431 this .stop .set (true );
432+ this .shutdownNodeExecutorService ();
430433
431434 final Connection conn = this .monitoringConnection .get ();
432435 this .monitoringConnection .set (null );
@@ -438,13 +441,58 @@ public void run() {
438441 }
439442 }
440443
444+ protected void shutdownNodeExecutorService () {
445+ if (this .nodeExecutorService != null ) {
446+
447+ this .nodeExecutorLock .lock ();
448+ try {
449+
450+ if (this .nodeExecutorService == null ) {
451+ return ;
452+ }
453+
454+ if (!this .nodeExecutorService .isShutdown ()) {
455+ this .nodeExecutorService .shutdown ();
456+ }
457+
458+ try {
459+ if (!this .nodeExecutorService .awaitTermination (30 , TimeUnit .SECONDS )) {
460+ this .nodeExecutorService .shutdownNow ();
461+ }
462+ } catch (InterruptedException e ) {
463+ // do nothing
464+ }
465+
466+ this .nodeExecutorService = null ;
467+ } finally {
468+ this .nodeExecutorLock .unlock ();
469+ }
470+ }
471+ }
472+
473+ protected void createNodeExecutorService () {
474+ this .nodeExecutorLock .lock ();
475+ try {
476+ this .nodeExecutorService = Executors .newCachedThreadPool (runnableTarget -> {
477+ final Thread monitoringThread = new Thread (runnableTarget );
478+ monitoringThread .setDaemon (true );
479+ if (!StringUtils .isNullOrEmpty (monitoringThread .getName ())) {
480+ monitoringThread .setName (monitoringThread .getName () + "-nm" );
481+ }
482+ return monitoringThread ;
483+ });
484+ } finally {
485+ this .nodeExecutorLock .unlock ();
486+ }
487+ }
488+
441489 protected boolean isInPanicMode () {
442490 return this .monitoringConnection .get () == null
443491 || !this .isVerifiedWriterConnection ;
444492 }
445493
446- protected Thread getNodeMonitoringThread (final HostSpec hostSpec , final @ Nullable HostSpec writerHostSpec ) {
447- return new NodeMonitoringThread (this , hostSpec , writerHostSpec );
494+ protected Runnable getNodeMonitoringWorker (final HostSpec hostSpec , final @ Nullable HostSpec writerHostSpec ) {
495+ return new NodeMonitoringWorker (this , hostSpec , writerHostSpec );
448496 }
449497
450498 protected List <HostSpec > openAnyConnectionAndUpdateTopology () {
@@ -536,7 +584,7 @@ protected String getNodeId(final Connection connection) {
536584 }
537585
538586 protected void closeConnection (final @ Nullable Connection connection ) {
539- this .closeConnection (connection , false );
587+ this .closeConnection (connection , true );
540588 }
541589
542590 protected void closeConnection (final @ Nullable Connection connection , final boolean unstableConnection ) {
@@ -756,27 +804,23 @@ protected String getHostEndpoint(final String nodeName) {
756804 return host .replace ("?" , nodeName );
757805 }
758806
759- private static class NodeMonitoringThread extends Thread {
807+ private static class NodeMonitoringWorker implements Runnable {
760808
761- private static final Logger LOGGER = Logger .getLogger (NodeMonitoringThread .class .getName ());
809+ private static final Logger LOGGER = Logger .getLogger (NodeMonitoringWorker .class .getName ());
762810
763811 protected final ClusterTopologyMonitorImpl monitor ;
764812 protected final HostSpec hostSpec ;
765813 protected final @ Nullable HostSpec writerHostSpec ;
766814 protected boolean writerChanged = false ;
767815
768- public NodeMonitoringThread (
816+ public NodeMonitoringWorker (
769817 final ClusterTopologyMonitorImpl monitor ,
770818 final HostSpec hostSpec ,
771819 final @ Nullable HostSpec writerHostSpec
772820 ) {
773821 this .monitor = monitor ;
774822 this .hostSpec = hostSpec ;
775823 this .writerHostSpec = writerHostSpec ;
776-
777- if (!StringUtils .isNullOrEmpty (this .getName ())) {
778- this .setName (this .getName () + "-nm" );
779- }
780824 }
781825
782826 @ Override
@@ -837,7 +881,7 @@ public void run() {
837881 this .monitor .topologyMap .get (this .monitor .clusterId )));
838882 }
839883
840- // Setting the connection to null here prevents the finally block
884+ // Setting the connection to null here prevents the final block
841885 // from closing nodeThreadsWriterConnection.
842886 connection = null ;
843887 return ;
0 commit comments