Skip to content

Commit 036a92c

Browse files
authored
refactor: host alias logic in wrapper (#431)
1. After failover Aurora Connection Tracker incorrectly invalidates the newly promoted writer 2. After failover Aurora Connection Tracker incorrectly invalidates a random reader that is neither the old or new writer. 3. If the initial connection is established using an cluster endpoint, EFM plugin uses the cluster endpoint as the monitoring endpoint
1 parent 30c0db2 commit 036a92c

21 files changed

+469
-311
lines changed

wrapper/src/main/java/software/amazon/jdbc/Driver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public Connection connect(final String url, final Properties info) throws SQLExc
9191
return null;
9292
}
9393

94+
LOGGER.finest("Opening connection to " + url);
95+
9496
final String driverUrl = url.replaceFirst(PROTOCOL_PREFIX, "jdbc:");
9597
final java.sql.Driver driver = DriverManager.getDriver(driverUrl);
9698

wrapper/src/main/java/software/amazon/jdbc/HostListProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,6 @@ public interface HostListProvider {
3939
* determine the host role
4040
*/
4141
HostRole getHostRole(Connection connection) throws SQLException;
42+
43+
HostSpec identifyConnection(Connection connection) throws SQLException;
4244
}

wrapper/src/main/java/software/amazon/jdbc/HostSpec.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class HostSpec {
3838
protected Set<String> aliases = ConcurrentHashMap.newKeySet();
3939
protected Set<String> allAliases = ConcurrentHashMap.newKeySet();
4040
protected long weight; // Greater or equal 0. Lesser the weight, the healthier node.
41+
protected String hostId;
4142

4243
public HostSpec(final String host) {
4344
this.host = host;
@@ -148,6 +149,12 @@ public void removeAlias(final String... alias) {
148149
});
149150
}
150151

152+
public void resetAliases() {
153+
this.aliases.clear();
154+
this.allAliases.clear();
155+
this.allAliases.add(this.asAlias());
156+
}
157+
151158
public String getUrl() {
152159
String url = isPortSpecified() ? host + ":" + port : host;
153160
if (!url.endsWith("/")) {
@@ -156,6 +163,14 @@ public String getUrl() {
156163
return url;
157164
}
158165

166+
public String getHostId() {
167+
return hostId;
168+
}
169+
170+
public void setHostId(String hostId) {
171+
this.hostId = hostId;
172+
}
173+
159174
public String asAlias() {
160175
return isPortSpecified() ? host + ":" + port : host;
161176
}

wrapper/src/main/java/software/amazon/jdbc/PluginService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,8 @@ HostSpec getHostSpecByStrategy(HostRole role, String strategy)
151151
Dialect getDialect();
152152

153153
void updateDialect(final @NonNull Connection connection) throws SQLException;
154+
155+
HostSpec identifyConnection(final Connection connection) throws SQLException;
156+
157+
void fillAliases(final Connection connection, final HostSpec hostSpec) throws SQLException;
154158
}

wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
package software.amazon.jdbc;
1818

1919
import java.sql.Connection;
20+
import java.sql.ResultSet;
2021
import java.sql.SQLException;
22+
import java.sql.Statement;
2123
import java.util.ArrayList;
2224
import java.util.EnumSet;
2325
import java.util.HashMap;
2426
import java.util.List;
2527
import java.util.Map;
2628
import java.util.Map.Entry;
29+
import java.util.Objects;
2730
import java.util.Properties;
2831
import java.util.Set;
2932
import java.util.concurrent.TimeUnit;
@@ -35,6 +38,7 @@
3538
import software.amazon.jdbc.dialect.Dialect;
3639
import software.amazon.jdbc.dialect.DialectManager;
3740
import software.amazon.jdbc.dialect.DialectProvider;
41+
import software.amazon.jdbc.dialect.TopologyAwareDatabaseCluster;
3842
import software.amazon.jdbc.exceptions.ExceptionManager;
3943
import software.amazon.jdbc.hostlistprovider.StaticHostListProvider;
4044
import software.amazon.jdbc.util.CacheMap;
@@ -334,7 +338,7 @@ public HostListProvider getHostListProvider() {
334338
@Override
335339
public void refreshHostList() throws SQLException {
336340
final List<HostSpec> updatedHostList = this.getHostListProvider().refresh();
337-
if (updatedHostList != null) {
341+
if (!Objects.equals(updatedHostList, this.hosts)) {
338342
updateHostAvailability(updatedHostList);
339343
setNodeList(this.hosts, updatedHostList);
340344
}
@@ -343,7 +347,7 @@ public void refreshHostList() throws SQLException {
343347
@Override
344348
public void refreshHostList(final Connection connection) throws SQLException {
345349
final List<HostSpec> updatedHostList = this.getHostListProvider().refresh(connection);
346-
if (updatedHostList != null) {
350+
if (!Objects.equals(updatedHostList, this.hosts)) {
347351
updateHostAvailability(updatedHostList);
348352
setNodeList(this.hosts, updatedHostList);
349353
}
@@ -489,4 +493,44 @@ public void updateDialect(final @NonNull Connection connection) throws SQLExcept
489493
connection);
490494
}
491495

496+
@Override
497+
public HostSpec identifyConnection(Connection connection) throws SQLException {
498+
if (!(this.getDialect() instanceof TopologyAwareDatabaseCluster)) {
499+
return null;
500+
}
501+
502+
return this.hostListProvider.identifyConnection(connection);
503+
}
504+
505+
@Override
506+
public void fillAliases(Connection connection, HostSpec hostSpec) throws SQLException {
507+
if (hostSpec == null) {
508+
return;
509+
}
510+
511+
if (!hostSpec.getAliases().isEmpty()) {
512+
LOGGER.finest(() -> Messages.get("PluginServiceImpl.nonEmptyAliases", new Object[] {hostSpec.getAliases()}));
513+
return;
514+
}
515+
516+
hostSpec.addAlias(hostSpec.asAlias());
517+
518+
// Add the host name and port, this host name is usually the internal IP address.
519+
try (final Statement stmt = connection.createStatement()) {
520+
try (final ResultSet rs = stmt.executeQuery(this.getDialect().getHostAliasQuery())) {
521+
while (rs.next()) {
522+
hostSpec.addAlias(rs.getString(1));
523+
}
524+
}
525+
} catch (final SQLException sqlException) {
526+
// log and ignore
527+
LOGGER.finest(() -> Messages.get("PluginServiceImpl.failedToRetrieveHostPort"));
528+
}
529+
530+
// Add the instance endpoint if the current connection is associated with a topology aware database cluster.
531+
final HostSpec host = this.identifyConnection(connection);
532+
if (host != null) {
533+
hostSpec.addAlias(host.asAliases().toArray(new String[] {}));
534+
}
535+
}
492536
}

wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/AuroraHostListProvider.java

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.HashSet;
2828
import java.util.List;
2929
import java.util.Map.Entry;
30+
import java.util.Objects;
3031
import java.util.Properties;
3132
import java.util.Set;
3233
import java.util.UUID;
@@ -85,7 +86,6 @@ public class AuroraHostListProvider implements DynamicHostListProvider {
8586
: TimeUnit.MILLISECONDS.toNanos(30000);
8687
private final long suggestedClusterIdRefreshRateNano = TimeUnit.MINUTES.toNanos(10);
8788
private List<HostSpec> hostList = new ArrayList<>();
88-
private List<HostSpec> lastReturnedHostList;
8989
private List<HostSpec> initialHostList = new ArrayList<>();
9090
private HostSpec initialHostSpec;
9191

@@ -273,7 +273,7 @@ private ClusterSuggestedResult getSuggestedClusterId(final String url) {
273273
for (final HostSpec host : hosts) {
274274
if (host.getUrl().equals(url)) {
275275
LOGGER.finest(() -> Messages.get("AuroraHostListProvider.suggestedClusterId",
276-
new Object[]{key, url}));
276+
new Object[] {key, url}));
277277
return new ClusterSuggestedResult(key, isPrimaryCluster);
278278
}
279279
}
@@ -398,8 +398,12 @@ private HostSpec createHost(final ResultSet resultSet) throws SQLException {
398398
// Calculate weight based on node lag in time and CPU utilization.
399399
final long weight = Math.round(nodeLag) * 100L + Math.round(cpuUtilization);
400400

401-
hostName = hostName == null ? "?" : hostName;
402-
final String endpoint = getHostEndpoint(hostName);
401+
return createHost(hostName, isWriter, weight);
402+
}
403+
404+
private HostSpec createHost(String host, final boolean isWriter, final long weight) {
405+
host = host == null ? "?" : host;
406+
final String endpoint = getHostEndpoint(host);
403407
final int port = this.clusterInstanceTemplate.isPortSpecified()
404408
? this.clusterInstanceTemplate.getPort()
405409
: this.initialHostSpec.getPort();
@@ -410,7 +414,8 @@ private HostSpec createHost(final ResultSet resultSet) throws SQLException {
410414
isWriter ? HostRole.WRITER : HostRole.READER,
411415
HostAvailability.AVAILABLE,
412416
weight);
413-
hostSpec.addAlias(hostName);
417+
hostSpec.addAlias(host);
418+
hostSpec.setHostId(host);
414419
return hostSpec;
415420
}
416421

@@ -466,12 +471,7 @@ public List<HostSpec> refresh(final Connection connection) throws SQLException {
466471
final FetchTopologyResult results = getTopology(currentConnection, false);
467472
LOGGER.finest(() -> Utils.logTopology(results.hosts));
468473

469-
if (results.isCachedData && this.lastReturnedHostList == results.hosts) {
470-
return null; // no topology update
471-
}
472-
473474
this.hostList = results.hosts;
474-
this.lastReturnedHostList = this.hostList;
475475
return Collections.unmodifiableList(hostList);
476476
}
477477

@@ -490,7 +490,6 @@ public List<HostSpec> forceRefresh(final Connection connection) throws SQLExcept
490490
final FetchTopologyResult results = getTopology(currentConnection, true);
491491
LOGGER.finest(() -> Utils.logTopology(results.hosts));
492492
this.hostList = results.hosts;
493-
this.lastReturnedHostList = this.hostList;
494493
return Collections.unmodifiableList(this.hostList);
495494
}
496495

@@ -577,6 +576,7 @@ public FetchTopologyResult(final boolean isCachedData, final List<HostSpec> host
577576
}
578577

579578
static class ClusterSuggestedResult {
579+
580580
public String clusterId;
581581
public boolean isPrimaryClusterId;
582582

@@ -588,18 +588,10 @@ public ClusterSuggestedResult(final String clusterId, final boolean isPrimaryClu
588588

589589
@Override
590590
public HostRole getHostRole(Connection conn) throws SQLException {
591-
if (this.topologyAwareDialect == null) {
592-
Dialect dialect = this.hostListProviderService.getDialect();
593-
if (!(dialect instanceof TopologyAwareDatabaseCluster)) {
594-
throw new SQLException(
595-
Messages.get("AuroraHostListProvider.invalidDialectForGetHostRole",
596-
new Object[]{dialect}));
597-
}
598-
this.topologyAwareDialect = (TopologyAwareDatabaseCluster) this.hostListProviderService.getDialect();
599-
}
600-
601591
try (final Statement stmt = conn.createStatement();
602-
final ResultSet rs = stmt.executeQuery(this.topologyAwareDialect.getIsReaderQuery())) {
592+
final ResultSet rs = stmt.executeQuery(
593+
getTopologyAwareDialect("AuroraHostListProvider.invalidDialectForGetHostRole")
594+
.getIsReaderQuery())) {
603595
if (rs.next()) {
604596
boolean isReader = rs.getBoolean(1);
605597
return isReader ? HostRole.READER : HostRole.WRITER;
@@ -610,4 +602,45 @@ public HostRole getHostRole(Connection conn) throws SQLException {
610602

611603
throw new SQLException(Messages.get("AuroraHostListProvider.errorGettingHostRole"));
612604
}
605+
606+
@Override
607+
public HostSpec identifyConnection(Connection connection) throws SQLException {
608+
try (final Statement stmt = connection.createStatement();
609+
final ResultSet resultSet = stmt.executeQuery(
610+
getTopologyAwareDialect("AuroraHostListProvider.invalidDialectForIdentifyConnection")
611+
.getNodeIdQuery())) {
612+
if (resultSet.next()) {
613+
final String instanceName = resultSet.getString(1);
614+
615+
final List<HostSpec> topology = this.refresh();
616+
617+
if (topology == null) {
618+
return null;
619+
}
620+
621+
return topology
622+
.stream()
623+
.filter(host -> Objects.equals(instanceName, host.getHostId()))
624+
.findAny()
625+
.orElse(null);
626+
}
627+
} catch (final SQLException e) {
628+
throw new SQLException(Messages.get("AuroraHostListProvider.errorIdentifyConnection"), e);
629+
}
630+
631+
throw new SQLException(Messages.get("AuroraHostListProvider.errorIdentifyConnection"));
632+
}
633+
634+
private TopologyAwareDatabaseCluster getTopologyAwareDialect(String exceptionMessageIdentifier) throws SQLException {
635+
if (this.topologyAwareDialect == null) {
636+
Dialect dialect = this.hostListProviderService.getDialect();
637+
if (!(dialect instanceof TopologyAwareDatabaseCluster)) {
638+
throw new SQLException(
639+
Messages.get(exceptionMessageIdentifier,
640+
new Object[] {dialect}));
641+
}
642+
this.topologyAwareDialect = (TopologyAwareDatabaseCluster) this.hostListProviderService.getDialect();
643+
}
644+
return this.topologyAwareDialect;
645+
}
613646
}

wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/ConnectionStringHostListProvider.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717
package software.amazon.jdbc.hostlistprovider;
1818

1919
import java.sql.Connection;
20+
import java.sql.ResultSet;
2021
import java.sql.SQLException;
22+
import java.sql.Statement;
2123
import java.util.ArrayList;
2224
import java.util.Collections;
2325
import java.util.List;
26+
import java.util.Objects;
2427
import java.util.Properties;
2528
import org.checkerframework.checker.nullness.qual.NonNull;
2629
import software.amazon.jdbc.AwsWrapperProperty;
@@ -109,4 +112,30 @@ public List<HostSpec> forceRefresh(final Connection connection) throws SQLExcept
109112
public HostRole getHostRole(Connection connection) {
110113
throw new UnsupportedOperationException("ConnectionStringHostListProvider does not support getHostRole");
111114
}
115+
116+
@Override
117+
public HostSpec identifyConnection(Connection connection) throws SQLException {
118+
try (final Statement stmt = connection.createStatement();
119+
final ResultSet resultSet = stmt.executeQuery(this.hostListProviderService.getDialect().getHostAliasQuery())) {
120+
if (resultSet.next()) {
121+
final String instance = resultSet.getString(1);
122+
123+
final List<HostSpec> topology = this.refresh(connection);
124+
125+
if (topology == null) {
126+
return null;
127+
}
128+
129+
return topology
130+
.stream()
131+
.filter(host -> Objects.equals(instance, host.getHostId()))
132+
.findAny()
133+
.orElse(null);
134+
}
135+
} catch (final SQLException e) {
136+
throw new SQLException(Messages.get("ConnectionStringHostListProvider.errorIdentifyConnection"), e);
137+
}
138+
139+
throw new SQLException(Messages.get("ConnectionStringHostListProvider.errorIdentifyConnection"));
140+
}
112141
}

0 commit comments

Comments
 (0)