Skip to content

Commit 2997d86

Browse files
committed
refactor: store monitoring endpoint
1 parent 829ebcf commit 2997d86

File tree

2 files changed

+51
-36
lines changed

2 files changed

+51
-36
lines changed

wrapper/src/main/java/software/amazon/jdbc/plugin/efm/HostMonitoringConnectionPlugin.java

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ public class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin
8383
protected @NonNull Properties properties;
8484
private final @NonNull Supplier<MonitorService> monitorServiceSupplier;
8585
private final @NonNull PluginService pluginService;
86-
private final @NonNull Set<String> nodeKeys = ConcurrentHashMap.newKeySet(); // Shared with monitor thread
8786
private MonitorService monitorService;
88-
private RdsUtils rdsHelper;
87+
private final RdsUtils rdsHelper;
88+
private HostSpec monitoringHostSpec;
8989

9090
/**
9191
* Initialize the node monitoring plugin.
@@ -155,34 +155,17 @@ public <T, E extends Exception> T execute(
155155
T result;
156156
MonitorConnectionContext monitorContext = null;
157157

158-
HostSpec monitoringHostSpec = this.pluginService.getCurrentHostSpec();
159-
final RdsUrlType rdsUrlType = this.rdsHelper.identifyRdsType(monitoringHostSpec.getUrl());
160-
161-
try {
162-
if (rdsUrlType.isRdsCluster()) {
163-
monitoringHostSpec = this.pluginService.identifyConnection(this.pluginService.getCurrentConnection());
164-
monitoringHostSpec.resetAliases();
165-
this.pluginService.fillAliases(this.pluginService.getCurrentConnection(), monitoringHostSpec);
166-
}
167-
} catch (SQLException e) {
168-
// Log and ignore
169-
LOGGER.finest(Messages.get("HostMonitoringConnectionPlugin.errorIdentifyingConnection", new Object[] {e}));
170-
}
171-
172158
try {
173159
LOGGER.finest(
174160
() -> Messages.get(
175161
"HostMonitoringConnectionPlugin.activatedMonitoring",
176162
new Object[] {methodName}));
177163

178-
this.nodeKeys.clear();
179-
this.nodeKeys.addAll(this.pluginService.getCurrentHostSpec().asAliases());
180-
181164
monitorContext =
182165
this.monitorService.startMonitoring(
183166
this.pluginService.getCurrentConnection(), // abort this connection if needed
184-
this.nodeKeys,
185-
monitoringHostSpec,
167+
this.getMonitoringHostSpec().asAliases(),
168+
this.getMonitoringHostSpec(),
186169
this.properties,
187170
failureDetectionTimeMillis,
188171
failureDetectionIntervalMillis,
@@ -196,7 +179,7 @@ public <T, E extends Exception> T execute(
196179
this.monitorService.stopMonitoring(monitorContext);
197180

198181
if (monitorContext.isNodeUnhealthy()) {
199-
this.pluginService.setAvailability(this.nodeKeys, HostAvailability.NOT_AVAILABLE);
182+
this.pluginService.setAvailability(this.getMonitoringHostSpec().asAliases(), HostAvailability.NOT_AVAILABLE);
200183

201184
final boolean isConnectionClosed;
202185
try {
@@ -262,16 +245,16 @@ public void releaseResources() {
262245

263246
@Override
264247
public OldConnectionSuggestedAction notifyConnectionChanged(final EnumSet<NodeChangeOptions> changes) {
265-
266248
if (changes.contains(NodeChangeOptions.WENT_DOWN)
267249
|| changes.contains(NodeChangeOptions.NODE_DELETED)) {
268-
if (!this.nodeKeys.isEmpty()) {
269-
this.monitorService.stopMonitoringForAllConnections(this.nodeKeys);
250+
if (!this.getMonitoringHostSpec().asAliases().isEmpty()) {
251+
this.monitorService.stopMonitoringForAllConnections(this.getMonitoringHostSpec().asAliases());
270252
}
271-
this.nodeKeys.clear();
272-
this.nodeKeys.addAll(this.pluginService.getCurrentHostSpec().getAliases());
273253
}
274254

255+
// Reset monitoring HostSpec since the associated connection has changed.
256+
this.monitoringHostSpec = null;
257+
275258
return OldConnectionSuggestedAction.NO_OPINION;
276259
}
277260

@@ -291,8 +274,11 @@ private Connection connectInternal(String driverProtocol, HostSpec hostSpec,
291274
final Connection conn = connectFunc.call();
292275

293276
if (conn != null) {
294-
hostSpec.resetAliases();
295-
this.pluginService.fillAliases(conn, hostSpec);
277+
final RdsUrlType type = this.rdsHelper.identifyRdsType(hostSpec.getHost());
278+
if (type.isRdsCluster()) {
279+
hostSpec.resetAliases();
280+
this.pluginService.fillAliases(conn, hostSpec);
281+
}
296282
}
297283

298284
return conn;
@@ -308,4 +294,24 @@ public Connection forceConnect(
308294
throws SQLException {
309295
return connectInternal(driverProtocol, hostSpec, forceConnectFunc);
310296
}
297+
298+
public HostSpec getMonitoringHostSpec() {
299+
if (this.monitoringHostSpec == null) {
300+
this.monitoringHostSpec = this.pluginService.getCurrentHostSpec();
301+
final RdsUrlType rdsUrlType = this.rdsHelper.identifyRdsType(monitoringHostSpec.getUrl());
302+
303+
try {
304+
if (rdsUrlType.isRdsCluster()) {
305+
LOGGER.finest("Monitoring HostSpec is associated with a cluster endpoint, "
306+
+ "plugin needs to identify the cluster connection.");
307+
this.monitoringHostSpec = this.pluginService.identifyConnection(this.pluginService.getCurrentConnection());
308+
this.pluginService.fillAliases(this.pluginService.getCurrentConnection(), monitoringHostSpec);
309+
}
310+
} catch (SQLException e) {
311+
// Log and ignore
312+
LOGGER.finest(Messages.get("HostMonitoringConnectionPlugin.errorIdentifyingConnection", new Object[] {e}));
313+
}
314+
}
315+
return this.monitoringHostSpec;
316+
}
311317
}

wrapper/src/test/java/software/amazon/jdbc/plugin/efm/HostMonitoringConnectionPluginTest.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class HostMonitoringConnectionPluginTest {
8181
@Captor ArgumentCaptor<String> stringArgumentCaptor;
8282
Properties properties = new Properties();
8383
@Mock HostSpec hostSpec;
84+
@Mock HostSpec hostSpec2;
8485
@Mock Supplier<MonitorService> supplier;
8586
@Mock RdsUtils rdsUtils;
8687
@Mock MonitorConnectionContext context;
@@ -137,6 +138,9 @@ void initDefaultMockReturns() throws Exception {
137138
when(hostSpec.getHost()).thenReturn("host");
138139
when(hostSpec.getHost()).thenReturn("port");
139140
when(hostSpec.getAliases()).thenReturn(new HashSet<>(Collections.singletonList("host:port")));
141+
when(hostSpec2.getHost()).thenReturn("host");
142+
when(hostSpec2.getHost()).thenReturn("port");
143+
when(hostSpec2.getAliases()).thenReturn(new HashSet<>(Collections.singletonList("host:port")));
140144
when(connection.createStatement()).thenReturn(statement);
141145
when(statement.executeQuery(any())).thenReturn(resultSet);
142146
when(rdsUtils.identifyRdsType(any())).thenReturn(RdsUrlType.RDS_INSTANCE);
@@ -293,16 +297,21 @@ void test_notifyConnectionChanged_nodeWentDown(final NodeChangeOptions option) t
293297
sqlFunction,
294298
EMPTY_ARGS);
295299

296-
final Set<String> aliases = new HashSet<>(Arrays.asList("alias1", "alias2"));
297-
when(hostSpec.getAliases()).thenReturn(aliases);
298-
assertEquals(OldConnectionSuggestedAction.NO_OPINION, plugin.notifyConnectionChanged(EnumSet.of(option)));
299-
300-
// NodeKeys should be empty at first
301-
verify(monitorService, never()).stopMonitoringForAllConnections(any());
300+
final Set<String> aliases1 = new HashSet<>(Arrays.asList("alias1", "alias2"));
301+
final Set<String> aliases2 = new HashSet<>(Arrays.asList("alias3", "alias4"));
302+
when(hostSpec.asAliases()).thenReturn(aliases1);
303+
when(hostSpec2.asAliases()).thenReturn(aliases2);
304+
when(pluginService.getCurrentHostSpec()).thenReturn(hostSpec);
302305

303306
assertEquals(OldConnectionSuggestedAction.NO_OPINION, plugin.notifyConnectionChanged(EnumSet.of(option)));
304307
// NodeKeys should contain {"alias1", "alias2"}
305-
verify(monitorService).stopMonitoringForAllConnections(aliases);
308+
verify(monitorService).stopMonitoringForAllConnections(aliases1);
309+
310+
when(pluginService.getCurrentHostSpec()).thenReturn(hostSpec2);
311+
assertEquals(OldConnectionSuggestedAction.NO_OPINION, plugin.notifyConnectionChanged(EnumSet.of(option)));
312+
// NotifyConnectionChanged should reset the monitoringHostSpec.
313+
// NodeKeys should contain {"alias3", "alias4"}
314+
verify(monitorService).stopMonitoringForAllConnections(aliases2);
306315
}
307316

308317
@Test

0 commit comments

Comments
 (0)