Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -109,7 +109,7 @@ public abstract class AbstractMessageListenerContainer<K, V>

private ApplicationContext applicationContext;

private final Set<TopicPartition> pauseRequestedPartitions;
private final Set<TopicPartition> pauseRequestedPartitions = ConcurrentHashMap.newKeySet();

/**
* Construct an instance with the provided factory and properties.
Expand Down Expand Up @@ -159,8 +159,6 @@ protected AbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V>
if (this.containerProperties.getConsumerRebalanceListener() == null) {
this.containerProperties.setConsumerRebalanceListener(createSimpleLoggingConsumerRebalanceListener());
}

this.pauseRequestedPartitions = new HashSet<>();
}

@Override
Expand Down Expand Up @@ -263,23 +261,17 @@ protected boolean isPaused() {

@Override
public boolean isPartitionPauseRequested(TopicPartition topicPartition) {
synchronized (this.pauseRequestedPartitions) {
return this.pauseRequestedPartitions.contains(topicPartition);
}
return this.pauseRequestedPartitions.contains(topicPartition);
}

@Override
public void pausePartition(TopicPartition topicPartition) {
synchronized (this.pauseRequestedPartitions) {
this.pauseRequestedPartitions.add(topicPartition);
}
this.pauseRequestedPartitions.add(topicPartition);
}

@Override
public void resumePartition(TopicPartition topicPartition) {
synchronized (this.pauseRequestedPartitions) {
this.pauseRequestedPartitions.remove(topicPartition);
}
this.pauseRequestedPartitions.remove(topicPartition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,9 @@ protected void pollAndInvoke() {
}
debugRecords(records);
resumeConsumerIfNeccessary();
resumePartitionsIfNecessary();
if (!this.consumerPaused) {
resumePartitionsIfNecessary();
}

invokeIfHaveRecords(records);
}
Expand Down Expand Up @@ -1522,7 +1524,8 @@ private void doResumeConsumerIfNeccessary() {
}
if (this.consumerPaused && !isPaused() && !this.pausedForAsyncAcks) {
this.logger.debug(() -> "Resuming consumption from: " + this.consumer.paused());
Set<TopicPartition> paused = this.consumer.paused();
Collection<TopicPartition> paused = new LinkedList<>(this.consumer.paused());
paused.removeAll(this.pausedPartitions);
this.consumer.resume(paused);
this.consumerPaused = false;
publishConsumerResumedEvent(paused);
Expand All @@ -1531,8 +1534,7 @@ private void doResumeConsumerIfNeccessary() {

private void pausePartitionsIfNecessary() {
Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
List<TopicPartition> partitionsToPause = this
.assignedPartitions
List<TopicPartition> partitionsToPause = getAssignedPartitions()
.stream()
.filter(tp -> isPartitionPauseRequested(tp)
&& !pausedConsumerPartitions.contains(tp))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -2553,14 +2554,6 @@ public void testPauseResumeAndConsumerSeekAware() throws Exception {
AtomicBoolean first = new AtomicBoolean(true);
AtomicBoolean rebalance = new AtomicBoolean(true);
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
Thread.sleep(50);
if (rebalance.getAndSet(false)) {
rebal.get().onPartitionsRevoked(Collections.emptyList());
rebal.get().onPartitionsAssigned(records.keySet());
}
return first.getAndSet(false) ? consumerRecords : emptyRecords;
});
final CountDownLatch seekLatch = new CountDownLatch(7);
willAnswer(i -> {
seekLatch.countDown();
Expand All @@ -2569,17 +2562,32 @@ public void testPauseResumeAndConsumerSeekAware() throws Exception {
given(consumer.assignment()).willReturn(records.keySet());
final CountDownLatch pauseLatch1 = new CountDownLatch(2); // consumer, event publisher
final CountDownLatch pauseLatch2 = new CountDownLatch(2); // consumer, consumer
Set<TopicPartition> pausedParts = new HashSet<>();
willAnswer(i -> {
pauseLatch1.countDown();
pauseLatch2.countDown();
pausedParts.addAll(i.getArgument(0));
return null;
}).given(consumer).pause(records.keySet());
given(consumer.paused()).willReturn(records.keySet());
given(consumer.paused()).willReturn(pausedParts);
CountDownLatch pollWhilePausedLatch = new CountDownLatch(2);
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
Thread.sleep(50);
if (pauseLatch1.getCount() == 0) {
pollWhilePausedLatch.countDown();
}
if (rebalance.getAndSet(false)) {
rebal.get().onPartitionsRevoked(Collections.emptyList());
rebal.get().onPartitionsAssigned(records.keySet());
}
return first.getAndSet(false) ? consumerRecords : emptyRecords;
});
final CountDownLatch resumeLatch = new CountDownLatch(2);
willAnswer(i -> {
resumeLatch.countDown();
pausedParts.removeAll(i.getArgument(0));
return null;
}).given(consumer).resume(records.keySet());
}).given(consumer).resume(any());
willAnswer(invoc -> {
rebal.set(invoc.getArgument(1));
return null;
Expand Down Expand Up @@ -2671,6 +2679,8 @@ else if (e instanceof ConsumerStoppedEvent) {
assertThat(container.isPaused()).isTrue();
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(container.isContainerPaused()).isTrue();
assertThat(pollWhilePausedLatch.await(10, TimeUnit.SECONDS)).isTrue();
verify(consumer, never()).resume(any());
rebalance.set(true); // force a re-pause
assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue();
container.resume();
Expand All @@ -2680,6 +2690,59 @@ else if (e instanceof ConsumerStoppedEvent) {
verify(consumer, times(6)).commitSync(anyMap(), eq(Duration.ofSeconds(41)));
}

@SuppressWarnings({ "unchecked" })
@Test
public void dontResumePausedPartition() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
AtomicBoolean first = new AtomicBoolean(true);
given(consumer.assignment()).willReturn(Set.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)));
final CountDownLatch pauseLatch1 = new CountDownLatch(1);
final CountDownLatch pauseLatch2 = new CountDownLatch(2);
Set<TopicPartition> pausedParts = new HashSet<>();
willAnswer(i -> {
pausedParts.addAll(i.getArgument(0));
pauseLatch1.countDown();
pauseLatch2.countDown();
return null;
}).given(consumer).pause(any());
given(consumer.paused()).willReturn(pausedParts);
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
Thread.sleep(50);
return emptyRecords;
});
final CountDownLatch resumeLatch = new CountDownLatch(1);
willAnswer(i -> {
resumeLatch.countDown();
pausedParts.removeAll(i.getArgument(0));
return null;
}).given(consumer).resume(any());
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset("foo", 0),
new TopicPartitionOffset("foo", 1));
containerProps.setGroupId("grp");
containerProps.setAckMode(AckMode.RECORD);
containerProps.setClientId("clientId");
containerProps.setIdleEventInterval(100L);
containerProps.setMessageListener((MessageListener) rec -> { });
containerProps.setMissingTopicsFatal(false);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
InOrder inOrder = inOrder(consumer);
container.pausePartition(new TopicPartition("foo", 1));
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(pausedParts).hasSize(1);
container.pause();
assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(pausedParts).hasSize(2);
container.resume();
assertThat(resumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(pausedParts).hasSize(1);
container.stop();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testInitialSeek() throws Exception {
Expand Down