Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ public interface HostListProvider {
* determine the host role
*/
HostRole getHostRole(Connection connection) throws SQLException;

HostSpec identifyConnection(Connection connection) throws SQLException;
}
15 changes: 15 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/HostSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class HostSpec {
protected Set<String> aliases = ConcurrentHashMap.newKeySet();
protected Set<String> allAliases = ConcurrentHashMap.newKeySet();
protected long weight; // Greater or equal 0. Lesser the weight, the healthier node.
protected String hostId;

public HostSpec(final String host) {
this.host = host;
Expand Down Expand Up @@ -148,6 +149,12 @@ public void removeAlias(final String... alias) {
});
}

public void resetAliases() {
this.aliases.clear();
this.allAliases.clear();
this.allAliases.add(this.asAlias());
}

public String getUrl() {
String url = isPortSpecified() ? host + ":" + port : host;
if (!url.endsWith("/")) {
Expand All @@ -156,6 +163,14 @@ public String getUrl() {
return url;
}

public String getHostId() {
return hostId;
}

public void setHostId(String hostId) {
this.hostId = hostId;
}

public String asAlias() {
return isPortSpecified() ? host + ":" + port : host;
}
Expand Down
4 changes: 4 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/PluginService.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,8 @@ HostSpec getHostSpecByStrategy(HostRole role, String strategy)
Dialect getDialect();

void updateDialect(final @NonNull Connection connection) throws SQLException;

HostSpec identifyConnection(final Connection connection) throws SQLException;

void fillAliases(final Connection connection, final HostSpec hostSpec) throws SQLException;
}
43 changes: 43 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package software.amazon.jdbc;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
Expand All @@ -35,6 +37,7 @@
import software.amazon.jdbc.dialect.Dialect;
import software.amazon.jdbc.dialect.DialectManager;
import software.amazon.jdbc.dialect.DialectProvider;
import software.amazon.jdbc.dialect.TopologyAwareDatabaseCluster;
import software.amazon.jdbc.exceptions.ExceptionManager;
import software.amazon.jdbc.hostlistprovider.StaticHostListProvider;
import software.amazon.jdbc.util.CacheMap;
Expand Down Expand Up @@ -489,4 +492,44 @@ public void updateDialect(final @NonNull Connection connection) throws SQLExcept
connection);
}

@Override
public HostSpec identifyConnection(Connection connection) throws SQLException {
if (!(this.getDialect() instanceof TopologyAwareDatabaseCluster)) {
return null;
}

return this.hostListProvider.identifyConnection(connection);
}

@Override
public void fillAliases(Connection connection, HostSpec hostSpec) throws SQLException {
if (hostSpec == null) {
return;
}

if (!hostSpec.getAliases().isEmpty()) {
LOGGER.finest(() -> Messages.get("PluginServiceImpl.nonEmptyAliases", new Object[] {hostSpec.getAliases()}));
return;
}

hostSpec.addAlias(hostSpec.asAlias());

// Add the host name and port, this host name is usually the internal IP address.
try (final Statement stmt = connection.createStatement()) {
try (final ResultSet rs = stmt.executeQuery(this.getDialect().getHostAliasQuery())) {
while (rs.next()) {
hostSpec.addAlias(rs.getString(1));
}
}
} catch (final SQLException sqlException) {
// log and ignore
LOGGER.finest(() -> Messages.get("PluginServiceImpl.failedToRetrieveHostPort"));
}

// Add the instance endpoint if the current connection is associated with a topology aware database cluster.
final HostSpec host = this.identifyConnection(connection);
if (host != null) {
hostSpec.addAlias(host.asAliases().toArray(new String[] {}));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -273,7 +274,7 @@ private ClusterSuggestedResult getSuggestedClusterId(final String url) {
for (final HostSpec host : hosts) {
if (host.getUrl().equals(url)) {
LOGGER.finest(() -> Messages.get("AuroraHostListProvider.suggestedClusterId",
new Object[]{key, url}));
new Object[] {key, url}));
return new ClusterSuggestedResult(key, isPrimaryCluster);
}
}
Expand Down Expand Up @@ -398,8 +399,12 @@ private HostSpec createHost(final ResultSet resultSet) throws SQLException {
// Calculate weight based on node lag in time and CPU utilization.
final long weight = Math.round(nodeLag) * 100L + Math.round(cpuUtilization);

hostName = hostName == null ? "?" : hostName;
final String endpoint = getHostEndpoint(hostName);
return createHost(hostName, isWriter, weight);
}

private HostSpec createHost(String host, final boolean isWriter, final long weight) {
host = host == null ? "?" : host;
final String endpoint = getHostEndpoint(host);
final int port = this.clusterInstanceTemplate.isPortSpecified()
? this.clusterInstanceTemplate.getPort()
: this.initialHostSpec.getPort();
Expand All @@ -410,7 +415,8 @@ private HostSpec createHost(final ResultSet resultSet) throws SQLException {
isWriter ? HostRole.WRITER : HostRole.READER,
HostAvailability.AVAILABLE,
weight);
hostSpec.addAlias(hostName);
hostSpec.addAlias(host);
hostSpec.setHostId(host);
return hostSpec;
}

Expand Down Expand Up @@ -577,6 +583,7 @@ public FetchTopologyResult(final boolean isCachedData, final List<HostSpec> host
}

static class ClusterSuggestedResult {

public String clusterId;
public boolean isPrimaryClusterId;

Expand All @@ -588,18 +595,10 @@ public ClusterSuggestedResult(final String clusterId, final boolean isPrimaryClu

@Override
public HostRole getHostRole(Connection conn) throws SQLException {
if (this.topologyAwareDialect == null) {
Dialect dialect = this.hostListProviderService.getDialect();
if (!(dialect instanceof TopologyAwareDatabaseCluster)) {
throw new SQLException(
Messages.get("AuroraHostListProvider.invalidDialectForGetHostRole",
new Object[]{dialect}));
}
this.topologyAwareDialect = (TopologyAwareDatabaseCluster) this.hostListProviderService.getDialect();
}

try (final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery(this.topologyAwareDialect.getIsReaderQuery())) {
final ResultSet rs = stmt.executeQuery(
getTopologyAwareDialect("AuroraHostListProvider.invalidDialectForGetHostRole")
.getIsReaderQuery())) {
if (rs.next()) {
boolean isReader = rs.getBoolean(1);
return isReader ? HostRole.READER : HostRole.WRITER;
Expand All @@ -610,4 +609,44 @@ public HostRole getHostRole(Connection conn) throws SQLException {

throw new SQLException(Messages.get("AuroraHostListProvider.errorGettingHostRole"));
}

@Override
public HostSpec identifyConnection(Connection connection) throws SQLException {
try (final Statement stmt = connection.createStatement();
final ResultSet resultSet = stmt.executeQuery(
getTopologyAwareDialect("AuroraHostListProvider.invalidDialectForIdentifyConnection")
.getNodeIdQuery())) {
if (resultSet.next()) {
final String instanceName = resultSet.getString(1);

final List<HostSpec> topology = this.getCachedTopology();

if (topology == null) {
return null;
}
return topology
.stream()
.filter(host -> Objects.equals(instanceName, host.getHostId()))
.findAny()
.orElse(null);
}
} catch (final SQLException e) {
throw new SQLException(Messages.get("AuroraHostListProvider.errorIdentifyConnection"), e);
}

throw new SQLException(Messages.get("AuroraHostListProvider.errorIdentifyConnection"));
}

private TopologyAwareDatabaseCluster getTopologyAwareDialect(String exceptionMessageIdentifier) throws SQLException {
if (this.topologyAwareDialect == null) {
Dialect dialect = this.hostListProviderService.getDialect();
if (!(dialect instanceof TopologyAwareDatabaseCluster)) {
throw new SQLException(
Messages.get(exceptionMessageIdentifier,
new Object[] {dialect}));
}
this.topologyAwareDialect = (TopologyAwareDatabaseCluster) this.hostListProviderService.getDialect();
}
return this.topologyAwareDialect;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package software.amazon.jdbc.hostlistprovider;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import org.checkerframework.checker.nullness.qual.NonNull;
import software.amazon.jdbc.AwsWrapperProperty;
Expand Down Expand Up @@ -109,4 +112,30 @@ public List<HostSpec> forceRefresh(final Connection connection) throws SQLExcept
public HostRole getHostRole(Connection connection) {
throw new UnsupportedOperationException("ConnectionStringHostListProvider does not support getHostRole");
}

@Override
public HostSpec identifyConnection(Connection connection) throws SQLException {
try (final Statement stmt = connection.createStatement();
final ResultSet resultSet = stmt.executeQuery(this.hostListProviderService.getDialect().getHostAliasQuery())) {
if (resultSet.next()) {
final String instance = resultSet.getString(1);

final List<HostSpec> topology = this.refresh(connection);

if (topology == null) {
return null;
}

return topology
.stream()
.filter(host -> Objects.equals(instance, host.getHostId()))
.findAny()
.orElse(null);
}
} catch (final SQLException e) {
throw new SQLException(Messages.get("ConnectionStringHostListProvider.errorIdentifyConnection"), e);
}

throw new SQLException(Messages.get("ConnectionStringHostListProvider.errorIdentifyConnection"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@
import software.amazon.jdbc.JdbcCallable;
import software.amazon.jdbc.NodeChangeOptions;
import software.amazon.jdbc.PluginService;
import software.amazon.jdbc.dialect.TopologyAwareDatabaseCluster;
import software.amazon.jdbc.hostlistprovider.AuroraHostListProvider;
import software.amazon.jdbc.plugin.failover.FailoverSQLException;
import software.amazon.jdbc.util.RdsUrlType;
import software.amazon.jdbc.util.RdsUtils;
import software.amazon.jdbc.util.StringUtils;
import software.amazon.jdbc.util.SubscribedMethodHelper;

public class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin {
Expand Down Expand Up @@ -93,8 +91,10 @@ public Connection connectInternal(
: hostSpec;

if (conn != null) {
if (!rdsHelper.isRdsInstance(currentHostSpec.getHost())) {
currentHostSpec.addAlias(getInstanceEndpoint(conn, currentHostSpec));
final RdsUrlType type = this.rdsHelper.identifyRdsType(hostSpec.getHost());
if (type.isRdsCluster()) {
hostSpec.resetAliases();
this.pluginService.fillAliases(conn, hostSpec);
}
}

Expand All @@ -110,16 +110,6 @@ public Connection forceConnect(String driverProtocol, HostSpec hostSpec, Propert
return connectInternal(hostSpec, forceConnectFunc);
}

private String getInstanceEndpointPattern(final String url) {
if (StringUtils.isNullOrEmpty(this.clusterInstanceTemplate)) {
this.clusterInstanceTemplate = AuroraHostListProvider.CLUSTER_INSTANCE_HOST_PATTERN.getString(this.props) == null
? rdsHelper.getRdsInstanceHostPattern(url)
: AuroraHostListProvider.CLUSTER_INSTANCE_HOST_PATTERN.getString(this.props);
}

return this.clusterInstanceTemplate;
}

@Override
public <T, E extends Exception> T execute(final Class<T> resultClass, final Class<E> exceptionClass,
final Object methodInvokeOn, final String methodName, final JdbcCallable<T, E> jdbcMethodFunc,
Expand Down Expand Up @@ -150,29 +140,6 @@ public void notifyNodeListChanged(final Map<String, EnumSet<NodeChangeOptions>>
}

private boolean isRoleChanged(final EnumSet<NodeChangeOptions> changes) {
return changes.contains(NodeChangeOptions.PROMOTED_TO_WRITER)
|| changes.contains(NodeChangeOptions.PROMOTED_TO_READER);
}

public String getInstanceEndpoint(final Connection conn, final HostSpec host) {
String instanceName = "?";

if (!(this.pluginService.getDialect() instanceof TopologyAwareDatabaseCluster)) {
return instanceName;
}
final TopologyAwareDatabaseCluster topologyAwareDialect =
(TopologyAwareDatabaseCluster) this.pluginService.getDialect();

try (final Statement stmt = conn.createStatement();
final ResultSet resultSet = stmt.executeQuery(topologyAwareDialect.getNodeIdQuery())) {
if (resultSet.next()) {
instanceName = resultSet.getString(1);
}
String instanceEndpoint = getInstanceEndpointPattern(host.getHost());
instanceEndpoint = host.isPortSpecified() ? instanceEndpoint + ":" + host.getPort() : instanceEndpoint;
return instanceEndpoint.replace("?", instanceName);
} catch (final SQLException e) {
return instanceName;
}
return changes.contains(NodeChangeOptions.PROMOTED_TO_READER);
}
}
Loading