Skip to content

Commit 8da3bf8

Browse files
grssamnodece
andauthored
[improve][admin] PIP-369 Introduce unload flag in ns-isolation-policy set call (apache#23120)
Co-authored-by: Zixuan Liu <[email protected]>
1 parent 3a59e4c commit 8da3bf8

File tree

8 files changed

+324
-27
lines changed

8 files changed

+324
-27
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java

+51-3
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import io.swagger.annotations.ExampleProperty;
2828
import java.util.ArrayList;
2929
import java.util.Collections;
30+
import java.util.HashSet;
3031
import java.util.LinkedHashSet;
3132
import java.util.List;
3233
import java.util.Map;
3334
import java.util.Objects;
3435
import java.util.Optional;
36+
import java.util.Set;
3537
import java.util.concurrent.CompletableFuture;
3638
import java.util.regex.Pattern;
3739
import java.util.stream.Collectors;
@@ -65,6 +67,7 @@
6567
import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
6668
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
6769
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
70+
import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
6871
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
6972
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
7073
import org.apache.pulsar.common.util.FutureUtil;
@@ -721,10 +724,13 @@ public void setNamespaceIsolationPolicy(
721724
.setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap())
722725
.thenApply(__ -> new NamespaceIsolationPolicies()))
723726
).thenCompose(nsIsolationPolicies -> {
727+
NamespaceIsolationDataImpl oldPolicy = nsIsolationPolicies
728+
.getPolicies().getOrDefault(policyName, null);
724729
nsIsolationPolicies.setPolicy(policyName, policyData);
725730
return namespaceIsolationPolicies()
726-
.setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies());
727-
}).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData))
731+
.setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies())
732+
.thenApply(__ -> oldPolicy);
733+
}).thenCompose(oldPolicy -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData, oldPolicy))
728734
.thenAccept(__ -> {
729735
log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.",
730736
clientAppId(), cluster, policyName);
@@ -759,7 +765,13 @@ public void setNamespaceIsolationPolicy(
759765
* Get matched namespaces; call unload for each namespaces.
760766
*/
761767
private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String cluster,
762-
NamespaceIsolationDataImpl policyData) {
768+
NamespaceIsolationDataImpl policyData,
769+
NamespaceIsolationDataImpl oldPolicy) {
770+
// exit early if none of the namespaces need to be unloaded
771+
if (NamespaceIsolationPolicyUnloadScope.none.equals(policyData.getUnloadScope())) {
772+
return CompletableFuture.completedFuture(null);
773+
}
774+
763775
PulsarAdmin adminClient;
764776
try {
765777
adminClient = pulsar().getAdminClient();
@@ -768,6 +780,7 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String clus
768780
}
769781
// compile regex patterns once
770782
List<Pattern> namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList();
783+
// TODO for 4.x, we should include both old and new namespace regex pattern for unload `all_matching` option
771784
return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> {
772785
List<CompletableFuture<List<String>>> filteredNamespacesForEachTenant = tenants.stream()
773786
.map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
@@ -793,6 +806,41 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String clus
793806
if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
794807
return CompletableFuture.completedFuture(null);
795808
}
809+
// If unload type is 'changed', we need to figure out a further subset of namespaces whose placement might
810+
// actually have been changed.
811+
812+
log.debug("Old policy: {} ; new policy: {}", oldPolicy, policyData);
813+
if (oldPolicy != null && NamespaceIsolationPolicyUnloadScope.changed.equals(policyData.getUnloadScope())) {
814+
// We also compare that the previous primary broker list is same as current, in case all namespaces need
815+
// to be placed again anyway.
816+
if (CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), policyData.getPrimary())) {
817+
// list is same, so we continue finding the changed namespaces.
818+
819+
// We create a union regex list contains old + new regexes
820+
Set<String> combinedNamespaces = new HashSet<>(oldPolicy.getNamespaces());
821+
combinedNamespaces.addAll(policyData.getNamespaces());
822+
// We create a intersection of the old and new regexes. These won't need to be unloaded
823+
Set<String> commonNamespaces = new HashSet<>(oldPolicy.getNamespaces());
824+
commonNamespaces.retainAll(policyData.getNamespaces());
825+
826+
log.debug("combined regexes: {}; common regexes:{}", combinedNamespaces, combinedNamespaces);
827+
828+
// Find the changed regexes (new - new ∩ old). TODO for 4.x, make this (new U old - new ∩ old)
829+
combinedNamespaces.removeAll(commonNamespaces);
830+
831+
log.debug("changed regexes: {}", commonNamespaces);
832+
833+
// Now we further filter the filtered namespaces based on this combinedNamespaces set
834+
shouldUnloadNamespaces = shouldUnloadNamespaces.stream()
835+
.filter(name -> combinedNamespaces.stream()
836+
.map(Pattern::compile)
837+
.anyMatch(pattern -> pattern.matcher(name).matches())
838+
).toList();
839+
840+
}
841+
}
842+
// unload type is either null or not in (changed, none), so we proceed to unload all namespaces
843+
// TODO - default in 4.x should become `changed`
796844
List<CompletableFuture<Void>> futures = shouldUnloadNamespaces.stream()
797845
.map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName))
798846
.collect(Collectors.toList());

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java

+187-21
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.apache.commons.lang3.StringUtils.isBlank;
2323
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
2424
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
25+
import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.*;
2526
import static org.mockito.ArgumentMatchers.any;
2627
import static org.mockito.Mockito.doAnswer;
2728
import static org.mockito.Mockito.spy;
@@ -53,6 +54,7 @@
5354
import java.util.TreeSet;
5455
import java.util.UUID;
5556
import java.util.concurrent.CompletableFuture;
57+
import java.util.concurrent.ExecutionException;
5658
import java.util.concurrent.TimeUnit;
5759
import java.util.concurrent.atomic.AtomicInteger;
5860
import javax.ws.rs.NotAcceptableException;
@@ -109,27 +111,7 @@
109111
import org.apache.pulsar.common.naming.NamespaceName;
110112
import org.apache.pulsar.common.naming.TopicDomain;
111113
import org.apache.pulsar.common.naming.TopicName;
112-
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
113-
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
114-
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
115-
import org.apache.pulsar.common.policies.data.BacklogQuota;
116-
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
117-
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
118-
import org.apache.pulsar.common.policies.data.BundlesData;
119-
import org.apache.pulsar.common.policies.data.ClusterData;
120-
import org.apache.pulsar.common.policies.data.ConsumerStats;
121-
import org.apache.pulsar.common.policies.data.EntryFilters;
122-
import org.apache.pulsar.common.policies.data.FailureDomain;
123-
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
124-
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
125-
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
126-
import org.apache.pulsar.common.policies.data.PersistencePolicies;
127-
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
128-
import org.apache.pulsar.common.policies.data.RetentionPolicies;
129-
import org.apache.pulsar.common.policies.data.SubscriptionStats;
130-
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
131-
import org.apache.pulsar.common.policies.data.TopicStats;
132-
import org.apache.pulsar.common.policies.data.TopicType;
114+
import org.apache.pulsar.common.policies.data.*;
133115
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
134116
import org.apache.pulsar.common.protocol.schema.SchemaData;
135117
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -3496,4 +3478,188 @@ public void testGetStatsIfPartitionNotExists() throws Exception {
34963478
// cleanup.
34973479
admin.topics().deletePartitionedTopic(partitionedTp);
34983480
}
3481+
3482+
private NamespaceIsolationData createPolicyData(NamespaceIsolationPolicyUnloadScope scope, List<String> namespaces,
3483+
List<String> primaryBrokers
3484+
) {
3485+
// setup ns-isolation-policy in both the clusters.
3486+
Map<String, String> parameters1 = new HashMap<>();
3487+
parameters1.put("min_limit", "1");
3488+
parameters1.put("usage_threshold", "100");
3489+
List<String> nsRegexList = new ArrayList<>(namespaces);
3490+
3491+
return NamespaceIsolationData.builder()
3492+
// "prop-ig/ns1" is present in test cluster, policy set on test2 should work
3493+
.namespaces(nsRegexList)
3494+
.primary(primaryBrokers)
3495+
.secondary(Collections.singletonList(""))
3496+
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
3497+
.policyType(AutoFailoverPolicyType.min_available)
3498+
.parameters(parameters1)
3499+
.build())
3500+
.unloadScope(scope)
3501+
.build();
3502+
}
3503+
3504+
private boolean allTopicsUnloaded(List<String> topics) {
3505+
for (String topic : topics) {
3506+
if (pulsar.getBrokerService().getTopicReference(topic).isPresent()) {
3507+
return false;
3508+
}
3509+
}
3510+
return true;
3511+
}
3512+
3513+
private void loadTopics(List<String> topics) throws PulsarClientException, ExecutionException, InterruptedException {
3514+
// create a topic by creating a producer so that the topic is present on the broker
3515+
for (String topic : topics) {
3516+
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
3517+
producer.close();
3518+
pulsar.getBrokerService().getTopicIfExists(topic).get();
3519+
}
3520+
3521+
// All namespaces are loaded onto broker. Assert that
3522+
for (String topic : topics) {
3523+
assertTrue(pulsar.getBrokerService().getTopicReference(topic).isPresent());
3524+
}
3525+
}
3526+
3527+
/**
3528+
* Validates that the namespace isolation policy set and update is unloading only the relevant namespaces based on
3529+
* the unload scope provided.
3530+
*
3531+
* @param topicType persistent or non persistent.
3532+
* @param policyName policy name.
3533+
* @param nsPrefix unique namespace prefix.
3534+
* @param totalNamespaces total namespaces to create. Only the end part. Each namespace also gets a topic t1.
3535+
* @param initialScope unload scope while creating the policy.
3536+
* @param initialNamespaceRegex namespace regex while creating the policy.
3537+
* @param initialLoadedNS expected namespaces to be still loaded after the policy create call. Remaining namespaces
3538+
* will be asserted to be unloaded within 20 seconds.
3539+
* @param updatedScope unload scope while updating the policy.
3540+
* @param updatedNamespaceRegex namespace regex while updating the policy.
3541+
* @param updatedLoadedNS expected namespaces to be loaded after policy update call. Remaining namespaces will be
3542+
* asserted to be unloaded within 20 seconds.
3543+
* @throws PulsarAdminException
3544+
* @throws PulsarClientException
3545+
* @throws ExecutionException
3546+
* @throws InterruptedException
3547+
*/
3548+
private void testIsolationPolicyUnloadsNSWithScope(String topicType, String policyName, String nsPrefix,
3549+
List<String> totalNamespaces,
3550+
NamespaceIsolationPolicyUnloadScope initialScope,
3551+
List<String> initialNamespaceRegex, List<String> initialLoadedNS,
3552+
NamespaceIsolationPolicyUnloadScope updatedScope,
3553+
List<String> updatedNamespaceRegex, List<String> updatedLoadedNS,
3554+
List<String> updatedBrokerRegex)
3555+
throws PulsarAdminException, PulsarClientException, ExecutionException, InterruptedException {
3556+
3557+
// Create all namespaces
3558+
List<String> allTopics = new ArrayList<>();
3559+
for (String namespacePart: totalNamespaces) {
3560+
admin.namespaces().createNamespace(nsPrefix + namespacePart, Set.of("test"));
3561+
allTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1");
3562+
}
3563+
// Load all topics so that they are present. Assume topic t1 under each namespace
3564+
loadTopics(allTopics);
3565+
3566+
// Create the policy
3567+
NamespaceIsolationData nsPolicyData1 = createPolicyData(
3568+
initialScope, initialNamespaceRegex, Collections.singletonList(".*")
3569+
);
3570+
admin.clusters().createNamespaceIsolationPolicy("test", policyName, nsPolicyData1);
3571+
3572+
List<String> initialLoadedTopics = new ArrayList<>();
3573+
for (String namespacePart: initialLoadedNS) {
3574+
initialLoadedTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1");
3575+
}
3576+
3577+
List<String> initialUnloadedTopics = new ArrayList<>(allTopics);
3578+
initialUnloadedTopics.removeAll(initialLoadedTopics);
3579+
3580+
// Assert that all topics (and thus ns) not under initialLoadedNS namespaces are unloaded
3581+
if (initialUnloadedTopics.isEmpty()) {
3582+
// Just wait a bit to ensure we don't miss lazy unloading of topics we expect not to unload
3583+
TimeUnit.SECONDS.sleep(5);
3584+
} else {
3585+
Awaitility.await()
3586+
.atMost(10, TimeUnit.SECONDS)
3587+
.until(() -> allTopicsUnloaded(initialUnloadedTopics));
3588+
}
3589+
// Assert that all topics under initialLoadedNS are still present
3590+
initialLoadedTopics.forEach(t -> assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent()));
3591+
3592+
// Load the topics again
3593+
loadTopics(allTopics);
3594+
3595+
// Update policy using updatedScope with updated namespace regex
3596+
nsPolicyData1 = createPolicyData(updatedScope, updatedNamespaceRegex, updatedBrokerRegex);
3597+
admin.clusters().updateNamespaceIsolationPolicy("test", policyName, nsPolicyData1);
3598+
3599+
List<String> updatedLoadedTopics = new ArrayList<>();
3600+
for (String namespacePart : updatedLoadedNS) {
3601+
updatedLoadedTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1");
3602+
}
3603+
3604+
List<String> updatedUnloadedTopics = new ArrayList<>(allTopics);
3605+
updatedUnloadedTopics.removeAll(updatedLoadedTopics);
3606+
3607+
// Assert that all topics (and thus ns) not under updatedLoadedNS namespaces are unloaded
3608+
if (updatedUnloadedTopics.isEmpty()) {
3609+
// Just wait a bit to ensure we don't miss lazy unloading of topics we expect not to unload
3610+
TimeUnit.SECONDS.sleep(5);
3611+
} else {
3612+
Awaitility.await()
3613+
.atMost(10, TimeUnit.SECONDS)
3614+
.until(() -> allTopicsUnloaded(updatedUnloadedTopics));
3615+
}
3616+
// Assert that all topics under updatedLoadedNS are still present
3617+
updatedLoadedTopics.forEach(t -> assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent()));
3618+
3619+
}
3620+
3621+
@Test(dataProvider = "topicType")
3622+
public void testIsolationPolicyUnloadsNSWithAllScope(final String topicType) throws Exception {
3623+
String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
3624+
testIsolationPolicyUnloadsNSWithScope(
3625+
topicType, "policy-all", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
3626+
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
3627+
all_matching, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("b1", "b2"),
3628+
Collections.singletonList(".*")
3629+
);
3630+
}
3631+
3632+
@Test(dataProvider = "topicType")
3633+
public void testIsolationPolicyUnloadsNSWithChangedScope(final String topicType) throws Exception {
3634+
String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
3635+
testIsolationPolicyUnloadsNSWithScope(
3636+
topicType, "policy-changed", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
3637+
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
3638+
changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"),
3639+
Collections.singletonList(".*")
3640+
);
3641+
}
3642+
3643+
@Test(dataProvider = "topicType")
3644+
public void testIsolationPolicyUnloadsNSWithNoneScope(final String topicType) throws Exception {
3645+
String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
3646+
testIsolationPolicyUnloadsNSWithScope(
3647+
topicType, "policy-none", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
3648+
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
3649+
none, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2", "c1"),
3650+
Collections.singletonList(".*")
3651+
);
3652+
}
3653+
3654+
@Test(dataProvider = "topicType")
3655+
public void testIsolationPolicyUnloadsNSWithPrimaryChanged(final String topicType) throws Exception {
3656+
String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
3657+
// As per changed flag, only c1 should unload, but due to primary change, both a* and c* will.
3658+
testIsolationPolicyUnloadsNSWithScope(
3659+
topicType, "policy-primary-changed", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
3660+
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
3661+
changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("b1", "b2"),
3662+
List.of(".*", "broker.*")
3663+
);
3664+
}
34993665
}

pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java

+4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ public interface NamespaceIsolationData {
3131

3232
AutoFailoverPolicyData getAutoFailoverPolicy();
3333

34+
NamespaceIsolationPolicyUnloadScope getUnloadScope();
35+
3436
void validate();
3537

3638
interface Builder {
@@ -42,6 +44,8 @@ interface Builder {
4244

4345
Builder autoFailoverPolicy(AutoFailoverPolicyData autoFailoverPolicyData);
4446

47+
Builder unloadScope(NamespaceIsolationPolicyUnloadScope unloadScope);
48+
4549
NamespaceIsolationData build();
4650
}
4751

0 commit comments

Comments
 (0)