Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.checkerframework.checker.nullness.qual.NonNull;
import software.amazon.jdbc.HostSpec;
import software.amazon.jdbc.PluginService;
import software.amazon.jdbc.RoundRobinHostSelector;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.PropertyUtils;
import software.amazon.jdbc.util.SlidingExpirationCacheWithCleanupThread;
Expand Down Expand Up @@ -104,6 +103,15 @@
@Override
public void close() throws Exception {
this.stopped.set(true);
try {
if (this.monitoringConn != null && !this.monitoringConn.isClosed()) {
this.monitoringConn.close();
}
} catch (final SQLException ex) {
// ignore
}

this.monitoringConn = null;

// Waiting for 5s gives a thread enough time to exit monitoring loop and close database connection.
if (!this.threadPool.awaitTermination(5, TimeUnit.SECONDS)) {
Expand All @@ -120,46 +128,65 @@
"LimitlessRouterMonitor.running",
new Object[] {this.hostSpec.getHost()}));

while (!this.stopped.get()) {
TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext(
"limitless router monitor thread", TelemetryTraceLevel.TOP_LEVEL);
telemetryContext.setAttribute("url", hostSpec.getUrl());
try {
this.openConnection();
if (this.monitoringConn == null || this.monitoringConn.isClosed()) {
continue;
}
List<HostSpec> newLimitlessRouters = queryHelper.queryForLimitlessRouters(this.monitoringConn,
this.hostSpec.getPort());

limitlessRouterCache.put(
this.limitlessRouterCacheKey,
newLimitlessRouters,
TimeUnit.MILLISECONDS.toNanos(LimitlessRouterServiceImpl.MONITOR_DISPOSAL_TIME_MS.getLong(props)));

LOGGER.finest(Utils.logTopology(newLimitlessRouters, "[limitlessRouterMonitor] Topology:"));
TimeUnit.MILLISECONDS.sleep(this.intervalMs); // do not include this in the telemetry
} catch (final InterruptedException exception) {
LOGGER.finest(
() -> Messages.get(
"LimitlessRouterMonitor.interruptedExceptionDuringMonitoring",
new Object[] {this.hostSpec.getHost()}));
} catch (final Exception ex) {
// this should not be reached; log and exit thread
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.log(
Level.FINEST,
Messages.get(
"LimitlessRouterMonitor.exceptionDuringMonitoringStop",
new Object[] {this.hostSpec.getHost()}),
ex); // We want to print full trace stack of the exception.
try {
while (!this.stopped.get()) {
TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext(
"limitless router monitor thread", TelemetryTraceLevel.TOP_LEVEL);
telemetryContext.setAttribute("url", hostSpec.getUrl());
try {
this.openConnection();
if (this.monitoringConn == null || this.monitoringConn.isClosed()) {
continue;
}
List<HostSpec> newLimitlessRouters = queryHelper.queryForLimitlessRouters(this.monitoringConn,
this.hostSpec.getPort());

limitlessRouterCache.put(
this.limitlessRouterCacheKey,
newLimitlessRouters,
TimeUnit.MILLISECONDS.toNanos(LimitlessRouterServiceImpl.MONITOR_DISPOSAL_TIME_MS.getLong(props)));

LOGGER.finest(Utils.logTopology(newLimitlessRouters, "[limitlessRouterMonitor] Topology:"));
TimeUnit.MILLISECONDS.sleep(this.intervalMs); // do not include this in the telemetry
} catch (final Exception ex) {
if (telemetryContext != null) {
telemetryContext.setException(ex);
telemetryContext.setSuccess(false);
}
throw ex;
} finally {
if (telemetryContext != null) {

Check warning on line 158 in wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterMonitor.java

View workflow job for this annotation

GitHub Actions / qodana

Constant values

Condition `telemetryContext != null` is always `true`
telemetryContext.closeContext();
}
}
} finally {
if (telemetryContext != null) {
telemetryContext.closeContext();
}
} catch (final InterruptedException exception) {
LOGGER.finest(
() -> Messages.get(
"LimitlessRouterMonitor.interruptedExceptionDuringMonitoring",
new Object[] {this.hostSpec.getHost()}));
} catch (final Exception ex) {
// this should not be reached; log and exit thread
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.log(
Level.FINEST,
Messages.get(
"LimitlessRouterMonitor.exceptionDuringMonitoringStop",
new Object[] {this.hostSpec.getHost()}),
ex); // We want to print full trace stack of the exception.
}
} finally {
this.stopped.set(true);
try {
if (this.monitoringConn != null && !this.monitoringConn.isClosed()) {
this.monitoringConn.close();
}
} catch (final SQLException ex) {
// ignore
}
this.monitoringConn = null;
}

}

private void openConnection() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public class LimitlessRouterServiceImpl implements LimitlessRouterService {
try {
limitlessRouterMonitor.close();
} catch (Exception e) {
// ignore
LOGGER.warning(Messages.get("LimitlessRouterServiceImpl.errorClosingMonitor",
new Object[]{e.getMessage()}));
}
},
CACHE_CLEANUP_NANO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,19 +239,20 @@ LimitlessConnectionPlugin.unsupportedDialectOrDatabase=Unsupported dialect ''{0}
LimitlessQueryHelper.unsupportedDialectOrDatabase=Unsupported dialect ''{0}'' encountered. Please ensure JDBC connection parameters are correct, and refer to the documentation to ensure that the connecting database is compatible with the Limitless Connection Plugin.

# Limitless Router Monitor
LimitlessRouterMonitor.exceptionDuringMonitoringStop=Unhandled exception was thrown in Limitless Router Monitoring thread for node {0}.
LimitlessRouterMonitor.exceptionDuringMonitoringStop=Stopping monitoring after unhandled exception was thrown in Limitless Router Monitoring thread for node {0}.
LimitlessRouterMonitor.interruptedExceptionDuringMonitoring=Limitless Router Monitoring thread for node {0} was interrupted.
LimitlessRouterMonitor.invalidQuery=Limitless Connection Plugin has encountered an error obtaining Limitless Router endpoints. Please ensure that you are connecting to an Aurora Limitless Database Shard Group Endpoint URL.
LimitlessRouterMonitor.invalidRouterLoad=Invalid load metric value of ''{1}''from the transaction router query aurora_limitless_router_endpoints() for transaction router ''{0}''. The load metric value must be a decimal value between 0 and 1. Host weight be assigned a default weight of 1.
LimitlessRouterMonitor.getNetworkTimeoutError=An error occurred while getting the connection network timeout: {0}
LimitlessRouterMonitor.openingConnection=Opening Limitless Router Monitor connection to ''{0}''.
LimitlessRouterMonitor.openedConnection=Opened Limitless Router Monitor connection: {0}.
LimitlessRouterMonitor.running=Limitless Router Monitor thread running on node {0}.
LimitlessRouterMonitor.stopped=Limitless Router Monitor thread stopped on node {0].
LimitlessRouterMonitor.stopped=Limitless Router Monitor thread stopped on node {0}.

# Limitless Router Service
LimitlessRouterServiceImpl.connectWithHost=Connecting to host {0}.
LimitlessRouterServiceImpl.errorStartingMonitor=An error occurred while starting Limitless Router Monitor. {0}
LimitlessRouterServiceImpl.errorClosingMonitor=An error occurred while closing Limitless Router Monitor: {0}
LimitlessRouterServiceImpl.errorStartingMonitor=An error occurred while starting Limitless Router Monitor: {0}
LimitlessRouterServiceImpl.failedToConnectToHost=Failed to connect to host {0}.
LimitlessRouterServiceImpl.fetchedEmptyRouterList=Empty router list was fetched.
LimitlessRouterServiceImpl.getLimitlessRoutersException=Exception encountered getting Limitless Routers. {0}
Expand Down
Loading