@@ -98,7 +98,8 @@ public class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
9898 protected final Object topologyUpdated = new Object ();
9999 protected final AtomicBoolean requestToUpdateTopology = new AtomicBoolean (false );
100100 protected final AtomicLong ignoreNewTopologyRequestsEndTimeNano = new AtomicLong (-1 );
101- protected final ConcurrentHashMap <String /* host */ , Thread > nodeThreads = new ConcurrentHashMap <>();
101+ protected final ConcurrentHashMap <String , Boolean > submittedNodes = new ConcurrentHashMap <>();
102+ protected ExecutorService nodeExecutorService = null ;
102103 protected final AtomicBoolean nodeThreadsStop = new AtomicBoolean (false );
103104 protected final AtomicReference <Connection > nodeThreadsWriterConnection = new AtomicReference <>(null );
104105 protected final AtomicReference <HostSpec > nodeThreadsWriterHostSpec = new AtomicReference <>(null );
@@ -277,8 +278,6 @@ public void close() throws Exception {
277278 if (!this .monitorExecutor .awaitTermination (30 , TimeUnit .SECONDS )) {
278279 this .monitorExecutor .shutdownNow ();
279280 }
280-
281- this .nodeThreads .clear ();
282281 }
283282
284283 @ Override
@@ -292,7 +291,7 @@ public void run() {
292291
293292 if (this .isInPanicMode ()) {
294293
295- if (this .nodeThreads .isEmpty ()) {
294+ if (this .submittedNodes .isEmpty ()) {
296295 LOGGER .finest (Messages .get ("ClusterTopologyMonitorImpl.startingNodeMonitoringThreads" ));
297296
298297 // start node threads
@@ -308,15 +307,19 @@ public void run() {
308307 hosts = this .openAnyConnectionAndUpdateTopology ();
309308 }
310309
310+ this .shutdownNodeExecutorService ();
311+ this .nodeExecutorService = this .createNodeExecutorService ();
312+
311313 if (hosts != null && !this .isVerifiedWriterConnection ) {
312314 for (HostSpec hostSpec : hosts ) {
313- this .nodeThreads .computeIfAbsent (hostSpec .getHost (),
315+ this .submittedNodes .computeIfAbsent (hostSpec .getHost (),
314316 (key ) -> {
315- final Thread thread = this .getNodeMonitoringThread ( hostSpec , this . writerHostSpec . get ());
316- thread . start ( );
317- return thread ;
317+ this .nodeExecutorService . submit (
318+ this . getNodeMonitoringWorker ( hostSpec , this . writerHostSpec . get ()) );
319+ return true ;
318320 });
319321 }
322+ // It's not possible to call shutdown() on this.nodeExecutorService since more node may be added later.
320323 }
321324 // otherwise let's try it again the next round
322325
@@ -345,10 +348,8 @@ public void run() {
345348 }
346349
347350 this .nodeThreadsStop .set (true );
348- for (Thread thread : this .nodeThreads .values ()) {
349- thread .interrupt ();
350- }
351- this .nodeThreads .clear ();
351+ this .shutdownNodeExecutorService ();
352+ this .submittedNodes .clear ();
352353
353354 continue ;
354355
@@ -357,13 +358,14 @@ public void run() {
357358 List <HostSpec > hosts = this .nodeThreadsLatestTopology .get ();
358359 if (hosts != null && !this .nodeThreadsStop .get ()) {
359360 for (HostSpec hostSpec : hosts ) {
360- this .nodeThreads .computeIfAbsent (hostSpec .getHost (),
361+ this .submittedNodes .computeIfAbsent (hostSpec .getHost (),
361362 (key ) -> {
362- final Thread thread = this .getNodeMonitoringThread ( hostSpec , this . writerHostSpec . get ());
363- thread . start ( );
364- return thread ;
363+ this .nodeExecutorService . submit (
364+ this . getNodeMonitoringWorker ( hostSpec , this . writerHostSpec . get ()) );
365+ return true ;
365366 });
366367 }
368+ // It's not possible to call shutdown() on this.nodeExecutorService since more node may be added later.
367369 }
368370 }
369371 }
@@ -373,12 +375,9 @@ public void run() {
373375 } else {
374376 // regular mode (not panic mode)
375377
376- if (!this .nodeThreads .isEmpty ()) {
377- // stop node threads
378- for (Thread thread : this .nodeThreads .values ()) {
379- thread .interrupt ();
380- }
381- this .nodeThreads .clear ();
378+ if (!this .submittedNodes .isEmpty ()) {
379+ this .shutdownNodeExecutorService ();
380+ this .submittedNodes .clear ();
382381 }
383382
384383 final List <HostSpec > hosts = this .fetchTopologyAndUpdateCache (this .monitoringConnection .get ());
@@ -427,6 +426,7 @@ public void run() {
427426
428427 } finally {
429428 this .stop .set (true );
429+ this .shutdownNodeExecutorService ();
430430
431431 final Connection conn = this .monitoringConnection .get ();
432432 this .monitoringConnection .set (null );
@@ -438,13 +438,43 @@ public void run() {
438438 }
439439 }
440440
441+ protected void shutdownNodeExecutorService () {
442+ if (this .nodeExecutorService != null ) {
443+
444+ if (!this .nodeExecutorService .isShutdown ()) {
445+ this .nodeExecutorService .shutdown ();
446+ }
447+
448+ try {
449+ if (!this .nodeExecutorService .awaitTermination (30 , TimeUnit .SECONDS )) {
450+ this .nodeExecutorService .shutdownNow ();
451+ }
452+ } catch (InterruptedException e ) {
453+ // do nothing
454+ }
455+
456+ this .nodeExecutorService = null ;
457+ }
458+ }
459+
460+ protected ExecutorService createNodeExecutorService () {
461+ return Executors .newCachedThreadPool (runnableTarget -> {
462+ final Thread monitoringThread = new Thread (runnableTarget );
463+ monitoringThread .setDaemon (true );
464+ if (!StringUtils .isNullOrEmpty (monitoringThread .getName ())) {
465+ monitoringThread .setName (monitoringThread .getName () + "-nm" );
466+ }
467+ return monitoringThread ;
468+ });
469+ }
470+
441471 protected boolean isInPanicMode () {
442472 return this .monitoringConnection .get () == null
443473 || !this .isVerifiedWriterConnection ;
444474 }
445475
446- protected Thread getNodeMonitoringThread (final HostSpec hostSpec , final @ Nullable HostSpec writerHostSpec ) {
447- return new NodeMonitoringThread (this , hostSpec , writerHostSpec );
476+ protected Runnable getNodeMonitoringWorker (final HostSpec hostSpec , final @ Nullable HostSpec writerHostSpec ) {
477+ return new NodeMonitoringWorker (this , hostSpec , writerHostSpec );
448478 }
449479
450480 protected List <HostSpec > openAnyConnectionAndUpdateTopology () {
@@ -536,7 +566,7 @@ protected String getNodeId(final Connection connection) {
536566 }
537567
538568 protected void closeConnection (final @ Nullable Connection connection ) {
539- this .closeConnection (connection , false );
569+ this .closeConnection (connection , true );
540570 }
541571
542572 protected void closeConnection (final @ Nullable Connection connection , final boolean unstableConnection ) {
@@ -756,27 +786,23 @@ protected String getHostEndpoint(final String nodeName) {
756786 return host .replace ("?" , nodeName );
757787 }
758788
759- private static class NodeMonitoringThread extends Thread {
789+ private static class NodeMonitoringWorker implements Runnable {
760790
761- private static final Logger LOGGER = Logger .getLogger (NodeMonitoringThread .class .getName ());
791+ private static final Logger LOGGER = Logger .getLogger (NodeMonitoringWorker .class .getName ());
762792
763793 protected final ClusterTopologyMonitorImpl monitor ;
764794 protected final HostSpec hostSpec ;
765795 protected final @ Nullable HostSpec writerHostSpec ;
766796 protected boolean writerChanged = false ;
767797
768- public NodeMonitoringThread (
798+ public NodeMonitoringWorker (
769799 final ClusterTopologyMonitorImpl monitor ,
770800 final HostSpec hostSpec ,
771801 final @ Nullable HostSpec writerHostSpec
772802 ) {
773803 this .monitor = monitor ;
774804 this .hostSpec = hostSpec ;
775805 this .writerHostSpec = writerHostSpec ;
776-
777- if (!StringUtils .isNullOrEmpty (this .getName ())) {
778- this .setName (this .getName () + "-nm" );
779- }
780806 }
781807
782808 @ Override
@@ -837,7 +863,7 @@ public void run() {
837863 this .monitor .topologyMap .get (this .monitor .clusterId )));
838864 }
839865
840- // Setting the connection to null here prevents the finally block
866+ // Setting the connection to null here prevents the final block
841867 // from closing nodeThreadsWriterConnection.
842868 connection = null ;
843869 return ;
0 commit comments