Skip to content

Commit d7af82e

Browse files
grssamlhotari
authored andcommitted
[improve][admin] PIP-369 Introduce unload flag in ns-isolation-policy set call (apache#23120)
Co-authored-by: Zixuan Liu <[email protected]> (cherry picked from commit 8da3bf8)
1 parent 0bbfc75 commit d7af82e

File tree

8 files changed

+324
-27
lines changed

8 files changed

+324
-27
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java

+51-3
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import io.swagger.annotations.ExampleProperty;
2828
import java.util.ArrayList;
2929
import java.util.Collections;
30+
import java.util.HashSet;
3031
import java.util.LinkedHashSet;
3132
import java.util.List;
3233
import java.util.Map;
3334
import java.util.Objects;
3435
import java.util.Optional;
36+
import java.util.Set;
3537
import java.util.concurrent.CompletableFuture;
3638
import java.util.regex.Pattern;
3739
import java.util.stream.Collectors;
@@ -64,6 +66,7 @@
6466
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
6567
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
6668
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
69+
import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
6770
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
6871
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
6972
import org.apache.pulsar.common.util.FutureUtil;
@@ -681,10 +684,13 @@ public void setNamespaceIsolationPolicy(
681684
.setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap())
682685
.thenApply(__ -> new NamespaceIsolationPolicies()))
683686
).thenCompose(nsIsolationPolicies -> {
687+
NamespaceIsolationDataImpl oldPolicy = nsIsolationPolicies
688+
.getPolicies().getOrDefault(policyName, null);
684689
nsIsolationPolicies.setPolicy(policyName, policyData);
685690
return namespaceIsolationPolicies()
686-
.setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies());
687-
}).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData))
691+
.setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies())
692+
.thenApply(__ -> oldPolicy);
693+
}).thenCompose(oldPolicy -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData, oldPolicy))
688694
.thenAccept(__ -> {
689695
log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.",
690696
clientAppId(), cluster, policyName);
@@ -719,7 +725,13 @@ public void setNamespaceIsolationPolicy(
719725
* Get matched namespaces; call unload for each namespaces.
720726
*/
721727
private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String cluster,
722-
NamespaceIsolationDataImpl policyData) {
728+
NamespaceIsolationDataImpl policyData,
729+
NamespaceIsolationDataImpl oldPolicy) {
730+
// exit early if none of the namespaces need to be unloaded
731+
if (NamespaceIsolationPolicyUnloadScope.none.equals(policyData.getUnloadScope())) {
732+
return CompletableFuture.completedFuture(null);
733+
}
734+
723735
PulsarAdmin adminClient;
724736
try {
725737
adminClient = pulsar().getAdminClient();
@@ -728,6 +740,7 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String clus
728740
}
729741
// compile regex patterns once
730742
List<Pattern> namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList();
743+
// TODO for 4.x, we should include both old and new namespace regex pattern for unload `all_matching` option
731744
return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> {
732745
List<CompletableFuture<List<String>>> filteredNamespacesForEachTenant = tenants.stream()
733746
.map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
@@ -753,6 +766,41 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String clus
753766
if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
754767
return CompletableFuture.completedFuture(null);
755768
}
769+
// If unload type is 'changed', we need to figure out a further subset of namespaces whose placement might
770+
// actually have been changed.
771+
772+
log.debug("Old policy: {} ; new policy: {}", oldPolicy, policyData);
773+
if (oldPolicy != null && NamespaceIsolationPolicyUnloadScope.changed.equals(policyData.getUnloadScope())) {
774+
// We also compare that the previous primary broker list is same as current, in case all namespaces need
775+
// to be placed again anyway.
776+
if (CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), policyData.getPrimary())) {
777+
// list is same, so we continue finding the changed namespaces.
778+
779+
// We create a union regex list contains old + new regexes
780+
Set<String> combinedNamespaces = new HashSet<>(oldPolicy.getNamespaces());
781+
combinedNamespaces.addAll(policyData.getNamespaces());
782+
// We create a intersection of the old and new regexes. These won't need to be unloaded
783+
Set<String> commonNamespaces = new HashSet<>(oldPolicy.getNamespaces());
784+
commonNamespaces.retainAll(policyData.getNamespaces());
785+
786+
log.debug("combined regexes: {}; common regexes:{}", combinedNamespaces, combinedNamespaces);
787+
788+
// Find the changed regexes (new - new ∩ old). TODO for 4.x, make this (new U old - new ∩ old)
789+
combinedNamespaces.removeAll(commonNamespaces);
790+
791+
log.debug("changed regexes: {}", commonNamespaces);
792+
793+
// Now we further filter the filtered namespaces based on this combinedNamespaces set
794+
shouldUnloadNamespaces = shouldUnloadNamespaces.stream()
795+
.filter(name -> combinedNamespaces.stream()
796+
.map(Pattern::compile)
797+
.anyMatch(pattern -> pattern.matcher(name).matches())
798+
).toList();
799+
800+
}
801+
}
802+
// unload type is either null or not in (changed, none), so we proceed to unload all namespaces
803+
// TODO - default in 4.x should become `changed`
756804
List<CompletableFuture<Void>> futures = shouldUnloadNamespaces.stream()
757805
.map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName))
758806
.collect(Collectors.toList());

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java

+187-21
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static java.util.concurrent.TimeUnit.MINUTES;
2222
import static org.apache.commons.lang3.StringUtils.isBlank;
2323
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
24+
import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.*;
2425
import static org.mockito.ArgumentMatchers.any;
2526
import static org.mockito.Mockito.doAnswer;
2627
import static org.mockito.Mockito.spy;
@@ -52,6 +53,7 @@
5253
import java.util.TreeSet;
5354
import java.util.UUID;
5455
import java.util.concurrent.CompletableFuture;
56+
import java.util.concurrent.ExecutionException;
5557
import java.util.concurrent.TimeUnit;
5658
import java.util.concurrent.atomic.AtomicInteger;
5759
import javax.ws.rs.NotAcceptableException;
@@ -108,27 +110,7 @@
108110
import org.apache.pulsar.common.naming.NamespaceName;
109111
import org.apache.pulsar.common.naming.TopicDomain;
110112
import org.apache.pulsar.common.naming.TopicName;
111-
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
112-
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
113-
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
114-
import org.apache.pulsar.common.policies.data.BacklogQuota;
115-
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
116-
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
117-
import org.apache.pulsar.common.policies.data.BundlesData;
118-
import org.apache.pulsar.common.policies.data.ClusterData;
119-
import org.apache.pulsar.common.policies.data.ConsumerStats;
120-
import org.apache.pulsar.common.policies.data.EntryFilters;
121-
import org.apache.pulsar.common.policies.data.FailureDomain;
122-
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
123-
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
124-
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
125-
import org.apache.pulsar.common.policies.data.PersistencePolicies;
126-
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
127-
import org.apache.pulsar.common.policies.data.RetentionPolicies;
128-
import org.apache.pulsar.common.policies.data.SubscriptionStats;
129-
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
130-
import org.apache.pulsar.common.policies.data.TopicStats;
131-
import org.apache.pulsar.common.policies.data.TopicType;
113+
import org.apache.pulsar.common.policies.data.*;
132114
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
133115
import org.apache.pulsar.common.protocol.schema.SchemaData;
134116
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -3494,4 +3476,188 @@ public void testGetStatsIfPartitionNotExists() throws Exception {
34943476
// cleanup.
34953477
admin.topics().deletePartitionedTopic(partitionedTp);
34963478
}
3479+
3480+
private NamespaceIsolationData createPolicyData(NamespaceIsolationPolicyUnloadScope scope, List<String> namespaces,
3481+
List<String> primaryBrokers
3482+
) {
3483+
// setup ns-isolation-policy in both the clusters.
3484+
Map<String, String> parameters1 = new HashMap<>();
3485+
parameters1.put("min_limit", "1");
3486+
parameters1.put("usage_threshold", "100");
3487+
List<String> nsRegexList = new ArrayList<>(namespaces);
3488+
3489+
return NamespaceIsolationData.builder()
3490+
// "prop-ig/ns1" is present in test cluster, policy set on test2 should work
3491+
.namespaces(nsRegexList)
3492+
.primary(primaryBrokers)
3493+
.secondary(Collections.singletonList(""))
3494+
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
3495+
.policyType(AutoFailoverPolicyType.min_available)
3496+
.parameters(parameters1)
3497+
.build())
3498+
.unloadScope(scope)
3499+
.build();
3500+
}
3501+
3502+
private boolean allTopicsUnloaded(List<String> topics) {
3503+
for (String topic : topics) {
3504+
if (pulsar.getBrokerService().getTopicReference(topic).isPresent()) {
3505+
return false;
3506+
}
3507+
}
3508+
return true;
3509+
}
3510+
3511+
private void loadTopics(List<String> topics) throws PulsarClientException, ExecutionException, InterruptedException {
3512+
// create a topic by creating a producer so that the topic is present on the broker
3513+
for (String topic : topics) {
3514+
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
3515+
producer.close();
3516+
pulsar.getBrokerService().getTopicIfExists(topic).get();
3517+
}
3518+
3519+
// All namespaces are loaded onto broker. Assert that
3520+
for (String topic : topics) {
3521+
assertTrue(pulsar.getBrokerService().getTopicReference(topic).isPresent());
3522+
}
3523+
}
3524+
3525+
/**
3526+
* Validates that the namespace isolation policy set and update is unloading only the relevant namespaces based on
3527+
* the unload scope provided.
3528+
*
3529+
* @param topicType persistent or non persistent.
3530+
* @param policyName policy name.
3531+
* @param nsPrefix unique namespace prefix.
3532+
* @param totalNamespaces total namespaces to create. Only the end part. Each namespace also gets a topic t1.
3533+
* @param initialScope unload scope while creating the policy.
3534+
* @param initialNamespaceRegex namespace regex while creating the policy.
3535+
* @param initialLoadedNS expected namespaces to be still loaded after the policy create call. Remaining namespaces
3536+
* will be asserted to be unloaded within 20 seconds.
3537+
* @param updatedScope unload scope while updating the policy.
3538+
* @param updatedNamespaceRegex namespace regex while updating the policy.
3539+
* @param updatedLoadedNS expected namespaces to be loaded after policy update call. Remaining namespaces will be
3540+
* asserted to be unloaded within 20 seconds.
3541+
* @throws PulsarAdminException
3542+
* @throws PulsarClientException
3543+
* @throws ExecutionException
3544+
* @throws InterruptedException
3545+
*/
3546+
private void testIsolationPolicyUnloadsNSWithScope(String topicType, String policyName, String nsPrefix,
3547+
List<String> totalNamespaces,
3548+
NamespaceIsolationPolicyUnloadScope initialScope,
3549+
List<String> initialNamespaceRegex, List<String> initialLoadedNS,
3550+
NamespaceIsolationPolicyUnloadScope updatedScope,
3551+
List<String> updatedNamespaceRegex, List<String> updatedLoadedNS,
3552+
List<String> updatedBrokerRegex)
3553+
throws PulsarAdminException, PulsarClientException, ExecutionException, InterruptedException {
3554+
3555+
// Create all namespaces
3556+
List<String> allTopics = new ArrayList<>();
3557+
for (String namespacePart: totalNamespaces) {
3558+
admin.namespaces().createNamespace(nsPrefix + namespacePart, Set.of("test"));
3559+
allTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1");
3560+
}
3561+
// Load all topics so that they are present. Assume topic t1 under each namespace
3562+
loadTopics(allTopics);
3563+
3564+
// Create the policy
3565+
NamespaceIsolationData nsPolicyData1 = createPolicyData(
3566+
initialScope, initialNamespaceRegex, Collections.singletonList(".*")
3567+
);
3568+
admin.clusters().createNamespaceIsolationPolicy("test", policyName, nsPolicyData1);
3569+
3570+
List<String> initialLoadedTopics = new ArrayList<>();
3571+
for (String namespacePart: initialLoadedNS) {
3572+
initialLoadedTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1");
3573+
}
3574+
3575+
List<String> initialUnloadedTopics = new ArrayList<>(allTopics);
3576+
initialUnloadedTopics.removeAll(initialLoadedTopics);
3577+
3578+
// Assert that all topics (and thus ns) not under initialLoadedNS namespaces are unloaded
3579+
if (initialUnloadedTopics.isEmpty()) {
3580+
// Just wait a bit to ensure we don't miss lazy unloading of topics we expect not to unload
3581+
TimeUnit.SECONDS.sleep(5);
3582+
} else {
3583+
Awaitility.await()
3584+
.atMost(10, TimeUnit.SECONDS)
3585+
.until(() -> allTopicsUnloaded(initialUnloadedTopics));
3586+
}
3587+
// Assert that all topics under initialLoadedNS are still present
3588+
initialLoadedTopics.forEach(t -> assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent()));
3589+
3590+
// Load the topics again
3591+
loadTopics(allTopics);
3592+
3593+
// Update policy using updatedScope with updated namespace regex
3594+
nsPolicyData1 = createPolicyData(updatedScope, updatedNamespaceRegex, updatedBrokerRegex);
3595+
admin.clusters().updateNamespaceIsolationPolicy("test", policyName, nsPolicyData1);
3596+
3597+
List<String> updatedLoadedTopics = new ArrayList<>();
3598+
for (String namespacePart : updatedLoadedNS) {
3599+
updatedLoadedTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1");
3600+
}
3601+
3602+
List<String> updatedUnloadedTopics = new ArrayList<>(allTopics);
3603+
updatedUnloadedTopics.removeAll(updatedLoadedTopics);
3604+
3605+
// Assert that all topics (and thus ns) not under updatedLoadedNS namespaces are unloaded
3606+
if (updatedUnloadedTopics.isEmpty()) {
3607+
// Just wait a bit to ensure we don't miss lazy unloading of topics we expect not to unload
3608+
TimeUnit.SECONDS.sleep(5);
3609+
} else {
3610+
Awaitility.await()
3611+
.atMost(10, TimeUnit.SECONDS)
3612+
.until(() -> allTopicsUnloaded(updatedUnloadedTopics));
3613+
}
3614+
// Assert that all topics under updatedLoadedNS are still present
3615+
updatedLoadedTopics.forEach(t -> assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent()));
3616+
3617+
}
3618+
3619+
@Test(dataProvider = "topicType")
3620+
public void testIsolationPolicyUnloadsNSWithAllScope(final String topicType) throws Exception {
3621+
String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
3622+
testIsolationPolicyUnloadsNSWithScope(
3623+
topicType, "policy-all", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
3624+
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
3625+
all_matching, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("b1", "b2"),
3626+
Collections.singletonList(".*")
3627+
);
3628+
}
3629+
3630+
@Test(dataProvider = "topicType")
3631+
public void testIsolationPolicyUnloadsNSWithChangedScope(final String topicType) throws Exception {
3632+
String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
3633+
testIsolationPolicyUnloadsNSWithScope(
3634+
topicType, "policy-changed", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
3635+
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
3636+
changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"),
3637+
Collections.singletonList(".*")
3638+
);
3639+
}
3640+
3641+
@Test(dataProvider = "topicType")
3642+
public void testIsolationPolicyUnloadsNSWithNoneScope(final String topicType) throws Exception {
3643+
String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
3644+
testIsolationPolicyUnloadsNSWithScope(
3645+
topicType, "policy-none", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
3646+
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
3647+
none, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2", "c1"),
3648+
Collections.singletonList(".*")
3649+
);
3650+
}
3651+
3652+
@Test(dataProvider = "topicType")
3653+
public void testIsolationPolicyUnloadsNSWithPrimaryChanged(final String topicType) throws Exception {
3654+
String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
3655+
// As per changed flag, only c1 should unload, but due to primary change, both a* and c* will.
3656+
testIsolationPolicyUnloadsNSWithScope(
3657+
topicType, "policy-primary-changed", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
3658+
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
3659+
changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("b1", "b2"),
3660+
List.of(".*", "broker.*")
3661+
);
3662+
}
34973663
}

pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java

+4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ public interface NamespaceIsolationData {
3131

3232
AutoFailoverPolicyData getAutoFailoverPolicy();
3333

34+
NamespaceIsolationPolicyUnloadScope getUnloadScope();
35+
3436
void validate();
3537

3638
interface Builder {
@@ -42,6 +44,8 @@ interface Builder {
4244

4345
Builder autoFailoverPolicy(AutoFailoverPolicyData autoFailoverPolicyData);
4446

47+
Builder unloadScope(NamespaceIsolationPolicyUnloadScope unloadScope);
48+
4549
NamespaceIsolationData build();
4650
}
4751

0 commit comments

Comments
 (0)