From c20071bab452643e82059af2c1d67d3db46e5ef0 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Fri, 14 Feb 2020 16:05:58 +0800 Subject: [PATCH] Should flush the last potential duplicated since can't combine potential duplicated messages and non-duplicated messages into a batch. (#6326) Fixes #6273 Motivation The main reason for #6273 is combining potential duplicated messages and non-duplicated messages into a batch. So need to flush the potential duplicated message first and then add the non-duplicated messages to a batch. (cherry picked from commit b898f49a9ebd6d1fdc602201987c182ba6ad8d2b) --- .../client/api/ClientDeduplicationTest.java | 12 ++++++---- .../impl/BatchMessageContainerImpl.java | 3 ++- .../pulsar/client/impl/ProducerImpl.java | 24 ++++++++++++------- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java index 97e7869a8bb33..09bfb79ccdcd1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java @@ -182,11 +182,12 @@ public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Except producer.flush(); // Repeat the messages and verify they're not received by consumer - producer.newMessage().value("my-message-1".getBytes()).sequenceId(2).sendAsync(); - producer.newMessage().value("my-message-2".getBytes()).sequenceId(4).sendAsync(); - producer.close(); + producer.newMessage().value("my-message-0".getBytes()).sequenceId(2).sendAsync(); + producer.newMessage().value("my-message-1".getBytes()).sequenceId(4).sendAsync(); + producer.newMessage().value("my-message-3".getBytes()).sequenceId(6).sendAsync(); + producer.flush(); - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 4; i++) { Message msg = consumer.receive(); assertEquals(new String(msg.getData()), "my-message-" + i); consumer.acknowledge(msg); @@ -196,11 +197,12 @@ public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Except Message msg = consumer.receive(1, TimeUnit.SECONDS); assertNull(msg); + producer.close(); // Kill and restart broker restartBroker(); producer = producerBuilder.create(); - assertEquals(producer.getLastSequenceId(), 5L); + assertEquals(producer.getLastSequenceId(), 6L); // Repeat the messages and verify they're not received by consumer producer.newMessage().value("my-message-1".getBytes()).sequenceId(2).sendAsync(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index d4772cef1b388..bcbd1c351f177 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -69,6 +69,7 @@ public boolean add(MessageImpl msg, SendCallback callback) { if (++numMessagesInBatch == 1) { // some properties are common amongst the different messages in the batch, hence we just pick it up from // the first message + messageMetadata.setSequenceId(msg.getSequenceId()); lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder()); this.firstCallback = callback; batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT @@ -87,7 +88,7 @@ public boolean add(MessageImpl msg, SendCallback callback) { messageMetadata.setSequenceId(lowestSequenceId); } highestSequenceId = msg.getSequenceId(); - producer.lastSequenceIdPushed = msg.getSequenceId(); + producer.lastSequenceIdPushed = Math.max(producer.lastSequenceIdPushed, msg.getSequenceId()); return isBatchFull(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 7805fa8ddfb17..776300deaec0a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -70,7 +70,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Commands.ChecksumType; @@ -114,6 +113,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private volatile long lastSequenceIdPublished; protected volatile long lastSequenceIdPushed; + private volatile boolean isLastSequenceIdPotentialDuplicated; private MessageCrypto msgCrypto = null; @@ -397,6 +397,7 @@ public void sendAsync(Message message, SendCallback callback) { // should trigger complete the batch message, new message will add to a new batch and new batch // sequence id use the new message, so that broker can handle the message duplication if (sequenceId <= lastSequenceIdPushed) { + isLastSequenceIdPotentialDuplicated = true; if (sequenceId <= lastSequenceIdPublished) { log.warn("Message with sequence id {} is definitely a duplicate", sequenceId); } else { @@ -405,14 +406,21 @@ public void sendAsync(Message message, SendCallback callback) { } doBatchSendAndAdd(msg, callback, payload); } else { - // handle boundary cases where message being added would exceed - // batch size and/or max message size - boolean isBatchFull = batchMessageContainer.add(msg, callback); - lastSendFuture = callback.getFuture(); - payload.release(); - if (isBatchFull) { - batchMessageAndSend(); + // Should flush the last potential duplicated since can't combine potential duplicated messages + // and non-duplicated messages into a batch. + if (isLastSequenceIdPotentialDuplicated) { + doBatchSendAndAdd(msg, callback, payload); + } else { + // handle boundary cases where message being added would exceed + // batch size and/or max message size + boolean isBatchFull = batchMessageContainer.add(msg, callback); + lastSendFuture = callback.getFuture(); + payload.release(); + if (isBatchFull) { + batchMessageAndSend(); + } } + isLastSequenceIdPotentialDuplicated = false; } } else { doBatchSendAndAdd(msg, callback, payload);