Skip to content

Commit

Permalink
Fix message redelivery for zero queue consumer while using async api …
Browse files Browse the repository at this point in the history
…to receive messages (#6090)

Fix message redelivery for zero queue consumer while using async api to receive messages
  • Loading branch information
codelipenghui authored Jan 20, 2020
1 parent f28c32f commit d5fca06
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

import static org.testng.Assert.assertEquals;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -376,4 +379,40 @@ public void testZeroQueueSizeMessageRedeliveryForListener() throws Exception {
consumer.close();
producer.close();
}

@Test
public void testZeroQueueSizeMessageRedeliveryForAsyncReceive() throws PulsarClientException, ExecutionException, InterruptedException {
final String topic = "persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForAsyncReceive";
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.receiverQueueSize(0)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscribe();

final int messages = 10;
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();

for (int i = 0; i < messages; i++) {
producer.send(i);
}

Set<Integer> receivedMessages = new HashSet<>();
List<CompletableFuture<Message<Integer>>> futures = new ArrayList<>(20);
for (int i = 0; i < messages * 2; i++) {
futures.add(consumer.receiveAsync());
}
for (CompletableFuture<Message<Integer>> future : futures) {
receivedMessages.add(future.get().getValue());
}

Assert.assertEquals(receivedMessages.size(), messages);

consumer.close();
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,7 @@ void notifyPendingReceivedCallback(final Message<T> message, Exception exception

if (conf.getReceiverQueueSize() == 0) {
// call interceptor and complete received callback
trackMessage(message);
interceptAndComplete(message, receivedFuture);
return;
}
Expand Down

0 comments on commit d5fca06

Please sign in to comment.