Skip to content

Commit 58ac007

Browse files
artembilanspring-builds
authored andcommitted
GH-3166: SMLC: synchronize on processorThreadsToInterrupt
Fixes: #3166 The `processorThreadsToInterrupt` is iterated in the `shutdownAndWaitOrCallback()`, and apparently one of the processors has finished successfully removing itself from the `processorThreadsToInterrupt` list. This leads to the `ConcurrentModificationException` on the mentioned above iteration. * Fix `SimpleMessageListenerContainer` making the `processorThreadsToInterrupt` as a `Collections.synchronizedList()` * Wrap `processorThreadsToInterrupt` iteration in the `shutdownAndWaitOrCallback()` with a `synchronized (this.processorThreadsToInterrupt)` (cherry picked from commit 6dc4457)
1 parent cde9125 commit 58ac007

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.ArrayList;
2020
import java.util.Arrays;
2121
import java.util.Collection;
22+
import java.util.Collections;
2223
import java.util.HashSet;
2324
import java.util.Iterator;
2425
import java.util.List;
@@ -112,7 +113,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
112113

113114
private final ActiveObjectCounter<BlockingQueueConsumer> cancellationLock = new ActiveObjectCounter<>();
114115

115-
private final List<Thread> processorThreadsToInterrupt = new ArrayList<>();
116+
private final List<Thread> processorThreadsToInterrupt = Collections.synchronizedList(new ArrayList<>());
116117

117118
private long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL;
118119

@@ -696,7 +697,9 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
696697
else {
697698
logger.info("Workers not finished.");
698699
if (isForceCloseChannel() || this.stopNow.get()) {
699-
this.processorThreadsToInterrupt.forEach(Thread::interrupt);
700+
synchronized (this.processorThreadsToInterrupt) {
701+
this.processorThreadsToInterrupt.forEach(Thread::interrupt);
702+
}
700703
canceledConsumers.forEach(consumer -> {
701704
if (logger.isWarnEnabled()) {
702705
logger.warn("Closing channel for unresponsive consumer: " + consumer);

0 commit comments

Comments
 (0)