Skip to content

Commit debf0ff

Browse files
committed
fix ns isolation policy for repl namespaces
1 parent fc2e314 commit debf0ff

File tree

2 files changed

+48
-6
lines changed

2 files changed

+48
-6
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java

+47-5
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.stream.Collectors;
3737
import java.util.stream.Stream;
3838
import javax.ws.rs.DELETE;
39+
import javax.ws.rs.DefaultValue;
3940
import javax.ws.rs.GET;
4041
import javax.ws.rs.POST;
4142
import javax.ws.rs.PUT;
@@ -55,6 +56,7 @@
5556
import org.apache.pulsar.broker.admin.AdminResource;
5657
import org.apache.pulsar.broker.web.RestException;
5758
import org.apache.pulsar.client.admin.PulsarAdmin;
59+
import org.apache.pulsar.client.admin.PulsarAdminException;
5860
import org.apache.pulsar.common.naming.Constants;
5961
import org.apache.pulsar.common.naming.NamedEntity;
6062
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
@@ -705,7 +707,9 @@ public void setNamespaceIsolationPolicy(
705707
@ApiParam(value = "The namespace isolation policy name", required = true)
706708
@PathParam("policyName") String policyName,
707709
@ApiParam(value = "The namespace isolation policy data", required = true)
708-
NamespaceIsolationDataImpl policyData
710+
NamespaceIsolationDataImpl policyData,
711+
@DefaultValue("true")
712+
@QueryParam("unloadBundles") boolean unload
709713
) {
710714
validateSuperUserAccessAsync()
711715
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
@@ -723,7 +727,13 @@ public void setNamespaceIsolationPolicy(
723727
nsIsolationPolicies.setPolicy(policyName, policyData);
724728
return namespaceIsolationPolicies()
725729
.setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies());
726-
}).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(policyData))
730+
}).thenCompose(__ -> {
731+
if (unload) {
732+
return filterAndUnloadMatchedNamespaceAsync(cluster, policyData);
733+
} else {
734+
return CompletableFuture.completedFuture(null);
735+
}
736+
})
727737
.thenAccept(__ -> {
728738
log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.",
729739
clientAppId(), cluster, policyName);
@@ -757,7 +767,8 @@ public void setNamespaceIsolationPolicy(
757767
/**
758768
* Get matched namespaces; call unload for each namespaces.
759769
*/
760-
private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(NamespaceIsolationDataImpl policyData) {
770+
private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String cluster,
771+
NamespaceIsolationDataImpl policyData) {
761772
PulsarAdmin adminClient;
762773
try {
763774
adminClient = pulsar().getAdminClient();
@@ -770,8 +781,13 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(NamespaceIs
770781
.map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant));
771782
return FutureUtil.waitForAll(completableFutureStream)
772783
.thenApply(namespaces -> {
773-
// if namespace match any policy regex, add it to ns list to be unload.
784+
// Filter namespaces that have current cluster in their replication_clusters
785+
// if namespace match any policy regex, add it to ns list to be unloaded.
774786
return namespaces.stream()
787+
.filter(namespaceName -> adminClient.namespaces()
788+
.getPoliciesAsync(namespaceName)
789+
.thenApply(policies -> policies.replication_clusters.contains(cluster))
790+
.join())
775791
.filter(namespaceName ->
776792
policyData.getNamespaces().stream().anyMatch(namespaceName::matches))
777793
.collect(Collectors.toList());
@@ -781,7 +797,33 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(NamespaceIs
781797
return CompletableFuture.completedFuture(null);
782798
}
783799
List<CompletableFuture<Void>> futures = shouldUnloadNamespaces.stream()
784-
.map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName))
800+
.map(namespaceName -> {
801+
try {
802+
return adminClient.namespaces()
803+
.getPolicies(namespaceName);
804+
} catch (PulsarAdminException e) {
805+
log.warn("[{}] Failed to get policy for {} namespace.", clientAppId(),
806+
namespaceName, e);
807+
throw new RuntimeException(e);
808+
}
809+
})
810+
.map(policies -> {
811+
final List<CompletableFuture<Void>> unloadFutures = new ArrayList<>();
812+
List<String> boundaries = policies.bundles.getBoundaries();
813+
for (int i = 0; i < boundaries.size() - 1; i++) {
814+
String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
815+
try {
816+
unloadFutures.add(
817+
pulsar().getAdminClient().namespaces().unloadNamespaceBundleAsync(
818+
namespaceName.toString(), bundle));
819+
} catch (PulsarServerException e) {
820+
log.error("[{}] Failed to unload namespace {}", clientAppId(), namespaceName,
821+
e);
822+
throw new RestException(e);
823+
}
824+
}
825+
return FutureUtil.waitForAll(unloadFutures);
826+
})
785827
.collect(Collectors.toList());
786828
return FutureUtil.waitForAll(futures)
787829
.thenAccept(__ -> {

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ public void clusters() throws Exception {
309309
.build())
310310
.build();
311311
asyncRequests(ctx -> clusters.setNamespaceIsolationPolicy(ctx,
312-
"use", "policy1", policyData));
312+
"use", "policy1", policyData, true));
313313
asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));
314314

315315
try {

0 commit comments

Comments
 (0)