Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2464,7 +2464,6 @@ private boolean recordsEqual(ConsumerRecord<K, V> rec1, ConsumerRecord<K, V> rec
private void pauseForNackSleep() {
if (this.nackSleep > 0) {
this.nackWake = System.currentTimeMillis() + this.nackSleep;
this.nackSleep = -1;
Set<TopicPartition> alreadyPaused = this.consumer.paused();
Collection<TopicPartition> assigned = getAssignedPartitions();
if (assigned != null) {
Expand All @@ -2484,6 +2483,7 @@ private void pauseForNackSleep() {
this.consumer.resume(nowPaused);
}
}
this.nackSleep = -1;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.lang.Nullable;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Gary Russell
* @since 2.8.5
*
*/
@SpringJUnitConfig
@DirtiesContext
public class ManualNackRecordZeroSleepTests {

@SuppressWarnings("rawtypes")
@Autowired
private Consumer consumer;
@Autowired
private Config config;

@Autowired
private KafkaListenerEndpointRegistry registry;

@SuppressWarnings({ "unchecked" })
@Test
public void zeroSleepNackFirstLastAndMiddleRecords() throws Exception {
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
this.registry.stop();
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
InOrder inOrder = inOrder(this.consumer);
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
HashMap<TopicPartition, OffsetAndMetadata> commit1 = new HashMap<>();
commit1.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L));
commit1.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L));
HashMap<TopicPartition, OffsetAndMetadata> commit2 = new HashMap<>();
commit2.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
commit2.put(new TopicPartition("foo", 2), new OffsetAndMetadata(1L));
HashMap<TopicPartition, OffsetAndMetadata> commit3 = new HashMap<>();
commit3.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 0L);
inOrder.verify(this.consumer).commitSync(commit1, Duration.ofSeconds(60));
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 1L);
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 0L);
inOrder.verify(this.consumer).commitSync(commit2, Duration.ofSeconds(60));
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 1L);
inOrder.verify(this.consumer).commitSync(commit3, Duration.ofSeconds(60));
assertThat(this.config.count).isEqualTo(9);
assertThat(this.config.contents.toArray()).isEqualTo(new String[]
{ "foo", "foo", "bar", "baz", "qux", "qux", "fiz", "buz", "buz"});
}

@Configuration
@EnableKafka
public static class Config {

final List<String> contents = new ArrayList<>();

final CountDownLatch pollLatch = new CountDownLatch(5);

final CountDownLatch deliveryLatch = new CountDownLatch(9);

final CountDownLatch closeLatch = new CountDownLatch(1);

final CountDownLatch commitLatch = new CountDownLatch(4);

volatile int count;

@KafkaListener(topics = "foo", groupId = "grp")
public void foo(String in, Acknowledgment ack) {
this.contents.add(in);
this.deliveryLatch.countDown();
++this.count;
if (this.contents.size() == 1 || this.count == 5 || this.count == 8) {
// first, last record or part 1, offset 1, first time
ack.nack(0);
}
else {
ack.acknowledge();
}
}

@SuppressWarnings({ "rawtypes" })
@Bean
public ConsumerFactory consumerFactory() {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = consumer();
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
.willReturn(consumer);
return consumerFactory;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Consumer consumer() {
final Consumer consumer = mock(Consumer.class);
final TopicPartition topicPartition0 = new TopicPartition("foo", 0);
final TopicPartition topicPartition1 = new TopicPartition("foo", 1);
final TopicPartition topicPartition2 = new TopicPartition("foo", 2);
willAnswer(i -> {
((ConsumerRebalanceListener) i.getArgument(1)).onPartitionsAssigned(
Collections.singletonList(topicPartition1));
return null;
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
Map<TopicPartition, List<ConsumerRecord>> records1 = new LinkedHashMap<>();
records1.put(topicPartition0, Arrays.asList(
new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
new RecordHeaders(), Optional.empty()),
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar",
new RecordHeaders(), Optional.empty())));
records1.put(topicPartition1, Arrays.asList(
new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz",
new RecordHeaders(), Optional.empty()),
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
new RecordHeaders(), Optional.empty())));
records1.put(topicPartition2, Arrays.asList(
new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz",
new RecordHeaders(), Optional.empty()),
new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz",
new RecordHeaders(), Optional.empty())));
Map<TopicPartition, List<ConsumerRecord>> records2 = new LinkedHashMap<>(records1);
records2.remove(topicPartition0);
records2.put(topicPartition1, Arrays.asList(
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
new RecordHeaders(), Optional.empty())));
Map<TopicPartition, List<ConsumerRecord>> records3 = new LinkedHashMap<>();
records3.put(topicPartition2, Arrays.asList(
new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz",
new RecordHeaders(), Optional.empty())));
final AtomicInteger which = new AtomicInteger();
final AtomicBoolean paused = new AtomicBoolean();
willAnswer(i -> {
if (paused.get()) {
Thread.sleep(10);
return ConsumerRecords.empty();
}
this.pollLatch.countDown();
switch (which.getAndIncrement()) {
case 0:
case 1:
return new ConsumerRecords(records1);
case 2:
return new ConsumerRecords(records2);
case 3:
return new ConsumerRecords(records3);
default:
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return new ConsumerRecords(Collections.emptyMap());
}
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
willAnswer(i -> {
return Collections.emptySet();
}).given(consumer).paused();
willAnswer(i -> {
paused.set(true);
return null;
}).given(consumer).pause(any());
willAnswer(i -> {
paused.set(false);
return null;
}).given(consumer).resume(any());
willAnswer(i -> {
this.commitLatch.countDown();
return null;
}).given(consumer).commitSync(anyMap(), any());
willAnswer(i -> {
this.closeLatch.countDown();
return null;
}).given(consumer).close();
return consumer;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.setRecordInterceptor(new RecordInterceptor() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this interceptor to be copied in this test scope?
I had a pain back-porting it yesterday for other test class: some API changes in Kafka client.

Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No; just a copy/paste artifact; will remove.


@Override
@Nullable
public ConsumerRecord intercept(ConsumerRecord record, Consumer consumer) {
return new ConsumerRecord(record.topic(), record.partition(), record.offset(), 0L,
TimestampType.NO_TIMESTAMP_TYPE, 0, 0, record.key(), record.value(), record.headers(),
Optional.empty());
}

});
return factory;
}

}

}