Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix the behavior of delayed message in Key_Shared mode #20233

Merged
merged 1 commit into from
May 9, 2023

Conversation

codelipenghui
Copy link
Contributor

Fixes #8703

Motivation

Fixed the unexpected consumer stuck issue of Key_Shared subscription by delayed messages.
The issue can be reproduced by the test provided in #8703.

Background

  • Suppose we have 10 messages 0,1,2,3,4,5,6,7,8,9, and 2 consumers, consumer-A and consumer-B
  • Only consumer-A connected to the topic and read 5 messages from the topic, 0,1,2,3,4, but haven’t acknowledged
  • Now, consumer-B joined, it will try to read messages from 5, but due to 0,1,2,3,4 are not being acknowledged, consumer-B can’t process new messages at this point because it might break the order(1 and 5 are from the same message key, we don’t know which one will be processed first in 2 instances)
  • So, the solution to guarantee the ordering dispatch is to pause the message dispatch for consumer-B until all the messages before message 5 are acked.

The expected behavior

if there are delayed messages in [0, 4] and [5,9]. All the messages from [5, 9] can be dispatched to consumer-B until all the messages from [0, 4] are acked. If there are messages with 1 hour delay in [0,4], consumer-B will be stuck for 1 hour. But maybe there are messages only with 10 sec delay in [5, 9]. This is still a very strange behavior for users, but it is expected for the current implementation

The unexpected behavior

The consumers can not able to receive all the messages even if all the delayed messages are ready to deliver to consumers. The newly added test can reproduce the issue. The logs of the test will help us to understand the issue.

  1. The producer published 40 delayed messages
2023-05-06T01:25:02,935 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:0:-1
2023-05-06T01:25:02,940 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:1:-1
2023-05-06T01:25:02,944 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:2:-1
2023-05-06T01:25:02,950 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:3:-1
2023-05-06T01:25:02,954 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:4:-1
2023-05-06T01:25:02,965 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:5:-1
2023-05-06T01:25:02,969 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:6:-1
2023-05-06T01:25:02,972 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:7:-1
2023-05-06T01:25:02,975 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:8:-1
2023-05-06T01:25:02,979 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:9:-1
2023-05-06T01:25:02,983 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:10:-1
2023-05-06T01:25:02,986 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:11:-1
2023-05-06T01:25:02,989 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:12:-1
2023-05-06T01:25:02,992 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:13:-1
2023-05-06T01:25:02,995 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:14:-1
2023-05-06T01:25:02,998 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:15:-1
2023-05-06T01:25:03,001 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:16:-1
2023-05-06T01:25:03,004 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:17:-1
2023-05-06T01:25:03,007 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:18:-1
2023-05-06T01:25:03,010 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:19:-1
2023-05-06T01:25:03,012 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:20:-1
2023-05-06T01:25:03,016 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:21:-1
2023-05-06T01:25:03,019 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:22:-1
2023-05-06T01:25:03,022 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:23:-1
2023-05-06T01:25:03,025 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:24:-1
2023-05-06T01:25:03,028 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:25:-1
2023-05-06T01:25:03,032 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:26:-1
2023-05-06T01:25:03,035 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:27:-1
2023-05-06T01:25:03,037 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:28:-1
2023-05-06T01:25:03,040 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:29:-1
2023-05-06T01:25:03,043 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:30:-1
2023-05-06T01:25:03,046 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:31:-1
2023-05-06T01:25:03,050 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:32:-1
2023-05-06T01:25:03,054 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:33:-1
2023-05-06T01:25:03,057 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:34:-1
2023-05-06T01:25:03,060 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:35:-1
2023-05-06T01:25:03,063 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:36:-1
2023-05-06T01:25:03,067 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:37:-1
2023-05-06T01:25:03,070 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:38:-1
2023-05-06T01:25:03,073 - INFO  - [main:KeySharedSubscriptionTest@1586] - Published delayed message :3:39:-
  1. The producer publish 40 messages without delay
2023-05-06T01:25:03,077 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:40:-1
2023-05-06T01:25:03,080 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:41:-1
2023-05-06T01:25:03,083 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:42:-1
2023-05-06T01:25:03,087 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:43:-1
2023-05-06T01:25:03,090 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:44:-1
2023-05-06T01:25:03,097 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:45:-1
2023-05-06T01:25:03,101 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:46:-1
2023-05-06T01:25:03,104 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:47:-1
2023-05-06T01:25:03,107 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:48:-1
2023-05-06T01:25:03,110 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:49:-1
2023-05-06T01:25:03,114 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:50:-1
2023-05-06T01:25:03,118 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:51:-1
2023-05-06T01:25:03,121 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:52:-1
2023-05-06T01:25:03,124 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:53:-1
2023-05-06T01:25:03,127 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:54:-1
2023-05-06T01:25:03,129 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:55:-1
2023-05-06T01:25:03,132 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:56:-1
2023-05-06T01:25:03,136 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:57:-1
2023-05-06T01:25:03,139 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:58:-1
2023-05-06T01:25:03,141 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:59:-1
2023-05-06T01:25:03,144 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:60:-1
2023-05-06T01:25:03,147 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:61:-1
2023-05-06T01:25:03,150 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:62:-1
2023-05-06T01:25:03,153 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:63:-1
2023-05-06T01:25:03,156 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:64:-1
2023-05-06T01:25:03,158 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:65:-1
2023-05-06T01:25:03,161 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:66:-1
2023-05-06T01:25:03,164 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:67:-1
2023-05-06T01:25:03,167 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:68:-1
2023-05-06T01:25:03,170 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:69:-1
2023-05-06T01:25:03,173 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:70:-1
2023-05-06T01:25:03,175 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:71:-1
2023-05-06T01:25:03,178 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:72:-1
2023-05-06T01:25:03,181 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:73:-1
2023-05-06T01:25:03,184 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:74:-1
2023-05-06T01:25:03,186 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:75:-1
2023-05-06T01:25:03,189 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:76:-1
2023-05-06T01:25:03,192 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:77:-1
2023-05-06T01:25:03,194 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:78:-1
2023-05-06T01:25:03,197 - INFO  - [main:KeySharedSubscriptionTest@1594] - Published message :3:79:-1
  1. The first consumer received messages
2023-05-06T01:25:03,225 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 0, 3:40:-1
2023-05-06T01:25:03,227 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 1, 3:41:-1
2023-05-06T01:25:03,228 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 2, 3:42:-1
2023-05-06T01:25:03,228 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 3, 3:43:-1
2023-05-06T01:25:03,228 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 4, 3:44:-1
2023-05-06T01:25:03,228 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 5, 3:45:-1
2023-05-06T01:25:03,228 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 6, 3:46:-1
2023-05-06T01:25:03,229 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 7, 3:47:-1
2023-05-06T01:25:03,229 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 8, 3:48:-1
2023-05-06T01:25:03,229 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 9, 3:49:-1
2023-05-06T01:25:03,255 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 11, 3:51:-1
2023-05-06T01:25:03,255 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 13, 3:53:-1
2023-05-06T01:25:03,255 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 15, 3:55:-1
2023-05-06T01:25:03,255 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 17, 3:57:-1
2023-05-06T01:25:03,256 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 18, 3:58:-1
2023-05-06T01:25:03,256 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 22, 3:62:-1
2023-05-06T01:25:03,256 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 26, 3:66:-1
2023-05-06T01:25:03,256 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 32, 3:72:-1
2023-05-06T01:25:03,256 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 37, 3:77:-1
2023-05-06T01:25:03,256 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 38, 3:78:-1
2023-05-06T01:25:03,266 - INFO  - [main:KeySharedSubscriptionTest@1608] - c1 message: 39, 3:79:-1

But the delayed messages are delivered to the first consumer when the test fails. The receive timeout is 30 seconds, and the messages are with 10 seconds. So the delayed messages for the first consumer should be delivered.

The root cause

The broker first replays messages from the replay queue without looking at the delayed tracker.

protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (!redeliveryMessages.isEmpty()) {
return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
} else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
NavigableSet<PositionImpl> messagesAvailableNow =
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
return messagesAvailableNow;
} else {
return Collections.emptyNavigableSet();
}
}

So, if all the messages in the relay queue are for the second consumer, the dispatch will run into an infinite loop(it also should be fixed, but not in this PR) because the messages are not allowed to be delivered at that time due to the first consumer hadn't acked the messages before the second consumer joined.

Modifications

Change the getMessagesToReplayNow method to dispatch messages based on the message ID (FIFO). So that the first consumer can receive the delayed messages, and then the second consumer can start to consume messages after the first consumer acked the delayed messages.

Verifying this change

New test is added

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@codelipenghui codelipenghui self-assigned this May 5, 2023
@codelipenghui codelipenghui added this to the 3.1.0 milestone May 5, 2023
@codelipenghui codelipenghui added the type/bug The PR fixed a bug or issue reported a bug label May 5, 2023
@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label May 5, 2023
@codelipenghui codelipenghui added release/2.10.5 release/2.11.2 release/3.0.1 release/2.9.6 and removed doc-required Your PR changes impact docs and you will update later. labels May 5, 2023
@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label May 5, 2023
Copy link
Contributor

@poorbarcode poorbarcode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

By the way, pulsar didn't support delayed messages in Key_Shared mode before, right?

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

@poorbarcode The document said

Only shared and key-shared subscriptions support delayed message delivery. In other subscriptions, delayed messages are dispatched immediately.

The key-shared subscription can allow out-of-order delivery, or we can say a loose ordering guarantee. So it still makes sense to let them work together.

But we should have clear documentation for the expected behavior of the consumption of the delayed message under a key-shared subscription.

@codecov-commenter
Copy link

Codecov Report

Merging #20233 (f4040d0) into master (0dd238a) will increase coverage by 38.44%.
The diff coverage is 80.00%.

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #20233       +/-   ##
=============================================
+ Coverage     34.48%   72.93%   +38.44%     
- Complexity    12537    31964    +19427     
=============================================
  Files          1614     1868      +254     
  Lines        126170   138591    +12421     
  Branches      13771    15246     +1475     
=============================================
+ Hits          43509   101076    +57567     
+ Misses        77053    29470    -47583     
- Partials       5608     8045     +2437     
Flag Coverage Δ
inttests 24.11% <60.00%> (-0.02%) ⬇️
systests 24.80% <40.00%> (?)
unittests 72.22% <80.00%> (+39.11%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...mon/policies/data/stats/SubscriptionStatsImpl.java 69.56% <50.00%> (-0.35%) ⬇️
...sistent/PersistentDispatcherMultipleConsumers.java 76.63% <100.00%> (+20.26%) ⬆️

... and 1510 files with indirect coverage changes

Copy link
Member

@mattisonchao mattisonchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work

@asafm
Copy link
Contributor

asafm commented Jun 11, 2023

@poorbarcode Did you end up documenting the change of behavior?

@codelipenghui codelipenghui deleted the penghui/fix-8703 branch June 13, 2023 08:53
@shihaojun
Copy link

Could we removeConsumersFromRecentJoinedConsumers to fix it?
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers#removeConsumersFromRecentJoinedConsumers
Change this function,remove the recentlyJoinedConsumers when all the consumer's pendingAcks min PositionImpl is greater than the recentlyJoinedConsumer PositionImpl
So the consumer could start to consume messages after that

@michaeljmarshall
Copy link
Member

As discussed on the mailing list https://lists.apache.org/thread/w4jzk27qhtosgsz7l9bmhf1t7o9mxjhp, there is no plan to release 2.9.6, so I am going to remove the release/2.9.6 label

Technoboy- added a commit to Technoboy-/pulsar that referenced this pull request Jun 29, 2023
@Anonymitaet
Copy link
Member

Doc is WIP, waiting for review comments from @codelipenghui and @coderzc on the following plans:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Delayed messages + Key_Shared causes blocked consumers