Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplication fails when the batch message contains duplicate message and valid message #6273

Closed
codelipenghui opened this issue Feb 8, 2020 · 3 comments · Fixed by #6326
Assignees
Labels
release/2.5.1 type/bug The PR fixed a bug or issue reported a bug
Milestone

Comments

@codelipenghui
Copy link
Contributor

codelipenghui commented Feb 8, 2020

Describe the bug
This bug was discussed in the slack channel https://apache-pulsar.slack.com/archives/C5ZSVEN4E/p1581119933163000. The problem occurs when a batch messages contain both duplicate data and valid data. If sequence ID for a message is lower than the broker maintained the highest sequence ID, the batch message which contains this message also considered a duplicate. So that will lead to valid messages can't be stored success and consumers can't get these messages.

@devinbost
Copy link
Contributor

@codelipenghui Do you think there could be a relationship between this issue and #6224
?

@codelipenghui codelipenghui added this to the 2.6.0 milestone Feb 11, 2020
@devinbost
Copy link
Contributor

I'm adding my discussion Slack discussion with @codelipenghui (with his permission) to create a better record of our discussion on this issue.

He documented what happens in this issue:

"For example, we have four batch messages in the producer pending message queue. The messages look like this(I show the sequence ID for more straightforward understanding):
0 -> [0 - 5]
1 -> [6, 8]
2 -> [7 - 10]
3 -> [12 - 15]
Batch message-0 has six messages with sequence ID 0 to 5, and batch message-1 has two messages with sequence ID 6 and 8, and batch message-2 has four messages with sequence ID 7 to 10, and the last batch message-3 has four messages with sequence ID 12 to 15.
When message-0 published to the broker, let’s look at the state of the producer and the broker:

  • Producer: the last pushed sequence ID is 5, and the last published sequence ID is 0 since the publish response not yet returned.
  • Broker: the lash pushed sequence ID is 5, and the last persistent sequence ID is 0 since the message not written to the Bookie.

Then the message-1 published to the broker:

  • Producer: the last pushed sequence ID is 8, and the last published sequence ID is 0 since the publish response not yet returned.
  • Broker: the lash pushed sequence ID is 8, and the last persistent sequence ID is 0 since the message not written to the Bookie.

When the producer flushes the message-2 to the broker, the message-2 has a sequence ID that lower than the last pushed sequence ID of the producer. So the producer should stop flush message-3 because the producer should get the response of message-2 and re-batch the messages of message-2. After re-batching, the producer should retry the re-batched message. The current state of the broker and producer are:

  • Producer: the last pushed sequence ID is 10, and the last published sequence ID is 10 since the publish response not yet returned.
  • Broker: the lash pushed sequence ID is 10, and the last persistent sequence ID is 10 since the message not written to the Bookie.

After message-2 published, the producer starts flushing message-3. So that retry the re-batched message can be handled properly.
But in extreme cases, this will introduce a lot of blocking.


Regarding:

When the producer flushes the message-2 to the broker, the message-2 has a sequence ID that lower than the last pushed sequence ID of the producer. So the producer should stop flush message-3 because the producer should get the response of message-2 and re-batch the messages of message-2.

I asked:
"Would that require the producer to wait for the response of message-2 before it could flush message-3? (Moving from asynchronous to synchronous behavior?)"

He replied:
"Yes, because if message-3 published to the broker, message-2 will be ignored by broker."

I said:
"hmm… What if the broker could notice that “message-2 has a sequence ID that lower than the last pushed sequence ID of the producer” and send back a response to the producer that requires the producer to re-submit message-2 with the correct sequence?
I guess it depends on what we want the user experience to be. Should this situation be considered an error that we’d want the producer’s developer to know about and need to fix? The main broker limitation is that we’re only tracking the last sequence ID, so we really have no way to know what was missing between those sequence IDs. (In the example of message-1, there’s late data that doesn’t appear until message-2.)
In Apache Flink, they provide a mechanism to allow re-ordering of messages (within a time buffer) and de-duplication, but that introduces additional latency that is acceptable for a data processing engine but probably wouldn’t be acceptable for Pulsar."

He replied:
"yes you are right, it wouldn’t be acceptable for Pulsar.
Maybe we need a PIP to describe the solution, it’s a tricky problem. We need to made some trade offs. Maybe there are better solutions, we need time to think about how to properly handle message deduplication. . . The current message order dependent deduplication approach brings us many restrictions."

I replied:
"I very much agree.
Instead, we perhaps could implement a sliding window dictionary of Sequence IDs that would contain a fixed number of IDs to keep in memory. Lookups would still be O(1). Perhaps a dynamic adjustable window feature could even be at provided at some point for dynamic-length deduplication logic.
We’d need to find out if there are more reasons that only the last Sequence ID is retained.
If they could be worked around, then this could implement a new feature of Pulsar."

He added:
"I thought of an easy way to deal with the current issue, No need to change the current mechanism, Of course, the current mechanism has better room for optimization that like your ideas.
The problem is producer combine duplicate messages and non-duplicate messages into a batch message. So we can change the current logic, if the message is a possibility duplicate message, we can flush it directly, this will avoid combine it into a batch. I think this has a small impact on performance and no need to change any broker code."

I asked:
"Would that mean the producer would need to retain state (like a window of Sequence IDs like we were talking about) to be able to identify if duplicate messages are mixed with non-duplicate messages into a batch message?"

He replied:
"Yes, there are two state named lastSequenceIdPushed and lastSequenceIdPublished , if sequence ID of the new message lower than lastSequenceIdPushed but greater than lastSequenceIdPublished , This message may be duplicated. Then we don’t combine it into the current batch. If sequence ID of the new message lower than lastSequenceIdPublished ,This is an absolute duplicate message, we can return back to users."

To better understand the current implementation, he suggested that I investigate these tests:
MessageDuplicationTest and ClientDeduplicationTest

@codelipenghui
Copy link
Contributor Author

Do you think there could be a relationship between this issue and #6224?

@devinbost Looks not relevant

@codelipenghui codelipenghui self-assigned this Feb 14, 2020
codelipenghui added a commit that referenced this issue Feb 14, 2020
…ial duplicated messages and non-duplicated messages into a batch. (#6326)

Fixes #6273

Motivation
The main reason for #6273 is combining potential duplicated messages and non-duplicated messages into a batch. So need to flush the potential duplicated message first and then add the non-duplicated messages to a batch.
tuteng pushed a commit to AmateurEvents/pulsar that referenced this issue Feb 23, 2020
…ial duplicated messages and non-duplicated messages into a batch. (apache#6326)

Fixes apache#6273

Motivation
The main reason for apache#6273 is combining potential duplicated messages and non-duplicated messages into a batch. So need to flush the potential duplicated message first and then add the non-duplicated messages to a batch.
tuteng pushed a commit to AmateurEvents/pulsar that referenced this issue Mar 21, 2020
…ial duplicated messages and non-duplicated messages into a batch. (apache#6326)

Fixes apache#6273

Motivation
The main reason for apache#6273 is combining potential duplicated messages and non-duplicated messages into a batch. So need to flush the potential duplicated message first and then add the non-duplicated messages to a batch.

(cherry picked from commit b898f49)
tuteng pushed a commit that referenced this issue Apr 6, 2020
…ial duplicated messages and non-duplicated messages into a batch. (#6326)

Fixes #6273

Motivation
The main reason for #6273 is combining potential duplicated messages and non-duplicated messages into a batch. So need to flush the potential duplicated message first and then add the non-duplicated messages to a batch.

(cherry picked from commit b898f49)
tuteng pushed a commit that referenced this issue Apr 13, 2020
…ial duplicated messages and non-duplicated messages into a batch. (#6326)

Fixes #6273

Motivation
The main reason for #6273 is combining potential duplicated messages and non-duplicated messages into a batch. So need to flush the potential duplicated message first and then add the non-duplicated messages to a batch.

(cherry picked from commit b898f49)
jiazhai pushed a commit to jiazhai/pulsar that referenced this issue May 18, 2020
…ial duplicated messages and non-duplicated messages into a batch. (apache#6326)

Fixes apache#6273

Motivation
The main reason for apache#6273 is combining potential duplicated messages and non-duplicated messages into a batch. So need to flush the potential duplicated message first and then add the non-duplicated messages to a batch.
(cherry picked from commit b898f49)
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this issue Aug 24, 2020
…ial duplicated messages and non-duplicated messages into a batch. (apache#6326)

Fixes apache#6273

Motivation
The main reason for apache#6273 is combining potential duplicated messages and non-duplicated messages into a batch. So need to flush the potential duplicated message first and then add the non-duplicated messages to a batch.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release/2.5.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
3 participants