Skip to content

Commit f0175bc

Browse files
committed
DefaultSubscriptionRegistry uses deep LinkedMultiValueMap copies between accessCache and updateCache
Issue: SPR-13185
1 parent df8e963 commit f0175bc

File tree

1 file changed

+26
-27
lines changed

1 file changed

+26
-27
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,13 @@
3838
import org.springframework.messaging.Message;
3939
import org.springframework.messaging.MessageHeaders;
4040
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
41+
import org.springframework.messaging.support.MessageHeaderAccessor;
4142
import org.springframework.util.AntPathMatcher;
4243
import org.springframework.util.Assert;
4344
import org.springframework.util.LinkedMultiValueMap;
4445
import org.springframework.util.MultiValueMap;
4546
import org.springframework.util.PathMatcher;
4647

47-
import static org.springframework.messaging.support.MessageHeaderAccessor.getAccessor;
48-
49-
5048
/**
5149
* Implementation of {@link SubscriptionRegistry} that stores subscriptions
5250
* in memory and uses a {@link org.springframework.util.PathMatcher PathMatcher}
@@ -181,7 +179,7 @@ public void unregisterAllSubscriptions(String sessionId) {
181179
protected MultiValueMap<String, String> findSubscriptionsInternal(String destination,
182180
Message<?> message) {
183181

184-
MultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination);
182+
LinkedMultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination);
185183
if (result != null) {
186184
return filterSubscriptions(result, message);
187185
}
@@ -258,39 +256,39 @@ public String toString() {
258256
private class DestinationCache {
259257

260258
/** Map from destination -> <sessionId, subscriptionId> for fast look-ups */
261-
private final Map<String, MultiValueMap<String, String>> accessCache =
262-
new ConcurrentHashMap<String, MultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT);
259+
private final Map<String, LinkedMultiValueMap<String, String>> accessCache =
260+
new ConcurrentHashMap<String, LinkedMultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT);
263261

264262
/** Map from destination -> <sessionId, subscriptionId> with locking */
265263
@SuppressWarnings("serial")
266-
private final Map<String, MultiValueMap<String, String>> updateCache =
267-
new LinkedHashMap<String, MultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT, 0.75f, true) {
264+
private final Map<String, LinkedMultiValueMap<String, String>> updateCache =
265+
new LinkedHashMap<String, LinkedMultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT, 0.75f, true) {
268266
@Override
269-
protected boolean removeEldestEntry(Map.Entry<String, MultiValueMap<String, String>> eldest) {
267+
protected boolean removeEldestEntry(Map.Entry<String, LinkedMultiValueMap<String, String>> eldest) {
270268
return size() > getCacheLimit();
271269
}
272270
};
273271

274272

275-
public MultiValueMap<String, String> getSubscriptions(String destination) {
273+
public LinkedMultiValueMap<String, String> getSubscriptions(String destination) {
276274
return this.accessCache.get(destination);
277275
}
278276

279-
public void addSubscriptions(String destination, MultiValueMap<String, String> subscriptions) {
277+
public void addSubscriptions(String destination, LinkedMultiValueMap<String, String> subscriptions) {
280278
synchronized (this.updateCache) {
281-
this.updateCache.put(destination, new LinkedMultiValueMap<String, String>(subscriptions));
279+
this.updateCache.put(destination, subscriptions.deepCopy());
282280
this.accessCache.put(destination, subscriptions);
283281
}
284282
}
285283

286284
public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
287285
synchronized (this.updateCache) {
288-
for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
286+
for (Map.Entry<String, LinkedMultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
289287
String cachedDestination = entry.getKey();
290288
if (getPathMatcher().match(destination, cachedDestination)) {
291-
MultiValueMap<String, String> subs = entry.getValue();
289+
LinkedMultiValueMap<String, String> subs = entry.getValue();
292290
subs.add(sessionId, subsId);
293-
this.accessCache.put(cachedDestination, new LinkedMultiValueMap<String, String>(subs));
291+
this.accessCache.put(cachedDestination, subs.deepCopy());
294292
}
295293
}
296294
}
@@ -299,9 +297,9 @@ public void updateAfterNewSubscription(String destination, String sessionId, Str
299297
public void updateAfterRemovedSubscription(String sessionId, String subsId) {
300298
synchronized (this.updateCache) {
301299
Set<String> destinationsToRemove = new HashSet<String>();
302-
for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
300+
for (Map.Entry<String, LinkedMultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
303301
String destination = entry.getKey();
304-
MultiValueMap<String, String> sessionMap = entry.getValue();
302+
LinkedMultiValueMap<String, String> sessionMap = entry.getValue();
305303
List<String> subscriptions = sessionMap.get(sessionId);
306304
if (subscriptions != null) {
307305
subscriptions.remove(subsId);
@@ -312,7 +310,7 @@ public void updateAfterRemovedSubscription(String sessionId, String subsId) {
312310
destinationsToRemove.add(destination);
313311
}
314312
else {
315-
this.accessCache.put(destination, new LinkedMultiValueMap<String, String>(sessionMap));
313+
this.accessCache.put(destination, sessionMap.deepCopy());
316314
}
317315
}
318316
}
@@ -326,15 +324,15 @@ public void updateAfterRemovedSubscription(String sessionId, String subsId) {
326324
public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
327325
synchronized (this.updateCache) {
328326
Set<String> destinationsToRemove = new HashSet<String>();
329-
for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
327+
for (Map.Entry<String, LinkedMultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
330328
String destination = entry.getKey();
331-
MultiValueMap<String, String> sessionMap = entry.getValue();
329+
LinkedMultiValueMap<String, String> sessionMap = entry.getValue();
332330
if (sessionMap.remove(info.getSessionId()) != null) {
333331
if (sessionMap.isEmpty()) {
334332
destinationsToRemove.add(destination);
335333
}
336334
else {
337-
this.accessCache.put(destination, new LinkedMultiValueMap<String, String>(sessionMap));
335+
this.accessCache.put(destination, sessionMap.deepCopy());
338336
}
339337
}
340338
}
@@ -351,6 +349,7 @@ public String toString() {
351349
}
352350
}
353351

352+
354353
/**
355354
* Provide access to session subscriptions by sessionId.
356355
*/
@@ -360,7 +359,6 @@ private static class SessionSubscriptionRegistry {
360359
private final ConcurrentMap<String, SessionSubscriptionInfo> sessions =
361360
new ConcurrentHashMap<String, SessionSubscriptionInfo>();
362361

363-
364362
public SessionSubscriptionInfo getSubscriptions(String sessionId) {
365363
return this.sessions.get(sessionId);
366364
}
@@ -394,6 +392,7 @@ public String toString() {
394392
}
395393
}
396394

395+
397396
/**
398397
* Hold subscriptions for a session.
399398
*/
@@ -407,7 +406,6 @@ private static class SessionSubscriptionInfo {
407406

408407
private final Object monitor = new Object();
409408

410-
411409
public SessionSubscriptionInfo(String sessionId) {
412410
Assert.notNull(sessionId, "sessionId must not be null");
413411
this.sessionId = sessionId;
@@ -473,19 +471,18 @@ public String toString() {
473471
}
474472
}
475473

474+
476475
private static class Subscription {
477476

478477
private final String id;
479478

480479
private final Expression selectorExpression;
481480

482-
483481
public Subscription(String id, Expression selector) {
484482
this.id = id;
485483
this.selectorExpression = selector;
486484
}
487485

488-
489486
public String getId() {
490487
return this.id;
491488
}
@@ -496,10 +493,11 @@ public Expression getSelectorExpression() {
496493

497494
@Override
498495
public String toString() {
499-
return "Subscription id='" + this.id;
496+
return "subscription(id=" + this.id + ")";
500497
}
501498
}
502499

500+
503501
private static class SimpMessageHeaderPropertyAccessor implements PropertyAccessor {
504502

505503
@Override
@@ -515,7 +513,8 @@ public boolean canRead(EvaluationContext context, Object target, String name) {
515513
@Override
516514
public TypedValue read(EvaluationContext context, Object target, String name) throws AccessException {
517515
MessageHeaders headers = (MessageHeaders) target;
518-
SimpMessageHeaderAccessor accessor = getAccessor(headers, SimpMessageHeaderAccessor.class);
516+
SimpMessageHeaderAccessor accessor =
517+
MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class);
519518
Object value;
520519
if ("destination".equalsIgnoreCase(name)) {
521520
value = accessor.getDestination();

0 commit comments

Comments
 (0)