Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Consumers Set Custom Retry Delay #6449

Merged

Conversation

liudezhi2098
Copy link
Contributor

@liudezhi2098 liudezhi2098 commented Mar 1, 2020

Master Issue: #6448

Motivation

For many online business systems, various exceptions usually occur in business logic processing, so the message needs to be re-consumed, but users hope that this delay time can be controlled flexibly. The current user's processing method is usually to send this message to a special retry topic, because production can specify any delay, so consumers subscribe the business topic and retry topic at the same time. I think this logic can be supported by pulsar itself, making it easier for users to use, and it looks like this is a very common requirement.

Modifications

This change can be supported on the client side, need to add a set of interfaces to org.apache.pulsar.client.api.Consumer

void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException;
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit);
CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, int delayLevel);

DeadLetterPolicy add retry topic

public class DeadLetterPolicy {

    /**
     * Maximum number of times that a message will be redelivered before being sent to the dead letter queue.
     */
    private int maxRedeliverCount;

    /**
     * Name of the retry topic where the failing messages will be sent.
     */
    private String retryLetterTopic;

    /**
     * Name of the dead topic where the failing messages will be sent.
     */
    private String deadLetterTopic;

}

org.apache.pulsar.client.impl.ConsumerImpl add a retry producer

  private volatile Producer<T> deadLetterProducer;

  private volatile Producer<T> retryLetterProducer;

Can specify whether to enable retry when creating a consumer,default unenable

    @Override
    public ConsumerBuilder<T> enableRetry(boolean retryEnable) {
        conf.setRetryEnable(retryEnable);
        return this;
    }

@jiazhai
Copy link
Member

jiazhai commented Mar 3, 2020

@liudezhi2098 Thanks for the great work. Would you please help add some tests for it?

@jiazhai
Copy link
Member

jiazhai commented Mar 30, 2020

/pulsarbot run-failure-checks

@jiazhai jiazhai added the doc-required Your PR changes impact docs and you will update later. label Mar 30, 2020
@jiazhai
Copy link
Member

jiazhai commented Apr 2, 2020

/pulsarbot run-failure-checks

@jiazhai jiazhai added this to the 2.6.0 milestone Apr 2, 2020
@liudezhi2098 liudezhi2098 force-pushed the support_consumers_set_custom_retry_delay branch from 045771b to 68b425d Compare April 5, 2020 04:09
@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@sijie sijie merged commit 30e762e into apache:master Apr 6, 2020
Huanli-Meng added a commit to Huanli-Meng/pulsar that referenced this pull request Jun 3, 2020
@Anonymitaet Anonymitaet removed the doc-required Your PR changes impact docs and you will update later. label Jun 8, 2020
@Anonymitaet
Copy link
Member

Doc has been added #7151

codelipenghui pushed a commit that referenced this pull request Jun 16, 2020
### Motivation
This PR is to update doc for the PR: #6449

Doc contents have been approved by the previous PR  #7151. 
Because there are two many conflicts in that PR. So I close the original PR and re-create this PR.


### Modifications
Add retry letter topic in concept> messaging doc.
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime)));

if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount()) {
processPossibleToDLQ((MessageIdImpl)messageId);
Copy link
Contributor

@Lanayx Lanayx Jul 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does anyone know what is the point of this line? The logic of sending message to dead letter queue is described within more than twenty lines below as well as message acknowledgment, why is it done twice?

@Lanayx
Copy link
Contributor

Lanayx commented Jul 17, 2020

@liudezhi2098 Hi, could you please comment on my question above?

huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
<!--
### Contribution Checklist
  
  - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
    Skip *Issue XYZ* if there is no associated github issue for this pull request.
    Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.

  - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
  
  - Each pull request should address only one issue, not mix up code from multiple issues.
  
  - Each commit in the pull request has a meaningful commit message

  - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.

**(The sections below can be removed for hotfixes of typos)**
-->


Master Issue: apache#6448

### Motivation


For many online business systems, various exceptions usually occur in business logic processing, so the message needs to be re-consumed, but users hope that this delay time can be controlled flexibly. The current user's processing method is usually to send this message to a special retry topic, because production can specify any delay, so consumers subscribe the business topic and retry topic at the same time. I think this logic can be supported by pulsar itself, making it easier for users to use, and it looks like this is a very common requirement.

### Modifications

This change can be supported on the client side,  need to add a set of interfaces to org.apache.pulsar.client.api.Consumer
```java
void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException;
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit);
CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, int delayLevel);
```
DeadLetterPolicy add retry topic
```java
public class DeadLetterPolicy {

    /**
     * Maximum number of times that a message will be redelivered before being sent to the dead letter queue.
     */
    private int maxRedeliverCount;

    /**
     * Name of the retry topic where the failing messages will be sent.
     */
    private String retryLetterTopic;

    /**
     * Name of the dead topic where the failing messages will be sent.
     */
    private String deadLetterTopic;

}

```
org.apache.pulsar.client.impl.ConsumerImpl add a retry producer
```java
  private volatile Producer<T> deadLetterProducer;

  private volatile Producer<T> retryLetterProducer;
```
Can specify whether to enable retry when creating a consumer,default unenable
```java
    @OverRide
    public ConsumerBuilder<T> enableRetry(boolean retryEnable) {
        conf.setRetryEnable(retryEnable);
        return this;
    }
```
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
### Motivation
This PR is to update doc for the PR: apache#6449

Doc contents have been approved by the previous PR  apache#7151. 
Because there are two many conflicts in that PR. So I close the original PR and re-create this PR.


### Modifications
Add retry letter topic in concept> messaging doc.
wolfstudy pushed a commit to apache/pulsar-client-go that referenced this pull request Sep 9, 2020
### Motivation

Follow [pulsar#6449](apache/pulsar#6449) to support retry letter topic in go client

### Modifications

- Add `retryRouter` for sending reconsume messages to retry letter topic
- Add `ReconsumeLater(msg Message, delay time.Duration)` to Consumer interface
- Add configureable retry letter topic name in `DLQPolicy`
    ```go
	type DLQPolicy struct {
		// ...
		// Name of the topic where the retry messages will be sent.
		RetryLetterTopic string
	}
	```
    enable it explicitly while creating consumer, default unenable
   
     ```go
    type ConsumerOptions struct {
	    // ...
		// Auto retry send messages to default filled DLQPolicy topics
		RetryEnable bool
	}
    ```
- Add 2 `TestRLQ*`  test cases
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants