Skip to content

Commit

Permalink
Should flush the last potential duplicated since can't combine potent…
Browse files Browse the repository at this point in the history
…ial duplicated messages and non-duplicated messages into a batch. (apache#6326)

Fixes apache#6273

Motivation
The main reason for apache#6273 is combining potential duplicated messages and non-duplicated messages into a batch. So need to flush the potential duplicated message first and then add the non-duplicated messages to a batch.
  • Loading branch information
codelipenghui authored Feb 14, 2020
1 parent 2e1c74a commit b898f49
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,12 @@ public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Except
producer.flush();

// Repeat the messages and verify they're not received by consumer
producer.newMessage().value("my-message-1".getBytes()).sequenceId(2).sendAsync();
producer.newMessage().value("my-message-2".getBytes()).sequenceId(4).sendAsync();
producer.close();
producer.newMessage().value("my-message-0".getBytes()).sequenceId(2).sendAsync();
producer.newMessage().value("my-message-1".getBytes()).sequenceId(4).sendAsync();
producer.newMessage().value("my-message-3".getBytes()).sequenceId(6).sendAsync();
producer.flush();

for (int i = 0; i < 3; i++) {
for (int i = 0; i < 4; i++) {
Message<byte[]> msg = consumer.receive();
assertEquals(new String(msg.getData()), "my-message-" + i);
consumer.acknowledge(msg);
Expand All @@ -196,11 +197,12 @@ public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Except
Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
assertNull(msg);

producer.close();
// Kill and restart broker
restartBroker();

producer = producerBuilder.create();
assertEquals(producer.getLastSequenceId(), 5L);
assertEquals(producer.getLastSequenceId(), 6L);

// Repeat the messages and verify they're not received by consumer
producer.newMessage().value("my-message-1".getBytes()).sequenceId(2).sendAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
if (++numMessagesInBatch == 1) {
// some properties are common amongst the different messages in the batch, hence we just pick it up from
// the first message
messageMetadata.setSequenceId(msg.getSequenceId());
lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
this.firstCallback = callback;
batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
Expand All @@ -87,7 +88,7 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
messageMetadata.setSequenceId(lowestSequenceId);
}
highestSequenceId = msg.getSequenceId();
producer.lastSequenceIdPushed = msg.getSequenceId();
producer.lastSequenceIdPushed = Math.max(producer.lastSequenceIdPushed, msg.getSequenceId());

return isBatchFull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
Expand Down Expand Up @@ -114,6 +113,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne

private volatile long lastSequenceIdPublished;
protected volatile long lastSequenceIdPushed;
private volatile boolean isLastSequenceIdPotentialDuplicated;

private MessageCrypto msgCrypto = null;

Expand Down Expand Up @@ -397,6 +397,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
// should trigger complete the batch message, new message will add to a new batch and new batch
// sequence id use the new message, so that broker can handle the message duplication
if (sequenceId <= lastSequenceIdPushed) {
isLastSequenceIdPotentialDuplicated = true;
if (sequenceId <= lastSequenceIdPublished) {
log.warn("Message with sequence id {} is definitely a duplicate", sequenceId);
} else {
Expand All @@ -405,14 +406,21 @@ public void sendAsync(Message<?> message, SendCallback callback) {
}
doBatchSendAndAdd(msg, callback, payload);
} else {
// handle boundary cases where message being added would exceed
// batch size and/or max message size
boolean isBatchFull = batchMessageContainer.add(msg, callback);
lastSendFuture = callback.getFuture();
payload.release();
if (isBatchFull) {
batchMessageAndSend();
// Should flush the last potential duplicated since can't combine potential duplicated messages
// and non-duplicated messages into a batch.
if (isLastSequenceIdPotentialDuplicated) {
doBatchSendAndAdd(msg, callback, payload);
} else {
// handle boundary cases where message being added would exceed
// batch size and/or max message size
boolean isBatchFull = batchMessageContainer.add(msg, callback);
lastSendFuture = callback.getFuture();
payload.release();
if (isBatchFull) {
batchMessageAndSend();
}
}
isLastSequenceIdPotentialDuplicated = false;
}
} else {
doBatchSendAndAdd(msg, callback, payload);
Expand Down

0 comments on commit b898f49

Please sign in to comment.