Skip to content

Commit

Permalink
Fix messages in TopicPolicies will never be cleaned up (#11928)
Browse files Browse the repository at this point in the history
  • Loading branch information
315157973 authored Sep 20, 2021
1 parent fccc1cf commit 93e2db0
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
Expand Down Expand Up @@ -95,19 +96,10 @@ private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, Action
if (ex != null) {
result.completeExceptionally(ex);
} else {
writer.writeAsync(
PulsarEvent.builder()
.actionType(actionType)
.eventType(EventType.TOPIC_POLICY)
.topicPoliciesEvent(
TopicPoliciesEvent.builder()
.domain(topicName.getDomain().toString())
.tenant(topicName.getTenant())
.namespace(topicName.getNamespaceObject().getLocalName())
.topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
.policies(policies)
.build())
.build()).whenComplete(((messageId, e) -> {
PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
CompletableFuture<MessageId> actionFuture =
ActionType.DELETE.equals(actionType) ? writer.deleteAsync(event) : writer.writeAsync(event);
actionFuture.whenComplete(((messageId, e) -> {
if (e != null) {
result.completeExceptionally(e);
} else {
Expand All @@ -133,6 +125,21 @@ private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, Action
return result;
}

private PulsarEvent getPulsarEvent(TopicName topicName, ActionType actionType, TopicPolicies policies) {
return PulsarEvent.builder()
.actionType(actionType)
.eventType(EventType.TOPIC_POLICY)
.topicPoliciesEvent(
TopicPoliciesEvent.builder()
.domain(topicName.getDomain().toString())
.tenant(topicName.getTenant())
.namespace(topicName.getNamespaceObject().getLocalName())
.topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
.policies(policies)
.build())
.build();
}

private void notifyListener(Message<PulsarEvent> msg) {
if (!EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
return;
Expand Down Expand Up @@ -316,6 +323,11 @@ private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
}

private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
// delete policies
if (msg.getValue() == null) {
policiesCache.remove(TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName()));
return;
}
if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
TopicName topicName =
Expand All @@ -331,7 +343,19 @@ private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
policiesCache.put(topicName, event.getPolicies());
break;
case DELETE:
// Since PR #11928, this branch is no longer needed.
// However, due to compatibility, it is temporarily retained here
// and can be deleted in the future.
policiesCache.remove(topicName);
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newWriterAsync().thenAccept(writer
-> writer.deleteAsync(getPulsarEvent(topicName, ActionType.DELETE, null))
.whenComplete((result, e) -> writer.closeAsync().whenComplete((res, ex) -> {
if (ex != null) {
log.error("close writer failed ", ex);
}
})));
break;
case NONE:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
Expand Down Expand Up @@ -88,6 +89,18 @@ public CompletableFuture<MessageId> writeAsync(PulsarEvent event) {
return producer.newMessage().key(getEventKey(event)).value(event).sendAsync();
}

@Override
public MessageId delete(PulsarEvent event) throws PulsarClientException {
validateActionType(event);
return producer.newMessage().key(getEventKey(event)).value(null).send();
}

@Override
public CompletableFuture<MessageId> deleteAsync(PulsarEvent event) {
validateActionType(event);
return producer.newMessage().key(getEventKey(event)).value(null).sendAsync();
}

private String getEventKey(PulsarEvent event) {
return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
event.getTopicPoliciesEvent().getTenant(),
Expand Down Expand Up @@ -115,6 +128,12 @@ public SystemTopicClient<PulsarEvent> getSystemTopicClient() {
}
}

private static void validateActionType(PulsarEvent event) {
if (event == null || !ActionType.DELETE.equals(event.getActionType())) {
throw new UnsupportedOperationException("The only supported ActionType is DELETE");
}
}

private static class TopicPolicyReader implements Reader<PulsarEvent> {

private final org.apache.pulsar.client.api.Reader<PulsarEvent> reader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -61,6 +62,7 @@
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -2519,6 +2521,83 @@ public void testPolicyIsDeleteTogetherManually() throws Exception {
.isNull());
}

@Test
public void testPoliciesCanBeDeletedWithTopic() throws Exception {
final String topic = testTopic + UUID.randomUUID();
final String topic2 = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
pulsarClient.newProducer().topic(topic2).create().close();

Awaitility.await().untilAsserted(() -> {
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull();
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNull();
});
// Init Topic Policies. Send 4 messages in a row, there should be only 2 messages left after compression
admin.topics().setMaxConsumersPerSubscription(topic, 1);
admin.topics().setMaxConsumersPerSubscription(topic2, 2);
admin.topics().setMaxConsumersPerSubscription(topic, 3);
admin.topics().setMaxConsumersPerSubscription(topic2, 4);
Awaitility.await().untilAsserted(() -> {
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNotNull();
});
String topicPoliciesTopic = "persistent://" + myNamespace + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicPoliciesTopic).get().get();
// Trigger compaction and make sure it is finished.
persistentTopic.triggerCompaction();
Field field = PersistentTopic.class.getDeclaredField("currentCompaction");
field.setAccessible(true);
CompletableFuture<Long> future = (CompletableFuture<Long>)field.get(persistentTopic);
Awaitility.await().untilAsserted(() -> assertTrue(future.isDone()));

Consumer consumer = pulsarClient.newConsumer()
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.readCompacted(true)
.topic(topicPoliciesTopic).subscriptionName("sub").subscribe();
int count = 0;
while (true) {
Message message = consumer.receive(1, TimeUnit.SECONDS);
if (message != null) {
count++;
consumer.acknowledge(message);
} else {
break;
}
}
consumer.close();
assertEquals(count, 2);

// Delete topic, there should be only 1 message left after compression
admin.topics().delete(topic, true);

Awaitility.await().untilAsserted(() ->
assertNull(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))));
persistentTopic.triggerCompaction();
field = PersistentTopic.class.getDeclaredField("currentCompaction");
field.setAccessible(true);
CompletableFuture<Long> future2 = (CompletableFuture<Long>)field.get(persistentTopic);
Awaitility.await().untilAsserted(() -> assertTrue(future2.isDone()));

consumer = pulsarClient.newConsumer()
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.readCompacted(true)
.topic(topicPoliciesTopic).subscriptionName("sub").subscribe();
count = 0;
while (true) {
Message message = consumer.receive(1, TimeUnit.SECONDS);
if (message != null) {
count++;
consumer.acknowledge(message);
} else {
break;
}
}
consumer.close();
assertEquals(count, 1);

}

@Test
public void testPolicyIsDeleteTogetherAutomatically() throws Exception {
final String topic = testTopic + UUID.randomUUID();
Expand Down

0 comments on commit 93e2db0

Please sign in to comment.