-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Description
2.7 introduced this utility method so that error handlers with a BackOff would be responsive to a container.stop() operation; previously, the stop() would be delayed.
spring-kafka/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java
Lines 237 to 253 in 8bcb052
| /** | |
| * Sleep for the desired timeout, as long as the container continues to run. | |
| * @param container the container. | |
| * @param interval the timeout. | |
| * @throws InterruptedException if the thread is interrupted. | |
| * @since 2.7 | |
| */ | |
| public static void stoppableSleep(MessageListenerContainer container, long interval) throws InterruptedException { | |
| long timeout = System.currentTimeMillis() + interval; | |
| do { | |
| Thread.sleep(SLEEP_INTERVAL); | |
| if (!container.isRunning()) { | |
| break; | |
| } | |
| } | |
| while (System.currentTimeMillis() < timeout); | |
| } |
This can also be used with idleBetweenPolls to avoid delaying the stop() there too.
Lines 1533 to 1541 in 8bcb052
| if (idleBetweenPolls > 0) { | |
| try { | |
| TimeUnit.MILLISECONDS.sleep(idleBetweenPolls); | |
| } | |
| catch (InterruptedException ex) { | |
| Thread.currentThread().interrupt(); | |
| throw new IllegalStateException("Consumer Thread [" + this + "] has been interrupted", ex); | |
| } | |
| } |
Enhance the method to use a regular sleep() if the idle interval is less than 500ms.