diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index fb5c457fcc874..919b811f002c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -20,6 +20,8 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetEmpty; +import static org.apache.pulsar.common.naming.Constants.DELAY_CANCELED_MESSAGE_POSITION; +import static org.apache.pulsar.common.naming.Constants.IS_MARK_DELETE_DELAY_MESSAGE; import io.netty.buffer.ByteBuf; import io.prometheus.client.Gauge; import java.util.ArrayList; @@ -28,6 +30,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -46,10 +49,12 @@ import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck.AckType; +import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; +import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap; import org.apache.pulsar.compaction.Compactor; import org.checkerframework.checker.nullness.qual.Nullable; @@ -69,10 +74,13 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen private final LongAdder filterRejectedMsgs = new LongAdder(); private final LongAdder filterRescheduledMsgs = new LongAdder(); + protected final ConcurrentLongLongPairHashMap delayedMessageMarkDeleteMap; + protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) { super(subscription); this.serviceConfig = serviceConfig; this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled(); + this.delayedMessageMarkDeleteMap = ConcurrentLongLongPairHashMap.newBuilder().autoShrink(true).build(); } @@ -221,6 +229,47 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i entries.set(i, null); entry.release(); continue; + } else if (delayedMessageMarkDeleteMap.containsKey(entry.getLedgerId(), entry.getEntryId())) { + // The delayed message is marked for delete. + ConcurrentLongLongPairHashMap.LongPair markMessageId = delayedMessageMarkDeleteMap + .get(entry.getLedgerId(), entry.getEntryId()); + List deleteDelayedMessageList = new ArrayList<>(); + deleteDelayedMessageList.add(entry.getPosition()); + deleteDelayedMessageList.add(PositionFactory.create(markMessageId.first, markMessageId.second)); + + delayedMessageMarkDeleteMap.remove(entry.getLedgerId(), entry.getEntryId()); + individualAcknowledgeMessageIfNeeded(deleteDelayedMessageList, Collections.emptyMap()); + entries.set(i, null); + entry.release(); + continue; + } + + List propertiesList = msgMetadata.getPropertiesList(); + if (!propertiesList.isEmpty()) { + Map propertiesMap = propertiesList.stream() + .filter(p -> p.getKey().equals(DELAY_CANCELED_MESSAGE_POSITION) + || p.getKey().equals(IS_MARK_DELETE_DELAY_MESSAGE)) + .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue, + (oldValue, newValue) -> newValue)); + + if (propertiesMap.containsKey(IS_MARK_DELETE_DELAY_MESSAGE)) { + if (propertiesMap.containsKey(DELAY_CANCELED_MESSAGE_POSITION)) { + String[] data = propertiesMap.get(DELAY_CANCELED_MESSAGE_POSITION).split(":"); + long ledgerId = Long.parseLong(data[0]); + long entryId = Long.parseLong(data[1]); + delayedMessageMarkDeleteMap.put(ledgerId, entryId, + entry.getLedgerId(), entry.getEntryId()); + entries.set(i, null); + entry.release(); + continue; + } else { + individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()), + Collections.emptyMap()); + entries.set(i, null); + entry.release(); + continue; + } + } } if (hasFilter) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java index e47857e8ec60f..df676a79e11d7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.common.naming.Constants.DELAY_CANCELED_MESSAGE_POSITION; +import static org.apache.pulsar.common.naming.Constants.IS_MARK_DELETE_DELAY_MESSAGE; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -32,6 +34,8 @@ import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; @@ -43,6 +47,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; @@ -686,4 +691,65 @@ public void testDelayedDeliveryExceedsMaxDelay() throws Exception { + maxDeliveryDelayInMillis + " milliseconds"); } } + + @Test + public void testDelayedMessageCancel() throws Exception { + String topic = BrokerTestUtil.newUniqueName("testDelayedMessageCancel"); + CountDownLatch latch = new CountDownLatch(9); + Set receivedMessages = ConcurrentHashMap.newKeySet(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("shared-sub") + .subscriptionType(SubscriptionType.Shared) + .messageListener((Consumer c, Message msg) -> { + receivedMessages.add(msg.getValue()); + c.acknowledgeAsync(msg); + latch.countDown(); + }) + .subscribe(); + + final long tickTime = 1000L; + + admin.topicPolicies().setDelayedDeliveryPolicy(topic, + DelayedDeliveryPolicies.builder() + .active(true) + .tickTime(tickTime) + .maxDeliveryDelayInMillis(10000) + .build()); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + for (int i = 0; i < 10; i++) { + final int n = i; + final long currentTime = System.currentTimeMillis(); + final long deliverAtTime = currentTime + 5000L; + producer.newMessage() + .key(String.valueOf(i)) + .value("msg-" + i) + .deliverAt(deliverAtTime) + .sendAsync().whenComplete((id, ex) -> { + if (n == 0) { + MessageIdAdv messageIdAdv = (MessageIdAdv) id; + String deleteDelayedMessageId = messageIdAdv.getLedgerId() + ":" + messageIdAdv.getEntryId(); + producer.newMessage() + .key(String.valueOf(n)) + .value("msg-0-mark") + .deliverAt(deliverAtTime - 2 * tickTime) + .property(IS_MARK_DELETE_DELAY_MESSAGE, "true") + .property(DELAY_CANCELED_MESSAGE_POSITION, deleteDelayedMessageId) + .sendAsync(); + } + }); + } + producer.flush(); + + assertTrue(latch.await(15, TimeUnit.SECONDS), "Not all messages were received in time"); + assertFalse(receivedMessages.contains("msg-0") || receivedMessages.contains("msg-0-mark"), + "msg-0 and msg-0-mark should have been cancelled but was received"); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java index ab71f2a43e5d4..944a35a779566 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java @@ -24,6 +24,8 @@ public class Constants { public static final String GLOBAL_CLUSTER = "global"; + public static final String DELAY_CANCELED_MESSAGE_POSITION = "delayCanceledMsgPosition"; + public static final String IS_MARK_DELETE_DELAY_MESSAGE = "isMarkDeleteDelayMessage"; private Constants() {} }