Skip to content

Commit 4f67a4c

Browse files
fix: LimitlessRouterMonitor to close connection properly
1 parent f4fb2b5 commit 4f67a4c

File tree

3 files changed

+67
-41
lines changed

3 files changed

+67
-41
lines changed

wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterMonitor.java

Lines changed: 61 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.checkerframework.checker.nullness.qual.NonNull;
3030
import software.amazon.jdbc.HostSpec;
3131
import software.amazon.jdbc.PluginService;
32-
import software.amazon.jdbc.RoundRobinHostSelector;
3332
import software.amazon.jdbc.util.Messages;
3433
import software.amazon.jdbc.util.PropertyUtils;
3534
import software.amazon.jdbc.util.SlidingExpirationCacheWithCleanupThread;
@@ -104,6 +103,13 @@ public AtomicBoolean isStopped() {
104103
@Override
105104
public void close() throws Exception {
106105
this.stopped.set(true);
106+
try {
107+
if (this.monitoringConn != null && !this.monitoringConn.isClosed()) {
108+
this.monitoringConn.close();
109+
}
110+
} catch (final SQLException ex) {
111+
// ignore
112+
}
107113

108114
// Waiting for 5s gives a thread enough time to exit monitoring loop and close database connection.
109115
if (!this.threadPool.awaitTermination(5, TimeUnit.SECONDS)) {
@@ -120,46 +126,64 @@ public void run() {
120126
"LimitlessRouterMonitor.running",
121127
new Object[] {this.hostSpec.getHost()}));
122128

123-
while (!this.stopped.get()) {
124-
TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext(
125-
"limitless router monitor thread", TelemetryTraceLevel.TOP_LEVEL);
126-
telemetryContext.setAttribute("url", hostSpec.getUrl());
127-
try {
128-
this.openConnection();
129-
if (this.monitoringConn == null || this.monitoringConn.isClosed()) {
130-
continue;
131-
}
132-
List<HostSpec> newLimitlessRouters = queryHelper.queryForLimitlessRouters(this.monitoringConn,
133-
this.hostSpec.getPort());
134-
135-
limitlessRouterCache.put(
136-
this.limitlessRouterCacheKey,
137-
newLimitlessRouters,
138-
TimeUnit.MILLISECONDS.toNanos(LimitlessRouterServiceImpl.MONITOR_DISPOSAL_TIME_MS.getLong(props)));
139-
140-
LOGGER.finest(Utils.logTopology(newLimitlessRouters, "[limitlessRouterMonitor] Topology:"));
141-
TimeUnit.MILLISECONDS.sleep(this.intervalMs); // do not include this in the telemetry
142-
} catch (final InterruptedException exception) {
143-
LOGGER.finest(
144-
() -> Messages.get(
145-
"LimitlessRouterMonitor.interruptedExceptionDuringMonitoring",
146-
new Object[] {this.hostSpec.getHost()}));
147-
} catch (final Exception ex) {
148-
// this should not be reached; log and exit thread
149-
if (LOGGER.isLoggable(Level.FINEST)) {
150-
LOGGER.log(
151-
Level.FINEST,
152-
Messages.get(
153-
"LimitlessRouterMonitor.exceptionDuringMonitoringStop",
154-
new Object[] {this.hostSpec.getHost()}),
155-
ex); // We want to print full trace stack of the exception.
129+
try {
130+
while (!this.stopped.get()) {
131+
TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext(
132+
"limitless router monitor thread", TelemetryTraceLevel.TOP_LEVEL);
133+
telemetryContext.setAttribute("url", hostSpec.getUrl());
134+
try {
135+
this.openConnection();
136+
if (this.monitoringConn == null || this.monitoringConn.isClosed()) {
137+
continue;
138+
}
139+
List<HostSpec> newLimitlessRouters = queryHelper.queryForLimitlessRouters(this.monitoringConn,
140+
this.hostSpec.getPort());
141+
142+
limitlessRouterCache.put(
143+
this.limitlessRouterCacheKey,
144+
newLimitlessRouters,
145+
TimeUnit.MILLISECONDS.toNanos(LimitlessRouterServiceImpl.MONITOR_DISPOSAL_TIME_MS.getLong(props)));
146+
147+
LOGGER.finest(Utils.logTopology(newLimitlessRouters, "[limitlessRouterMonitor] Topology:"));
148+
TimeUnit.MILLISECONDS.sleep(this.intervalMs); // do not include this in the telemetry
149+
} catch (final Exception ex) {
150+
if (telemetryContext != null) {
151+
telemetryContext.setException(ex);
152+
telemetryContext.setSuccess(false);
153+
}
154+
throw ex;
155+
} finally {
156+
if (telemetryContext != null) {
157+
telemetryContext.closeContext();
158+
}
156159
}
157-
} finally {
158-
if (telemetryContext != null) {
159-
telemetryContext.closeContext();
160+
}
161+
} catch (final InterruptedException exception) {
162+
LOGGER.finest(
163+
() -> Messages.get(
164+
"LimitlessRouterMonitor.interruptedExceptionDuringMonitoring",
165+
new Object[] {this.hostSpec.getHost()}));
166+
} catch (final Exception ex) {
167+
// this should not be reached; log and exit thread
168+
if (LOGGER.isLoggable(Level.FINEST)) {
169+
LOGGER.log(
170+
Level.FINEST,
171+
Messages.get(
172+
"LimitlessRouterMonitor.exceptionDuringMonitoringStop",
173+
new Object[] {this.hostSpec.getHost()}),
174+
ex); // We want to print full trace stack of the exception.
175+
}
176+
} finally {
177+
this.stopped.set(true);
178+
try {
179+
if (this.monitoringConn != null && !this.monitoringConn.isClosed()) {
180+
this.monitoringConn.close();
160181
}
182+
} catch (final SQLException ex) {
183+
// ignore
161184
}
162185
}
186+
163187
}
164188

165189
private void openConnection() throws SQLException {

wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public class LimitlessRouterServiceImpl implements LimitlessRouterService {
5757
try {
5858
limitlessRouterMonitor.close();
5959
} catch (Exception e) {
60-
// ignore
60+
LOGGER.finest(Messages.get("LimitlessRouterServiceImpl.errorClosingMonitor",
61+
new Object[]{e.getMessage()}));
6162
}
6263
},
6364
CACHE_CLEANUP_NANO

wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,19 +239,20 @@ LimitlessConnectionPlugin.unsupportedDialectOrDatabase=Unsupported dialect ''{0}
239239
LimitlessQueryHelper.unsupportedDialectOrDatabase=Unsupported dialect ''{0}'' encountered. Please ensure JDBC connection parameters are correct, and refer to the documentation to ensure that the connecting database is compatible with the Limitless Connection Plugin.
240240

241241
# Limitless Router Monitor
242-
LimitlessRouterMonitor.exceptionDuringMonitoringStop=Unhandled exception was thrown in Limitless Router Monitoring thread for node {0}.
242+
LimitlessRouterMonitor.exceptionDuringMonitoringStop=Stopping monitoring after unhandled exception was thrown in Limitless Router Monitoring thread for node {0}.
243243
LimitlessRouterMonitor.interruptedExceptionDuringMonitoring=Limitless Router Monitoring thread for node {0} was interrupted.
244244
LimitlessRouterMonitor.invalidQuery=Limitless Connection Plugin has encountered an error obtaining Limitless Router endpoints. Please ensure that you are connecting to an Aurora Limitless Database Shard Group Endpoint URL.
245245
LimitlessRouterMonitor.invalidRouterLoad=Invalid load metric value of ''{1}''from the transaction router query aurora_limitless_router_endpoints() for transaction router ''{0}''. The load metric value must be a decimal value between 0 and 1. Host weight be assigned a default weight of 1.
246246
LimitlessRouterMonitor.getNetworkTimeoutError=An error occurred while getting the connection network timeout: {0}
247247
LimitlessRouterMonitor.openingConnection=Opening Limitless Router Monitor connection to ''{0}''.
248248
LimitlessRouterMonitor.openedConnection=Opened Limitless Router Monitor connection: {0}.
249249
LimitlessRouterMonitor.running=Limitless Router Monitor thread running on node {0}.
250-
LimitlessRouterMonitor.stopped=Limitless Router Monitor thread stopped on node {0].
250+
LimitlessRouterMonitor.stopped=Limitless Router Monitor thread stopped on node {0}.
251251

252252
# Limitless Router Service
253253
LimitlessRouterServiceImpl.connectWithHost=Connecting to host {0}.
254-
LimitlessRouterServiceImpl.errorStartingMonitor=An error occurred while starting Limitless Router Monitor. {0}
254+
LimitlessRouterServiceImpl.errorClosingMonitor=An error occurred while closing Limitless Router Monitor: {0}
255+
LimitlessRouterServiceImpl.errorStartingMonitor=An error occurred while starting Limitless Router Monitor: {0}
255256
LimitlessRouterServiceImpl.failedToConnectToHost=Failed to connect to host {0}.
256257
LimitlessRouterServiceImpl.fetchedEmptyRouterList=Empty router list was fetched.
257258
LimitlessRouterServiceImpl.getLimitlessRoutersException=Exception encountered getting Limitless Routers. {0}

0 commit comments

Comments
 (0)