Skip to content

Commit

Permalink
Support Consumers Set Custom Retry Delay (apache#6449)
Browse files Browse the repository at this point in the history
<!--
### Contribution Checklist
  
  - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
    Skip *Issue XYZ* if there is no associated github issue for this pull request.
    Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.

  - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
  
  - Each pull request should address only one issue, not mix up code from multiple issues.
  
  - Each commit in the pull request has a meaningful commit message

  - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.

**(The sections below can be removed for hotfixes of typos)**
-->


Master Issue: apache#6448

### Motivation


For many online business systems, various exceptions usually occur in business logic processing, so the message needs to be re-consumed, but users hope that this delay time can be controlled flexibly. The current user's processing method is usually to send this message to a special retry topic, because production can specify any delay, so consumers subscribe the business topic and retry topic at the same time. I think this logic can be supported by pulsar itself, making it easier for users to use, and it looks like this is a very common requirement.

### Modifications

This change can be supported on the client side,  need to add a set of interfaces to org.apache.pulsar.client.api.Consumer
```java
void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException;
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit);
CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, int delayLevel);
```
DeadLetterPolicy add retry topic
```java
public class DeadLetterPolicy {

    /**
     * Maximum number of times that a message will be redelivered before being sent to the dead letter queue.
     */
    private int maxRedeliverCount;

    /**
     * Name of the retry topic where the failing messages will be sent.
     */
    private String retryLetterTopic;

    /**
     * Name of the dead topic where the failing messages will be sent.
     */
    private String deadLetterTopic;

}

```
org.apache.pulsar.client.impl.ConsumerImpl add a retry producer
```java
  private volatile Producer<T> deadLetterProducer;

  private volatile Producer<T> retryLetterProducer;
```
Can specify whether to enable retry when creating a consumer,default unenable
```java
    @OverRide
    public ConsumerBuilder<T> enableRetry(boolean retryEnable) {
        conf.setRetryEnable(retryEnable);
        return this;
    }
```
  • Loading branch information
liudezhi2098 authored and huangdx0726 committed Aug 24, 2020
1 parent fc18579 commit f49cc31
Show file tree
Hide file tree
Showing 12 changed files with 734 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertNotNull;

public class RetryTopicTest extends ProducerConsumerBase {

private static final Logger log = LoggerFactory.getLogger(RetryTopicTest.class);

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testRetryTopic() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";

final int maxRedeliveryCount = 2;

final int sendMessages = 100;

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://my-property/my-ns/my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();

for (int i = 0; i < sendMessages; i++) {
producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
}

producer.close();

int totalReceived = 0;
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS);
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

int totalInDeadLetter = 0;
do {
Message message = deadLetterConsumer.receive();
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);

deadLetterConsumer.close();
consumer.close();

Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
if (checkMessage != null) {
log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
}
assertNull(checkMessage);

checkConsumer.close();
newPulsarClient.close();
}

/**
* The test is disabled {@link https://github.com/apache/pulsar/issues/2647}.
* @throws Exception
*/
@Test
public void testRetryTopicWithMultiTopic() throws Exception {
final String topic1 = "persistent://my-property/my-ns/retry-topic-1";
final String topic2 = "persistent://my-property/my-ns/retry-topic-2";

final int maxRedeliveryCount = 2;

int sendMessages = 100;

// subscribe to the original topics before publish
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic1, topic2)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

// subscribe to the DLQ topics before consuming original topics
Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://my-property/my-ns/my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
.topic(topic1)
.create();

Producer<byte[]> producer2 = pulsarClient.newProducer(Schema.BYTES)
.topic(topic2)
.create();

for (int i = 0; i < sendMessages; i++) {
producer1.send(String.format("Hello Pulsar [%d]", i).getBytes());
producer2.send(String.format("Hello Pulsar [%d]", i).getBytes());
}

sendMessages = sendMessages * 2;

producer1.close();
producer2.close();

int totalReceived = 0;
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {} - total = {}",
message.getMessageId(), new String(message.getData()), ++totalReceived);
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

int totalInDeadLetter = 0;
do {
Message message = deadLetterConsumer.receive();
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);

deadLetterConsumer.close();
consumer.close();

Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic1, topic2)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
if (checkMessage != null) {
log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
}
assertNull(checkMessage);

checkConsumer.close();
}

@Test
public void testRetryTopicByCustomTopicName() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";
final int maxRedeliveryCount = 2;
final int sendMessages = 100;

// subscribe before publish
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://my-property/my-ns/my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();
for (int i = 0; i < sendMessages; i++) {
producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
}
producer.close();

int totalReceived = 0;
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS);
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
int totalInDeadLetter = 0;
do {
Message message = deadLetterConsumer.receive();
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);
deadLetterConsumer.close();
consumer.close();
PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> checkConsumer = newPulsarClient1.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
if (checkMessage != null) {
log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
}
assertNull(checkMessage);
newPulsarClient.close();
newPulsarClient1.close();
checkConsumer.close();
}

}
Loading

0 comments on commit f49cc31

Please sign in to comment.