Skip to content

Commit ca9beea

Browse files
committed
DefaultSubscriptionRegistry uses deep LinkedMultiValueMap copies between accessCache and updateCache
Also backported CopyOnWriteArraySet use from 4.2, for defensive iteration over registered subscriptions. Issue: SPR-13185
1 parent 3eb54cc commit ca9beea

File tree

2 files changed

+73
-61
lines changed

2 files changed

+73
-61
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import java.util.Collection;
2020
import java.util.HashSet;
2121
import java.util.LinkedHashMap;
22+
import java.util.LinkedList;
2223
import java.util.List;
2324
import java.util.Map;
2425
import java.util.Set;
2526
import java.util.concurrent.ConcurrentHashMap;
2627
import java.util.concurrent.ConcurrentMap;
28+
import java.util.concurrent.CopyOnWriteArraySet;
2729

2830
import org.springframework.messaging.Message;
2931
import org.springframework.util.AntPathMatcher;
@@ -32,7 +34,6 @@
3234
import org.springframework.util.MultiValueMap;
3335
import org.springframework.util.PathMatcher;
3436

35-
3637
/**
3738
* A default, simple in-memory implementation of {@link SubscriptionRegistry}.
3839
*
@@ -166,7 +167,7 @@ public MultiValueMap<String, String> getSubscriptions(String destination) {
166167

167168
public void addSubscriptions(String destination, MultiValueMap<String, String> subscriptions) {
168169
synchronized (this.updateCache) {
169-
this.updateCache.put(destination, new LinkedMultiValueMap<String, String>(subscriptions));
170+
this.updateCache.put(destination, deepCopy(subscriptions));
170171
this.accessCache.put(destination, subscriptions);
171172
}
172173
}
@@ -178,7 +179,7 @@ public void updateAfterNewSubscription(String destination, String sessionId, Str
178179
if (getPathMatcher().match(destination, cachedDestination)) {
179180
MultiValueMap<String, String> subs = entry.getValue();
180181
subs.add(sessionId, subsId);
181-
this.accessCache.put(cachedDestination, new LinkedMultiValueMap<String, String>(subs));
182+
this.accessCache.put(cachedDestination, deepCopy(subs));
182183
}
183184
}
184185
}
@@ -200,7 +201,7 @@ public void updateAfterRemovedSubscription(String sessionId, String subsId) {
200201
destinationsToRemove.add(destination);
201202
}
202203
else {
203-
this.accessCache.put(destination, new LinkedMultiValueMap<String, String>(sessionMap));
204+
this.accessCache.put(destination, deepCopy(sessionMap));
204205
}
205206
}
206207
}
@@ -222,7 +223,7 @@ public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
222223
destinationsToRemove.add(destination);
223224
}
224225
else {
225-
this.accessCache.put(destination, new LinkedMultiValueMap<String, String>(sessionMap));
226+
this.accessCache.put(destination, deepCopy(sessionMap));
226227
}
227228
}
228229
}
@@ -233,12 +234,21 @@ public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
233234
}
234235
}
235236

237+
private <K, V> LinkedMultiValueMap<K, V> deepCopy(Map<K, List<V>> map) {
238+
LinkedMultiValueMap<K, V> copy = new LinkedMultiValueMap<K, V>();
239+
for (Map.Entry<K, List<V>> entry : map.entrySet()) {
240+
copy.put(entry.getKey(), new LinkedList<V>(entry.getValue()));
241+
}
242+
return copy;
243+
}
244+
236245
@Override
237246
public String toString() {
238247
return "cache[" + this.accessCache.size() + " destination(s)]";
239248
}
240249
}
241250

251+
242252
/**
243253
* Provide access to session subscriptions by sessionId.
244254
*/
@@ -276,10 +286,11 @@ public SessionSubscriptionInfo removeSubscriptions(String sessionId) {
276286

277287
@Override
278288
public String toString() {
279-
return "registry[" + sessions.size() + " sessions]";
289+
return "registry[" + this.sessions.size() + " sessions]";
280290
}
281291
}
282292

293+
283294
/**
284295
* Hold subscriptions for a session.
285296
*/
@@ -292,7 +303,6 @@ private static class SessionSubscriptionInfo {
292303

293304
private final Object monitor = new Object();
294305

295-
296306
public SessionSubscriptionInfo(String sessionId) {
297307
Assert.notNull(sessionId, "sessionId must not be null");
298308
this.sessionId = sessionId;
@@ -316,7 +326,7 @@ public void addSubscription(String destination, String subscriptionId) {
316326
synchronized (this.monitor) {
317327
subs = this.subscriptions.get(destination);
318328
if (subs == null) {
319-
subs = new HashSet<String>(4);
329+
subs = new CopyOnWriteArraySet<String>();
320330
this.subscriptions.put(destination, subs);
321331
}
322332
}

spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java

Lines changed: 55 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -34,25 +34,19 @@
3434

3535

3636
/**
37-
* Test fixture for {@link org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry}.
37+
* Test fixture for
38+
* {@link org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry}.
3839
*
3940
* @author Rossen Stoyanchev
4041
* @author Sebastien Deleuze
4142
*/
4243
public class DefaultSubscriptionRegistryTests {
4344

44-
private DefaultSubscriptionRegistry registry;
45-
46-
47-
@Before
48-
public void setup() {
49-
this.registry = new DefaultSubscriptionRegistry();
50-
}
45+
private final DefaultSubscriptionRegistry registry = new DefaultSubscriptionRegistry();
5146

5247

5348
@Test
5449
public void registerSubscriptionInvalidInput() {
55-
5650
String sessId = "sess01";
5751
String subsId = "subs01";
5852
String dest = "/foo";
@@ -69,7 +63,6 @@ public void registerSubscriptionInvalidInput() {
6963

7064
@Test
7165
public void registerSubscription() {
72-
7366
String sessId = "sess01";
7467
String subsId = "subs01";
7568
String dest = "/foo";
@@ -83,7 +76,6 @@ public void registerSubscription() {
8376

8477
@Test
8578
public void registerSubscriptionOneSession() {
86-
8779
String sessId = "sess01";
8880
List<String> subscriptionIds = Arrays.asList("subs01", "subs02", "subs03");
8981
String dest = "/foo";
@@ -93,14 +85,12 @@ public void registerSubscriptionOneSession() {
9385
}
9486

9587
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message(dest));
96-
9788
assertEquals("Expected one element " + actual, 1, actual.size());
9889
assertEquals(subscriptionIds, sort(actual.get(sessId)));
9990
}
10091

10192
@Test
10293
public void registerSubscriptionMultipleSessions() {
103-
10494
List<String> sessIds = Arrays.asList("sess01", "sess02", "sess03");
10595
List<String> subscriptionIds = Arrays.asList("subs01", "subs02", "subs03");
10696
String dest = "/foo";
@@ -112,7 +102,6 @@ public void registerSubscriptionMultipleSessions() {
112102
}
113103

114104
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message(dest));
115-
116105
assertEquals("Expected three elements " + actual, 3, actual.size());
117106
assertEquals(subscriptionIds, sort(actual.get(sessIds.get(0))));
118107
assertEquals(subscriptionIds, sort(actual.get(sessIds.get(1))));
@@ -121,22 +110,18 @@ public void registerSubscriptionMultipleSessions() {
121110

122111
@Test
123112
public void registerSubscriptionWithDestinationPattern() {
124-
125113
String sessId = "sess01";
126114
String subsId = "subs01";
127115
String destPattern = "/topic/PRICE.STOCK.*.IBM";
128116
String dest = "/topic/PRICE.STOCK.NASDAQ.IBM";
129-
130117
this.registry.registerSubscription(subscribeMessage(sessId, subsId, destPattern));
131-
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message(dest));
132118

119+
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message(dest));
133120
assertEquals("Expected one element " + actual, 1, actual.size());
134121
assertEquals(Arrays.asList(subsId), actual.get(sessId));
135122
}
136123

137-
// SPR-11657
138-
139-
@Test
124+
@Test // SPR-11657
140125
public void registerSubscriptionsWithSimpleAndPatternDestinations() {
141126

142127
String sess1 = "sess01";
@@ -148,51 +133,55 @@ public void registerSubscriptionsWithSimpleAndPatternDestinations() {
148133

149134
this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NASDAQ.IBM"));
150135
this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.*.IBM"));
136+
151137
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
152138
assertEquals(1, actual.size());
153139
assertEquals(Arrays.asList(subs2, subs1), actual.get(sess1));
154140

155141
this.registry.registerSubscription(subscribeMessage(sess2, subs1, "/topic/PRICE.STOCK.NASDAQ.IBM"));
156142
this.registry.registerSubscription(subscribeMessage(sess2, subs2, "/topic/PRICE.STOCK.NYSE.IBM"));
157143
this.registry.registerSubscription(subscribeMessage(sess2, subs3, "/topic/PRICE.STOCK.NASDAQ.GOOG"));
144+
158145
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
159146
assertEquals(2, actual.size());
160147
assertEquals(Arrays.asList(subs2, subs1), actual.get(sess1));
161148
assertEquals(Arrays.asList(subs1), actual.get(sess2));
162149

163150
this.registry.unregisterAllSubscriptions(sess1);
151+
164152
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
165153
assertEquals(1, actual.size());
166154
assertEquals(Arrays.asList(subs1), actual.get(sess2));
167155

168156
this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.*.IBM"));
169157
this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NASDAQ.IBM"));
158+
170159
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
171160
assertEquals(2, actual.size());
172161
assertEquals(Arrays.asList(subs1, subs2), actual.get(sess1));
173162
assertEquals(Arrays.asList(subs1), actual.get(sess2));
174163

175164
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs2));
165+
176166
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
177167
assertEquals(2, actual.size());
178168
assertEquals(Arrays.asList(subs1), actual.get(sess1));
179169
assertEquals(Arrays.asList(subs1), actual.get(sess2));
180170

181171
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs1));
172+
182173
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
183174
assertEquals(1, actual.size());
184175
assertEquals(Arrays.asList(subs1), actual.get(sess2));
185176

186177
this.registry.unregisterSubscription(unsubscribeMessage(sess2, subs1));
178+
187179
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
188180
assertEquals(0, actual.size());
189181
}
190182

191-
// SPR-11755
192-
193-
@Test
183+
@Test // SPR-11755
194184
public void registerAndUnregisterMultipleDestinations() {
195-
196185
String sess1 = "sess01";
197186
String sess2 = "sess02";
198187

@@ -219,13 +208,13 @@ public void registerAndUnregisterMultipleDestinations() {
219208
this.registry.registerSubscription(subscribeMessage(sess1, subs3, "/topic/PRICE.STOCK.NASDAQ.GOOG"));
220209
this.registry.registerSubscription(subscribeMessage(sess1, subs4, "/topic/PRICE.STOCK.NYSE.IBM"));
221210
this.registry.registerSubscription(subscribeMessage(sess2, subs5, "/topic/PRICE.STOCK.NASDAQ.GOOG"));
211+
222212
this.registry.unregisterAllSubscriptions(sess1);
223213
this.registry.unregisterAllSubscriptions(sess2);
224214
}
225215

226216
@Test
227217
public void registerSubscriptionWithDestinationPatternRegex() {
228-
229218
String sessId = "sess01";
230219
String subsId = "subs01";
231220
String destPattern = "/topic/PRICE.STOCK.*.{ticker:(IBM|MSFT)}";
@@ -249,9 +238,29 @@ public void registerSubscriptionWithDestinationPatternRegex() {
249238
assertEquals("Expected no elements " + actual, 0, actual.size());
250239
}
251240

241+
@Test // SPR-11931
242+
public void registerTwiceAndUnregisterSubscriptions() {
243+
this.registry.registerSubscription(subscribeMessage("sess01", "subs01", "/foo"));
244+
this.registry.registerSubscription(subscribeMessage("sess01", "subs02", "/foo"));
245+
246+
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message("/foo"));
247+
assertEquals("Expected 1 element", 1, actual.size());
248+
assertEquals(Arrays.asList("subs01", "subs02"), actual.get("sess01"));
249+
250+
this.registry.unregisterSubscription(unsubscribeMessage("sess01", "subs01"));
251+
252+
actual = this.registry.findSubscriptions(message("/foo"));
253+
assertEquals("Expected 1 element", 1, actual.size());
254+
assertEquals(Arrays.asList("subs02"), actual.get("sess01"));
255+
256+
this.registry.unregisterSubscription(unsubscribeMessage("sess01", "subs02"));
257+
258+
actual = this.registry.findSubscriptions(message("/foo"));
259+
assertEquals("Expected no element", 0, actual.size());
260+
}
261+
252262
@Test
253263
public void unregisterSubscription() {
254-
255264
List<String> sessIds = Arrays.asList("sess01", "sess02", "sess03");
256265
List<String> subscriptionIds = Arrays.asList("subs01", "subs02", "subs03");
257266
String dest = "/foo";
@@ -267,36 +276,13 @@ public void unregisterSubscription() {
267276
this.registry.unregisterSubscription(unsubscribeMessage(sessIds.get(0), subscriptionIds.get(2)));
268277

269278
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message(dest));
270-
271279
assertEquals("Expected two elements: " + actual, 2, actual.size());
272280
assertEquals(subscriptionIds, sort(actual.get(sessIds.get(1))));
273281
assertEquals(subscriptionIds, sort(actual.get(sessIds.get(2))));
274282
}
275283

276-
// SPR-11931
277-
278-
@Test
279-
public void registerTwiceAndUnregisterSubscriptions() {
280-
281-
this.registry.registerSubscription(subscribeMessage("sess01", "subs01", "/foo"));
282-
this.registry.registerSubscription(subscribeMessage("sess01", "subs02", "/foo"));
283-
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message("/foo"));
284-
assertEquals("Expected 1 element", 1, actual.size());
285-
assertEquals(Arrays.asList("subs01", "subs02"), actual.get("sess01"));
286-
287-
this.registry.unregisterSubscription(unsubscribeMessage("sess01", "subs01"));
288-
actual = this.registry.findSubscriptions(message("/foo"));
289-
assertEquals("Expected 1 element", 1, actual.size());
290-
assertEquals(Arrays.asList("subs02"), actual.get("sess01"));
291-
292-
this.registry.unregisterSubscription(unsubscribeMessage("sess01", "subs02"));
293-
actual = this.registry.findSubscriptions(message("/foo"));
294-
assertEquals("Expected no element", 0, actual.size());
295-
}
296-
297284
@Test
298285
public void unregisterAllSubscriptions() {
299-
300286
List<String> sessIds = Arrays.asList("sess01", "sess02", "sess03");
301287
List<String> subscriptionIds = Arrays.asList("subs01", "subs02", "subs03");
302288
String dest = "/foo";
@@ -311,7 +297,6 @@ public void unregisterAllSubscriptions() {
311297
this.registry.unregisterAllSubscriptions(sessIds.get(1));
312298

313299
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message(dest));
314-
315300
assertEquals("Expected one element: " + actual, 1, actual.size());
316301
assertEquals(subscriptionIds, sort(actual.get(sessIds.get(2))));
317302
}
@@ -328,9 +313,7 @@ public void findSubscriptionsNoMatches() {
328313
assertEquals("Expected no elements " + actual, 0, actual.size());
329314
}
330315

331-
// SPR-12665
332-
333-
@Test
316+
@Test // SPR-12665
334317
public void findSubscriptionsReturnsMapSafeToIterate() throws Exception {
335318
this.registry.registerSubscription(subscribeMessage("sess1", "1", "/foo"));
336319
this.registry.registerSubscription(subscribeMessage("sess2", "1", "/foo"));
@@ -346,6 +329,25 @@ public void findSubscriptionsReturnsMapSafeToIterate() throws Exception {
346329
// no ConcurrentModificationException
347330
}
348331

332+
@Test // SPR-13185
333+
public void findSubscriptionsReturnsMapSafeToIterateIncludingValues() throws Exception {
334+
this.registry.registerSubscription(subscribeMessage("sess1", "1", "/foo"));
335+
this.registry.registerSubscription(subscribeMessage("sess1", "2", "/foo"));
336+
337+
MultiValueMap<String, String> allSubscriptions = this.registry.findSubscriptions(message("/foo"));
338+
assertNotNull(allSubscriptions);
339+
assertEquals(1, allSubscriptions.size());
340+
341+
Iterator<String> iteratorValues = allSubscriptions.get("sess1").iterator();
342+
iteratorValues.next();
343+
344+
this.registry.unregisterSubscription(unsubscribeMessage("sess1", "2"));
345+
346+
iteratorValues.next();
347+
// no ConcurrentModificationException
348+
}
349+
350+
349351
private Message<?> subscribeMessage(String sessionId, String subscriptionId, String destination) {
350352
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.SUBSCRIBE);
351353
headers.setSessionId(sessionId);

0 commit comments

Comments
 (0)