Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,24 @@
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.jdbc.cleanup.CanReleaseResources;
import software.amazon.jdbc.exceptions.ExceptionManager;
import software.amazon.jdbc.hostlistprovider.StaticHostListProvider;
import software.amazon.jdbc.util.ExpiringCache;
import software.amazon.jdbc.util.CacheMap;
import software.amazon.jdbc.util.Messages;

public class PluginServiceImpl implements PluginService, CanReleaseResources,
HostListProviderService, PluginManagerService {

private static final Logger LOGGER = Logger.getLogger(PluginServiceImpl.class.getName());
private static final int DEFAULT_HOST_AVAILABILITY_CACHE_EXPIRE_MS = 5 * 60 * 1000; // 5 min
protected static final long DEFAULT_HOST_AVAILABILITY_CACHE_EXPIRE_NANO = TimeUnit.MINUTES.toNanos(5);

protected static final ExpiringCache<String, HostAvailability> hostAvailabilityExpiringCache = new ExpiringCache<>(
DEFAULT_HOST_AVAILABILITY_CACHE_EXPIRE_MS);
protected static final CacheMap<String, HostAvailability> hostAvailabilityExpiringCache = new CacheMap<>();
protected final ConnectionPluginManager pluginManager;
private final Properties props;
private final String originalUrl;
Expand Down Expand Up @@ -258,7 +258,8 @@ public void setAvailability(final @NonNull Set<String> hostAliases, final @NonNu
for (final HostSpec host : hostsToChange) {
final HostAvailability currentAvailability = host.getAvailability();
host.setAvailability(availability);
hostAvailabilityExpiringCache.put(host.getUrl(), availability);
hostAvailabilityExpiringCache.put(host.getUrl(), availability,
DEFAULT_HOST_AVAILABILITY_CACHE_EXPIRE_NANO);
if (currentAvailability != availability) {
final EnumSet<NodeChangeOptions> hostChanges;
if (availability == HostAvailability.AVAILABLE) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -41,6 +40,7 @@
import software.amazon.jdbc.cleanup.CanReleaseResources;
import software.amazon.jdbc.plugin.AbstractConnectionPlugin;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.SubscribedMethodHelper;

/**
* Monitor the server while the connection is executing methods for more sophisticated failure
Expand Down Expand Up @@ -77,16 +77,13 @@ public class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin
"Number of failed connection checks before considering database node unhealthy.");

private static final Set<String> subscribedMethods =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("*")));
Collections.unmodifiableSet(new HashSet<>(Collections.singletonList("*")));

private static final String MYSQL_RETRIEVE_HOST_PORT_SQL =
"SELECT CONCAT(@@hostname, ':', @@port)";
private static final String PG_RETRIEVE_HOST_PORT_SQL =
"SELECT CONCAT(inet_server_addr(), ':', inet_server_port())";

private static final List<String> METHODS_TO_SKIP_MONITORING =
Arrays.asList(".get", ".abort", ".close", ".next", ".create");

protected @NonNull Properties properties;
private final @NonNull Supplier<MonitorService> monitorServiceSupplier;
private final @NonNull PluginService pluginService;
Expand All @@ -111,10 +108,13 @@ public HostMonitoringConnectionPlugin(
final @NonNull Supplier<MonitorService> monitorServiceSupplier) {
if (pluginService == null) {
throw new IllegalArgumentException("pluginService");
} else if (properties == null) {
}
if (properties == null) {
throw new IllegalArgumentException("properties");
}

if (monitorServiceSupplier == null) {
throw new IllegalArgumentException("monitorServiceSupplier");
}
this.pluginService = pluginService;
this.properties = properties;
this.monitorServiceSupplier = monitorServiceSupplier;
Expand Down Expand Up @@ -142,7 +142,7 @@ public <T, E extends Exception> T execute(
// update config settings since they may change
final boolean isEnabled = FAILURE_DETECTION_ENABLED.getBoolean(this.properties);

if (!isEnabled || !this.doesNeedMonitoring(methodName)) {
if (!isEnabled || !SubscribedMethodHelper.NETWORK_BOUND_METHODS.contains(methodName)) {
return jdbcMethodFunc.call();
}

Expand Down Expand Up @@ -179,34 +179,36 @@ public <T, E extends Exception> T execute(

} finally {
if (monitorContext != null) {
this.monitorService.stopMonitoring(monitorContext);

final boolean isConnectionClosed;
try {
isConnectionClosed = this.pluginService.getCurrentConnection().isClosed();
} catch (final SQLException e) {
throw castException(exceptionClass, e);
}

if (monitorContext.isNodeUnhealthy()) {

this.pluginService.setAvailability(this.nodeKeys, HostAvailability.NOT_AVAILABLE);

if (!isConnectionClosed) {
abortConnection();
throw castException(
exceptionClass,
new SQLException(
Messages.get(
"HostMonitoringConnectionPlugin.unavailableNode",
new Object[] {this.pluginService.getCurrentHostSpec().asAlias()})));
synchronized (monitorContext) {
this.monitorService.stopMonitoring(monitorContext);

if (monitorContext.isNodeUnhealthy()) {
this.pluginService.setAvailability(this.nodeKeys, HostAvailability.NOT_AVAILABLE);

final boolean isConnectionClosed;
try {
isConnectionClosed = this.pluginService.getCurrentConnection().isClosed();
} catch (final SQLException e) {
throw castException(exceptionClass, e);
}

if (!isConnectionClosed) {
abortConnection();
throw castException(
exceptionClass,
new SQLException(
Messages.get(
"HostMonitoringConnectionPlugin.unavailableNode",
new Object[] {this.pluginService.getCurrentHostSpec().asAlias()})));
}
}
}

LOGGER.finest(
() -> Messages.get(
"HostMonitoringConnectionPlugin.monitoringDeactivated",
new Object[] {methodName}));
}
LOGGER.finest(
() -> Messages.get(
"HostMonitoringConnectionPlugin.monitoringDeactivated",
new Object[] {methodName}));
}

return result;
Expand All @@ -229,24 +231,6 @@ void abortConnection() {
}
}

/**
* Checks whether the JDBC method passed to this connection plugin requires monitoring.
*
* @param methodName Name of the JDBC method.
* @return true if the method requires monitoring; false otherwise.
*/
protected boolean doesNeedMonitoring(final String methodName) {

for (final String method : METHODS_TO_SKIP_MONITORING) {
if (methodName.contains(method)) {
return false;
}
}

// Monitor all the other methods
return true;
}

private void initMonitorService() {
if (this.monitorService == null) {
this.monitorService = this.monitorServiceSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import software.amazon.jdbc.util.Messages;

Expand All @@ -38,51 +33,44 @@ public class MonitorConnectionContext {
private final long failureDetectionIntervalMillis;
private final long failureDetectionTimeMillis;
private final long failureDetectionCount;

private final Set<String> hostAliases; // Variable is never written, so it does not need to be thread-safe
private final Connection connectionToAbort;
private final Monitor monitor;

private final AtomicBoolean activeContext = new AtomicBoolean(true);
private final AtomicBoolean nodeUnhealthy = new AtomicBoolean();
private final AtomicLong startMonitorTimeNano = new AtomicLong();
private volatile boolean activeContext = true;
private volatile boolean nodeUnhealthy = false;
private long startMonitorTimeNano;
private long expectedActiveMonitoringStartTimeNano;
private long invalidNodeStartTimeNano; // Only accessed by monitor thread
private long failureCount; // Only accessed by monitor thread

/**
* Constructor.
*
* @param monitor A reference to a monitor object.
* @param connectionToAbort A reference to the connection associated with this context that will
* be aborted in case of server failure.
* @param hostAliases All valid references to the server.
* @param failureDetectionTimeMillis Grace period after which node monitoring starts.
* @param failureDetectionIntervalMillis Interval between each failed connection check.
* @param failureDetectionCount Number of failed connection checks before considering database
* node as unhealthy.
*/
public MonitorConnectionContext(
Monitor monitor,
Connection connectionToAbort,
Set<String> hostAliases,
long failureDetectionTimeMillis,
long failureDetectionIntervalMillis,
long failureDetectionCount) {
this.monitor = monitor;
this.connectionToAbort = connectionToAbort;
// Variable is never written, so it does not need to be thread-safe
this.hostAliases = new HashSet<>(hostAliases);
this.failureDetectionTimeMillis = failureDetectionTimeMillis;
this.failureDetectionIntervalMillis = failureDetectionIntervalMillis;
this.failureDetectionCount = failureDetectionCount;
}

void setStartMonitorTimeNano(long startMonitorTimeNano) {
this.startMonitorTimeNano.set(startMonitorTimeNano);
}

Set<String> getHostAliases() {
return Collections.unmodifiableSet(this.hostAliases);
}

public long getFailureDetectionTimeMillis() {
return failureDetectionTimeMillis;
this.startMonitorTimeNano = startMonitorTimeNano;
this.expectedActiveMonitoringStartTimeNano = startMonitorTimeNano
+ TimeUnit.MILLISECONDS.toNanos(this.failureDetectionTimeMillis);
}

public long getFailureDetectionIntervalMillis() {
Expand All @@ -97,6 +85,14 @@ public long getFailureCount() {
return this.failureCount;
}

public long getExpectedActiveMonitoringStartTimeNano() {
return this.expectedActiveMonitoringStartTimeNano;
}

public Monitor getMonitor() {
return this.monitor;
}

void setFailureCount(long failureCount) {
this.failureCount = failureCount;
}
Expand All @@ -113,28 +109,28 @@ boolean isInvalidNodeStartTimeDefined() {
return this.invalidNodeStartTimeNano > 0;
}

public long getInvalidNodeStartTimeNano() {
long getInvalidNodeStartTimeNano() {
return this.invalidNodeStartTimeNano;
}

public boolean isNodeUnhealthy() {
return this.nodeUnhealthy.get();
return this.nodeUnhealthy;
}

void setNodeUnhealthy(boolean nodeUnhealthy) {
this.nodeUnhealthy.set(nodeUnhealthy);
this.nodeUnhealthy = nodeUnhealthy;
}

public boolean isActiveContext() {
return this.activeContext.get();
return this.activeContext;
}

public void invalidate() {
this.activeContext.set(false);
public void setInactive() {
this.activeContext = false;
}

synchronized void abortConnection() {
if (this.connectionToAbort == null || !this.activeContext.get()) {
void abortConnection() {
if (this.connectionToAbort == null || !this.activeContext) {
return;
}

Expand All @@ -153,19 +149,25 @@ synchronized void abortConnection() {
* Update whether the connection is still valid if the total elapsed time has passed the grace
* period.
*
* @param hostName A node name for logging purposes.
* @param statusCheckStartTimeNano The time when connection status check started in nanos.
* @param statusCheckEndTimeNano The time when connection status check ended in nanos.
* @param isValid Whether the connection is valid.
*/
public void updateConnectionStatus(long statusCheckStartTimeNano, long statusCheckEndTimeNano, boolean isValid) {
if (!this.activeContext.get()) {
public void updateConnectionStatus(
String hostName,
long statusCheckStartTimeNano,
long statusCheckEndTimeNano,
boolean isValid) {

if (!this.activeContext) {
return;
}

final long totalElapsedTimeNano = statusCheckEndTimeNano - this.startMonitorTimeNano.get();
final long totalElapsedTimeNano = statusCheckEndTimeNano - this.startMonitorTimeNano;

if (totalElapsedTimeNano > TimeUnit.MILLISECONDS.toNanos(this.failureDetectionTimeMillis)) {
this.setConnectionValid(isValid, statusCheckStartTimeNano, statusCheckEndTimeNano);
this.setConnectionValid(hostName, isValid, statusCheckStartTimeNano, statusCheckEndTimeNano);
}
}

Expand All @@ -181,11 +183,16 @@ public void updateConnectionStatus(long statusCheckStartTimeNano, long statusChe
* <li>{@code failureDetectionCount}
* </ul>
*
* @param hostName A node name for logging purposes.
* @param connectionValid Boolean indicating whether the server is still responsive.
* @param statusCheckStartNano The time when connection status check started in nanos.
* @param statusCheckEndNano The time when connection status check ended in nanos.
*/
void setConnectionValid(boolean connectionValid, long statusCheckStartNano, long statusCheckEndNano) {
void setConnectionValid(
String hostName,
boolean connectionValid,
long statusCheckStartNano,
long statusCheckEndNano) {

if (!connectionValid) {
this.failureCount++;
Expand All @@ -200,7 +207,7 @@ void setConnectionValid(boolean connectionValid, long statusCheckStartNano, long
* Math.max(0, this.getFailureDetectionCount());

if (invalidNodeDurationNano >= TimeUnit.MILLISECONDS.toNanos(maxInvalidNodeDurationMillis)) {
LOGGER.fine(() -> Messages.get("MonitorConnectionContext.hostDead", new Object[] {hostAliases}));
LOGGER.fine(() -> Messages.get("MonitorConnectionContext.hostDead", new Object[] {hostName}));
this.setNodeUnhealthy(true);
this.abortConnection();
return;
Expand All @@ -209,7 +216,7 @@ void setConnectionValid(boolean connectionValid, long statusCheckStartNano, long
LOGGER.finest(
() -> Messages.get(
"MonitorConnectionContext.hostNotResponding",
new Object[] {hostAliases, this.getFailureCount()}));
new Object[] {hostName, this.getFailureCount()}));
return;
}

Expand All @@ -219,6 +226,6 @@ void setConnectionValid(boolean connectionValid, long statusCheckStartNano, long

LOGGER.finest(
() -> Messages.get("MonitorConnectionContext.hostAlive",
new Object[] {hostAliases}));
new Object[] {hostName}));
}
}
Loading