Skip to content

Commit

Permalink
[fix][client] Copy orderingKey to retry letter topic and DLQ messages…
Browse files Browse the repository at this point in the history
… and fix bug in copying (apache#23182)

Fixes apache#23173
Fixes apache#23181

See apache#23173 and apache#23181

- copy ordering key to messages sent to retry letter topic and DLQ topic

(cherry picked from commit 67fc5b9)
(cherry picked from commit c83428f)
  • Loading branch information
lhotari authored and nikhil-ctds committed Aug 16, 2024
1 parent 0ceef2a commit b9e08b6
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,66 @@ public void testDeadLetterTopicWithBinaryMessageKey() throws Exception {
consumer.close();
}

@Test
public void testDeadLetterTopicMessagesWithOrderingKey() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";

final int maxRedeliveryCount = 1;

final int sendMessages = 100;

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();

byte[] key = new byte[]{1, 2, 3, 4};
for (int i = 0; i < sendMessages; i++) {
producer.newMessage()
.orderingKey(key)
.value(String.format("Hello Pulsar [%d]", i).getBytes())
.send();
}

producer.close();

int totalReceived = 0;
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

int totalInDeadLetter = 0;
do {
Message message = deadLetterConsumer.receive();
assertEquals(message.getOrderingKey(), key);
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);

deadLetterConsumer.close();
consumer.close();
}

@DataProvider(name = "produceLargeMessages")
public Object[][] produceLargeMessages() {
return new Object[][] { { false }, { true } };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ public void testAutoConsumeSchemaRetryLetter() throws Exception {
public void testRetryTopicProperties() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";

byte[] key = "key".getBytes();
byte[] orderingKey = "orderingKey".getBytes();

final int maxRedeliveryCount = 3;

final int sendMessages = 10;
Expand Down Expand Up @@ -285,7 +288,11 @@ public void testRetryTopicProperties() throws Exception {

Set<String> originMessageIds = new HashSet<>();
for (int i = 0; i < sendMessages; i++) {
MessageId msgId = producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
MessageId msgId = producer.newMessage()
.value(String.format("Hello Pulsar [%d]", i).getBytes())
.keyBytes(key)
.orderingKey(orderingKey)
.send();
originMessageIds.add(msgId.toString());
}

Expand All @@ -298,6 +305,10 @@ public void testRetryTopicProperties() throws Exception {
if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
// check the REAL_TOPIC property
assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
assertTrue(message.hasKey());
assertEquals(message.getKeyBytes(), key);
assertTrue(message.hasOrderingKey());
assertEquals(message.getOrderingKey(), orderingKey);
retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
}
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
Expand All @@ -317,6 +328,10 @@ public void testRetryTopicProperties() throws Exception {
if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
// check the REAL_TOPIC property
assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
assertTrue(message.hasKey());
assertEquals(message.getKeyBytes(), key);
assertTrue(message.hasOrderingKey());
assertEquals(message.getOrderingKey(), orderingKey);
deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
}
deadLetterConsumer.acknowledge(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,14 +595,17 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, A
}
}

private static void copyMessageKeyIfNeeded(Message<?> message, TypedMessageBuilder<?> typedMessageBuilderNew) {
private static void copyMessageKeysIfNeeded(Message<?> message, TypedMessageBuilder<?> typedMessageBuilderNew) {
if (message.hasKey()) {
if (message.hasBase64EncodedKey()) {
typedMessageBuilderNew.keyBytes(message.getKeyBytes());
} else {
typedMessageBuilderNew.key(message.getKey());
}
}
if (message.hasOrderingKey()) {
typedMessageBuilderNew.orderingKey(message.getOrderingKey());
}
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -660,6 +663,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
.value(retryMessage.getData())
.properties(propertiesMap);
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> {
result.complete(null);
Expand All @@ -686,7 +690,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
if (delayTime > 0) {
typedMessageBuilderNew.deliverAfter(delayTime, unit);
}
copyMessageKeyIfNeeded(message, typedMessageBuilderNew);
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync()
.thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
.thenAccept(v -> result.complete(null))
Expand Down Expand Up @@ -2135,7 +2139,7 @@ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv messageId)
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(message.getData())
.properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr));
copyMessageKeyIfNeeded(message, typedMessageBuilderNew);
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync()
.thenAccept(messageIdInDLQ -> {
possibleSendToDeadLetterTopicMessages.remove(messageId);
Expand Down

0 comments on commit b9e08b6

Please sign in to comment.