From 7945b4b23a51286464e248f6b785809973284efa Mon Sep 17 00:00:00 2001 From: Karen Chen <64801825+karenc-bq@users.noreply.github.com> Date: Tue, 29 Jul 2025 14:45:42 -0700 Subject: [PATCH] Revert "remove suggested clusterId functionality (#1476)" This reverts commit 3b32ac23 --- .../using-plugins/UsingTheFailover2Plugin.md | 2 +- .../hostlistprovider/RdsHostListProvider.java | 151 +++++++++++++- .../monitoring/ClusterTopologyMonitor.java | 2 + .../ClusterTopologyMonitorImpl.java | 5 + .../MonitoringRdsHostListProvider.java | 30 +++ .../RdsHostListProviderTest.java | 185 +++++++++++++++++- .../RdsMultiAzDbClusterListProviderTest.java | 171 +++++++++++++++- 7 files changed, 525 insertions(+), 21 deletions(-) diff --git a/docs/using-the-jdbc-driver/using-plugins/UsingTheFailover2Plugin.md b/docs/using-the-jdbc-driver/using-plugins/UsingTheFailover2Plugin.md index c94f1204c..87b3a25c8 100644 --- a/docs/using-the-jdbc-driver/using-plugins/UsingTheFailover2Plugin.md +++ b/docs/using-the-jdbc-driver/using-plugins/UsingTheFailover2Plugin.md @@ -62,7 +62,7 @@ In addition to the parameters that you can configure for the underlying driver, | `failoverTimeoutMs` | Integer | No | Maximum allowed time in milliseconds to attempt reconnecting to a new writer or reader instance after a cluster failover is initiated. | `300000` | | `clusterTopologyHighRefreshRateMs` | Integer | No | Interval of time in milliseconds to wait between attempts to update cluster topology after the writer has come back online following a failover event. It corresponds to the increased monitoring rate described earlier. Usually, the topology monitoring component uses this increased monitoring rate for 30s after a new writer was detected. | `100` | | `failoverReaderHostSelectorStrategy` | String | No | Strategy used to select a reader node during failover. For more information on the available reader selection strategies, see this [table](../ReaderSelectionStrategies.md). | `random` | -| `clusterId` | String | If using multiple database clusters, yes; otherwise, no | A unique identifier for the cluster. Connections with the same cluster id share a cluster topology cache. This parameter is optional and defaults to `1`. When supporting multiple database clusters, this parameter becomes mandatory. Each connection string must include the `clusterId` parameter with a value that can be any number or string. However, all connection strings associated with the same database cluster must use identical `clusterId` values, while connection strings belonging to different database clusters must specify distinct values. Examples of value: `1`, `2`, `1234`, `abc-1`, `abc-2`. | `1` | +| `clusterId` | String | No | A unique identifier for the cluster. Connections with the same cluster id share a cluster topology cache. | None | | `telemetryFailoverAdditionalTopTrace` | Boolean | No | Allows the driver to produce an additional telemetry span associated with failover. Such span helps to facilitate telemetry analysis in AWS CloudWatch. | `false` | | `skipFailoverOnInterruptedThread` | Boolean | No | Enable to skip failover if the current thread is interrupted. This may leave the Connection in an invalid state so the Connection should be disposed. | `false` | diff --git a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/RdsHostListProvider.java b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/RdsHostListProvider.java index 05036eda8..e1ef01aec 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/RdsHostListProvider.java +++ b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/RdsHostListProvider.java @@ -72,10 +72,10 @@ public class RdsHostListProvider implements DynamicHostListProvider { + "after which it will be updated during the next interaction with the connection."); public static final AwsWrapperProperty CLUSTER_ID = new AwsWrapperProperty( - "clusterId", "1", + "clusterId", "", "A unique identifier for the cluster. " + "Connections with the same cluster id share a cluster topology cache. " - + "If unspecified, a cluster id is '1'."); + + "If unspecified, a cluster id is automatically created for AWS RDS clusters."); public static final AwsWrapperProperty CLUSTER_INSTANCE_HOST_PATTERN = new AwsWrapperProperty( @@ -90,8 +90,11 @@ public class RdsHostListProvider implements DynamicHostListProvider { protected static final RdsUtils rdsHelper = new RdsUtils(); protected static final ConnectionUrlParser connectionUrlParser = new ConnectionUrlParser(); protected static final int defaultTopologyQueryTimeoutMs = 5000; - protected final FullServicesContainer servicesContainer; + protected static final long suggestedClusterIdRefreshRateNano = TimeUnit.MINUTES.toNanos(10); + protected static final CacheMap suggestedPrimaryClusterIdCache = new CacheMap<>(); + protected static final CacheMap primaryClusterIdCache = new CacheMap<>(); + protected final FullServicesContainer servicesContainer; protected final HostListProviderService hostListProviderService; protected final String originalUrl; protected final String topologyQuery; @@ -109,6 +112,10 @@ public class RdsHostListProvider implements DynamicHostListProvider { protected String clusterId; protected HostSpec clusterInstanceTemplate; + // A primary clusterId is a clusterId that is based off of a cluster endpoint URL + // (rather than a GUID or a value provided by the user). + protected boolean isPrimaryClusterId; + protected volatile boolean isInitialized = false; protected Properties properties; @@ -155,7 +162,8 @@ protected void init() throws SQLException { this.initialHostSpec = this.initialHostList.get(0); this.hostListProviderService.setInitialConnectionHostSpec(this.initialHostSpec); - this.clusterId = CLUSTER_ID.getString(this.properties); + this.clusterId = UUID.randomUUID().toString(); + this.isPrimaryClusterId = false; this.refreshRateNano = TimeUnit.MILLISECONDS.toNanos(CLUSTER_TOPOLOGY_REFRESH_RATE_MS.getInteger(properties)); @@ -176,8 +184,34 @@ protected void init() throws SQLException { validateHostPatternSetting(this.clusterInstanceTemplate.getHost()); this.rdsUrlType = rdsHelper.identifyRdsType(this.initialHostSpec.getHost()); - this.isInitialized = true; + final String clusterIdSetting = CLUSTER_ID.getString(this.properties); + if (!StringUtils.isNullOrEmpty(clusterIdSetting)) { + this.clusterId = clusterIdSetting; + } else if (rdsUrlType == RdsUrlType.RDS_PROXY) { + // Each proxy is associated with a single cluster, so it's safe to use RDS Proxy Url as cluster + // identification + this.clusterId = this.initialHostSpec.getUrl(); + } else if (rdsUrlType.isRds()) { + final ClusterSuggestedResult clusterSuggestedResult = + getSuggestedClusterId(this.initialHostSpec.getHostAndPort()); + if (clusterSuggestedResult != null && !StringUtils.isNullOrEmpty(clusterSuggestedResult.clusterId)) { + this.clusterId = clusterSuggestedResult.clusterId; + this.isPrimaryClusterId = clusterSuggestedResult.isPrimaryClusterId; + } else { + final String clusterRdsHostUrl = + rdsHelper.getRdsClusterHostUrl(this.initialHostSpec.getHost()); + if (!StringUtils.isNullOrEmpty(clusterRdsHostUrl)) { + this.clusterId = this.clusterInstanceTemplate.isPortSpecified() + ? String.format("%s:%s", clusterRdsHostUrl, this.clusterInstanceTemplate.getPort()) + : clusterRdsHostUrl; + this.isPrimaryClusterId = true; + primaryClusterIdCache.put(this.clusterId, true, suggestedClusterIdRefreshRateNano); + } + } + } + + this.isInitialized = true; } finally { lock.unlock(); } @@ -198,8 +232,25 @@ protected void init() throws SQLException { protected FetchTopologyResult getTopology(final Connection conn, final boolean forceUpdate) throws SQLException { init(); + final String suggestedPrimaryClusterId = suggestedPrimaryClusterIdCache.get(this.clusterId); + + // Change clusterId by accepting a suggested one + if (!StringUtils.isNullOrEmpty(suggestedPrimaryClusterId) + && !this.clusterId.equals(suggestedPrimaryClusterId)) { + + final String oldClusterId = this.clusterId; + this.clusterId = suggestedPrimaryClusterId; + this.isPrimaryClusterId = true; + this.clusterIdChanged(oldClusterId); + } + final List storedHosts = this.getStoredTopology(); + // This clusterId is a primary one and is about to create a new entry in the cache. + // When a primary entry is created it needs to be suggested for other (non-primary) entries. + // Remember a flag to do suggestion after cache is updated. + final boolean needToSuggest = storedHosts == null && this.isPrimaryClusterId; + if (storedHosts == null || forceUpdate) { // need to re-fetch topology @@ -215,6 +266,9 @@ protected FetchTopologyResult getTopology(final Connection conn, final boolean f if (!Utils.isNullOrEmpty(hosts)) { this.servicesContainer.getStorageService().set(this.clusterId, new Topology(hosts)); + if (needToSuggest) { + this.suggestPrimaryCluster(hosts); + } return new FetchTopologyResult(false, hosts); } } @@ -227,6 +281,78 @@ protected FetchTopologyResult getTopology(final Connection conn, final boolean f } } + protected void clusterIdChanged(final String oldClusterId) throws SQLException { + // do nothing + } + + protected ClusterSuggestedResult getSuggestedClusterId(final String url) { + Map entries = this.servicesContainer.getStorageService().getEntries(Topology.class); + if (entries == null) { + return null; + } + + for (final Entry entry : entries.entrySet()) { + final String key = entry.getKey(); // clusterId + final List hosts = entry.getValue().getHosts(); + final boolean isPrimaryCluster = primaryClusterIdCache.get(key, false, + suggestedClusterIdRefreshRateNano); + if (key.equals(url)) { + return new ClusterSuggestedResult(url, isPrimaryCluster); + } + if (hosts == null) { + continue; + } + for (final HostSpec host : hosts) { + if (host.getHostAndPort().equals(url)) { + LOGGER.finest(() -> Messages.get("RdsHostListProvider.suggestedClusterId", + new Object[] {key, url})); + return new ClusterSuggestedResult(key, isPrimaryCluster); + } + } + } + return null; + } + + protected void suggestPrimaryCluster(final @NonNull List primaryClusterHosts) { + if (Utils.isNullOrEmpty(primaryClusterHosts)) { + return; + } + + final Set primaryClusterHostUrls = new HashSet<>(); + for (final HostSpec hostSpec : primaryClusterHosts) { + primaryClusterHostUrls.add(hostSpec.getUrl()); + } + + Map entries = this.servicesContainer.getStorageService().getEntries(Topology.class); + if (entries == null) { + return; + } + + for (final Entry entry : entries.entrySet()) { + final String clusterId = entry.getKey(); + final List clusterHosts = entry.getValue().getHosts(); + final boolean isPrimaryCluster = primaryClusterIdCache.get(clusterId, false, + suggestedClusterIdRefreshRateNano); + final String suggestedPrimaryClusterId = suggestedPrimaryClusterIdCache.get(clusterId); + if (isPrimaryCluster + || !StringUtils.isNullOrEmpty(suggestedPrimaryClusterId) + || Utils.isNullOrEmpty(clusterHosts)) { + continue; + } + + // The entry is non-primary + for (final HostSpec host : clusterHosts) { + if (primaryClusterHostUrls.contains(host.getUrl())) { + // Instance on this cluster matches with one of the instance on primary cluster + // Suggest the primary clusterId to this entry + suggestedPrimaryClusterIdCache.put(clusterId, this.clusterId, + suggestedClusterIdRefreshRateNano); + break; + } + } + } + } + /** * Obtain a cluster topology from database. * @@ -388,8 +514,8 @@ protected String getHostEndpoint(final String nodeName) { * Clear topology cache for all clusters. */ public static void clearAll() { - // nothing to clear - // TODO: consider to remove + primaryClusterIdCache.clear(); + suggestedPrimaryClusterIdCache.clear(); } /** @@ -548,4 +674,15 @@ public String getClusterId() throws UnsupportedOperationException, SQLException init(); return this.clusterId; } + + public static class ClusterSuggestedResult { + + public String clusterId; + public boolean isPrimaryClusterId; + + public ClusterSuggestedResult(final String clusterId, final boolean isPrimaryClusterId) { + this.clusterId = clusterId; + this.isPrimaryClusterId = isPrimaryClusterId; + } + } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitor.java b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitor.java index 3ec9a9a87..73ea62399 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitor.java +++ b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitor.java @@ -28,6 +28,8 @@ public interface ClusterTopologyMonitor extends Monitor { boolean canDispose(); + void setClusterId(final String clusterId); + List forceRefresh(final boolean writerImportant, final long timeoutMs) throws SQLException, TimeoutException; diff --git a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java index 488c65cc1..08630097d 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java @@ -163,6 +163,11 @@ public boolean canDispose() { return true; } + @Override + public void setClusterId(String clusterId) { + this.clusterId = clusterId; + } + @Override public List forceRefresh(final boolean shouldVerifyWriter, final long timeoutMs) throws SQLException, TimeoutException { diff --git a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/MonitoringRdsHostListProvider.java b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/MonitoringRdsHostListProvider.java index ecd82658f..29aae6ddd 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/MonitoringRdsHostListProvider.java +++ b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/MonitoringRdsHostListProvider.java @@ -122,6 +122,36 @@ protected List queryForTopology(final Connection conn) throws SQLExcep } } + @Override + protected void clusterIdChanged(final String oldClusterId) throws SQLException { + MonitorService monitorService = this.servicesContainer.getMonitorService(); + final ClusterTopologyMonitorImpl existingMonitor = + monitorService.get(ClusterTopologyMonitorImpl.class, oldClusterId); + if (existingMonitor != null) { + this.servicesContainer.getMonitorService().runIfAbsent( + ClusterTopologyMonitorImpl.class, + this.clusterId, + this.servicesContainer.getStorageService(), + this.pluginService.getTelemetryFactory(), + this.originalUrl, + this.pluginService.getDriverProtocol(), + this.pluginService.getTargetDriverDialect(), + this.pluginService.getDialect(), + this.properties, + (connectionService, pluginService) -> existingMonitor); + assert monitorService.get(ClusterTopologyMonitorImpl.class, this.clusterId) == existingMonitor; + existingMonitor.setClusterId(this.clusterId); + monitorService.remove(ClusterTopologyMonitorImpl.class, oldClusterId); + } + + final StorageService storageService = this.servicesContainer.getStorageService(); + final Topology existingTopology = storageService.get(Topology.class, oldClusterId); + final List existingHosts = existingTopology == null ? null : existingTopology.getHosts(); + if (existingHosts != null) { + storageService.set(this.clusterId, new Topology(existingHosts)); + } + } + @Override public List forceRefresh(final boolean shouldVerifyWriter, final long timeoutMs) throws SQLException, TimeoutException { diff --git a/wrapper/src/test/java/software/amazon/jdbc/hostlistprovider/RdsHostListProviderTest.java b/wrapper/src/test/java/software/amazon/jdbc/hostlistprovider/RdsHostListProviderTest.java index f3fc8540d..797d151be 100644 --- a/wrapper/src/test/java/software/amazon/jdbc/hostlistprovider/RdsHostListProviderTest.java +++ b/wrapper/src/test/java/software/amazon/jdbc/hostlistprovider/RdsHostListProviderTest.java @@ -17,16 +17,20 @@ package software.amazon.jdbc.hostlistprovider; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -230,7 +234,7 @@ void testGetCachedTopology_returnStoredTopology() throws SQLException { } @Test - void testTopologyCache() throws SQLException { + void testTopologyCache_NoSuggestedClusterId() throws SQLException { RdsHostListProvider.clearAll(); RdsHostListProvider provider1 = Mockito.spy(getRdsHostListProvider("jdbc:something://cluster-a.domain.com/")); @@ -252,7 +256,8 @@ void testTopologyCache() throws SQLException { assertEquals(topologyClusterA, topologyProvider1); RdsHostListProvider provider2 = Mockito.spy(getRdsHostListProvider("jdbc:something://cluster-b.domain.com/")); - assertNotNull(provider2.getStoredTopology()); + provider2.init(); + assertNull(provider2.getStoredTopology()); final List topologyClusterB = Arrays.asList( new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) @@ -263,15 +268,162 @@ void testTopologyCache() throws SQLException { .host("instance-b-3.domain.com").port(HostSpec.NO_PORT).role(HostRole.READER).build()); doReturn(topologyClusterB).when(provider2).queryForTopology(any(Connection.class)); - List topologyProvider2 = provider2.refresh(mock(Connection.class)); - assertNotEquals(topologyClusterB, topologyProvider2); - - topologyProvider2 = provider2.forceRefresh(mock(Connection.class)); + final List topologyProvider2 = provider2.refresh(mock(Connection.class)); assertEquals(topologyClusterB, topologyProvider2); + assertEquals(2, storageService.size(Topology.class)); + } + + @Test + void testTopologyCache_SuggestedClusterIdForRds() throws SQLException { + RdsHostListProvider.clearAll(); + + RdsHostListProvider provider1 = + Mockito.spy(getRdsHostListProvider("jdbc:something://cluster-a.cluster-xyz.us-east-2.rds.amazonaws.com/")); + provider1.init(); + final List topologyClusterA = Arrays.asList( + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-1.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.WRITER) + .build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-2.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.READER) + .build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-3.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.READER) + .build()); + + doReturn(topologyClusterA).when(provider1).queryForTopology(any(Connection.class)); + + assertEquals(0, storageService.size(Topology.class)); + + final List topologyProvider1 = provider1.refresh(mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider1); + + RdsHostListProvider provider2 = + Mockito.spy(getRdsHostListProvider("jdbc:something://cluster-a.cluster-xyz.us-east-2.rds.amazonaws.com/")); + provider2.init(); + + assertEquals(provider1.clusterId, provider2.clusterId); + assertTrue(provider1.isPrimaryClusterId); + assertTrue(provider2.isPrimaryClusterId); + + final List topologyProvider2 = provider2.refresh(mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider2); + assertEquals(1, storageService.size(Topology.class)); } + @Test + void testTopologyCache_SuggestedClusterIdForInstance() throws SQLException { + RdsHostListProvider.clearAll(); + + RdsHostListProvider provider1 = + Mockito.spy(getRdsHostListProvider("jdbc:something://cluster-a.cluster-xyz.us-east-2.rds.amazonaws.com/")); + provider1.init(); + final List topologyClusterA = Arrays.asList( + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-1.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.WRITER) + .build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-2.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.READER) + .build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-3.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.READER) + .build()); + + doReturn(topologyClusterA).when(provider1).queryForTopology(any(Connection.class)); + + assertEquals(0, storageService.size(Topology.class)); + + final List topologyProvider1 = provider1.refresh(mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider1); + + RdsHostListProvider provider2 = + Mockito.spy(getRdsHostListProvider("jdbc:something://instance-a-3.xyz.us-east-2.rds.amazonaws.com/")); + provider2.init(); + + assertEquals(provider1.clusterId, provider2.clusterId); + assertTrue(provider1.isPrimaryClusterId); + assertTrue(provider2.isPrimaryClusterId); + + final List topologyProvider2 = provider2.refresh(mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider2); + + assertEquals(1, storageService.size(Topology.class)); + } + + @Test + void testTopologyCache_AcceptSuggestion() throws SQLException { + RdsHostListProvider.clearAll(); + + RdsHostListProvider provider1 = + Mockito.spy(getRdsHostListProvider("jdbc:something://instance-a-2.xyz.us-east-2.rds.amazonaws.com/")); + provider1.init(); + final List topologyClusterA = Arrays.asList( + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-1.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.WRITER) + .build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-2.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.READER) + .build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-3.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.READER) + .build()); + + doAnswer(a -> topologyClusterA).when(provider1).queryForTopology(any(Connection.class)); + + assertEquals(0, storageService.size(Topology.class)); + + List topologyProvider1 = provider1.refresh(mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider1); + + // RdsHostListProvider.logCache(); + + RdsHostListProvider provider2 = + Mockito.spy(getRdsHostListProvider("jdbc:something://cluster-a.cluster-xyz.us-east-2.rds.amazonaws.com/")); + provider2.init(); + + doAnswer(a -> topologyClusterA).when(provider2).queryForTopology(any(Connection.class)); + + final List topologyProvider2 = provider2.refresh(mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider2); + + assertNotEquals(provider1.clusterId, provider2.clusterId); + assertFalse(provider1.isPrimaryClusterId); + assertTrue(provider2.isPrimaryClusterId); + assertEquals(2, storageService.size(Topology.class)); + assertEquals("cluster-a.cluster-xyz.us-east-2.rds.amazonaws.com", + RdsHostListProvider.suggestedPrimaryClusterIdCache.get(provider1.clusterId)); + + // RdsHostListProvider.logCache(); + + topologyProvider1 = provider1.forceRefresh(mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider1); + assertEquals(provider1.clusterId, provider2.clusterId); + assertTrue(provider1.isPrimaryClusterId); + assertTrue(provider2.isPrimaryClusterId); + + // RdsHostListProvider.logCache(); + } + @Test void testIdentifyConnectionWithInvalidNodeIdQuery() throws SQLException { rdsHostListProvider = Mockito.spy(getRdsHostListProvider("jdbc:someprotocol://url")); @@ -453,4 +605,25 @@ void testGetTopology_returnsLatestWriter() throws SQLException { assertEquals(expectedWriterHost.getHost(), result.hosts.get(0).getHost()); } + + @Test + void testClusterUrlUsedAsDefaultClusterId() throws SQLException { + String readerClusterUrl = "mycluster.cluster-ro-XYZ.us-east-1.rds.amazonaws.com"; + String expectedClusterId = "mycluster.cluster-XYZ.us-east-1.rds.amazonaws.com:1234"; + String connectionString = "jdbc:someprotocol://" + readerClusterUrl + ":1234/test"; + RdsHostListProvider provider1 = Mockito.spy(getRdsHostListProvider(connectionString)); + assertEquals(expectedClusterId, provider1.getClusterId()); + + List mockTopology = + Collections.singletonList(new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("host").build()); + doReturn(mockTopology).when(provider1).queryForTopology(any(Connection.class)); + provider1.refresh(); + assertEquals(mockTopology, provider1.getStoredTopology()); + verify(provider1, times(1)).queryForTopology(mockConnection); + + RdsHostListProvider provider2 = Mockito.spy(getRdsHostListProvider(connectionString)); + assertEquals(expectedClusterId, provider2.getClusterId()); + assertEquals(mockTopology, provider2.getStoredTopology()); + verify(provider2, never()).queryForTopology(mockConnection); + } } diff --git a/wrapper/src/test/java/software/amazon/jdbc/hostlistprovider/RdsMultiAzDbClusterListProviderTest.java b/wrapper/src/test/java/software/amazon/jdbc/hostlistprovider/RdsMultiAzDbClusterListProviderTest.java index 6cfe0321a..df6d6ee50 100644 --- a/wrapper/src/test/java/software/amazon/jdbc/hostlistprovider/RdsMultiAzDbClusterListProviderTest.java +++ b/wrapper/src/test/java/software/amazon/jdbc/hostlistprovider/RdsMultiAzDbClusterListProviderTest.java @@ -17,13 +17,16 @@ package software.amazon.jdbc.hostlistprovider; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -115,7 +118,7 @@ private RdsMultiAzDbClusterListProvider getRdsMazDbClusterHostListProvider(Strin "fang", "li"); provider.init(); - // provider.clusterId = "1"; + // provider.clusterId = "cluster-id"; return provider; } @@ -202,7 +205,7 @@ void testGetCachedTopology_returnCachedTopology() throws SQLException { } @Test - void testTopologyCache() throws SQLException { + void testTopologyCache_NoSuggestedClusterId() throws SQLException { RdsMultiAzDbClusterListProvider.clearAll(); RdsMultiAzDbClusterListProvider provider1 = @@ -226,7 +229,8 @@ void testTopologyCache() throws SQLException { RdsMultiAzDbClusterListProvider provider2 = Mockito.spy(getRdsMazDbClusterHostListProvider("jdbc:something://cluster-b.domain.com/")); - assertNotNull(provider2.getStoredTopology()); + provider2.init(); + assertNull(provider2.getStoredTopology()); final List topologyClusterB = Arrays.asList( new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) @@ -237,15 +241,168 @@ void testTopologyCache() throws SQLException { .host("instance-b-3.domain.com").port(HostSpec.NO_PORT).role(HostRole.READER).build()); doReturn(topologyClusterB).when(provider2).queryForTopology(any(Connection.class)); - List topologyProvider2 = provider2.refresh(Mockito.mock(Connection.class)); - assertNotEquals(topologyClusterB, topologyProvider2); - - topologyProvider2 = provider2.forceRefresh(Mockito.mock(Connection.class)); + final List topologyProvider2 = provider2.refresh(Mockito.mock(Connection.class)); assertEquals(topologyClusterB, topologyProvider2); + assertEquals(2, storageService.size(Topology.class)); + } + + @Test + void testTopologyCache_SuggestedClusterIdForRds() throws SQLException { + RdsMultiAzDbClusterListProvider.clearAll(); + + RdsMultiAzDbClusterListProvider provider1 = + Mockito.spy(getRdsMazDbClusterHostListProvider( + "jdbc:something://cluster-a.cluster-xyz.us-east-2.rds.amazonaws.com/")); + provider1.init(); + final List topologyClusterA = Arrays.asList( + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-1.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.WRITER) + .build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-2.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.READER) + .build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-3.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.READER) + .build()); + + doReturn(topologyClusterA).when(provider1).queryForTopology(any(Connection.class)); + + assertEquals(0, storageService.size(Topology.class)); + + final List topologyProvider1 = provider1.refresh(Mockito.mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider1); + + RdsMultiAzDbClusterListProvider provider2 = + Mockito.spy(getRdsMazDbClusterHostListProvider( + "jdbc:something://cluster-a.cluster-xyz.us-east-2.rds.amazonaws.com/")); + provider2.init(); + + assertEquals(provider1.clusterId, provider2.clusterId); + assertTrue(provider1.isPrimaryClusterId); + assertTrue(provider2.isPrimaryClusterId); + + final List topologyProvider2 = provider2.refresh(Mockito.mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider2); + + assertEquals(1, storageService.size(Topology.class)); + } + + @Test + void testTopologyCache_SuggestedClusterIdForInstance() throws SQLException { + RdsMultiAzDbClusterListProvider.clearAll(); + + RdsMultiAzDbClusterListProvider provider1 = + Mockito.spy(getRdsMazDbClusterHostListProvider( + "jdbc:something://cluster-a.cluster-xyz.us-east-2.rds.amazonaws.com/")); + provider1.init(); + final List topologyClusterA = Arrays.asList( + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-1.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.WRITER) + .build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-2.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.READER) + .build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-3.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.READER) + .build()); + + doReturn(topologyClusterA).when(provider1).queryForTopology(any(Connection.class)); + + assertEquals(0, storageService.size(Topology.class)); + + final List topologyProvider1 = provider1.refresh(Mockito.mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider1); + + RdsMultiAzDbClusterListProvider provider2 = + Mockito.spy(getRdsMazDbClusterHostListProvider( + "jdbc:something://instance-a-3.xyz.us-east-2.rds.amazonaws.com/")); + provider2.init(); + + assertEquals(provider1.clusterId, provider2.clusterId); + assertTrue(provider1.isPrimaryClusterId); + assertTrue(provider2.isPrimaryClusterId); + + final List topologyProvider2 = provider2.refresh(Mockito.mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider2); + assertEquals(1, storageService.size(Topology.class)); } + @Test + void testTopologyCache_AcceptSuggestion() throws SQLException { + RdsMultiAzDbClusterListProvider.clearAll(); + + RdsMultiAzDbClusterListProvider provider1 = + Mockito.spy(getRdsMazDbClusterHostListProvider( + "jdbc:something://instance-a-2.xyz.us-east-2.rds.amazonaws.com/")); + provider1.init(); + final List topologyClusterA = Arrays.asList( + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-1.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.WRITER) + .build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-2.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.READER) + .build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("instance-a-3.xyz.us-east-2.rds.amazonaws.com") + .port(HostSpec.NO_PORT) + .role(HostRole.READER) + .build()); + + doAnswer(a -> topologyClusterA).when(provider1).queryForTopology(any(Connection.class)); + + assertEquals(0, storageService.size(Topology.class)); + + List topologyProvider1 = provider1.refresh(Mockito.mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider1); + + // RdsMultiAzDbClusterListProvider.logCache(); + + RdsMultiAzDbClusterListProvider provider2 = + Mockito.spy(getRdsMazDbClusterHostListProvider( + "jdbc:something://cluster-a.cluster-xyz.us-east-2.rds.amazonaws.com/")); + provider2.init(); + + doAnswer(a -> topologyClusterA).when(provider2).queryForTopology(any(Connection.class)); + + final List topologyProvider2 = provider2.refresh(Mockito.mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider2); + + assertNotEquals(provider1.clusterId, provider2.clusterId); + assertFalse(provider1.isPrimaryClusterId); + assertTrue(provider2.isPrimaryClusterId); + assertEquals(2, storageService.size(Topology.class)); + assertEquals("cluster-a.cluster-xyz.us-east-2.rds.amazonaws.com", + RdsMultiAzDbClusterListProvider.suggestedPrimaryClusterIdCache.get(provider1.clusterId)); + + // RdsMultiAzDbClusterListProvider.logCache(); + + topologyProvider1 = provider1.forceRefresh(Mockito.mock(Connection.class)); + assertEquals(topologyClusterA, topologyProvider1); + assertEquals(provider1.clusterId, provider2.clusterId); + assertTrue(provider1.isPrimaryClusterId); + assertTrue(provider2.isPrimaryClusterId); + + // RdsMultiAzDbClusterListProvider.logCache(); + } + @Test void testIdentifyConnectionWithInvalidNodeIdQuery() throws SQLException { rdsMazDbClusterHostListProvider = Mockito.spy(getRdsMazDbClusterHostListProvider("jdbc:someprotocol://url"));