Skip to content

Commit

Permalink
[fix][broker] Fix compaction/replication data loss when expire messag…
Browse files Browse the repository at this point in the history
…es (apache#21865)

(cherry picked from commit d9029c6)
  • Loading branch information
coderzc authored and mukesh-ctds committed Mar 6, 2024
1 parent 0012ad3 commit 8c6ecc7
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.slf4j.Logger;
Expand Down Expand Up @@ -2097,10 +2098,9 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
final List<CompletableFuture<Void>> futures =
new ArrayList<>((int) topic.getReplicators().size());
List<String> subNames =
new ArrayList<>((int) topic.getReplicators().size()
+ (int) topic.getSubscriptions().size());
subNames.addAll(topic.getReplicators().keys());
subNames.addAll(topic.getSubscriptions().keys());
new ArrayList<>((int) topic.getSubscriptions().size());
subNames.addAll(topic.getSubscriptions().keys().stream().filter(
subName -> !subName.equals(Compactor.COMPACTION_SUBSCRIPTION)).toList());
for (int i = 0; i < subNames.size(); i++) {
try {
futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,6 @@ protected boolean replicateEntries(List<Entry> entries) {
continue;
}

if (msg.isExpired(messageTTLInSeconds)) {
msgExpired.recordEvent(0 /* no value stat */);
if (log.isDebugEnabled()) {
log.debug("[{}] Discarding expired message at position {}, replicateTo {}",
replicatorId, entry.getPosition(), msg.getReplicateTo());
}
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
entry.release();
msg.recycle();
continue;
}

if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
// The producer is not ready yet after having stopped/restarted. Drop the message because it will
// recovered when the producer is ready
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public void findEntryComplete(Position position, Object ctx) {
if (position != null) {
log.info("[{}][{}] Expiring all messages until position {}", topicName, subName, position);
Position prevMarkDeletePos = cursor.getMarkDeletedPosition();
cursor.asyncMarkDelete(position, markDeleteCallback, cursor.getNumberOfEntriesInBacklog(false));
cursor.asyncMarkDelete(position, cursor.getProperties(), markDeleteCallback,
cursor.getNumberOfEntriesInBacklog(false));
if (!Objects.equals(cursor.getMarkDeletedPosition(), prevMarkDeletePos) && subscription != null) {
subscription.updateLastMarkDeleteAdvancedTimestamp();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ private PersistentSubscription createPersistentSubscription(String subscriptionN
}
}

private static boolean isCompactionSubscription(String subscriptionName) {
public static boolean isCompactionSubscription(String subscriptionName) {
return COMPACTION_SUBSCRIPTION.equals(subscriptionName);
}

Expand Down Expand Up @@ -1696,11 +1696,11 @@ private CompletableFuture<Void> checkShadowReplication() {
public void checkMessageExpiry() {
int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get();
if (messageTtlInSeconds != 0) {
subscriptions.forEach((__, sub) -> sub.expireMessages(messageTtlInSeconds));
replicators.forEach((__, replicator)
-> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
shadowReplicators.forEach((__, replicator)
-> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
subscriptions.forEach((__, sub) -> {
if (!isCompactionSubscription(sub.getName())) {
sub.expireMessages(messageTtlInSeconds);
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,6 @@ protected boolean replicateEntries(List<Entry> entries) {
continue;
}

if (msg.isExpired(messageTTLInSeconds)) {
msgExpired.recordEvent(0 /* no value stat */);
if (log.isDebugEnabled()) {
log.debug("[{}] Discarding expired message at position {}, replicateTo {}",
replicatorId, entry.getPosition(), msg.getReplicateTo());
}
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
entry.release();
msg.recycle();
continue;
}

if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
// The producer is not ready yet after having stopped/restarted. Drop the message because it will
// recovered when the producer is ready
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
Expand Down Expand Up @@ -1803,4 +1804,74 @@ public void testReplicatorProducerNotExceed() throws Exception {

Assert.assertThrows(PulsarClientException.ProducerBusyException.class, () -> new MessageProducer(url2, dest2));
}

@Test
public void testReplicatorWithTTL() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicatorWithTTL ---");

final String cluster1 = pulsar1.getConfig().getClusterName();
final String cluster2 = pulsar2.getConfig().getClusterName();
final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
final TopicName topic = TopicName
.get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/testReplicatorWithTTL"));
admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2));
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2));
admin1.topics().createNonPartitionedTopic(topic.toString());
admin1.topicPolicies().setMessageTTL(topic.toString(), 1);

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

@Cleanup
Producer<byte[]> persistentProducer1 = client1.newProducer().topic(topic.toString()).create();
persistentProducer1.send("V1".getBytes());

waitReplicateFinish(topic, admin1);

PersistentTopic persistentTopic =
(PersistentTopic) pulsar1.getBrokerService().getTopicReference(topic.toString()).get();
persistentTopic.getReplicators().forEach((cluster, replicator) -> {
PersistentReplicator persistentReplicator = (PersistentReplicator) replicator;
// Pause replicator
persistentReplicator.disconnect();
});

persistentProducer1.send("V2".getBytes());
persistentProducer1.send("V3".getBytes());

Thread.sleep(1000);

admin1.topics().expireMessagesForAllSubscriptions(topic.toString(), 1);

persistentTopic.getReplicators().forEach((cluster, replicator) -> {
PersistentReplicator persistentReplicator = (PersistentReplicator) replicator;
persistentReplicator.startProducer();
});

waitReplicateFinish(topic, admin1);

persistentProducer1.send("V4".getBytes());

waitReplicateFinish(topic, admin1);

@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

@Cleanup
Consumer<byte[]> consumer = client2.newConsumer().topic(topic.toString())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub").subscribe();

List<String> result = new ArrayList<>();
while (true) {
Message<byte[]> receive = consumer.receive(2, TimeUnit.SECONDS);
if (receive == null) {
break;
}
result.add(new String(receive.getValue()));
}

assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2120,4 +2120,68 @@ public void testDeleteCompactedLedgerWithSlowAck() throws Exception {
() -> pulsarTestContext.getBookKeeperClient().openLedger(
compactedLedgerId.get(), BookKeeper.DigestType.CRC32, new byte[]{})));
}

@Test
public void testCompactionWithTTL() throws Exception {
String topicName = "persistent://my-property/use/my-ns/testCompactionWithTTL";
String subName = "sub";
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true)
.subscribe().close();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();

producer.newMessage().key("K1").value("V1").send();
producer.newMessage().key("K2").value("V2").send();

admin.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

producer.newMessage().key("K1").value("V3").send();
producer.newMessage().key("K2").value("V4").send();

Thread.sleep(1000);

// expire messages
admin.topics().expireMessagesForAllSubscriptions(topicName, 1);

// trim the topic
admin.topics().unload(topicName);

Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
assertEquals(internalStats.numberOfEntries, 4);
});

producer.newMessage().key("K3").value("V5").send();

admin.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

@Cleanup
Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true)
.subscribe();

List<String> result = new ArrayList<>();
while (true) {
Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
if (receive == null) {
break;
}

result.add(receive.getValue());
}

Assert.assertEquals(result, List.of("V3", "V4", "V5"));
}
}

0 comments on commit 8c6ecc7

Please sign in to comment.