3232import java .util .Properties ;
3333import java .util .Set ;
3434import java .util .UUID ;
35+ import java .util .concurrent .TimeUnit ;
3536import java .util .concurrent .locks .ReentrantLock ;
3637import java .util .logging .Logger ;
3738import org .checkerframework .checker .nullness .qual .NonNull ;
3839import org .checkerframework .checker .nullness .qual .Nullable ;
3940import software .amazon .jdbc .AwsWrapperProperty ;
40- import software .amazon .jdbc .HostListProvider ;
4141import software .amazon .jdbc .HostListProviderService ;
4242import software .amazon .jdbc .HostRole ;
4343import software .amazon .jdbc .HostSpec ;
44+ import software .amazon .jdbc .util .CacheMap ;
4445import software .amazon .jdbc .util .ConnectionUrlParser ;
45- import software .amazon .jdbc .util .ExpiringCache ;
4646import software .amazon .jdbc .util .Messages ;
4747import software .amazon .jdbc .util .RdsUrlType ;
4848import software .amazon .jdbc .util .RdsUtils ;
@@ -86,7 +86,6 @@ public class AuroraHostListProvider implements DynamicHostListProvider {
8686 // filter out nodes that haven't been updated in the last 5 minutes
8787 + "WHERE time_to_sec(timediff(now(), LAST_UPDATE_TIMESTAMP)) <= 300 OR SESSION_ID = 'MASTER_SESSION_ID' "
8888 + "ORDER BY LAST_UPDATE_TIMESTAMP" ;
89- static final int DEFAULT_CACHE_EXPIRE_MS = 5 * 60 * 1000 ; // 5 min
9089 static final String WRITER_SESSION_ID = "MASTER_SESSION_ID" ;
9190 static final String FIELD_SERVER_ID = "SERVER_ID" ;
9291 static final String FIELD_SESSION_ID = "SESSION_ID" ;
@@ -95,18 +94,15 @@ public class AuroraHostListProvider implements DynamicHostListProvider {
9594 private RdsUrlType rdsUrlType ;
9695 private final RdsUtils rdsHelper ;
9796
98- private int refreshRateInMilliseconds = Integer .parseInt (
99- CLUSTER_TOPOLOGY_REFRESH_RATE_MS .defaultValue != null
100- ? CLUSTER_TOPOLOGY_REFRESH_RATE_MS .defaultValue
101- : "30000" );
97+ private long refreshRateNano = CLUSTER_TOPOLOGY_REFRESH_RATE_MS .defaultValue != null
98+ ? TimeUnit .MILLISECONDS .toNanos (Long .parseLong (CLUSTER_TOPOLOGY_REFRESH_RATE_MS .defaultValue ))
99+ : TimeUnit .MILLISECONDS .toNanos (30000 );
102100 private List <HostSpec > hostList = new ArrayList <>();
103101 private List <HostSpec > lastReturnedHostList ;
104102 private List <HostSpec > initialHostList = new ArrayList <>();
105103 private HostSpec initialHostSpec ;
106104
107- protected static final ExpiringCache <String , ClusterTopologyInfo > topologyCache =
108- new ExpiringCache <>(DEFAULT_CACHE_EXPIRE_MS , getOnEvict ());
109- private static final Object cacheLock = new Object ();
105+ public static final CacheMap <String , ClusterTopologyInfo > topologyCache = new CacheMap <>();
110106
111107 private static final String PG_DRIVER_PROTOCOL = "postgresql" ;
112108 private final String retrieveTopologyQuery ;
@@ -152,13 +148,6 @@ public AuroraHostListProvider(
152148 }
153149 }
154150
155- private static ExpiringCache .OnEvictRunnable <ClusterTopologyInfo > getOnEvict () {
156- return (ClusterTopologyInfo evictedEntry ) -> {
157- LOGGER .finest (() -> "Entry with clusterId '" + evictedEntry .clusterId
158- + "' has been evicted from the topology cache." );
159- };
160- }
161-
162151 protected void init () throws SQLException {
163152 if (this .isInitialized ) {
164153 return ;
@@ -182,7 +171,8 @@ protected void init() throws SQLException {
182171
183172 this .clusterId = UUID .randomUUID ().toString ();
184173 this .isPrimaryClusterId = false ;
185- this .refreshRateInMilliseconds = CLUSTER_TOPOLOGY_REFRESH_RATE_MS .getInteger (properties );
174+ this .refreshRateNano =
175+ TimeUnit .MILLISECONDS .toNanos (CLUSTER_TOPOLOGY_REFRESH_RATE_MS .getInteger (properties ));
186176 this .clusterInstanceTemplate =
187177 CLUSTER_INSTANCE_HOST_PATTERN .getString (this .properties ) == null
188178 ? new HostSpec (rdsHelper .getRdsInstanceHostPattern (originalUrl ))
@@ -226,7 +216,7 @@ protected void init() throws SQLException {
226216 /**
227217 * Get cluster topology. It may require an extra call to database to fetch the latest topology. A
228218 * cached copy of topology is returned if it's not yet outdated (controlled by {@link
229- * #refreshRateInMilliseconds }).
219+ * #refreshRateNano }).
230220 *
231221 * @param conn A connection to database to fetch the latest topology, if needed.
232222 * @param forceUpdate If true, it forces a service to ignore cached copy of topology and to fetch
@@ -297,7 +287,7 @@ public FetchTopologyResult getTopology(final Connection conn, final boolean forc
297287 }
298288
299289 private ClusterSuggestedResult getSuggestedClusterId (final String url ) {
300- for (Entry <String , ClusterTopologyInfo > entry : topologyCache .entrySet ()) {
290+ for (Entry <String , ClusterTopologyInfo > entry : topologyCache .getEntries (). entrySet ()) {
301291 final String key = entry .getKey ();
302292 final ClusterTopologyInfo value = entry .getValue ();
303293 if (key .equals (url )) {
@@ -327,24 +317,21 @@ protected void suggestPrimaryCluster(final @NonNull ClusterTopologyInfo primaryC
327317 primaryClusterHostUrls .add (hostSpec .getUrl ());
328318 }
329319
330- for (Entry <String , ClusterTopologyInfo > entry : topologyCache .entrySet ()) {
331- if (entry .getValue ().isPrimaryCluster
332- || !StringUtils .isNullOrEmpty (entry .getValue ().suggestedPrimaryClusterId )
333- || Utils .isNullOrEmpty (entry .getValue ().hosts )) {
320+ for (Entry <String , ClusterTopologyInfo > entry : topologyCache .getEntries ().entrySet ()) {
321+ ClusterTopologyInfo clusterTopologyInfo = entry .getValue ();
322+ if (clusterTopologyInfo .isPrimaryCluster
323+ || !StringUtils .isNullOrEmpty (clusterTopologyInfo .suggestedPrimaryClusterId )
324+ || Utils .isNullOrEmpty (clusterTopologyInfo .hosts )) {
334325 continue ;
335326 }
336327
337328 // The entry is non-primary
338- for (HostSpec host : entry . getValue () .hosts ) {
329+ for (HostSpec host : clusterTopologyInfo .hosts ) {
339330 if (primaryClusterHostUrls .contains (host .getUrl ())) {
340331 // Instance on this cluster matches with one of the instance on primary cluster
341332 // Suggest the primary clusterId to this entry
342- try {
343- topologyCache .getLock ().lock ();
344- entry .getValue ().suggestedPrimaryClusterId = primaryClusterTopologyInfo .clusterId ;
345- } finally {
346- topologyCache .getLock ().unlock ();
347- }
333+ clusterTopologyInfo .suggestedPrimaryClusterId = primaryClusterTopologyInfo .clusterId ;
334+ topologyCache .put (entry .getKey (), clusterTopologyInfo , this .refreshRateNano );
348335 break ;
349336 }
350337 }
@@ -354,7 +341,7 @@ protected void suggestPrimaryCluster(final @NonNull ClusterTopologyInfo primaryC
354341 private boolean refreshNeeded (final ClusterTopologyInfo info ) {
355342 final Instant lastUpdateTime = info .lastUpdated ;
356343 return info .hosts .isEmpty ()
357- || Duration .between (lastUpdateTime , Instant .now ()).toMillis () > refreshRateInMilliseconds ;
344+ || Duration .between (lastUpdateTime , Instant .now ()).toMillis () > refreshRateNano ;
358345 }
359346
360347 /**
@@ -476,54 +463,37 @@ private ClusterTopologyInfo updateCache(
476463
477464 if (clusterTopologyInfo == null ) {
478465 // Add a new entry to the cache
479- topologyCache .put (latestTopologyInfo .clusterId , latestTopologyInfo );
466+ topologyCache .put (latestTopologyInfo .clusterId , latestTopologyInfo , this . refreshRateNano );
480467 return latestTopologyInfo ;
481-
482468 }
483469
484470 if (clusterTopologyInfo .clusterId .equals (latestTopologyInfo .clusterId )) {
485471 // Updating the same item in the cache
486- try {
487- topologyCache .getLock ().lock ();
488-
489- clusterTopologyInfo .hosts = latestTopologyInfo .hosts ;
490- clusterTopologyInfo .lastUpdated = Instant .now ();
491-
492- // It's not necessary, but it forces the cache to check for entries to evict
493- topologyCache .put (clusterTopologyInfo .clusterId , clusterTopologyInfo );
472+ clusterTopologyInfo .hosts = latestTopologyInfo .hosts ;
473+ clusterTopologyInfo .lastUpdated = Instant .now ();
494474
495- } finally {
496- topologyCache .getLock ().unlock ();
497- }
475+ // It's not necessary, but it forces the cache to check for entries to evict
476+ topologyCache .put (clusterTopologyInfo .clusterId , clusterTopologyInfo , this .refreshRateNano );
498477 return clusterTopologyInfo ;
499-
500478 }
501479
502480 // Update existing entry in the cache
503481 // This instance of AuroraHostListProvider accepts suggested clusterId
504482 // and effectively needs to update the primary entry
505-
506483 ClusterTopologyInfo primaryClusterTopologyInfo = topologyCache .get (this .clusterId );
507-
508484 if (primaryClusterTopologyInfo != null ) {
509- try {
510- topologyCache .getLock ().lock ();
511-
512- primaryClusterTopologyInfo .hosts = latestTopologyInfo .hosts ;
513- primaryClusterTopologyInfo .lastUpdated = Instant .now ();
485+ primaryClusterTopologyInfo .hosts = latestTopologyInfo .hosts ;
486+ primaryClusterTopologyInfo .lastUpdated = Instant .now ();
514487
515- // It's not necessary, but it forces the cache to check for entries to evict
516- topologyCache .put (primaryClusterTopologyInfo .clusterId , primaryClusterTopologyInfo );
517-
518- } finally {
519- topologyCache .getLock ().unlock ();
520- }
488+ // It's not necessary, but it forces the cache to check for entries to evict
489+ topologyCache .put (primaryClusterTopologyInfo .clusterId , primaryClusterTopologyInfo ,
490+ this .refreshRateNano );
521491 return primaryClusterTopologyInfo ;
522492 }
523493
524494 // That's suspicious path. Primary entry doesn't exist.
525495 // Let's create it.
526- topologyCache .put (latestTopologyInfo .clusterId , latestTopologyInfo );
496+ topologyCache .put (latestTopologyInfo .clusterId , latestTopologyInfo , this . refreshRateNano );
527497 return latestTopologyInfo ;
528498 }
529499
@@ -538,35 +508,18 @@ private ClusterTopologyInfo updateCache(
538508 return info == null || refreshNeeded (info ) ? null : info .hosts ;
539509 }
540510
541- /**
542- * Check if cached topology belongs to multi-writer cluster.
543- *
544- * @return True, if it's multi-writer cluster.
545- */
546- public boolean isMultiWriterCluster () {
547- synchronized (cacheLock ) {
548- ClusterTopologyInfo clusterTopologyInfo = topologyCache .get (this .clusterId );
549- return (clusterTopologyInfo != null
550- && clusterTopologyInfo .isMultiWriterCluster );
551- }
552- }
553-
554511 /**
555512 * Clear topology cache for all clusters.
556513 */
557514 public void clearAll () {
558- synchronized (cacheLock ) {
559- topologyCache .clear ();
560- }
515+ topologyCache .clear ();
561516 }
562517
563518 /**
564519 * Clear topology cache for the current cluster.
565520 */
566521 public void clear () {
567- synchronized (cacheLock ) {
568- topologyCache .remove (this .clusterId );
569- }
522+ topologyCache .remove (this .clusterId );
570523 }
571524
572525 @ Override
@@ -649,7 +602,7 @@ private void validateHostPatternSetting(String hostPattern) {
649602 public static void logCache () {
650603 LOGGER .finest (() -> {
651604 StringBuilder sb = new StringBuilder ();
652- final Set <Entry <String , ClusterTopologyInfo >> cacheEntries = topologyCache .entrySet ();
605+ final Set <Entry <String , ClusterTopologyInfo >> cacheEntries = topologyCache .getEntries (). entrySet ();
653606
654607 if (cacheEntries .isEmpty ()) {
655608 sb .append ("Cache is empty." );
0 commit comments