Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ public interface HostListProvider {
* determine the host role
*/
HostRole getHostRole(Connection connection) throws SQLException;

HostSpec identifyConnection(Connection connection) throws SQLException;
}
14 changes: 14 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/HostSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class HostSpec {
protected Set<String> aliases = ConcurrentHashMap.newKeySet();
protected Set<String> allAliases = ConcurrentHashMap.newKeySet();
protected long weight; // Greater or equal 0. Lesser the weight, the healthier node.
protected String hostId;

public HostSpec(final String host) {
this.host = host;
Expand Down Expand Up @@ -148,6 +149,11 @@ public void removeAlias(final String... alias) {
});
}

public void resetAliases() {
this.aliases.clear();
this.allAliases.clear();
}

public String getUrl() {
String url = isPortSpecified() ? host + ":" + port : host;
if (!url.endsWith("/")) {
Expand All @@ -156,6 +162,14 @@ public String getUrl() {
return url;
}

public String getHostId() {
return hostId;
}

public void setHostId(String hostId) {
this.hostId = hostId;
}

public String asAlias() {
return isPortSpecified() ? host + ":" + port : host;
}
Expand Down
4 changes: 4 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/PluginService.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,8 @@ HostSpec getHostSpecByStrategy(HostRole role, String strategy)
Dialect getDialect();

void updateDialect(final @NonNull Connection connection) throws SQLException;

HostSpec identifyConnection(final Connection connection) throws SQLException;

void fillAliases(final Connection connection, final HostSpec hostSpec) throws SQLException;
}
37 changes: 37 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package software.amazon.jdbc;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
Expand All @@ -35,6 +37,7 @@
import software.amazon.jdbc.dialect.Dialect;
import software.amazon.jdbc.dialect.DialectManager;
import software.amazon.jdbc.dialect.DialectProvider;
import software.amazon.jdbc.dialect.TopologyAwareDatabaseCluster;
import software.amazon.jdbc.exceptions.ExceptionManager;
import software.amazon.jdbc.hostlistprovider.StaticHostListProvider;
import software.amazon.jdbc.util.CacheMap;
Expand Down Expand Up @@ -489,4 +492,38 @@ public void updateDialect(final @NonNull Connection connection) throws SQLExcept
connection);
}

@Override
public HostSpec identifyConnection(Connection connection) throws SQLException {
if (!(this.getDialect() instanceof TopologyAwareDatabaseCluster)) {
return this.getCurrentHostSpec();
}

return this.hostListProvider.identifyConnection(connection);
}

@Override
public void fillAliases(Connection connection, HostSpec hostSpec) throws SQLException {
if (!hostSpec.getAliases().isEmpty()) {
LOGGER.finest(() -> Messages.get("PluginServiceImpl.nonEmptyAliases", new Object[] {hostSpec.getAliases()}));
return;
}

hostSpec.addAlias(hostSpec.asAlias());

try (final Statement stmt = connection.createStatement()) {
try (final ResultSet rs = stmt.executeQuery(this.getDialect().getHostAliasQuery())) {
while (rs.next()) {
hostSpec.addAlias(rs.getString(1));
}
}
} catch (final SQLException sqlException) {
// log and ignore
LOGGER.finest(() -> Messages.get("PluginServiceImpl.failedToRetrieveHostPort"));
}

final HostSpec host = this.identifyConnection(connection);
if (host != this.currentHostSpec) {
hostSpec.addAlias(host.asAlias());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
Expand All @@ -42,6 +43,7 @@
import software.amazon.jdbc.HostSpec;
import software.amazon.jdbc.dialect.Dialect;
import software.amazon.jdbc.dialect.TopologyAwareDatabaseCluster;
import software.amazon.jdbc.plugin.readwritesplitting.ReadWriteSplittingSQLException;
import software.amazon.jdbc.util.CacheMap;
import software.amazon.jdbc.util.ConnectionUrlParser;
import software.amazon.jdbc.util.Messages;
Expand Down Expand Up @@ -273,7 +275,7 @@ private ClusterSuggestedResult getSuggestedClusterId(final String url) {
for (final HostSpec host : hosts) {
if (host.getUrl().equals(url)) {
LOGGER.finest(() -> Messages.get("AuroraHostListProvider.suggestedClusterId",
new Object[]{key, url}));
new Object[] {key, url}));
return new ClusterSuggestedResult(key, isPrimaryCluster);
}
}
Expand Down Expand Up @@ -398,8 +400,12 @@ private HostSpec createHost(final ResultSet resultSet) throws SQLException {
// Calculate weight based on node lag in time and CPU utilization.
final long weight = Math.round(nodeLag) * 100L + Math.round(cpuUtilization);

hostName = hostName == null ? "?" : hostName;
final String endpoint = getHostEndpoint(hostName);
return createHost(hostName, isWriter, weight);
}

private HostSpec createHost(String host, final boolean isWriter, final long weight) {
host = host == null ? "?" : host;
final String endpoint = getHostEndpoint(host);
final int port = this.clusterInstanceTemplate.isPortSpecified()
? this.clusterInstanceTemplate.getPort()
: this.initialHostSpec.getPort();
Expand All @@ -410,7 +416,8 @@ private HostSpec createHost(final ResultSet resultSet) throws SQLException {
isWriter ? HostRole.WRITER : HostRole.READER,
HostAvailability.AVAILABLE,
weight);
hostSpec.addAlias(hostName);
hostSpec.addAlias(host);
hostSpec.setHostId(host);
return hostSpec;
}

Expand Down Expand Up @@ -577,6 +584,7 @@ public FetchTopologyResult(final boolean isCachedData, final List<HostSpec> host
}

static class ClusterSuggestedResult {

public String clusterId;
public boolean isPrimaryClusterId;

Expand All @@ -593,13 +601,13 @@ public HostRole getHostRole(Connection conn) throws SQLException {
if (!(dialect instanceof TopologyAwareDatabaseCluster)) {
throw new SQLException(
Messages.get("AuroraHostListProvider.invalidDialectForGetHostRole",
new Object[]{dialect}));
new Object[] {dialect}));
}
this.topologyAwareDialect = (TopologyAwareDatabaseCluster) this.hostListProviderService.getDialect();
}

try (final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery(this.topologyAwareDialect.getIsReaderQuery())) {
final ResultSet rs = stmt.executeQuery(this.topologyAwareDialect.getIsReaderQuery())) {
if (rs.next()) {
boolean isReader = rs.getBoolean(1);
return isReader ? HostRole.READER : HostRole.WRITER;
Expand All @@ -610,4 +618,33 @@ public HostRole getHostRole(Connection conn) throws SQLException {

throw new SQLException(Messages.get("AuroraHostListProvider.errorGettingHostRole"));
}

@Override
public HostSpec identifyConnection(Connection connection) throws SQLException {
try (final Statement stmt = connection.createStatement();
final ResultSet resultSet = stmt.executeQuery(topologyAwareDialect.getNodeIdQuery())) {
if (resultSet.next()) {
final String instanceName = resultSet.getString(1);
if (this.getCachedTopology() == null) {
final HostSpec host = new HostSpec(getHostEndpoint(instanceName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we refresh topology with provided connection?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return null;

host.setHostId(instanceName);
return host;
}

return this.getCachedTopology()
.stream()
.filter(host -> Objects.equals(instanceName, host.getHostId()))
.findAny()
.orElseGet(() -> {
final HostSpec host = new HostSpec(getHostEndpoint(instanceName));
host.setHostId(instanceName);
return host;
});
}
} catch (final SQLException e) {
throw new SQLException(Messages.get("AuroraHostListProvider.errorIdentifyingConnection"), e);
}

throw new SQLException(Messages.get("AuroraHostListProvider.errorIdentifyingConnection"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,9 @@ public List<HostSpec> forceRefresh(final Connection connection) throws SQLExcept
public HostRole getHostRole(Connection connection) {
throw new UnsupportedOperationException("ConnectionStringHostListProvider does not support getHostRole");
}

@Override
public HostSpec identifyConnection(Connection connection) throws SQLException {
return this.hostList.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we need to get sql from dialect, fetch node id, try to find it in topology.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ public Connection connectInternal(
: hostSpec;

if (conn != null) {
if (!rdsHelper.isRdsInstance(currentHostSpec.getHost())) {
currentHostSpec.addAlias(getInstanceEndpoint(conn, currentHostSpec));
}
this.pluginService.fillAliases(conn, hostSpec);
}

tracker.populateOpenedConnectionQueue(currentHostSpec, conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,32 +241,6 @@ public void releaseResources() {
this.monitorService = null;
}

/**
* Generate a set of node keys representing the node to monitor.
*
* @param driverProtocol Driver protocol for provided connection
* @param connection the connection to a specific node.
* @param hostSpec host details to add node keys to
*/
private void generateHostAliases(
final @NonNull String driverProtocol,
final @NonNull Connection connection,
final @NonNull HostSpec hostSpec) {

hostSpec.addAlias(hostSpec.asAlias());

try (final Statement stmt = connection.createStatement()) {
try (final ResultSet rs = stmt.executeQuery(this.pluginService.getDialect().getHostAliasQuery())) {
while (rs.next()) {
hostSpec.addAlias(rs.getString(1));
}
}
} catch (final SQLException sqlException) {
// log and ignore
LOGGER.finest(() -> Messages.get("HostMonitoringConnectionPlugin.failedToRetrieveHostPort"));
}
}

@Override
public OldConnectionSuggestedAction notifyConnectionChanged(final EnumSet<NodeChangeOptions> changes) {

Expand Down Expand Up @@ -298,7 +272,8 @@ private Connection connectInternal(String driverProtocol, HostSpec hostSpec,
final Connection conn = connectFunc.call();

if (conn != null) {
generateHostAliases(driverProtocol, conn, hostSpec);
hostSpec.resetAliases();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to reset? Some other plugin might already filled aliases for this host spec.

this.pluginService.fillAliases(conn, hostSpec);
}

return conn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package software.amazon.jdbc.plugin.efm;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
Expand All @@ -42,6 +43,7 @@ public class MonitorServiceImpl implements MonitorService {
"60000",
"Interval in milliseconds for a monitor to be considered inactive and to be disposed.");

private final PluginService pluginService;
private MonitorThreadContainer threadContainer;

final MonitorInitializer monitorInitializer;
Expand All @@ -50,6 +52,7 @@ public class MonitorServiceImpl implements MonitorService {

public MonitorServiceImpl(final @NonNull PluginService pluginService) {
this(
pluginService,
(hostSpec, properties, monitorService) ->
new MonitorImpl(
pluginService,
Expand All @@ -67,9 +70,11 @@ public MonitorServiceImpl(final @NonNull PluginService pluginService) {
}

MonitorServiceImpl(
final PluginService pluginService,
final MonitorInitializer monitorInitializer,
final ExecutorServiceInitializer executorServiceInitializer) {

this.pluginService = pluginService;
this.monitorInitializer = monitorInitializer;
this.threadContainer = MonitorThreadContainer.getInstance(executorServiceInitializer);
}
Expand All @@ -89,7 +94,12 @@ public MonitorConnectionContext startMonitoring(
() -> Messages.get(
"MonitorServiceImpl.emptyAliasSet",
new Object[] {hostSpec}));
hostSpec.addAlias(hostSpec.asAlias());
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this entire try-catch block. We just rely on provided nodeKeys, and if they empty then we raise an exception.

this.pluginService.fillAliases(connectionToAbort, hostSpec);
} catch (SQLException e) {
// Log and ignore the error.
LOGGER.finest(Messages.get("MonitorServiceImpl.errorPopulatingAliases", new Object[] {e}));
}
}

final Monitor monitor;
Expand Down
9 changes: 6 additions & 3 deletions wrapper/src/main/resources/messages.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ AuroraHostListProvider.invalidQuery=Error obtaining host list. Provided database
AuroraHostListProvider.invalidDialect=Expecting a dialect that supports a cluster topology.
AuroraHostListProvider.invalidDialectForGetHostRole=An Aurora dialect is required to analyze a host's role. The detected dialect was ''{0}''
AuroraHostListProvider.errorGettingHostRole=An error occurred while obtaining the connected host's role. This could occur if the connection is broken or if you are not connected to an Aurora database.
AuroraHostListProvider.errorIdentifyingConnection=An error occurred while obtaining the connection's host ID.

# AWS Credentials Manager
AwsCredentialsManager.nullProvider=The configured AwsCredentialsProvider was null. If you have configured the AwsCredentialsManager to use a custom AwsCredentialsProviderHandler, please ensure the handler does not return null.
Expand Down Expand Up @@ -139,7 +140,6 @@ Failover.noOperationsAfterConnectionClosed=No operations allowed after connectio
HostMonitoringConnectionPlugin.activatedMonitoring=Executing method ''{0}'', monitoring is activated.
HostMonitoringConnectionPlugin.monitoringDeactivated=Monitoring deactivated for method ''{0}''.
HostMonitoringConnectionPlugin.unavailableNode=Node ''{0}'' is unavailable.
HostMonitoringConnectionPlugin.failedToRetrieveHostPort=Could not retrieve Host:Port for connection.
HostMonitoringConnectionPlugin.unsupportedDriverProtocol=Driver protocol ''{0}'' is not supported.

# IAM Auth Connection Plugin
Expand All @@ -164,15 +164,18 @@ MonitorThreadContainer.emptyNodeKeys=Provided node keys are empty.
MonitorImpl.contextNullWarning=Parameter 'context' should not be null.

# Monitor Service Impl
MonitorServiceImpl.nullMonitorParam=Parameter monitor' should not be null.
MonitorServiceImpl.emptyAliasSet=Empty alias set passed for {0}. Set should not be empty.
MonitorServiceImpl.nullMonitorParam=Parameter 'monitor' should not be null.
MonitorServiceImpl.emptyAliasSet=Empty alias set passed for ''{0}''. Set should not be empty.
MonitorServiceImpl.errorPopulatingAliases=Error occurred while populating aliases: ''{0}''.

# Plugin Service Impl
PluginServiceImpl.hostListEmpty=Current host list is empty.
PluginServiceImpl.releaseResources=Releasing resources.
PluginServiceImpl.hostListException=Exception while getting a host list.
PluginServiceImpl.hostAliasNotFound=Can''t find any host by the following aliases: ''{0}''.
PluginServiceImpl.hostsChangelistEmpty=There are no changes in the hosts' availability.
PluginServiceImpl.failedToRetrieveHostPort=Could not retrieve Host:Port for connection.
PluginServiceImpl.nonEmptyAliases=fillAliases called when HostSpec already contains the following aliases: ''{0}''. Please reset HostSpec aliases before calling this method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Please reset HostSpec aliases before calling this method."

I think it may mislead developers.
"Please reset HostSpec aliases if you need to re-fill them again." ?


# Property Utils
PropertyUtils.setMethodDoesNotExistOnTarget=Set method for property ''{0}'' does not exist on target ''{1}''.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,16 @@ public Dialect getDialect() {
}

public void updateDialect(final @NonNull Connection connection) throws SQLException { }

@Override
public HostSpec identifyConnection(Connection connection) throws SQLException {
return null;
}

@Override
public void fillAliases(Connection connection, HostSpec hostSpec) throws SQLException {

}
}

public static class TestConnection implements Connection {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,5 +523,10 @@ public List<HostSpec> forceRefresh(Connection connection) {
public HostRole getHostRole(Connection conn) {
return HostRole.WRITER;
}

@Override
public HostSpec identifyConnection(Connection connection) throws SQLException {
return new HostSpec("foo");
}
}
}