Skip to content

Commit

Permalink
[fix][broker]Fix lookupService.getTopicsUnderNamespace can not work w…
Browse files Browse the repository at this point in the history
…ith a quote pattern (apache#23014)

(cherry picked from commit 7c0e827)
  • Loading branch information
poorbarcode committed Jul 11, 2024
1 parent 98d4a53 commit c0029d7
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.google.common.collect.Lists;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -48,6 +50,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -1113,4 +1116,57 @@ public void testTopicDeletion() throws Exception {
assertEquals(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-1").join(), Optional.empty());
assertTrue(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-2").join().isPresent());
}

@Test(dataProvider = "partitioned")
public void testPatternQuote(boolean partitioned) throws Exception {
final NamespaceName namespace = NamespaceName.get("public/default");
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
final LookupService lookup = client.getLookup();
List<String> expectedRes = new ArrayList<>();
if (partitioned) {
admin.topics().createPartitionedTopic(topicName, 2);
expectedRes.add(TopicName.get(topicName).getPartition(0).toString());
expectedRes.add(TopicName.get(topicName).getPartition(1).toString());
Collections.sort(expectedRes);
} else {
admin.topics().createNonPartitionedTopic(topicName);
expectedRes.add(topicName);
}

// Verify 1: "java.util.regex.Pattern.quote".
String pattern1 = java.util.regex.Pattern.quote(topicName);
List<String> res1 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT,
pattern1, null).join().getNonPartitionedOrPartitionTopics();
Collections.sort(res1);
assertEquals(res1, expectedRes);

// Verify 2: "com.google.re2j.Pattern.quote"
String pattern2 = com.google.re2j.Pattern.quote(topicName);
List<String> res2 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT,
pattern2, null).join().getNonPartitionedOrPartitionTopics();
Collections.sort(res2);
assertEquals(res2, expectedRes);

// Verify 3: "java.util.regex.Pattern.quote" & "^$"
String pattern3 = "^" + java.util.regex.Pattern.quote(topicName) + "$";
List<String> res3 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT,
pattern3, null).join().getNonPartitionedOrPartitionTopics();
Collections.sort(res3);
assertEquals(res3, expectedRes);

// Verify 4: "com.google.re2j.Pattern.quote" & "^$"
String pattern4 = "^" + com.google.re2j.Pattern.quote(topicName) + "$";
List<String> res4 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT,
pattern4, null).join().getNonPartitionedOrPartitionTopics();
Collections.sort(res4);
assertEquals(res4, expectedRes);

// cleanup.
if (partitioned) {
admin.topics().deletePartitionedTopic(topicName, false);
} else {
admin.topics().delete(topicName, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.topics;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
Expand All @@ -28,6 +29,7 @@
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;

@UtilityClass
Expand Down Expand Up @@ -82,15 +84,23 @@ public static Set<String> minus(Collection<String> list1, Collection<String> lis
return s1;
}

private static String removeTopicDomainScheme(String originalRegexp) {
@VisibleForTesting
static String removeTopicDomainScheme(String originalRegexp) {
if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
return originalRegexp;
}
String removedTopicDomain = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1];
if (originalRegexp.contains("^")) {
return String.format("^%s", removedTopicDomain);
String[] parts = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString());
String prefix = parts[0];
String removedTopicDomain = parts[1];
if (prefix.equals(TopicDomain.persistent.value()) || prefix.equals(TopicDomain.non_persistent.value())) {
prefix = "";
} else if (prefix.endsWith(TopicDomain.non_persistent.value())) {
prefix = prefix.substring(0, prefix.length() - TopicDomain.non_persistent.value().length());
} else if (prefix.endsWith(TopicDomain.persistent.value())){
prefix = prefix.substring(0, prefix.length() - TopicDomain.persistent.value().length());
} else {
return removedTopicDomain;
throw new IllegalArgumentException("Does not support topic domain: " + prefix);
}
return String.format("%s%s", prefix, removedTopicDomain);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TopicListTest {

Expand Down Expand Up @@ -107,5 +108,60 @@ public void testCalculateHash() {

}


@Test
public void testRemoveTopicDomainScheme() {
// persistent.
final String tpName1 = "persistent://public/default/tp";
String res1 = TopicList.removeTopicDomainScheme(tpName1);
assertEquals(res1, "public/default/tp");

// non-persistent
final String tpName2 = "non-persistent://public/default/tp";
String res2 = TopicList.removeTopicDomainScheme(tpName2);
assertEquals(res2, "public/default/tp");

// without topic domain.
final String tpName3 = "public/default/tp";
String res3 = TopicList.removeTopicDomainScheme(tpName3);
assertEquals(res3, "public/default/tp");

// persistent & "java.util.regex.Pattern.quote".
final String tpName4 = java.util.regex.Pattern.quote(tpName1);
String res4 = TopicList.removeTopicDomainScheme(tpName4);
assertEquals(res4, java.util.regex.Pattern.quote("public/default/tp"));

// persistent & "java.util.regex.Pattern.quote" & "^$".
final String tpName5 = "^" + java.util.regex.Pattern.quote(tpName1) + "$";
String res5 = TopicList.removeTopicDomainScheme(tpName5);
assertEquals(res5, "^" + java.util.regex.Pattern.quote("public/default/tp") + "$");

// persistent & "com.google.re2j.Pattern.quote".
final String tpName6 = Pattern.quote(tpName1);
String res6 = TopicList.removeTopicDomainScheme(tpName6);
assertEquals(res6, Pattern.quote("public/default/tp"));

// non-persistent & "java.util.regex.Pattern.quote".
final String tpName7 = java.util.regex.Pattern.quote(tpName2);
String res7 = TopicList.removeTopicDomainScheme(tpName7);
assertEquals(res7, java.util.regex.Pattern.quote("public/default/tp"));

// non-persistent & "com.google.re2j.Pattern.quote".
final String tpName8 = Pattern.quote(tpName2);
String res8 = TopicList.removeTopicDomainScheme(tpName8);
assertEquals(res8, Pattern.quote("public/default/tp"));

// non-persistent & "com.google.re2j.Pattern.quote" & "^$".
final String tpName9 = "^" + Pattern.quote(tpName2) + "$";
String res9 = TopicList.removeTopicDomainScheme(tpName9);
assertEquals(res9, "^" + Pattern.quote("public/default/tp") + "$");

// wrong topic domain.
final String tpName10 = "xx://public/default/tp";
try {
TopicList.removeTopicDomainScheme(tpName10);
fail("Does not support the topic domain xx");
} catch (Exception ex) {
// expected error.
}
}
}

0 comments on commit c0029d7

Please sign in to comment.