Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Logger;
import software.amazon.jdbc.HostSpec;
import software.amazon.jdbc.JdbcCallable;
import software.amazon.jdbc.NodeChangeOptions;
Expand All @@ -39,6 +40,8 @@

public class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin {

private static final Logger LOGGER = Logger.getLogger(AuroraConnectionTrackerPlugin.class.getName());

static final String METHOD_ABORT = "Connection.abort";
static final String METHOD_CLOSE = "Connection.close";
private static final Set<String> subscribedMethods =
Expand Down Expand Up @@ -94,7 +97,19 @@ public Connection connectInternal(

if (conn != null) {
if (!rdsHelper.isRdsInstance(currentHostSpec.getHost())) {
currentHostSpec.addAlias(getInstanceEndpoint(conn, currentHostSpec));
// Remove pre-existing instance endpoints.
// If the host url is a cluster endpoint, the driver may be adding multiple instance endpoints to different
// instances if we don't remove pre-existing ones.
currentHostSpec.getAliases().forEach(alias -> {
if (this.rdsHelper.isRdsInstance(alias)) {
currentHostSpec.removeAlias(alias);
}
});

final String instanceEndpoint = getInstanceEndpoint(conn, currentHostSpec);
if (!StringUtils.isNullOrEmpty(instanceEndpoint)) {
currentHostSpec.addAlias(instanceEndpoint);
}
}
}

Expand Down Expand Up @@ -124,7 +139,13 @@ private String getInstanceEndpointPattern(final String url) {
public <T, E extends Exception> T execute(final Class<T> resultClass, final Class<E> exceptionClass,
final Object methodInvokeOn, final String methodName, final JdbcCallable<T, E> jdbcMethodFunc,
final Object[] jdbcMethodArgs) throws E {
final HostSpec originalHost = this.pluginService.getCurrentHostSpec();
String originalHost = this.pluginService.getCurrentHostSpec().getUrl();
if (!this.rdsHelper.isRdsInstance(originalHost)) {
originalHost = this.pluginService.getCurrentHostSpec().getAliases().stream()
.filter(this.rdsHelper::isRdsInstance).findAny()
.orElse(null);
}

try {
final T result = jdbcMethodFunc.call();
if ((methodName.equals(METHOD_CLOSE) || methodName.equals(METHOD_ABORT))) {
Expand All @@ -142,6 +163,15 @@ public <T, E extends Exception> T execute(final Class<T> resultClass, final Clas

@Override
public void notifyNodeListChanged(final Map<String, EnumSet<NodeChangeOptions>> changes) {
final StringBuilder sb = new StringBuilder("Tracker Changes:");
for (final Map.Entry<String, EnumSet<NodeChangeOptions>> change : changes.entrySet()) {
if (sb.length() > 0) {
sb.append("\n");
}
sb.append(String.format("\tHost '%s': %s", change.getKey(), change.getValue()));
}
LOGGER.finest(sb.toString());

for (final String node : changes.keySet()) {
if (isRoleChanged(changes.get(node))) {
tracker.invalidateAllConnections(node);
Expand All @@ -150,15 +180,14 @@ public void notifyNodeListChanged(final Map<String, EnumSet<NodeChangeOptions>>
}

private boolean isRoleChanged(final EnumSet<NodeChangeOptions> changes) {
return changes.contains(NodeChangeOptions.PROMOTED_TO_WRITER)
|| changes.contains(NodeChangeOptions.PROMOTED_TO_READER);
return changes.contains(NodeChangeOptions.PROMOTED_TO_READER);
}

public String getInstanceEndpoint(final Connection conn, final HostSpec host) {
String instanceName = "?";

if (!(this.pluginService.getDialect() instanceof TopologyAwareDatabaseCluster)) {
return instanceName;
return null;
}
final TopologyAwareDatabaseCluster topologyAwareDialect =
(TopologyAwareDatabaseCluster) this.pluginService.getDialect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,29 +77,22 @@ public void populateOpenedConnectionQueue(final HostSpec hostSpec, final Connect
}

/**
* Invalidates all opened connections pointing to the same node in a daemon thread. 1
* Invalidates all opened connections pointing to the nodes specified.
*
* @param hostSpec The {@link HostSpec} object containing the url of the node.
* @param node A list of endpoints.
*/
public void invalidateAllConnections(final HostSpec hostSpec) {
invalidateAllConnections(hostSpec.getAliases().toArray(new String[] {}));
}

public void invalidateAllConnections(final String... node) {
final Optional<String> instanceEndpoint = Arrays.stream(node).filter(rdsUtils::isRdsInstance).findFirst();
if (!instanceEndpoint.isPresent()) {
return;
}
final Queue<WeakReference<Connection>> connectionQueue = openedConnections.get(instanceEndpoint.get());
LOGGER.finest("invalidateAllConnections");
logConnectionQueue(instanceEndpoint.get(), connectionQueue);
invalidateConnections(openedConnections.get(instanceEndpoint.get()));
}

public void invalidateCurrentConnection(final HostSpec hostSpec, final Connection connection) {
final String host = rdsUtils.isRdsInstance(hostSpec.getHost())
? hostSpec.getHost()
: hostSpec.getAliases().stream().filter(rdsUtils::isRdsInstance).findFirst().orElse(null);

public void invalidateCurrentConnection(final String host, final Connection connection) {
if (StringUtils.isNullOrEmpty(host)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class AuroraConnectionTrackerPluginTest {
@Mock JdbcCallable<ResultSet, SQLException> mockSqlFunction;
@Mock JdbcCallable<Void, SQLException> mockCloseOrAbortFunction;
private static final Object[] SQL_ARGS = {"sql"};
@Mock HostSpec mockHostSpec;

private AutoCloseable closeable;

Expand All @@ -79,6 +80,7 @@ void setUp() throws SQLException {
when(mockRdsUtils.getRdsInstanceHostPattern(any(String.class))).thenReturn("?");
when(mockPluginService.getCurrentConnection()).thenReturn(mockConnection);
when(mockPluginService.getDialect()).thenReturn(mockTopologyAwareDialect);
when(mockPluginService.getCurrentHostSpec()).thenReturn(mockHostSpec);
when(((TopologyAwareDatabaseCluster) mockTopologyAwareDialect).getNodeIdQuery()).thenReturn("any");
}

Expand Down Expand Up @@ -215,8 +217,9 @@ public void testTrackNewConnections_nonRdsInstanceUrl_withoutClusterInstanceHost
@Test
public void testInvalidateOpenedConnections() throws SQLException {
final FailoverSQLException expectedException = new FailoverSQLException("reason", "sqlstate");
final HostSpec originalHost = new HostSpec("host");
when(mockPluginService.getCurrentHostSpec()).thenReturn(originalHost);
final String originalHost = "host";
when(mockRdsUtils.isRdsInstance(eq(originalHost))).thenReturn(true);
when(mockHostSpec.getUrl()).thenReturn(originalHost);
doThrow(expectedException).when(mockSqlFunction).call();

final AuroraConnectionTrackerPlugin plugin = new AuroraConnectionTrackerPlugin(
Expand All @@ -242,8 +245,9 @@ public void testInvalidateOpenedConnections() throws SQLException {
@ParameterizedTest
@ValueSource(strings = {AuroraConnectionTrackerPlugin.METHOD_ABORT, AuroraConnectionTrackerPlugin.METHOD_CLOSE})
public void testInvalidateConnectionsOnCloseOrAbort(final String method) throws SQLException {
final HostSpec originalHost = new HostSpec("host");
when(mockPluginService.getCurrentHostSpec()).thenReturn(originalHost);
final String originalHost = "host";
when(mockRdsUtils.isRdsInstance(eq(originalHost))).thenReturn(true);
when(mockHostSpec.getUrl()).thenReturn(originalHost);

final AuroraConnectionTrackerPlugin plugin = new AuroraConnectionTrackerPlugin(
mockPluginService,
Expand Down
2 changes: 2 additions & 0 deletions wrapper/src/test/resources/logging-test.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tT.%1$tL] [%4$-7s] %2$s%n %5$

software.amazon.jdbc.level=ALL

software.amazon.jdbc.plugin.AuroraConnectionTrackerPlugin.level=FINEST

integration.container.level=FINEST
integration.container.aurora.postgres.AuroraPostgresStaleDnsTest.level=FINEST
integration.container.aurora.postgres.AuroraAdvancedPerformanceTest.level=FINEST
Expand Down