From debf0ff034c796b3892751793b9dae5d1be3af66 Mon Sep 17 00:00:00 2001 From: iosdev747 Date: Mon, 29 Jul 2024 21:53:41 +0530 Subject: [PATCH 1/7] fix ns isolation policy for repl namespaces --- .../broker/admin/impl/ClustersBase.java | 52 +++++++++++++++++-- .../apache/pulsar/broker/admin/AdminTest.java | 2 +- 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 2f064d7b37720..2fa26af409c30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -36,6 +36,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.PUT; @@ -55,6 +56,7 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; @@ -705,7 +707,9 @@ public void setNamespaceIsolationPolicy( @ApiParam(value = "The namespace isolation policy name", required = true) @PathParam("policyName") String policyName, @ApiParam(value = "The namespace isolation policy data", required = true) - NamespaceIsolationDataImpl policyData + NamespaceIsolationDataImpl policyData, + @DefaultValue("true") + @QueryParam("unloadBundles") boolean unload ) { validateSuperUserAccessAsync() .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) @@ -723,7 +727,13 @@ public void setNamespaceIsolationPolicy( nsIsolationPolicies.setPolicy(policyName, policyData); return namespaceIsolationPolicies() .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies()); - }).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(policyData)) + }).thenCompose(__ -> { + if (unload) { + return filterAndUnloadMatchedNamespaceAsync(cluster, policyData); + } else { + return CompletableFuture.completedFuture(null); + } + }) .thenAccept(__ -> { log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.", clientAppId(), cluster, policyName); @@ -757,7 +767,8 @@ public void setNamespaceIsolationPolicy( /** * Get matched namespaces; call unload for each namespaces. */ - private CompletableFuture filterAndUnloadMatchedNamespaceAsync(NamespaceIsolationDataImpl policyData) { + private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String cluster, + NamespaceIsolationDataImpl policyData) { PulsarAdmin adminClient; try { adminClient = pulsar().getAdminClient(); @@ -770,8 +781,13 @@ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(NamespaceIs .map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant)); return FutureUtil.waitForAll(completableFutureStream) .thenApply(namespaces -> { - // if namespace match any policy regex, add it to ns list to be unload. + // Filter namespaces that have current cluster in their replication_clusters + // if namespace match any policy regex, add it to ns list to be unloaded. return namespaces.stream() + .filter(namespaceName -> adminClient.namespaces() + .getPoliciesAsync(namespaceName) + .thenApply(policies -> policies.replication_clusters.contains(cluster)) + .join()) .filter(namespaceName -> policyData.getNamespaces().stream().anyMatch(namespaceName::matches)) .collect(Collectors.toList()); @@ -781,7 +797,33 @@ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(NamespaceIs return CompletableFuture.completedFuture(null); } List> futures = shouldUnloadNamespaces.stream() - .map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName)) + .map(namespaceName -> { + try { + return adminClient.namespaces() + .getPolicies(namespaceName); + } catch (PulsarAdminException e) { + log.warn("[{}] Failed to get policy for {} namespace.", clientAppId(), + namespaceName, e); + throw new RuntimeException(e); + } + }) + .map(policies -> { + final List> unloadFutures = new ArrayList<>(); + List boundaries = policies.bundles.getBoundaries(); + for (int i = 0; i < boundaries.size() - 1; i++) { + String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1)); + try { + unloadFutures.add( + pulsar().getAdminClient().namespaces().unloadNamespaceBundleAsync( + namespaceName.toString(), bundle)); + } catch (PulsarServerException e) { + log.error("[{}] Failed to unload namespace {}", clientAppId(), namespaceName, + e); + throw new RestException(e); + } + } + return FutureUtil.waitForAll(unloadFutures); + }) .collect(Collectors.toList()); return FutureUtil.waitForAll(futures) .thenAccept(__ -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 2894903c0d0c1..9d216014e7738 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -309,7 +309,7 @@ public void clusters() throws Exception { .build()) .build(); asyncRequests(ctx -> clusters.setNamespaceIsolationPolicy(ctx, - "use", "policy1", policyData)); + "use", "policy1", policyData, true)); asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); try { From 81ba58a40e11288592aa9ec95402906178e3bfe2 Mon Sep 17 00:00:00 2001 From: iosdev747 Date: Mon, 29 Jul 2024 22:12:20 +0530 Subject: [PATCH 2/7] Revert "fix ns isolation policy for repl namespaces" This reverts commit debf0ff034c796b3892751793b9dae5d1be3af66. --- .../broker/admin/impl/ClustersBase.java | 52 ++----------------- .../apache/pulsar/broker/admin/AdminTest.java | 2 +- 2 files changed, 6 insertions(+), 48 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 2fa26af409c30..2f064d7b37720 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -36,7 +36,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.PUT; @@ -56,7 +55,6 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; @@ -707,9 +705,7 @@ public void setNamespaceIsolationPolicy( @ApiParam(value = "The namespace isolation policy name", required = true) @PathParam("policyName") String policyName, @ApiParam(value = "The namespace isolation policy data", required = true) - NamespaceIsolationDataImpl policyData, - @DefaultValue("true") - @QueryParam("unloadBundles") boolean unload + NamespaceIsolationDataImpl policyData ) { validateSuperUserAccessAsync() .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) @@ -727,13 +723,7 @@ public void setNamespaceIsolationPolicy( nsIsolationPolicies.setPolicy(policyName, policyData); return namespaceIsolationPolicies() .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies()); - }).thenCompose(__ -> { - if (unload) { - return filterAndUnloadMatchedNamespaceAsync(cluster, policyData); - } else { - return CompletableFuture.completedFuture(null); - } - }) + }).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(policyData)) .thenAccept(__ -> { log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.", clientAppId(), cluster, policyName); @@ -767,8 +757,7 @@ public void setNamespaceIsolationPolicy( /** * Get matched namespaces; call unload for each namespaces. */ - private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String cluster, - NamespaceIsolationDataImpl policyData) { + private CompletableFuture filterAndUnloadMatchedNamespaceAsync(NamespaceIsolationDataImpl policyData) { PulsarAdmin adminClient; try { adminClient = pulsar().getAdminClient(); @@ -781,13 +770,8 @@ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String clus .map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant)); return FutureUtil.waitForAll(completableFutureStream) .thenApply(namespaces -> { - // Filter namespaces that have current cluster in their replication_clusters - // if namespace match any policy regex, add it to ns list to be unloaded. + // if namespace match any policy regex, add it to ns list to be unload. return namespaces.stream() - .filter(namespaceName -> adminClient.namespaces() - .getPoliciesAsync(namespaceName) - .thenApply(policies -> policies.replication_clusters.contains(cluster)) - .join()) .filter(namespaceName -> policyData.getNamespaces().stream().anyMatch(namespaceName::matches)) .collect(Collectors.toList()); @@ -797,33 +781,7 @@ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String clus return CompletableFuture.completedFuture(null); } List> futures = shouldUnloadNamespaces.stream() - .map(namespaceName -> { - try { - return adminClient.namespaces() - .getPolicies(namespaceName); - } catch (PulsarAdminException e) { - log.warn("[{}] Failed to get policy for {} namespace.", clientAppId(), - namespaceName, e); - throw new RuntimeException(e); - } - }) - .map(policies -> { - final List> unloadFutures = new ArrayList<>(); - List boundaries = policies.bundles.getBoundaries(); - for (int i = 0; i < boundaries.size() - 1; i++) { - String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1)); - try { - unloadFutures.add( - pulsar().getAdminClient().namespaces().unloadNamespaceBundleAsync( - namespaceName.toString(), bundle)); - } catch (PulsarServerException e) { - log.error("[{}] Failed to unload namespace {}", clientAppId(), namespaceName, - e); - throw new RestException(e); - } - } - return FutureUtil.waitForAll(unloadFutures); - }) + .map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName)) .collect(Collectors.toList()); return FutureUtil.waitForAll(futures) .thenAccept(__ -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 9d216014e7738..2894903c0d0c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -309,7 +309,7 @@ public void clusters() throws Exception { .build()) .build(); asyncRequests(ctx -> clusters.setNamespaceIsolationPolicy(ctx, - "use", "policy1", policyData, true)); + "use", "policy1", policyData)); asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); try { From 670f9a2e5505fa4f0a72e32575200ca737464d0d Mon Sep 17 00:00:00 2001 From: iosdev747 Date: Mon, 29 Jul 2024 22:15:45 +0530 Subject: [PATCH 3/7] fix ns isolation policy for repl namespaces --- .../broker/admin/impl/ClustersBase.java | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 2f064d7b37720..7472775abe2d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -36,6 +36,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.PUT; @@ -55,6 +56,7 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; @@ -705,7 +707,9 @@ public void setNamespaceIsolationPolicy( @ApiParam(value = "The namespace isolation policy name", required = true) @PathParam("policyName") String policyName, @ApiParam(value = "The namespace isolation policy data", required = true) - NamespaceIsolationDataImpl policyData + NamespaceIsolationDataImpl policyData, + @DefaultValue("true") + @QueryParam("unloadBundles") boolean unload ) { validateSuperUserAccessAsync() .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) @@ -722,8 +726,14 @@ public void setNamespaceIsolationPolicy( ).thenCompose(nsIsolationPolicies -> { nsIsolationPolicies.setPolicy(policyName, policyData); return namespaceIsolationPolicies() - .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies()); - }).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(policyData)) + .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies()); + }).thenCompose(__ -> { + if (unload) { + return filterAndUnloadMatchedNamespaceAsync(cluster, policyData); + } else { + return CompletableFuture.completedFuture(null); + } + }) .thenAccept(__ -> { log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.", clientAppId(), cluster, policyName); @@ -757,7 +767,8 @@ public void setNamespaceIsolationPolicy( /** * Get matched namespaces; call unload for each namespaces. */ - private CompletableFuture filterAndUnloadMatchedNamespaceAsync(NamespaceIsolationDataImpl policyData) { + private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String cluster, + NamespaceIsolationDataImpl policyData) { PulsarAdmin adminClient; try { adminClient = pulsar().getAdminClient(); @@ -770,8 +781,13 @@ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(NamespaceIs .map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant)); return FutureUtil.waitForAll(completableFutureStream) .thenApply(namespaces -> { - // if namespace match any policy regex, add it to ns list to be unload. + // Filter namespaces that have current cluster in their replication_clusters + // if namespace match any policy regex, add it to ns list to be unloaded. return namespaces.stream() + .filter(namespaceName -> adminClient.namespaces() + .getPoliciesAsync(namespaceName) + .thenApply(policies -> policies.replication_clusters.contains(cluster)) + .join()) .filter(namespaceName -> policyData.getNamespaces().stream().anyMatch(namespaceName::matches)) .collect(Collectors.toList()); From 32da1002a88a8997adc0107b6b238c85240f5080 Mon Sep 17 00:00:00 2001 From: iosdev747 Date: Mon, 29 Jul 2024 22:30:12 +0530 Subject: [PATCH 4/7] fix admin tests --- .../src/test/java/org/apache/pulsar/broker/admin/AdminTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 2894903c0d0c1..9d216014e7738 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -309,7 +309,7 @@ public void clusters() throws Exception { .build()) .build(); asyncRequests(ctx -> clusters.setNamespaceIsolationPolicy(ctx, - "use", "policy1", policyData)); + "use", "policy1", policyData, true)); asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); try { From 00819ecddc03caf623d3101905e920277aee0027 Mon Sep 17 00:00:00 2001 From: iosdev747 Date: Mon, 29 Jul 2024 22:58:19 +0530 Subject: [PATCH 5/7] remove unused import --- .../java/org/apache/pulsar/broker/admin/impl/ClustersBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 23fb1c7d445f4..36ea193a643b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -56,7 +56,6 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; From a2e37fe304c4479ddf6c83cf6b62946b69ed90ba Mon Sep 17 00:00:00 2001 From: iosdev747 Date: Tue, 30 Jul 2024 01:38:47 +0530 Subject: [PATCH 6/7] add unload bundle flag in pulsar admin (+CLI) and change default --- .../broker/admin/impl/ClustersBase.java | 2 +- .../apache/pulsar/client/admin/Clusters.java | 37 +++++++++++++++++-- .../client/admin/internal/ClustersImpl.java | 25 +++++++------ .../cli/CmdNamespaceIsolationPolicy.java | 6 ++- 4 files changed, 52 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 36ea193a643b9..f31fa10bac6fe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -708,7 +708,7 @@ public void setNamespaceIsolationPolicy( @PathParam("policyName") String policyName, @ApiParam(value = "The namespace isolation policy data", required = true) NamespaceIsolationDataImpl policyData, - @DefaultValue("true") + @DefaultValue("false") @QueryParam("unloadBundles") boolean unload ) { validateSuperUserAccessAsync() diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java index 53e6680946566..3cf014862cc02 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java @@ -38,6 +38,12 @@ * Admin interface for clusters management. */ public interface Clusters { + + /** + * Defaults for all the flags. + */ + boolean UNLOAD_BUNDLE_DEFAULT = false; + /** * Get the list of clusters. *

@@ -418,9 +424,15 @@ Map getNamespaceIsolationPolicies(String cluster * Unexpected error */ void createNamespaceIsolationPolicy( - String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException; + default void createNamespaceIsolationPolicy( + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) + throws PulsarAdminException { + createNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT); + } + /** * Create a namespace isolation policy for a cluster asynchronously. *

@@ -437,7 +449,13 @@ void createNamespaceIsolationPolicy( * @return */ CompletableFuture createNamespaceIsolationPolicyAsync( - String cluster, String policyName, NamespaceIsolationData namespaceIsolationData); + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles); + + default CompletableFuture createNamespaceIsolationPolicyAsync( + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) { + return createNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT); + } + /** * Returns list of active brokers with namespace-isolation policies attached to it. @@ -506,9 +524,15 @@ CompletableFuture getBrokerWithNamespaceIsolationP * Unexpected error */ void updateNamespaceIsolationPolicy( - String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException; + default void updateNamespaceIsolationPolicy( + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) + throws PulsarAdminException { + updateNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT); + } + /** * Update a namespace isolation policy for a cluster asynchronously. *

@@ -526,7 +550,12 @@ void updateNamespaceIsolationPolicy( * */ CompletableFuture updateNamespaceIsolationPolicyAsync( - String cluster, String policyName, NamespaceIsolationData namespaceIsolationData); + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles); + + default CompletableFuture updateNamespaceIsolationPolicyAsync( + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) { + return updateNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT); + } /** * Delete a namespace isolation policy for a cluster. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java index 231d4506d6173..02385ccc55c49 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java @@ -202,26 +202,26 @@ public CompletableFuture getBrokerWithNamespaceIso @Override public void createNamespaceIsolationPolicy(String cluster, String policyName, - NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException { - setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData); + NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException { + setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, unloadBundles); } @Override public CompletableFuture createNamespaceIsolationPolicyAsync( - String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) { - return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData); + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) { + return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, unloadBundles); } @Override public void updateNamespaceIsolationPolicy(String cluster, String policyName, - NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException { - setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData); + NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException { + setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, unloadBundles); } @Override public CompletableFuture updateNamespaceIsolationPolicyAsync( - String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) { - return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData); + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) { + return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, unloadBundles); } @Override @@ -236,13 +236,14 @@ public CompletableFuture deleteNamespaceIsolationPolicyAsync(String cluste } private void setNamespaceIsolationPolicy(String cluster, String policyName, - NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException { - sync(() -> setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData)); + NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException { + sync(() -> setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, unloadBundles)); } private CompletableFuture setNamespaceIsolationPolicyAsync(String cluster, String policyName, - NamespaceIsolationData namespaceIsolationData) { - WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName); + NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) { + WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName) + .queryParam("unloadBundles", unloadBundles); return asyncPostRequest(path, Entity.entity(namespaceIsolationData, MediaType.APPLICATION_JSON)); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java index e9896decd8c96..8ede00294491d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java @@ -73,12 +73,16 @@ private class SetPolicy extends CliCommand { required = true, split = ",") private Map autoFailoverPolicyParams; + @Option(names = "--unloadBundles", description = "Unload namespace bundles after applying policy") + private boolean unloadBundles; + void run() throws PulsarAdminException { // validate and create the POJO NamespaceIsolationData namespaceIsolationData = createNamespaceIsolationData(namespaces, primary, secondary, autoFailoverPolicyTypeName, autoFailoverPolicyParams); - getAdmin().clusters().createNamespaceIsolationPolicy(clusterName, policyName, namespaceIsolationData); + getAdmin().clusters() + .createNamespaceIsolationPolicy(clusterName, policyName, namespaceIsolationData, unloadBundles); } } From 38a2349592c59f898795c5e67d2d5f5ef98d92a6 Mon Sep 17 00:00:00 2001 From: iosdev747 Date: Wed, 31 Jul 2024 22:00:40 +0530 Subject: [PATCH 7/7] add filter to unload namespaces added/removed by policy change --- .../broker/admin/impl/ClustersBase.java | 28 ++++++++-- ...ApiNamespaceIsolationMultiBrokersTest.java | 56 ++++++++++++++++--- .../policies/NamespaceIsolationPolicy.java | 7 +++ .../impl/NamespaceIsolationPolicyImpl.java | 5 ++ 4 files changed, 83 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index e9a8ce2f8ba8e..5eeebac4db96b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -33,8 +33,10 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; @@ -58,6 +60,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamedEntity; +import org.apache.pulsar.common.policies.NamespaceIsolationPolicy; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.ClusterData; @@ -724,10 +727,15 @@ public void setNamespaceIsolationPolicy( .setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap()) .thenApply(__ -> new NamespaceIsolationPolicies())) ).thenCompose(nsIsolationPolicies -> { + NamespaceIsolationPolicy currentIsolationPolicy = nsIsolationPolicies.getPolicyByName(policyName); + List oldNamespaceRegex = currentIsolationPolicy == null ? new ArrayList<>() : + currentIsolationPolicy.getNamespaces(); nsIsolationPolicies.setPolicy(policyName, policyData); return namespaceIsolationPolicies() - .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies()); - }).thenCompose(__ -> unload ? filterAndUnloadMatchedNamespaceAsync(cluster, policyData) : + .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies()) + .thenApply(policy -> oldNamespaceRegex); + }).thenCompose(oldNSRegex -> unload ? filterAndUnloadMatchedNamespaceAsync(cluster, policyData, + oldNSRegex) : CompletableFuture.completedFuture(null)) .thenAccept(__ -> { log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.", @@ -763,21 +771,33 @@ public void setNamespaceIsolationPolicy( * Get matched namespaces; call unload for each namespaces. */ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String cluster, - NamespaceIsolationDataImpl policyData) { + NamespaceIsolationDataImpl policyData, + List oldNSRegex) { PulsarAdmin adminClient; try { adminClient = pulsar().getAdminClient(); } catch (PulsarServerException e) { return FutureUtil.failedFuture(e); } + List currentRegex = policyData.getNamespaces(); + // (new ∩ old), namespaces to remove matching this regex + List commonRegex = currentRegex.stream().distinct().filter(oldNSRegex::contains).toList(); // compile regex patterns once - List namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList(); + List excludeNamespacePatterns = commonRegex.stream().map(Pattern::compile).toList(); + // (new ⋃ old), namespaces to keep matching this regex + List namespacePatterns = + Stream.concat(currentRegex.stream(), oldNSRegex.stream()).map(Pattern::compile).toList(); return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> { List>> filteredNamespacesForEachTenant = tenants.stream() .map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> { List> namespaceNamesInCluster = namespaces.stream() + // filter namespaces using ns regex from current policy .filter(namespaceName -> namespacePatterns.stream() .anyMatch(pattern -> pattern.matcher(namespaceName).matches())) + // remove namespaces using old ns regex + // only unloads namespaces matching delta of policy, (new ⋃ old) - (new ∩ old) + .filter(((Predicate) (namespaceName -> excludeNamespacePatterns.stream() + .anyMatch(pattern -> pattern.matcher(namespaceName).matches()))).negate()) .map(namespaceName -> adminClient.namespaces().getPoliciesAsync(namespaceName) .thenApply(policies -> policies.replication_clusters.contains(cluster) ? namespaceName : null)) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java index da7d95d677af8..6828ac9dcf026 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java @@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.MultiBrokerBaseTest; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; import org.apache.pulsar.common.policies.data.ClusterData; @@ -48,6 +49,11 @@ public class AdminApiNamespaceIsolationMultiBrokersTest extends MultiBrokerBaseT PulsarAdmin localAdmin; PulsarAdmin remoteAdmin; + @Override + protected int numberOfAdditionalBrokers() { + return 4; + } + @Override protected void doInitConf() throws Exception { super.doInitConf(); @@ -69,23 +75,39 @@ public void setupClusters() throws Exception { .createCluster("cluster-1", ClusterData.builder().serviceUrl(localBrokerWebService).build()); remoteAdmin.clusters() .createCluster("cluster-2", ClusterData.builder().serviceUrl(remoteBrokerWebService).build()); + setupForTenant("A"); + setupForTenant("B"); + } + + private void setupForTenant(String prefix) throws PulsarAdminException { TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of(""), Set.of("test", "cluster-1", "cluster-2")); - localAdmin.tenants().createTenant("prop-ig", tenantInfo); - localAdmin.namespaces().createNamespace("prop-ig/ns1", Set.of("test", "cluster-1")); + localAdmin.tenants().createTenant(prefix+"prop-ig", tenantInfo); + localAdmin.namespaces().createNamespace(prefix+"prop-ig/ns1", Set.of("test", "cluster-1")); + localAdmin.namespaces().createNamespace(prefix+"prop-ig/n1", Set.of("test", "cluster-1")); + localAdmin.topics().createNonPartitionedTopic(prefix+"prop-ig/ns1/t1"); + } + + public void testNamespaceIsolationPolicyForReplNSWithUnload() throws Exception { + testNamespaceIsolationPolicyForReplNS("A", "policy-1", true); + } + + public void testNamespaceIsolationPolicyForReplNSWithoutUnload() throws Exception { + testNamespaceIsolationPolicyForReplNS("B", "policy-2", false); } - public void testNamespaceIsolationPolicyForReplNS() throws Exception { + private void testNamespaceIsolationPolicyForReplNS(String prefix, String policyName1, boolean unload) throws Exception { - // Verify that namespace is not present in cluster-2. - Set replicationClusters = localAdmin.namespaces().getPolicies("prop-ig/ns1").replication_clusters; + // Verify that namespaces are not present in cluster-2. + Set replicationClusters = localAdmin.namespaces().getPolicies(prefix+"prop-ig/ns1").replication_clusters; + Assert.assertFalse(replicationClusters.contains("cluster-2")); + replicationClusters = localAdmin.namespaces().getPolicies(prefix+"prop-ig/n1").replication_clusters; Assert.assertFalse(replicationClusters.contains("cluster-2")); // setup ns-isolation-policy in both the clusters. - String policyName1 = "policy-1"; Map parameters1 = new HashMap<>(); parameters1.put("min_limit", "1"); parameters1.put("usage_threshold", "100"); - List nsRegexList = new ArrayList<>(Arrays.asList("prop-ig/.*")); + List nsRegexList = new ArrayList<>(Arrays.asList(prefix+"prop-ig/ns1.*")); NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder() // "prop-ig/ns1" is present in test cluster, policy set on test2 should work @@ -98,17 +120,33 @@ public void testNamespaceIsolationPolicyForReplNS() throws Exception { .build()) .build(); - localAdmin.clusters().createNamespaceIsolationPolicy("test", policyName1, nsPolicyData1); + // 1. Create policy should work in local cluster + localAdmin.clusters().createNamespaceIsolationPolicy("test", policyName1, nsPolicyData1, unload); // verify policy is present in local cluster Map policiesMap = localAdmin.clusters().getNamespaceIsolationPolicies("test"); assertEquals(policiesMap.get(policyName1), nsPolicyData1); - remoteAdmin.clusters().createNamespaceIsolationPolicy("cluster-2", policyName1, nsPolicyData1); + // 2. Create policy should work in remote cluster + remoteAdmin.clusters().createNamespaceIsolationPolicy("cluster-2", policyName1, nsPolicyData1, unload); // verify policy is present in remote cluster policiesMap = remoteAdmin.clusters().getNamespaceIsolationPolicies("cluster-2"); assertEquals(policiesMap.get(policyName1), nsPolicyData1); + // 3. Update (add) policy should work in local cluster + nsPolicyData1.getNamespaces().add(prefix+"prop-ig/n1.*"); // this will add public/.* namespaces + localAdmin.clusters().updateNamespaceIsolationPolicy("test", policyName1, nsPolicyData1, unload); + // verify policy is present in local cluster + policiesMap = localAdmin.clusters().getNamespaceIsolationPolicies("test"); + assertEquals(policiesMap.get(policyName1), nsPolicyData1); + + // 4. Update (remove) policy should work in local cluster + nsPolicyData1.getNamespaces().remove(prefix+"prop-ig/n1.*"); // this will add public/.* namespaces + localAdmin.clusters().updateNamespaceIsolationPolicy("test", policyName1, nsPolicyData1, unload); + // verify policy is present in local cluster + policiesMap = localAdmin.clusters().getNamespaceIsolationPolicies("test"); + assertEquals(policiesMap.get(policyName1), nsPolicyData1); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java index bd28d30d4cee9..1742ae25e8d67 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java @@ -43,6 +43,13 @@ public interface NamespaceIsolationPolicy { */ List getSecondaryBrokers(); + /** + * Get the list of namespace regex used in policy. + * + * @return + */ + List getNamespaces(); + /** * Get the list of primary brokers for the namespace according to the policy. * diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java index af3663869fa02..9e451e9e67e84 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java @@ -76,6 +76,11 @@ public List getSecondaryBrokers() { return this.secondary; } + @Override + public List getNamespaces() { + return this.namespaces; + } + @Override public List findPrimaryBrokers(List availableBrokers, NamespaceName namespace) { if (!this.matchNamespaces(namespace.toString())) {