Skip to content

Commit 0459ead

Browse files
limitless - address PR comments...fix multi cluster lock scenario in limitless monitor service
1 parent 783e709 commit 0459ead

File tree

4 files changed

+42
-31
lines changed

4 files changed

+42
-31
lines changed

wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessConnectionPlugin.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import software.amazon.jdbc.hostavailability.HostAvailability;
3939
import software.amazon.jdbc.plugin.AbstractConnectionPlugin;
4040
import software.amazon.jdbc.util.Messages;
41+
import software.amazon.jdbc.util.Utils;
4142
import software.amazon.jdbc.wrapper.HighestWeightHostSelector;
4243

4344
public class LimitlessConnectionPlugin extends AbstractConnectionPlugin {
@@ -154,7 +155,7 @@ private Connection connectInternalWithDialect(
154155
this.pluginService.getHostListProvider().getClusterId(), props);
155156

156157
Connection conn = null;
157-
if (limitlessRouters.isEmpty()) {
158+
if (Utils.isNullOrEmpty(limitlessRouters)) {
158159
conn = connectFunc.call();
159160
LOGGER.finest(Messages.get("LimitlessConnectionPlugin.limitlessRouterCacheEmpty"));
160161
final boolean waitForRouterInfo = WAIT_F0R_ROUTER_INFO.getBoolean(props);
@@ -169,7 +170,11 @@ private Connection connectInternalWithDialect(
169170
if (limitlessRouters.contains(hostSpec)) {
170171
LOGGER.finest(Messages.get("LimitlessConnectionPlugin.connectWithHost", new Object[] {hostSpec.getHost()}));
171172
if (conn == null || conn.isClosed()) {
172-
conn = connectFunc.call();
173+
try {
174+
conn = connectFunc.call();
175+
} catch (final SQLException e) {
176+
return retryConnectWithLeastLoadedRouters(limitlessRouters, props, conn, hostSpec);
177+
}
173178
}
174179
return conn;
175180
}
@@ -184,7 +189,10 @@ private Connection connectInternalWithDialect(
184189
new Object[] {selectedHostSpec.getHost()}));
185190
} catch (SQLException e) {
186191
LOGGER.warning(Messages.get("LimitlessConnectionPlugin.errorSelectingRouter", new Object[] {e.getMessage()}));
187-
return retryConnectWithLeastLoadedRouters(limitlessRouters, props, conn, hostSpec, e);
192+
if (conn == null || conn.isClosed()) {
193+
conn = connectFunc.call();
194+
}
195+
return retryConnectWithLeastLoadedRouters(limitlessRouters, props, conn, hostSpec);
188196
}
189197

190198
try {
@@ -194,8 +202,11 @@ private Connection connectInternalWithDialect(
194202
"LimitlessConnectionPlugin.failedToConnectToHost",
195203
new Object[] {selectedHostSpec.getHost()}));
196204
selectedHostSpec.setAvailability(HostAvailability.NOT_AVAILABLE);
205+
if (conn == null || conn.isClosed()) {
206+
conn = connectFunc.call();
207+
}
197208
// Retry connect prioritising healthiest router for best chance of connection over load-balancing with round-robin
198-
return retryConnectWithLeastLoadedRouters(limitlessRouters, props, conn, hostSpec, e);
209+
return retryConnectWithLeastLoadedRouters(limitlessRouters, props, conn, hostSpec);
199210
}
200211
}
201212

@@ -222,7 +233,7 @@ private Connection connectInternalWithoutDialect(
222233

223234
List<HostSpec> limitlessRouters = this.limitlessRouterService.getLimitlessRouters(
224235
this.pluginService.getHostListProvider().getClusterId(), props);
225-
if (limitlessRouters.isEmpty()) {
236+
if (Utils.isNullOrEmpty(limitlessRouters)) {
226237
LOGGER.finest(Messages.get("LimitlessConnectionPlugin.limitlessRouterCacheEmpty"));
227238
final boolean waitForRouterInfo = WAIT_F0R_ROUTER_INFO.getBoolean(props);
228239
if (waitForRouterInfo) {
@@ -240,7 +251,7 @@ private void initLimitlessRouterMonitorService() {
240251
}
241252

242253
private Connection retryConnectWithLeastLoadedRouters(final List<HostSpec> limitlessRouters, final Properties props,
243-
final Connection conn, final HostSpec hostSpec, final SQLException originalException) throws SQLException {
254+
final Connection conn, final HostSpec hostSpec) throws SQLException {
244255

245256
List<HostSpec> currentRouters = limitlessRouters;
246257
int retryCount = 0;
@@ -252,9 +263,8 @@ private Connection retryConnectWithLeastLoadedRouters(final List<HostSpec> limit
252263
if (currentRouters == null
253264
|| currentRouters.isEmpty()
254265
|| currentRouters.stream().noneMatch(h -> h.getAvailability().equals(HostAvailability.AVAILABLE))) {
255-
throw new SQLException(
256-
Messages.get("LimitlessConnectionPlugin.noRoutersAvailableForRetry"),
257-
originalException);
266+
LOGGER.warning(Messages.get("LimitlessConnectionPlugin.noRoutersAvailableForRetry"));
267+
return conn;
258268
}
259269
}
260270

@@ -266,21 +276,25 @@ private Connection retryConnectWithLeastLoadedRouters(final List<HostSpec> limit
266276
LOGGER.finest(Messages.get(
267277
"LimitlessConnectionPlugin.selectedHostForRetry",
268278
new Object[] {selectedHostSpec.getHost()}));
269-
} catch (UnsupportedOperationException e) {
279+
} catch (final UnsupportedOperationException e) {
270280
LOGGER.severe(Messages.get("LimitlessConnectionPlugin.incorrectConfiguration"));
271281
throw e;
282+
} catch (final SQLException e) {
283+
// error from host selector
284+
continue;
272285
}
273286

274287
try {
275288
return pluginService.connect(selectedHostSpec, props);
276-
} catch (SQLException e) {
289+
} catch (final SQLException e) {
277290
selectedHostSpec.setAvailability(HostAvailability.NOT_AVAILABLE);
278291
LOGGER.finest(Messages.get(
279292
"LimitlessConnectionPlugin.failedToConnectToHost",
280293
new Object[] {selectedHostSpec.getHost()}));
281294
}
282295
}
283-
throw new SQLException(Messages.get("LimitlessConnectionPlugin.noRoutersAvailableForRetry"), originalException);
296+
LOGGER.warning(Messages.get("LimitlessConnectionPlugin.maxRetriesExceeded"));
297+
return conn;
284298
}
285299

286300
private List<HostSpec> synchronouslyGetLimitlessRoutersWithRetry(

wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImpl.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818

1919
import java.sql.Connection;
2020
import java.sql.SQLException;
21-
import java.util.Collections;
2221
import java.util.List;
22+
import java.util.Map;
2323
import java.util.Properties;
24+
import java.util.concurrent.ConcurrentHashMap;
2425
import java.util.concurrent.TimeUnit;
2526
import java.util.concurrent.locks.ReentrantLock;
2627
import org.checkerframework.checker.nullness.qual.NonNull;
@@ -37,7 +38,7 @@ public class LimitlessRouterServiceImpl implements LimitlessRouterService {
3738
"600000", // 10min
3839
"Interval in milliseconds for an Limitless router monitor to be considered inactive and to be disposed.");
3940
protected static final long CACHE_CLEANUP_NANO = TimeUnit.MINUTES.toNanos(1);
40-
protected static final ReentrantLock forceGetLimitlessRoutersLock = new ReentrantLock();
41+
protected static final Map<String, ReentrantLock> forceGetLimitlessRoutersLockMap = new ConcurrentHashMap<>();
4142
protected final PluginService pluginService;
4243
protected final LimitlessQueryHelper queryHelper;
4344
protected final LimitlessRouterMonitorInitializer limitlessRouterMonitorInitializer;
@@ -80,11 +81,7 @@ public LimitlessRouterServiceImpl(
8081
public List<HostSpec> getLimitlessRouters(final String clusterId, final Properties props) throws SQLException {
8182
final long cacheExpirationNano = TimeUnit.MILLISECONDS.toNanos(
8283
MONITOR_DISPOSAL_TIME_MS.getLong(props));
83-
final List<HostSpec> limitlessRouters = limitlessRouterCache.get(clusterId, cacheExpirationNano);
84-
if (limitlessRouters == null) {
85-
return Collections.emptyList();
86-
}
87-
return limitlessRouters;
84+
return limitlessRouterCache.get(clusterId, cacheExpirationNano);
8885
}
8986

9087
@Override
@@ -93,7 +90,11 @@ public List<HostSpec> forceGetLimitlessRoutersWithConn(
9390
final long cacheExpirationNano = TimeUnit.MILLISECONDS.toNanos(
9491
MONITOR_DISPOSAL_TIME_MS.getLong(props));
9592

96-
forceGetLimitlessRoutersLock.lock();
93+
final ReentrantLock lock = forceGetLimitlessRoutersLockMap.computeIfAbsent(
94+
this.pluginService.getHostListProvider().getClusterId(),
95+
(key) -> new ReentrantLock()
96+
);
97+
lock.lock();
9798
try {
9899
final List<HostSpec> limitlessRouters =
99100
limitlessRouterCache.get(this.pluginService.getHostListProvider().getClusterId(), cacheExpirationNano);
@@ -109,7 +110,7 @@ public List<HostSpec> forceGetLimitlessRoutersWithConn(
109110
newLimitlessRouters, LimitlessRouterServiceImpl.MONITOR_DISPOSAL_TIME_MS.getLong(props));
110111
return newLimitlessRouters;
111112
} finally {
112-
forceGetLimitlessRoutersLock.unlock();
113+
lock.unlock();
113114
}
114115
}
115116

wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -193,23 +193,18 @@ LimitlessConnectionPlugin.incorrectConfiguration=Limitless Connection Plugin is
193193
LimitlessConnectionPlugin.interruptedThread=Thread was interrupted.
194194
LimitlessConnectionPlugin.limitlessRouterCacheEmpty=Limitless Router cache is empty. This normal during application start up when the cache is not yet populated.
195195
LimitlessConnectionPlugin.maxRetriesExceeded=Max number of connection retries has been exceeded.
196-
LimitlessConnectionPlugin.noRoutersAvailable=There are no transaction routers available.
197-
LimitlessConnectionPlugin.noRoutersAvailableForRetry=There are no transaction routers available for connection retry.
196+
LimitlessConnectionPlugin.noRoutersAvailable=No transaction routers available.
197+
LimitlessConnectionPlugin.noRoutersAvailableForRetry=No transaction routers available for connection retry. Retrying with original connection.
198198
LimitlessConnectionPlugin.selectedHost=Host {0} has been selected.
199199
LimitlessConnectionPlugin.selectedHostForRetry=Host {0} has been selected for connection retry.
200200
LimitlessConnectionPlugin.synchronouslyGetLimitlessRouters=Fetching Limitless Routers synchronously.
201201
LimitlessConnectionPlugin.unsupportedDialectOrDatabase=Unsupported dialect ''{0}'' encountered. Please ensure JDBC connection parameters are correct, and refer to the documentation to ensure that the connecting database is compatible with the Limitless Connection Plugin.
202202

203-
# Limitless Router Service Impl
204-
LimitlessRouterServiceImpl.nulLimitlessRouterMonitor=LimitlessRouterMonitor with cluster ID {0} is null.
205-
206203
# Limitless Query Helper
207204
LimitlessQueryHelper.unsupportedDialectOrDatabase=Unsupported dialect ''{0}'' encountered. Please ensure JDBC connection parameters are correct, and refer to the documentation to ensure that the connecting database is compatible with the Limitless Connection Plugin.
208205

209206
# Limitless Router Monitor
210207
LimitlessRouterMonitor.exceptionDuringMonitoringStop=Unhandled exception was thrown in Limitless Router Monitoring thread for node {0}.
211-
LimitlessRouterMonitor.forceGetLimitlessRouters=Fetching Limitless Transaction Routers synchronously.
212-
LimitlessRouterMonitor.forceGetLimitlessRoutersFailed=Failed to fetch Limitless Routers due to closed connection.
213208
LimitlessRouterMonitor.interruptedExceptionDuringMonitoring=Limitless Router Monitoring thread for node {0} was interrupted.
214209
LimitlessRouterMonitor.invalidQuery=Limitless Connection Plugin has encountered an error obtaining Limitless Router endpoints. Please ensure that you are connecting to an Aurora Limitless Database Shard Group Endpoint URL.
215210
LimitlessRouterMonitor.invalidRouterLoad=Invalid load metric value of ''{1}''from the transaction router query aurora_limitless_router_endpoints() for transaction router ''{0}''. The load metric value must be a decimal value between 0 and 1. Host weight be assigned a default weight of 1.

wrapper/src/test/java/software/amazon/jdbc/plugin/limitless/LimitlessConnectionPluginTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,9 +302,10 @@ void testConnect_givenMaxRetries_throwsSqlException() throws SQLException {
302302
final int maxRetries = 7;
303303
propsWithMaxRetries.setProperty(LimitlessConnectionPlugin.MAX_RETRIES.name, String.valueOf(maxRetries));
304304

305-
assertThrows(
306-
SQLException.class,
307-
() -> plugin.connect(DRIVER_PROTOCOL, INPUT_HOST_SPEC, propsWithMaxRetries, true, mockConnectFuncLambda));
305+
306+
final Connection actualConn = plugin
307+
.connect(DRIVER_PROTOCOL, INPUT_HOST_SPEC, propsWithMaxRetries, true, mockConnectFuncLambda);
308+
assertEquals(mockConnection, actualConn);
308309

309310
verify(mockLimitlessRouterService, times(1)).startMonitoring(INPUT_HOST_SPEC,
310311
props, Integer.parseInt(LimitlessConnectionPlugin.INTERVAL_MILLIS.defaultValue));

0 commit comments

Comments
 (0)