Skip to content

Commit 758aa81

Browse files
new context queue
expiring cache
1 parent 3684853 commit 758aa81

File tree

13 files changed

+500
-454
lines changed

13 files changed

+500
-454
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* This program is free software; you can redistribute it and/or modify
5+
* it under the terms of the GNU General Public License, version 2.0
6+
* (GPLv2), as published by the Free Software Foundation, with the
7+
* following additional permissions:
8+
*
9+
* This program is distributed with certain software that is licensed
10+
* under separate terms, as designated in a particular file or component
11+
* or in the license documentation. Without limiting your rights under
12+
* the GPLv2, the authors of this program hereby grant you an additional
13+
* permission to link the program and your derivative works with the
14+
* separately licensed software that they have included with the program.
15+
*
16+
* Without limiting the foregoing grant of rights under the GPLv2 and
17+
* additional permission as to separately licensed software, this
18+
* program is also subject to the Universal FOSS Exception, version 1.0,
19+
* a copy of which can be found along with its FAQ at
20+
* http://oss.oracle.com/licenses/universal-foss-exception.
21+
*
22+
* This program is distributed in the hope that it will be useful, but
23+
* WITHOUT ANY WARRANTY; without even the implied warranty of
24+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
25+
* See the GNU General Public License, version 2.0, for more details.
26+
*
27+
* You should have received a copy of the GNU General Public License
28+
* along with this program. If not, see
29+
* http://www.gnu.org/licenses/gpl-2.0.html.
30+
*/
31+
32+
package com.mysql.cj.util;
33+
34+
import java.util.Map;
35+
import java.util.concurrent.ConcurrentHashMap;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicLong;
38+
39+
public class CacheMap<K,V> {
40+
41+
private final Map<K, CacheItem<V>> cache = new ConcurrentHashMap<>();
42+
private final long cleanupIntervalNanos = TimeUnit.MINUTES.toNanos(10);
43+
private AtomicLong cleanupTimeNanos = new AtomicLong(System.nanoTime() + cleanupIntervalNanos);
44+
45+
public CacheMap() {
46+
}
47+
48+
public V get(final K key, final long itemExpirationNano) {
49+
CacheItem<V> cacheItem = cache.computeIfPresent(key,
50+
(kk, vv) -> vv.isExpired()
51+
? null
52+
: new CacheItem<>(vv.item, System.nanoTime() + itemExpirationNano));
53+
return cacheItem == null ? null : cacheItem.item;
54+
}
55+
56+
public V get(final K key, final V defaultItemValue, long itemExpirationNano) {
57+
CacheItem<V> cacheItem = cache.compute(key,
58+
(kk, vv) -> new CacheItem<>(
59+
(vv == null || vv.isExpired() ? defaultItemValue : vv.item),
60+
System.nanoTime() + itemExpirationNano));
61+
return cacheItem.item;
62+
}
63+
64+
public void put(final K key, final V item, long itemExpirationNano) {
65+
cache.put(key, new CacheItem<>(item, System.nanoTime() + itemExpirationNano));
66+
cleanUp();
67+
}
68+
69+
public void putIfAbsent(final K key, final V item, long itemExpirationNano) {
70+
cache.putIfAbsent(key, new CacheItem<>(item, System.nanoTime() + itemExpirationNano));
71+
cleanUp();
72+
}
73+
74+
public void remove(final K key) {
75+
cache.remove(key);
76+
cleanUp();
77+
}
78+
79+
public void clear() {
80+
cache.clear();
81+
}
82+
83+
public int size() { return this.cache.size(); }
84+
85+
private void cleanUp() {
86+
if (this.cleanupTimeNanos.get() < System.nanoTime()) {
87+
this.cleanupTimeNanos = new AtomicLong(System.nanoTime() + cleanupIntervalNanos);
88+
cache.entrySet().forEach(entry -> {
89+
if (entry.getValue() == null || entry.getValue().isExpired()) {
90+
cache.remove(entry.getKey());
91+
}
92+
});
93+
}
94+
}
95+
96+
private static class CacheItem<V> {
97+
final V item;
98+
final long expirationTime;
99+
100+
public CacheItem(V item, long expirationTime) {
101+
this.item = item;
102+
this.expirationTime = expirationTime;
103+
}
104+
105+
boolean isExpired() {
106+
return System.nanoTime() > expirationTime;
107+
}
108+
109+
@Override
110+
public int hashCode() {
111+
final int prime = 31;
112+
int result = 1;
113+
result = prime * result + ((item == null) ? 0 : item.hashCode());
114+
return result;
115+
}
116+
117+
@Override
118+
public boolean equals(Object obj) {
119+
if (this == obj) {
120+
return true;
121+
}
122+
if (obj == null) {
123+
return false;
124+
}
125+
if (getClass() != obj.getClass()) {
126+
return false;
127+
}
128+
CacheItem<?> other = (CacheItem<?>) obj;
129+
if (item == null) {
130+
if (other.item != null) {
131+
return false;
132+
}
133+
} else if (!item.equals(other.item)) {
134+
return false;
135+
}
136+
return true;
137+
}
138+
139+
@Override
140+
public String toString() {
141+
return "CacheItem [item=" + item + ", expirationTime=" + expirationTime + "]";
142+
}
143+
}
144+
}

src/main/resources/com/mysql/cj/LocalizedErrorMessages.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,6 @@ ConnectionFeatureNotAvailableException.0=Feature not available in this distribut
710710
IllegalArgumentException.NullParameter=Parameter ''{0}'' must not be null.
711711
InvalidLoadBalanceStrategy=Invalid load balancing strategy ''{0}''.
712712
DefaultMonitorService.EmptyNodeKeys=Empty NodeKey set passed into DefaultMonitorService. Set should not be empty.
713-
DefaultMonitorService.NoMonitorForContext=Can't find monitor for context passed into DefaultMonitorService.
714713
DefaultMonitorService.InvalidContext=Invalid context passed into DefaultMonitorService. Could not find any NodeKey from context.
715714
DefaultMonitorService.InvalidNodeKey=Invalid node key passed into DefaultMonitorService. No existing monitor for the given set of node keys.
716715

src/main/user-impl/java/com/mysql/cj/jdbc/ha/ConnectionProxy.java

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public class ConnectionProxy implements ICurrentConnectionProvider, InvocationHa
7474
protected ConnectionPluginManager pluginManager = null;
7575
private HostInfo currentHostInfo;
7676
private JdbcConnection currentConnection;
77+
private Class<?> currentConnectionClass;
7778

7879
public ConnectionProxy(ConnectionUrl connectionUrl) throws SQLException {
7980
this(connectionUrl, null);
@@ -98,6 +99,7 @@ public ConnectionProxy(ConnectionUrl connectionUrl, JdbcConnection connection) t
9899
throws SQLException {
99100
this.currentHostInfo = connectionUrl.getMainHost();
100101
this.currentConnection = connection;
102+
this.currentConnectionClass = connection == null ? null : connection.getClass();
101103

102104
initLogger(connectionUrl);
103105
initSettings(connectionUrl);
@@ -175,36 +177,37 @@ public void setCurrentConnection(JdbcConnection connection, HostInfo info) {
175177
}
176178

177179
this.currentConnection = connection;
180+
this.currentConnectionClass = connection == null ? null : connection.getClass();
178181
this.currentHostInfo = info;
179182
}
180183

181184
@Override
182-
public synchronized Object invoke(Object proxy, Method method, Object[] args)
185+
public Object invoke(Object proxy, Method method, Object[] args)
183186
throws Throwable {
184187
final String methodName = method.getName();
185188

186189
if (isDirectExecute(methodName)) {
187190
return executeMethodDirectly(methodName, args);
188191
}
189192

190-
Object[] argsCopy = args == null ? null : Arrays.copyOf(args, args.length);
191-
192-
try {
193-
Object result = this.pluginManager.execute(
194-
this.currentConnection.getClass(),
195-
methodName,
196-
() -> method.invoke(currentConnection, args),
197-
argsCopy);
198-
return proxyIfReturnTypeIsJdbcInterface(method.getReturnType(), result);
199-
} catch (Exception e) {
200-
// Check if the captured exception must be wrapped by an unchecked exception.
201-
Class<?>[] declaredExceptions = method.getExceptionTypes();
202-
for (Class<?> declaredException : declaredExceptions) {
203-
if (declaredException.isAssignableFrom(e.getClass())) {
204-
throw e;
193+
synchronized (currentConnection) {
194+
try {
195+
Object result = this.pluginManager.execute(
196+
this.currentConnectionClass,
197+
methodName,
198+
() -> method.invoke(currentConnection, args),
199+
args);
200+
return proxyIfReturnTypeIsJdbcInterface(method.getReturnType(), result);
201+
} catch (Exception e) {
202+
// Check if the captured exception must be wrapped by an unchecked exception.
203+
Class<?>[] declaredExceptions = method.getExceptionTypes();
204+
for (Class<?> declaredException : declaredExceptions) {
205+
if (declaredException.isAssignableFrom(e.getClass())) {
206+
throw e;
207+
}
205208
}
209+
throw new IllegalStateException(e.getMessage(), e);
206210
}
207-
throw new IllegalStateException(e.getMessage(), e);
208211
}
209212
}
210213

@@ -301,10 +304,12 @@ private boolean isDirectExecute(String methodName) {
301304
* Proxy class to intercept and deal with errors that may occur in any object bound to the current connection.
302305
*/
303306
class JdbcInterfaceProxy implements InvocationHandler {
304-
Object invokeOn;
307+
private final Object invokeOn;
308+
private final Class<?> invokeOnClass;
305309

306310
JdbcInterfaceProxy(Object toInvokeOn) {
307311
this.invokeOn = toInvokeOn;
312+
this.invokeOnClass = toInvokeOn == null ? null : toInvokeOn.getClass();
308313
}
309314

310315
/**
@@ -329,21 +334,19 @@ private Object executeMethodDirectly(String methodName, Object[] args) {
329334
return null;
330335
}
331336

332-
public synchronized Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
337+
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
333338
final String methodName = method.getName();
334339
if (isDirectExecute(methodName)) {
335340
return executeMethodDirectly(methodName, args);
336341
}
337342

338-
Object[] argsCopy = args == null ? null : Arrays.copyOf(args, args.length);
339-
340-
synchronized(ConnectionProxy.this) {
343+
synchronized(this.invokeOn) {
341344
Object result =
342345
ConnectionProxy.this.pluginManager.execute(
343-
this.invokeOn.getClass(),
346+
this.invokeOnClass,
344347
methodName,
345348
() -> method.invoke(this.invokeOn, args),
346-
argsCopy);
349+
args);
347350
return proxyIfReturnTypeIsJdbcInterface(method.getReturnType(), result);
348351
}
349352
}

src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/DefaultMonitorService.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import com.mysql.cj.jdbc.JdbcConnection;
3939
import com.mysql.cj.log.Log;
4040

41-
import java.util.Iterator;
41+
import java.util.Collections;
4242
import java.util.Set;
4343
import java.util.concurrent.Executors;
4444

@@ -51,6 +51,8 @@ public class DefaultMonitorService implements IMonitorService {
5151

5252
private final Log logger;
5353
final IMonitorInitializer monitorInitializer;
54+
private Set<String> cachedMonitorNodeKeys = null;
55+
private IMonitor cachedMonitor = null;
5456

5557
public DefaultMonitorService(Log logger) {
5658
this(
@@ -96,11 +98,21 @@ public MonitorConnectionContext startMonitoring(
9698
throw new IllegalArgumentException(warning);
9799
}
98100

99-
final IMonitor monitor = getMonitor(nodeKeys, hostInfo, propertySet);
101+
IMonitor monitor;
102+
if (this.cachedMonitor == null
103+
|| this.cachedMonitorNodeKeys == null
104+
|| !this.cachedMonitorNodeKeys.equals(nodeKeys)) {
105+
106+
monitor = getMonitor(nodeKeys, hostInfo, propertySet);
107+
this.cachedMonitor = monitor;
108+
this.cachedMonitorNodeKeys = Collections.unmodifiableSet(nodeKeys);
109+
} else {
110+
monitor = this.cachedMonitor;
111+
}
100112

101113
final MonitorConnectionContext context = new MonitorConnectionContext(
114+
monitor,
102115
connectionToAbort,
103-
nodeKeys,
104116
logger,
105117
failureDetectionTimeMillis,
106118
failureDetectionIntervalMillis,
@@ -118,20 +130,8 @@ public void stopMonitoring(MonitorConnectionContext context) {
118130
return;
119131
}
120132

121-
context.invalidate();
122-
123-
// Any 1 node is enough to find the monitor containing the context
124-
// All nodes will map to the same monitor
125-
IMonitor monitor;
126-
for (Iterator<String> it = context.getNodeKeys().iterator(); it.hasNext();) {
127-
String nodeKey = it.next();
128-
monitor = this.threadContainer.getMonitor(nodeKey);
129-
if (monitor != null) {
130-
monitor.stopMonitoring(context);
131-
return;
132-
}
133-
}
134-
logger.logTrace(Messages.getString("DefaultMonitorService.NoMonitorForContext"));
133+
IMonitor monitor = context.getMonitor();
134+
monitor.stopMonitoring(context);
135135
}
136136

137137
@Override

0 commit comments

Comments
 (0)