Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ In addition to the parameters that you can configure for the underlying driver,
| `failoverTimeoutMs` | Integer | No | Maximum allowed time in milliseconds to attempt reconnecting to a new writer or reader instance after a cluster failover is initiated. | `300000` |
| `clusterTopologyHighRefreshRateMs` | Integer | No | Interval of time in milliseconds to wait between attempts to update cluster topology after the writer has come back online following a failover event. It corresponds to the increased monitoring rate described earlier. Usually, the topology monitoring component uses this increased monitoring rate for 30s after a new writer was detected. | `100` |
| `failoverReaderHostSelectorStrategy` | String | No | Strategy used to select a reader node during failover. For more information on the available reader selection strategies, see this [table](../ReaderSelectionStrategies.md). | `random` |
| `clusterId` | String | If using multiple database clusters, yes; otherwise, no | A unique identifier for the cluster. Connections with the same cluster id share a cluster topology cache. This parameter is optional and defaults to `1`. When supporting multiple database clusters, this parameter becomes mandatory. Each connection string must include the `clusterId` parameter with a value that can be any number or string. However, all connection strings associated with the same database cluster must use identical `clusterId` values, while connection strings belonging to different database clusters must specify distinct values. Examples of value: `1`, `2`, `1234`, `abc-1`, `abc-2`. | `1` |
| `clusterId` | String | No | A unique identifier for the cluster. Connections with the same cluster id share a cluster topology cache. | None |
| `telemetryFailoverAdditionalTopTrace` | Boolean | No | Allows the driver to produce an additional telemetry span associated with failover. Such span helps to facilitate telemetry analysis in AWS CloudWatch. | `false` |
| `skipFailoverOnInterruptedThread` | Boolean | No | Enable to skip failover if the current thread is interrupted. This may leave the Connection in an invalid state so the Connection should be disposed. | `false` |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ public class RdsHostListProvider implements DynamicHostListProvider {
+ "after which it will be updated during the next interaction with the connection.");

public static final AwsWrapperProperty CLUSTER_ID = new AwsWrapperProperty(
"clusterId", "1",
"clusterId", "",
"A unique identifier for the cluster. "
+ "Connections with the same cluster id share a cluster topology cache. "
+ "If unspecified, a cluster id is '1'.");
+ "If unspecified, a cluster id is automatically created for AWS RDS clusters.");

public static final AwsWrapperProperty CLUSTER_INSTANCE_HOST_PATTERN =
new AwsWrapperProperty(
Expand All @@ -90,8 +90,11 @@ public class RdsHostListProvider implements DynamicHostListProvider {
protected static final RdsUtils rdsHelper = new RdsUtils();
protected static final ConnectionUrlParser connectionUrlParser = new ConnectionUrlParser();
protected static final int defaultTopologyQueryTimeoutMs = 5000;
protected final FullServicesContainer servicesContainer;
protected static final long suggestedClusterIdRefreshRateNano = TimeUnit.MINUTES.toNanos(10);
protected static final CacheMap<String, String> suggestedPrimaryClusterIdCache = new CacheMap<>();
protected static final CacheMap<String, Boolean> primaryClusterIdCache = new CacheMap<>();

protected final FullServicesContainer servicesContainer;
protected final HostListProviderService hostListProviderService;
protected final String originalUrl;
protected final String topologyQuery;
Expand All @@ -109,6 +112,10 @@ public class RdsHostListProvider implements DynamicHostListProvider {
protected String clusterId;
protected HostSpec clusterInstanceTemplate;

// A primary clusterId is a clusterId that is based off of a cluster endpoint URL
// (rather than a GUID or a value provided by the user).
protected boolean isPrimaryClusterId;

protected volatile boolean isInitialized = false;

protected Properties properties;
Expand Down Expand Up @@ -155,7 +162,8 @@ protected void init() throws SQLException {
this.initialHostSpec = this.initialHostList.get(0);
this.hostListProviderService.setInitialConnectionHostSpec(this.initialHostSpec);

this.clusterId = CLUSTER_ID.getString(this.properties);
this.clusterId = UUID.randomUUID().toString();
this.isPrimaryClusterId = false;
this.refreshRateNano =
TimeUnit.MILLISECONDS.toNanos(CLUSTER_TOPOLOGY_REFRESH_RATE_MS.getInteger(properties));

Expand All @@ -176,8 +184,34 @@ protected void init() throws SQLException {
validateHostPatternSetting(this.clusterInstanceTemplate.getHost());

this.rdsUrlType = rdsHelper.identifyRdsType(this.initialHostSpec.getHost());
this.isInitialized = true;

final String clusterIdSetting = CLUSTER_ID.getString(this.properties);
if (!StringUtils.isNullOrEmpty(clusterIdSetting)) {
this.clusterId = clusterIdSetting;
} else if (rdsUrlType == RdsUrlType.RDS_PROXY) {
// Each proxy is associated with a single cluster, so it's safe to use RDS Proxy Url as cluster
// identification
this.clusterId = this.initialHostSpec.getUrl();
} else if (rdsUrlType.isRds()) {
final ClusterSuggestedResult clusterSuggestedResult =
getSuggestedClusterId(this.initialHostSpec.getHostAndPort());
if (clusterSuggestedResult != null && !StringUtils.isNullOrEmpty(clusterSuggestedResult.clusterId)) {
this.clusterId = clusterSuggestedResult.clusterId;
this.isPrimaryClusterId = clusterSuggestedResult.isPrimaryClusterId;
} else {
final String clusterRdsHostUrl =
rdsHelper.getRdsClusterHostUrl(this.initialHostSpec.getHost());
if (!StringUtils.isNullOrEmpty(clusterRdsHostUrl)) {
this.clusterId = this.clusterInstanceTemplate.isPortSpecified()
? String.format("%s:%s", clusterRdsHostUrl, this.clusterInstanceTemplate.getPort())
: clusterRdsHostUrl;
this.isPrimaryClusterId = true;
primaryClusterIdCache.put(this.clusterId, true, suggestedClusterIdRefreshRateNano);
}
}
}

this.isInitialized = true;
} finally {
lock.unlock();
}
Expand All @@ -198,8 +232,25 @@ protected void init() throws SQLException {
protected FetchTopologyResult getTopology(final Connection conn, final boolean forceUpdate) throws SQLException {
init();

final String suggestedPrimaryClusterId = suggestedPrimaryClusterIdCache.get(this.clusterId);

// Change clusterId by accepting a suggested one
if (!StringUtils.isNullOrEmpty(suggestedPrimaryClusterId)
&& !this.clusterId.equals(suggestedPrimaryClusterId)) {

final String oldClusterId = this.clusterId;
this.clusterId = suggestedPrimaryClusterId;
this.isPrimaryClusterId = true;
this.clusterIdChanged(oldClusterId);
}

final List<HostSpec> storedHosts = this.getStoredTopology();

// This clusterId is a primary one and is about to create a new entry in the cache.
// When a primary entry is created it needs to be suggested for other (non-primary) entries.
// Remember a flag to do suggestion after cache is updated.
final boolean needToSuggest = storedHosts == null && this.isPrimaryClusterId;

if (storedHosts == null || forceUpdate) {

// need to re-fetch topology
Expand All @@ -215,6 +266,9 @@ protected FetchTopologyResult getTopology(final Connection conn, final boolean f

if (!Utils.isNullOrEmpty(hosts)) {
this.servicesContainer.getStorageService().set(this.clusterId, new Topology(hosts));
if (needToSuggest) {
this.suggestPrimaryCluster(hosts);
}
return new FetchTopologyResult(false, hosts);
}
}
Expand All @@ -227,6 +281,78 @@ protected FetchTopologyResult getTopology(final Connection conn, final boolean f
}
}

protected void clusterIdChanged(final String oldClusterId) throws SQLException {
// do nothing
}

protected ClusterSuggestedResult getSuggestedClusterId(final String url) {
Map<String, Topology> entries = this.servicesContainer.getStorageService().getEntries(Topology.class);
if (entries == null) {
return null;
}

for (final Entry<String, Topology> entry : entries.entrySet()) {
final String key = entry.getKey(); // clusterId
final List<HostSpec> hosts = entry.getValue().getHosts();
final boolean isPrimaryCluster = primaryClusterIdCache.get(key, false,
suggestedClusterIdRefreshRateNano);
if (key.equals(url)) {
return new ClusterSuggestedResult(url, isPrimaryCluster);
}
if (hosts == null) {
continue;
}
for (final HostSpec host : hosts) {
if (host.getHostAndPort().equals(url)) {
LOGGER.finest(() -> Messages.get("RdsHostListProvider.suggestedClusterId",
new Object[] {key, url}));
return new ClusterSuggestedResult(key, isPrimaryCluster);
}
}
}
return null;
}

protected void suggestPrimaryCluster(final @NonNull List<HostSpec> primaryClusterHosts) {
if (Utils.isNullOrEmpty(primaryClusterHosts)) {
return;
}

final Set<String> primaryClusterHostUrls = new HashSet<>();
for (final HostSpec hostSpec : primaryClusterHosts) {
primaryClusterHostUrls.add(hostSpec.getUrl());
}

Map<String, Topology> entries = this.servicesContainer.getStorageService().getEntries(Topology.class);
if (entries == null) {
return;
}

for (final Entry<String, Topology> entry : entries.entrySet()) {
final String clusterId = entry.getKey();
final List<HostSpec> clusterHosts = entry.getValue().getHosts();
final boolean isPrimaryCluster = primaryClusterIdCache.get(clusterId, false,
suggestedClusterIdRefreshRateNano);
final String suggestedPrimaryClusterId = suggestedPrimaryClusterIdCache.get(clusterId);
if (isPrimaryCluster
|| !StringUtils.isNullOrEmpty(suggestedPrimaryClusterId)
|| Utils.isNullOrEmpty(clusterHosts)) {
continue;
}

// The entry is non-primary
for (final HostSpec host : clusterHosts) {
if (primaryClusterHostUrls.contains(host.getUrl())) {
// Instance on this cluster matches with one of the instance on primary cluster
// Suggest the primary clusterId to this entry
suggestedPrimaryClusterIdCache.put(clusterId, this.clusterId,
suggestedClusterIdRefreshRateNano);
break;
}
}
}
}

/**
* Obtain a cluster topology from database.
*
Expand Down Expand Up @@ -388,8 +514,8 @@ protected String getHostEndpoint(final String nodeName) {
* Clear topology cache for all clusters.
*/
public static void clearAll() {
// nothing to clear
// TODO: consider to remove
primaryClusterIdCache.clear();
suggestedPrimaryClusterIdCache.clear();
}

/**
Expand Down Expand Up @@ -548,4 +674,15 @@ public String getClusterId() throws UnsupportedOperationException, SQLException
init();
return this.clusterId;
}

public static class ClusterSuggestedResult {

public String clusterId;
public boolean isPrimaryClusterId;

public ClusterSuggestedResult(final String clusterId, final boolean isPrimaryClusterId) {
this.clusterId = clusterId;
this.isPrimaryClusterId = isPrimaryClusterId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public interface ClusterTopologyMonitor extends Monitor {

boolean canDispose();

void setClusterId(final String clusterId);

List<HostSpec> forceRefresh(final boolean writerImportant, final long timeoutMs)
throws SQLException, TimeoutException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ public boolean canDispose() {
return true;
}

@Override
public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}

@Override
public List<HostSpec> forceRefresh(final boolean shouldVerifyWriter, final long timeoutMs)
throws SQLException, TimeoutException {
Expand Down
Loading
Loading