Skip to content

Commit cde9125

Browse files
artembilanspring-builds
authored andcommitted
GH-2920: SMLC: Interrupt ML on shutdown
Fixes: #2920 When the `SimpleMessageListenerContainer.shutdownTimeout` period is over, there is no reason to keep any related threads active, sometimes blocking the whole application stop. * Introduce a `List SimpleMessageListenerContainer.processorThreadsToInterrupt` to keep track of the scheduled `AsyncMessageProcessingConsumer`. * Interrupt those threads when `this.cancellationLock.await()` is not successful in the `SimpleMessageListenerContainer.shutdownAndWaitOrCallback()` (cherry picked from commit e1081de)
1 parent a85931f commit cde9125

File tree

2 files changed

+50
-0
lines changed

2 files changed

+50
-0
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
112112

113113
private final ActiveObjectCounter<BlockingQueueConsumer> cancellationLock = new ActiveObjectCounter<>();
114114

115+
private final List<Thread> processorThreadsToInterrupt = new ArrayList<>();
116+
115117
private long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL;
116118

117119
private long stopConsumerMinInterval = DEFAULT_STOP_CONSUMER_MIN_INTERVAL;
@@ -694,6 +696,7 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
694696
else {
695697
logger.info("Workers not finished.");
696698
if (isForceCloseChannel() || this.stopNow.get()) {
699+
this.processorThreadsToInterrupt.forEach(Thread::interrupt);
697700
canceledConsumers.forEach(consumer -> {
698701
if (logger.isWarnEnabled()) {
699702
logger.warn("Closing channel for unresponsive consumer: " + consumer);
@@ -1320,6 +1323,7 @@ public void run() { // NOSONAR - line count
13201323

13211324
try {
13221325
initialize();
1326+
SimpleMessageListenerContainer.this.processorThreadsToInterrupt.add(Thread.currentThread());
13231327
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
13241328
mainLoop();
13251329
}
@@ -1398,6 +1402,7 @@ public void run() { // NOSONAR - line count
13981402
}
13991403
}
14001404
finally {
1405+
SimpleMessageListenerContainer.this.processorThreadsToInterrupt.remove(Thread.currentThread());
14011406
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
14021407
if (getTransactionManager() != null) {
14031408
ConsumerChannelRegistry.unRegisterConsumerChannel();

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.rabbitmq.client.AMQP.BasicProperties;
4141
import com.rabbitmq.client.Channel;
4242
import com.rabbitmq.client.Consumer;
43+
import com.rabbitmq.client.DefaultConsumer;
4344
import com.rabbitmq.client.Envelope;
4445
import com.rabbitmq.client.PossibleAuthenticationFailureException;
4546
import org.aopalliance.intercept.MethodInterceptor;
@@ -849,6 +850,50 @@ public void testBatchReceiveTimedOut() throws Exception {
849850
verifyNoMoreInteractions(listener);
850851
}
851852

853+
@Test
854+
@SuppressWarnings("unchecked")
855+
void listenerIsInterruptedOnUnsuccessfulShutdown() throws Exception {
856+
ConnectionFactory connectionFactory = mock();
857+
Connection connection = mock();
858+
Channel channel = mock();
859+
given(connectionFactory.createConnection()).willReturn(connection);
860+
given(connection.createChannel(false)).willReturn(channel);
861+
willAnswer(invocation -> "1")
862+
.given(channel)
863+
.basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(),
864+
any(Consumer.class));
865+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
866+
container.setQueueNames("some_queue");
867+
868+
CountDownLatch handlingLatch = new CountDownLatch(1);
869+
CountDownLatch interruptedLatch = new CountDownLatch(1);
870+
container.setMessageListener(message -> {
871+
handlingLatch.countDown();
872+
try {
873+
Thread.sleep(2000L);
874+
}
875+
catch (InterruptedException e) {
876+
interruptedLatch.countDown();
877+
}
878+
});
879+
container.setShutdownTimeout(200L);
880+
container.start();
881+
882+
Set<BlockingQueueConsumer> consumers = TestUtils.getPropertyValue(container, "consumers", Set.class);
883+
BlockingQueueConsumer blockingQueueConsumer = consumers.iterator().next();
884+
885+
Map<String, DefaultConsumer> internalConsumers =
886+
TestUtils.getPropertyValue(blockingQueueConsumer, "consumers", Map.class);
887+
internalConsumers.values().iterator().next()
888+
.handleDelivery("1", new Envelope(1, false, "", ""), new BasicProperties(), new byte[] {1});
889+
890+
assertThat(handlingLatch.await(10, TimeUnit.SECONDS)).isTrue();
891+
892+
container.stop();
893+
894+
assertThat(interruptedLatch.await(10, TimeUnit.SECONDS)).isTrue();
895+
}
896+
852897
private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
853898
final boolean cancel, final CountDownLatch latch) {
854899
return invocation -> {

0 commit comments

Comments
 (0)