Skip to content

Commit e2feed4

Browse files
committed
Move "handlers" field to AbstractSubscribableChannel
Move the management of subscribers to the abstract parent class where it belongs.
1 parent 4e933b4 commit e2feed4

File tree

4 files changed

+34
-67
lines changed

4 files changed

+34
-67
lines changed

spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
import org.springframework.messaging.MessageHandler;
2020
import org.springframework.messaging.SubscribableChannel;
2121

22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Set;
25+
import java.util.concurrent.CopyOnWriteArraySet;
26+
2227
/**
2328
* Abstract base class for {@link SubscribableChannel} implementations.
2429
*
@@ -27,9 +32,20 @@
2732
*/
2833
public abstract class AbstractSubscribableChannel extends AbstractMessageChannel implements SubscribableChannel {
2934

35+
private final Set<MessageHandler> handlers = new CopyOnWriteArraySet<MessageHandler>();
36+
37+
38+
public Set<MessageHandler> getSubscribers() {
39+
return Collections.<MessageHandler>unmodifiableSet(this.handlers);
40+
}
41+
42+
public boolean hasSubscription(MessageHandler handler) {
43+
return this.handlers.contains(handler);
44+
}
45+
3046
@Override
31-
public final boolean subscribe(MessageHandler handler) {
32-
boolean result = subscribeInternal(handler);
47+
public boolean subscribe(MessageHandler handler) {
48+
boolean result = this.handlers.add(handler);
3349
if (result) {
3450
if (logger.isDebugEnabled()) {
3551
logger.debug("[" + getBeanName() + "] subscribed " + handler);
@@ -39,8 +55,8 @@ public final boolean subscribe(MessageHandler handler) {
3955
}
4056

4157
@Override
42-
public final boolean unsubscribe(MessageHandler handler) {
43-
boolean result = unsubscribeInternal(handler);
58+
public boolean unsubscribe(MessageHandler handler) {
59+
boolean result = this.handlers.remove(handler);
4460
if (result) {
4561
if (logger.isDebugEnabled()) {
4662
logger.debug("[" + getBeanName() + "] unsubscribed " + handler);
@@ -49,20 +65,4 @@ public final boolean unsubscribe(MessageHandler handler) {
4965
return result;
5066
}
5167

52-
53-
/**
54-
* Whether the given {@link MessageHandler} is already subscribed.
55-
*/
56-
public abstract boolean hasSubscription(MessageHandler handler);
57-
58-
/**
59-
* Subscribe the given {@link MessageHandler}.
60-
*/
61-
protected abstract boolean subscribeInternal(MessageHandler handler);
62-
63-
/**
64-
* Unsubscribe the given {@link MessageHandler}.
65-
*/
66-
protected abstract boolean unsubscribeInternal(MessageHandler handler);
67-
6868
}

spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
3535

3636
private final Executor executor;
3737

38-
private final Set<MessageHandler> handlers = new CopyOnWriteArraySet<MessageHandler>();
39-
4038

4139
/**
4240
* Create a new {@link ExecutorSubscribableChannel} instance where messages will be sent
@@ -61,14 +59,9 @@ public Executor getExecutor() {
6159
return this.executor;
6260
}
6361

64-
@Override
65-
public boolean hasSubscription(MessageHandler handler) {
66-
return this.handlers.contains(handler);
67-
}
68-
6962
@Override
7063
public boolean sendInternal(final Message<?> message, long timeout) {
71-
for (final MessageHandler handler : this.handlers) {
64+
for (final MessageHandler handler : getSubscribers()) {
7265
if (this.executor == null) {
7366
handler.handleMessage(message);
7467
}
@@ -84,14 +77,4 @@ public void run() {
8477
return true;
8578
}
8679

87-
@Override
88-
public boolean subscribeInternal(MessageHandler handler) {
89-
return this.handlers.add(handler);
90-
}
91-
92-
@Override
93-
public boolean unsubscribeInternal(MessageHandler handler) {
94-
return this.handlers.remove(handler);
95-
}
96-
9780
}

spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.ArrayList;
2020
import java.util.Iterator;
2121
import java.util.List;
22+
import java.util.Set;
2223

2324
import org.hamcrest.Matchers;
2425
import org.junit.Before;
@@ -93,7 +94,7 @@ public void setupOnce() {
9394
public void clientInboundChannel() {
9495

9596
TestChannel channel = this.simpleContext.getBean("clientInboundChannel", TestChannel.class);
96-
List<MessageHandler> handlers = channel.handlers;
97+
Set<MessageHandler> handlers = channel.getSubscribers();
9798

9899
assertEquals(3, handlers.size());
99100
assertTrue(handlers.contains(simpleContext.getBean(SimpAnnotationMethodMessageHandler.class)));
@@ -104,12 +105,12 @@ public void clientInboundChannel() {
104105
@Test
105106
public void clientInboundChannelWithBrokerRelay() {
106107
TestChannel channel = this.brokerRelayContext.getBean("clientInboundChannel", TestChannel.class);
107-
List<MessageHandler> values = channel.handlers;
108+
Set<MessageHandler> handlers = channel.getSubscribers();
108109

109-
assertEquals(3, values.size());
110-
assertTrue(values.contains(brokerRelayContext.getBean(SimpAnnotationMethodMessageHandler.class)));
111-
assertTrue(values.contains(brokerRelayContext.getBean(UserDestinationMessageHandler.class)));
112-
assertTrue(values.contains(brokerRelayContext.getBean(StompBrokerRelayMessageHandler.class)));
110+
assertEquals(3, handlers.size());
111+
assertTrue(handlers.contains(brokerRelayContext.getBean(SimpAnnotationMethodMessageHandler.class)));
112+
assertTrue(handlers.contains(brokerRelayContext.getBean(UserDestinationMessageHandler.class)));
113+
assertTrue(handlers.contains(brokerRelayContext.getBean(StompBrokerRelayMessageHandler.class)));
113114
}
114115

115116
@Test
@@ -197,7 +198,7 @@ public void clientOutboundChannelCustomized() {
197198
@Test
198199
public void brokerChannel() {
199200
TestChannel channel = this.simpleContext.getBean("brokerChannel", TestChannel.class);
200-
List<MessageHandler> handlers = channel.handlers;
201+
Set<MessageHandler> handlers = channel.getSubscribers();
201202

202203
assertEquals(2, handlers.size());
203204
assertTrue(handlers.contains(simpleContext.getBean(UserDestinationMessageHandler.class)));
@@ -207,7 +208,7 @@ public void brokerChannel() {
207208
@Test
208209
public void brokerChannelWithBrokerRelay() {
209210
TestChannel channel = this.brokerRelayContext.getBean("brokerChannel", TestChannel.class);
210-
List<MessageHandler> handlers = channel.handlers;
211+
Set<MessageHandler> handlers = channel.getSubscribers();
211212

212213
assertEquals(2, handlers.size());
213214
assertTrue(handlers.contains(brokerRelayContext.getBean(UserDestinationMessageHandler.class)));
@@ -451,17 +452,8 @@ protected void configureMessageBroker(MessageBrokerRegistry registry) {
451452

452453
private static class TestChannel extends ExecutorSubscribableChannel {
453454

454-
private final List<MessageHandler> handlers = new ArrayList<>();
455-
456455
private final List<Message<?>> messages = new ArrayList<>();
457456

458-
459-
@Override
460-
public boolean subscribeInternal(MessageHandler handler) {
461-
this.handlers.add(handler);
462-
return super.subscribeInternal(handler);
463-
}
464-
465457
@Override
466458
public boolean sendInternal(Message<?> message, long timeout) {
467459
this.messages.add(message);

spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.ArrayList;
2020
import java.util.List;
2121
import java.util.Map;
22+
import java.util.Set;
2223

2324
import org.junit.Before;
2425
import org.junit.Test;
@@ -95,10 +96,10 @@ public void clientInboundChannelSendMessage() throws Exception {
9596
@Test
9697
public void clientOutboundChannelChannel() {
9798
TestChannel channel = this.config.getBean("clientOutboundChannel", TestChannel.class);
98-
List<MessageHandler> values = channel.handlers;
99+
Set<MessageHandler> handlers = channel.getSubscribers();
99100

100-
assertEquals(1, values.size());
101-
assertTrue(values.get(0) instanceof SubProtocolWebSocketHandler);
101+
assertEquals(1, handlers.size());
102+
assertTrue(handlers.iterator().next() instanceof SubProtocolWebSocketHandler);
102103
}
103104

104105

@@ -155,17 +156,8 @@ public AbstractSubscribableChannel brokerChannel() {
155156

156157
private static class TestChannel extends ExecutorSubscribableChannel {
157158

158-
private final List<MessageHandler> handlers = new ArrayList<>();
159-
160159
private final List<Message<?>> messages = new ArrayList<>();
161160

162-
163-
@Override
164-
public boolean subscribeInternal(MessageHandler handler) {
165-
this.handlers.add(handler);
166-
return super.subscribeInternal(handler);
167-
}
168-
169161
@Override
170162
public boolean sendInternal(Message<?> message, long timeout) {
171163
this.messages.add(message);

0 commit comments

Comments
 (0)