Skip to content

Commit ce298f5

Browse files
garyrussellartembilan
authored andcommitted
GH-1845: Idle Between Polls Improvements
#1845 - don't idle until partitions are assigned - it caused delayed assignment - use `ListenerUtils.stoppableSleep()` - don't hold up container stop while idle - `ListenerUtils.stoppableSleep()` - reduce sleep time for small intervals to improve resolution
1 parent 8bcb052 commit ce298f5

File tree

3 files changed

+43
-4
lines changed

3 files changed

+43
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,13 +1526,14 @@ private void checkIdle() {
15261526

15271527
private void idleBetweenPollIfNecessary() {
15281528
long idleBetweenPolls = this.containerProperties.getIdleBetweenPolls();
1529-
if (idleBetweenPolls > 0) {
1529+
Collection<TopicPartition> assigned = getAssignedPartitions();
1530+
if (idleBetweenPolls > 0 && assigned != null && assigned.size() > 0) {
15301531
idleBetweenPolls = Math.min(idleBetweenPolls,
15311532
this.maxPollInterval - (System.currentTimeMillis() - this.lastPoll)
15321533
- 5000); // NOSONAR - less by five seconds to avoid race condition with rebalance
15331534
if (idleBetweenPolls > 0) {
15341535
try {
1535-
TimeUnit.MILLISECONDS.sleep(idleBetweenPolls);
1536+
ListenerUtils.stoppableSleep(KafkaMessageListenerContainer.this, idleBetweenPolls);
15361537
}
15371538
catch (InterruptedException ex) {
15381539
Thread.currentThread().interrupt();

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ private ListenerUtils() {
4949

5050
private static final ThreadLocal<Boolean> LOG_METADATA_ONLY = new ThreadLocal<>();
5151

52-
private static final int SLEEP_INTERVAL = 100;
52+
private static final int DEFAULT_SLEEP_INTERVAL = 100;
53+
54+
private static final int SMALL_SLEEP_INTERVAL = 10;
55+
56+
private static final long SMALL_INTERVAL_THRESHOLD = 500;
5357

5458
/**
5559
* Determine the type of the listener.
@@ -243,8 +247,9 @@ public static void unrecoverableBackOff(BackOff backOff, ThreadLocal<BackOffExec
243247
*/
244248
public static void stoppableSleep(MessageListenerContainer container, long interval) throws InterruptedException {
245249
long timeout = System.currentTimeMillis() + interval;
250+
long sleepInterval = interval > SMALL_INTERVAL_THRESHOLD ? DEFAULT_SLEEP_INTERVAL : SMALL_SLEEP_INTERVAL;
246251
do {
247-
Thread.sleep(SLEEP_INTERVAL);
252+
Thread.sleep(sleepInterval);
248253
if (!container.isRunning()) {
249254
break;
250255
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2508,6 +2508,39 @@ public void testInitialSeek() throws Exception {
25082508
container.stop();
25092509
}
25102510

2511+
@SuppressWarnings({ "unchecked", "rawtypes" })
2512+
@Test
2513+
public void testIdleEarlyExit() throws Exception {
2514+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2515+
Consumer<Integer, String> consumer = mock(Consumer.class);
2516+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2517+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
2518+
final CountDownLatch latch = new CountDownLatch(1);
2519+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2520+
latch.countDown();
2521+
Thread.sleep(50);
2522+
return emptyRecords;
2523+
});
2524+
ContainerProperties containerProps = new ContainerProperties("foo");
2525+
containerProps.setGroupId("grp");
2526+
containerProps.setAckMode(AckMode.RECORD);
2527+
containerProps.setClientId("clientId");
2528+
containerProps.setMessageListener((MessageListener) r -> { });
2529+
containerProps.setMissingTopicsFatal(false);
2530+
containerProps.setIdleBetweenPolls(60_000L);
2531+
containerProps.setShutdownTimeout(20_000);
2532+
KafkaMessageListenerContainer<Integer, String> container =
2533+
new KafkaMessageListenerContainer<>(cf, containerProps);
2534+
container.start();
2535+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
2536+
new DirectFieldAccessor(container).setPropertyValue("listenerConsumer.assignedPartitions",
2537+
Arrays.asList(new TopicPartition("foo", 0)));
2538+
Thread.sleep(500);
2539+
long t1 = System.currentTimeMillis();
2540+
container.stop();
2541+
assertThat(System.currentTimeMillis() - t1).isLessThan(10_000L);
2542+
}
2543+
25112544
@Test
25122545
public void testExceptionWhenCommitAfterRebalance() throws Exception {
25132546
final CountDownLatch rebalanceLatch = new CountDownLatch(2);

0 commit comments

Comments
 (0)