Skip to content

Commit 4886edd

Browse files
committed
Send STOMP ERROR if external broker not available
Issue: SPR-12820
1 parent 1803978 commit 4886edd

File tree

2 files changed

+71
-8
lines changed

2 files changed

+71
-8
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import org.springframework.messaging.Message;
2727
import org.springframework.messaging.MessageChannel;
2828
import org.springframework.messaging.MessageDeliveryException;
29+
import org.springframework.messaging.MessageHeaders;
2930
import org.springframework.messaging.SubscribableChannel;
3031
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
3132
import org.springframework.messaging.simp.SimpMessageType;
@@ -399,12 +400,21 @@ protected void handleMessageInternal(Message<?> message) {
399400
throw new MessageDeliveryException("Message broker not active. Consider subscribing to " +
400401
"receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.");
401402
}
402-
SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders());
403-
if (logger.isErrorEnabled() && SimpMessageType.CONNECT.equals(messageType)) {
404-
logger.error("Broker not active. Ignoring " + message);
403+
StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
404+
if (handler != null) {
405+
handler.sendStompErrorFrameToClient("Broker not available.");
406+
handler.clearConnection();
405407
}
406-
else if (logger.isDebugEnabled()) {
407-
logger.debug("Broker not active. Ignoring " + message);
408+
else {
409+
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
410+
if (getHeaderInitializer() != null) {
411+
getHeaderInitializer().initHeaders(accessor);
412+
}
413+
accessor.setSessionId(sessionId);
414+
accessor.setUser(SimpMessageHeaderAccessor.getUser(message.getHeaders()));
415+
accessor.setMessage("Broker not available.");
416+
MessageHeaders headers = accessor.getMessageHeaders();
417+
getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
408418
}
409419
return;
410420
}

spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -146,7 +146,7 @@ public void testDestinationExcluded() throws Exception {
146146
}
147147

148148
@Test
149-
public void testOutboundMessage() throws Exception {
149+
public void testOutboundMessageIsEnriched() throws Exception {
150150

151151
this.brokerRelay.start();
152152

@@ -170,6 +170,59 @@ public void testOutboundMessage() throws Exception {
170170
assertEquals("joe", actualHeaders.getUser().getName());
171171
}
172172

173+
// SPR-12820
174+
175+
@Test
176+
public void testConnectWhenBrokerNotAvailable() throws Exception {
177+
178+
this.brokerRelay.start();
179+
this.brokerRelay.stopInternal();
180+
181+
String sessionId = "sess1";
182+
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
183+
headers.setSessionId(sessionId);
184+
headers.setUser(new TestPrincipal("joe"));
185+
this.brokerRelay.handleMessage(MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()));
186+
187+
Message<byte[]> actual = this.outboundChannel.getMessages().get(0);
188+
StompHeaderAccessor actualHeaders = StompHeaderAccessor.getAccessor(actual, StompHeaderAccessor.class);
189+
assertEquals(StompCommand.ERROR, actualHeaders.getCommand());
190+
assertEquals(sessionId, actualHeaders.getSessionId());
191+
assertEquals("joe", actualHeaders.getUser().getName());
192+
assertEquals("Broker not available.", actualHeaders.getMessage());
193+
}
194+
195+
@Test
196+
public void testSendAfterBrokerUnavailable() throws Exception {
197+
198+
this.brokerRelay.start();
199+
200+
String sessionId = "sess1";
201+
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
202+
headers.setSessionId(sessionId);
203+
headers.setUser(new TestPrincipal("joe"));
204+
this.brokerRelay.handleMessage(MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()));
205+
206+
assertEquals(2, this.brokerRelay.getConnectionCount());
207+
208+
this.brokerRelay.stopInternal();
209+
210+
headers = StompHeaderAccessor.create(StompCommand.SEND);
211+
headers.setSessionId(sessionId);
212+
headers.setUser(new TestPrincipal("joe"));
213+
headers.setDestination("/foo");
214+
this.brokerRelay.handleMessage(MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()));
215+
216+
assertEquals(1, this.brokerRelay.getConnectionCount());
217+
218+
Message<byte[]> actual = this.outboundChannel.getMessages().get(0);
219+
StompHeaderAccessor actualHeaders = StompHeaderAccessor.getAccessor(actual, StompHeaderAccessor.class);
220+
assertEquals(StompCommand.ERROR, actualHeaders.getCommand());
221+
assertEquals(sessionId, actualHeaders.getSessionId());
222+
assertEquals("joe", actualHeaders.getUser().getName());
223+
assertEquals("Broker not available.", actualHeaders.getMessage());
224+
}
225+
173226

174227
private static ListenableFutureTask<Void> getVoidFuture() {
175228
ListenableFutureTask<Void> futureTask = new ListenableFutureTask<>(new Callable<Void>() {

0 commit comments

Comments
 (0)