Skip to content

Commit

Permalink
[fix][client] Release the orphan producers after the primary consumer…
Browse files Browse the repository at this point in the history
… is closed (#19858)

Motivation: The producers ["retryLetterProducer", "deadLetterProducer"] will be auto-created by consumers if enabled `DLQ`, but these producers will not close after consumers are closed.

Modifications: Auto close "retryLetterProducer" and "deadLetterProducer" after the primary consumer is closed
(cherry picked from commit 94ae340)
  • Loading branch information
poorbarcode committed May 6, 2023
1 parent 22a0c6d commit a0ff522
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.impl.ConsumerImpl;
Expand Down Expand Up @@ -516,4 +517,51 @@ public void testRetryTopicException() throws Exception {
consumer.close();
}


@Test(timeOut = 30000L)
public void testRetryProducerWillCloseByConsumer() throws Exception {
final String topicName = "persistent://my-property/my-ns/tp_" + UUID.randomUUID().toString();
final String subscriptionName = "sub1";
final String topicRetry = topicName + "-" + subscriptionName + "-RETRY";
final String topicDLQ = topicName + "-" + subscriptionName + "-DLQ";

// Trigger the DLQ and retry topic creation.
final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder().deadLetterTopic(topicDLQ).maxRedeliverCount(2).build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.create();
// send messages.
for (int i = 0; i < 5; i++) {
producer.newMessage()
.value("msg-" + i)
.sendAsync();
}
producer.flush();
for (int i = 0; i < 20; i++) {
Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
if (msg != null) {
consumer.reconsumeLater(msg, 1, TimeUnit.SECONDS);
} else {
break;
}
}

consumer.close();
producer.close();
admin.topics().delete(topicName, false);

// Verify: "retryLetterProducer" and "deadLetterProducer" will be closed by "consumer.close()", so these two
// topics can be deleted successfully.
admin.topics().delete(topicRetry, false);
admin.topics().delete(topicDLQ, false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,23 @@ public CompletableFuture<Void> closeAsync() {
});
}

return closeFuture;
ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
closeFutures.add(closeFuture);
if (retryLetterProducer != null) {
closeFutures.add(retryLetterProducer.closeAsync().whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("Exception ignored in closing retryLetterProducer of consumer", ex);
}
}));
}
if (deadLetterProducer != null) {
closeFutures.add(deadLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("Exception ignored in closing deadLetterProducer of consumer", ex);
}
}));
}
return FutureUtil.waitForAll(closeFutures);
}

private void cleanupAtClose(CompletableFuture<Void> closeFuture, Throwable exception) {
Expand Down

0 comments on commit a0ff522

Please sign in to comment.