Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions wrapper/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ tasks.register<Test>("test-all-mysql-aurora") {
systemProperty("test-no-performance", "true")
systemProperty("test-no-pg-driver", "true")
systemProperty("test-no-pg-engine", "true")
systemProperty("test-no-mariadb-driver", "true")
systemProperty("test-no-mariadb-engine", "true")
systemProperty("test-no-iam", "true")
systemProperty("test-no-hikari", "true")
systemProperty("test-no-secrets-manager", "true")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ public interface HostListProvider {
* determine the host role
*/
HostRole getHostRole(Connection connection) throws SQLException;

HostSpec identifyConnection(Connection connection) throws SQLException;
}
14 changes: 14 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/HostSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class HostSpec {
protected Set<String> aliases = ConcurrentHashMap.newKeySet();
protected Set<String> allAliases = ConcurrentHashMap.newKeySet();
protected long weight; // Greater or equal 0. Lesser the weight, the healthier node.
protected String hostId;

public HostSpec(final String host) {
this.host = host;
Expand Down Expand Up @@ -148,6 +149,11 @@ public void removeAlias(final String... alias) {
});
}

public void resetAliases() {
this.aliases.clear();
this.allAliases.clear();
}

public String getUrl() {
String url = isPortSpecified() ? host + ":" + port : host;
if (!url.endsWith("/")) {
Expand All @@ -156,6 +162,14 @@ public String getUrl() {
return url;
}

public String getHostId() {
return hostId;
}

public void setHostId(String hostId) {
this.hostId = hostId;
}

public String asAlias() {
return isPortSpecified() ? host + ":" + port : host;
}
Expand Down
4 changes: 4 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/PluginService.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,8 @@ HostSpec getHostSpecByStrategy(HostRole role, String strategy)
Dialect getDialect();

void updateDialect(final @NonNull Connection connection) throws SQLException;

HostSpec identifyConnection(final Connection connection) throws SQLException;

void fillAliases(final Connection connection, final HostSpec hostSpec) throws SQLException;
}
39 changes: 39 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package software.amazon.jdbc;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
Expand All @@ -35,6 +37,7 @@
import software.amazon.jdbc.dialect.Dialect;
import software.amazon.jdbc.dialect.DialectManager;
import software.amazon.jdbc.dialect.DialectProvider;
import software.amazon.jdbc.dialect.TopologyAwareDatabaseCluster;
import software.amazon.jdbc.exceptions.ExceptionManager;
import software.amazon.jdbc.hostlistprovider.StaticHostListProvider;
import software.amazon.jdbc.util.CacheMap;
Expand Down Expand Up @@ -489,4 +492,40 @@ public void updateDialect(final @NonNull Connection connection) throws SQLExcept
connection);
}

@Override
public HostSpec identifyConnection(Connection connection) throws SQLException {
if (!(this.getDialect() instanceof TopologyAwareDatabaseCluster)) {
return null;
}

return this.hostListProvider.identifyConnection(connection);
}

@Override
public void fillAliases(Connection connection, HostSpec hostSpec) throws SQLException {
if (!hostSpec.getAliases().isEmpty()) {
LOGGER.finest(() -> Messages.get("PluginServiceImpl.nonEmptyAliases", new Object[] {hostSpec.getAliases()}));
return;
}

hostSpec.addAlias(hostSpec.asAlias());

// Add the host name and port, this host name is usually the internal IP address.
try (final Statement stmt = connection.createStatement()) {
try (final ResultSet rs = stmt.executeQuery(this.getDialect().getHostAliasQuery())) {
while (rs.next()) {
hostSpec.addAlias(rs.getString(1));
}
}
} catch (final SQLException sqlException) {
// log and ignore
LOGGER.finest(() -> Messages.get("PluginServiceImpl.failedToRetrieveHostPort"));
}

// Add the instance endpoint if the current connection is associated with a topology aware database cluster.
final HostSpec host = this.identifyConnection(connection);
if (host != this.currentHostSpec) {
hostSpec.addAlias(host.asAlias());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -273,7 +274,7 @@ private ClusterSuggestedResult getSuggestedClusterId(final String url) {
for (final HostSpec host : hosts) {
if (host.getUrl().equals(url)) {
LOGGER.finest(() -> Messages.get("AuroraHostListProvider.suggestedClusterId",
new Object[]{key, url}));
new Object[] {key, url}));
return new ClusterSuggestedResult(key, isPrimaryCluster);
}
}
Expand Down Expand Up @@ -398,8 +399,12 @@ private HostSpec createHost(final ResultSet resultSet) throws SQLException {
// Calculate weight based on node lag in time and CPU utilization.
final long weight = Math.round(nodeLag) * 100L + Math.round(cpuUtilization);

hostName = hostName == null ? "?" : hostName;
final String endpoint = getHostEndpoint(hostName);
return createHost(hostName, isWriter, weight);
}

private HostSpec createHost(String host, final boolean isWriter, final long weight) {
host = host == null ? "?" : host;
final String endpoint = getHostEndpoint(host);
final int port = this.clusterInstanceTemplate.isPortSpecified()
? this.clusterInstanceTemplate.getPort()
: this.initialHostSpec.getPort();
Expand All @@ -410,7 +415,8 @@ private HostSpec createHost(final ResultSet resultSet) throws SQLException {
isWriter ? HostRole.WRITER : HostRole.READER,
HostAvailability.AVAILABLE,
weight);
hostSpec.addAlias(hostName);
hostSpec.addAlias(host);
hostSpec.setHostId(host);
return hostSpec;
}

Expand Down Expand Up @@ -577,6 +583,7 @@ public FetchTopologyResult(final boolean isCachedData, final List<HostSpec> host
}

static class ClusterSuggestedResult {

public String clusterId;
public boolean isPrimaryClusterId;

Expand All @@ -588,18 +595,10 @@ public ClusterSuggestedResult(final String clusterId, final boolean isPrimaryClu

@Override
public HostRole getHostRole(Connection conn) throws SQLException {
if (this.topologyAwareDialect == null) {
Dialect dialect = this.hostListProviderService.getDialect();
if (!(dialect instanceof TopologyAwareDatabaseCluster)) {
throw new SQLException(
Messages.get("AuroraHostListProvider.invalidDialectForGetHostRole",
new Object[]{dialect}));
}
this.topologyAwareDialect = (TopologyAwareDatabaseCluster) this.hostListProviderService.getDialect();
}

try (final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery(this.topologyAwareDialect.getIsReaderQuery())) {
final ResultSet rs = stmt.executeQuery(
getTopologyAwareDialect("AuroraHostListProvider.invalidDialectForGetHostRole")
.getIsReaderQuery())) {
if (rs.next()) {
boolean isReader = rs.getBoolean(1);
return isReader ? HostRole.READER : HostRole.WRITER;
Expand All @@ -610,4 +609,46 @@ public HostRole getHostRole(Connection conn) throws SQLException {

throw new SQLException(Messages.get("AuroraHostListProvider.errorGettingHostRole"));
}

@Override
public HostSpec identifyConnection(Connection connection) throws SQLException {
try (final Statement stmt = connection.createStatement();
final ResultSet resultSet = stmt.executeQuery(
getTopologyAwareDialect("AuroraHostListProvider.invalidDialectForIdentifyConnection")
.getNodeIdQuery())) {
if (resultSet.next()) {
final String instanceName = resultSet.getString(1);

final List<HostSpec> topology = this.refresh(connection);

if (topology == null) {
final HostSpec host = new HostSpec(getHostEndpoint(instanceName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we refresh topology with provided connection?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return null;

host.setHostId(instanceName);
return host;
}
return topology
.stream()
.filter(host -> Objects.equals(instanceName, host.getHostId()))
.findAny()
.orElse(null);
}
} catch (final SQLException e) {
throw new SQLException(Messages.get("AuroraHostListProvider.errorIdentifyConnection"), e);
}

throw new SQLException(Messages.get("AuroraHostListProvider.errorIdentifyConnection"));
}

private TopologyAwareDatabaseCluster getTopologyAwareDialect(String exceptionMessageIdentifier) throws SQLException {
if (this.topologyAwareDialect == null) {
Dialect dialect = this.hostListProviderService.getDialect();
if (!(dialect instanceof TopologyAwareDatabaseCluster)) {
throw new SQLException(
Messages.get(exceptionMessageIdentifier,
new Object[] {dialect}));
}
this.topologyAwareDialect = (TopologyAwareDatabaseCluster) this.hostListProviderService.getDialect();
}
return this.topologyAwareDialect;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,9 @@ public List<HostSpec> forceRefresh(final Connection connection) throws SQLExcept
public HostRole getHostRole(Connection connection) {
throw new UnsupportedOperationException("ConnectionStringHostListProvider does not support getHostRole");
}

@Override
public HostSpec identifyConnection(Connection connection) throws SQLException {
return this.hostList.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we need to get sql from dialect, fetch node id, try to find it in topology.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@
import software.amazon.jdbc.JdbcCallable;
import software.amazon.jdbc.NodeChangeOptions;
import software.amazon.jdbc.PluginService;
import software.amazon.jdbc.dialect.TopologyAwareDatabaseCluster;
import software.amazon.jdbc.hostlistprovider.AuroraHostListProvider;
import software.amazon.jdbc.plugin.failover.FailoverSQLException;
import software.amazon.jdbc.util.RdsUrlType;
import software.amazon.jdbc.util.RdsUtils;
import software.amazon.jdbc.util.StringUtils;
import software.amazon.jdbc.util.SubscribedMethodHelper;

public class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin {
Expand Down Expand Up @@ -93,8 +91,10 @@ public Connection connectInternal(
: hostSpec;

if (conn != null) {
if (!rdsHelper.isRdsInstance(currentHostSpec.getHost())) {
currentHostSpec.addAlias(getInstanceEndpoint(conn, currentHostSpec));
final RdsUrlType type = this.rdsHelper.identifyRdsType(hostSpec.getHost());
if (type.isRdsCluster()) {
hostSpec.resetAliases();
this.pluginService.fillAliases(conn, hostSpec);
}
}

Expand All @@ -110,16 +110,6 @@ public Connection forceConnect(String driverProtocol, HostSpec hostSpec, Propert
return connectInternal(hostSpec, forceConnectFunc);
}

private String getInstanceEndpointPattern(final String url) {
if (StringUtils.isNullOrEmpty(this.clusterInstanceTemplate)) {
this.clusterInstanceTemplate = AuroraHostListProvider.CLUSTER_INSTANCE_HOST_PATTERN.getString(this.props) == null
? rdsHelper.getRdsInstanceHostPattern(url)
: AuroraHostListProvider.CLUSTER_INSTANCE_HOST_PATTERN.getString(this.props);
}

return this.clusterInstanceTemplate;
}

@Override
public <T, E extends Exception> T execute(final Class<T> resultClass, final Class<E> exceptionClass,
final Object methodInvokeOn, final String methodName, final JdbcCallable<T, E> jdbcMethodFunc,
Expand Down Expand Up @@ -150,29 +140,6 @@ public void notifyNodeListChanged(final Map<String, EnumSet<NodeChangeOptions>>
}

private boolean isRoleChanged(final EnumSet<NodeChangeOptions> changes) {
return changes.contains(NodeChangeOptions.PROMOTED_TO_WRITER)
|| changes.contains(NodeChangeOptions.PROMOTED_TO_READER);
}

public String getInstanceEndpoint(final Connection conn, final HostSpec host) {
String instanceName = "?";

if (!(this.pluginService.getDialect() instanceof TopologyAwareDatabaseCluster)) {
return instanceName;
}
final TopologyAwareDatabaseCluster topologyAwareDialect =
(TopologyAwareDatabaseCluster) this.pluginService.getDialect();

try (final Statement stmt = conn.createStatement();
final ResultSet resultSet = stmt.executeQuery(topologyAwareDialect.getNodeIdQuery())) {
if (resultSet.next()) {
instanceName = resultSet.getString(1);
}
String instanceEndpoint = getInstanceEndpointPattern(host.getHost());
instanceEndpoint = host.isPortSpecified() ? instanceEndpoint + ":" + host.getPort() : instanceEndpoint;
return instanceEndpoint.replace("?", instanceName);
} catch (final SQLException e) {
return instanceName;
}
return changes.contains(NodeChangeOptions.PROMOTED_TO_READER);
}
}
Loading