Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] fix how ns-isolation-policy API works for replicated namespaces #23094

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
Expand Down Expand Up @@ -706,7 +707,9 @@ public void setNamespaceIsolationPolicy(
@ApiParam(value = "The namespace isolation policy name", required = true)
@PathParam("policyName") String policyName,
@ApiParam(value = "The namespace isolation policy data", required = true)
NamespaceIsolationDataImpl policyData
NamespaceIsolationDataImpl policyData,
@DefaultValue("false")
@QueryParam("unloadBundles") boolean unload
) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
Expand All @@ -723,8 +726,14 @@ public void setNamespaceIsolationPolicy(
).thenCompose(nsIsolationPolicies -> {
nsIsolationPolicies.setPolicy(policyName, policyData);
return namespaceIsolationPolicies()
.setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies());
}).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(policyData))
.setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies());
}).thenCompose(__ -> {
if (unload) {
return filterAndUnloadMatchedNamespaceAsync(cluster, policyData);
} else {
return CompletableFuture.completedFuture(null);
}
})
.thenAccept(__ -> {
log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.",
clientAppId(), cluster, policyName);
Expand Down Expand Up @@ -758,7 +767,8 @@ public void setNamespaceIsolationPolicy(
/**
* Get matched namespaces; call unload for each namespaces.
*/
private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(NamespaceIsolationDataImpl policyData) {
private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String cluster,
NamespaceIsolationDataImpl policyData) {
PulsarAdmin adminClient;
try {
adminClient = pulsar().getAdminClient();
Expand All @@ -771,8 +781,13 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(NamespaceIs
.map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant));
return FutureUtil.waitForAll(completableFutureStream)
.thenApply(namespaces -> {
// if namespace match any policy regex, add it to ns list to be unload.
// Filter namespaces that have current cluster in their replication_clusters
// if namespace match any policy regex, add it to ns list to be unloaded.
return namespaces.stream()
.filter(namespaceName -> adminClient.namespaces()
.getPoliciesAsync(namespaceName)
.thenApply(policies -> policies.replication_clusters.contains(cluster))
.join())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be a blocking operation. it would be better to make it asynchronous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the filter to work, I need to call join to wait for the future completion. The reason for doing it this way is that policy is required to decide whether to remove the namespace or not. Can you suggest how I can make it async? Nothing better comes to my mind that doesn't need too much code refactoring...

Copy link
Member

@lhotari lhotari Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you suggest how I can make it async?

There's a real problem already with the existing code, even without unloading calls. I've explained some of that in the previous comment in #23094 (comment) .
One of the problems is that all tenants and namespaces will be listed concurrently at once, without any concurrency limits. That alone will cause problems.

To fix the problem, the solution for making asynchronous calls will need concurrency limits. I'd suggest introducing a dependency to

    <dependency>
        <groupId>com.spotify</groupId>
        <artifactId>completable-futures</artifactId>
        <version>0.3.6</version>
    </dependency>

and using the https://github.com/spotify/completable-futures/blob/master/src/main/java/com/spotify/futures/ConcurrencyReducer.java class for controlling the concurrency.

This challenge is that this is a systemic problem at a higher level and solving this problem in this PR might feel overly complex. However, it's possible to handle it incrementally and refactor later.

For making the code asynchronous without blocking calls, composition is needed by using thenCompose/thenApply.
In this case, it's not trivial, so it requires a bit more thought than usual since the unlimited concurrency problem needs to also be solved.

.filter(namespaceName ->
policyData.getNamespaces().stream().anyMatch(namespaceName::matches))
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public void clusters() throws Exception {
.build())
.build();
asyncRequests(ctx -> clusters.setNamespaceIsolationPolicy(ctx,
"use", "policy1", policyData));
"use", "policy1", policyData, true));
asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
* Admin interface for clusters management.
*/
public interface Clusters {

/**
* Defaults for all the flags.
*/
boolean UNLOAD_BUNDLE_DEFAULT = false;

/**
* Get the list of clusters.
* <p/>
Expand Down Expand Up @@ -418,9 +424,15 @@ Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(String cluster
* Unexpected error
*/
void createNamespaceIsolationPolicy(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData)
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles)
throws PulsarAdminException;

default void createNamespaceIsolationPolicy(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData)
throws PulsarAdminException {
createNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT);
}

/**
* Create a namespace isolation policy for a cluster asynchronously.
* <p/>
Expand All @@ -437,7 +449,13 @@ void createNamespaceIsolationPolicy(
* @return
*/
CompletableFuture<Void> createNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData);
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles);

default CompletableFuture<Void> createNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
return createNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT);
}


/**
* Returns list of active brokers with namespace-isolation policies attached to it.
Expand Down Expand Up @@ -506,9 +524,15 @@ CompletableFuture<BrokerNamespaceIsolationData> getBrokerWithNamespaceIsolationP
* Unexpected error
*/
void updateNamespaceIsolationPolicy(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData)
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles)
throws PulsarAdminException;

default void updateNamespaceIsolationPolicy(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData)
throws PulsarAdminException {
updateNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT);
}

/**
* Update a namespace isolation policy for a cluster asynchronously.
* <p/>
Expand All @@ -526,7 +550,12 @@ void updateNamespaceIsolationPolicy(
*
*/
CompletableFuture<Void> updateNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData);
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles);

default CompletableFuture<Void> updateNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
return updateNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT);
}

/**
* Delete a namespace isolation policy for a cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,26 +202,26 @@ public CompletableFuture<BrokerNamespaceIsolationData> getBrokerWithNamespaceIso

@Override
public void createNamespaceIsolationPolicy(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData);
NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException {
setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, unloadBundles);
}

@Override
public CompletableFuture<Void> createNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData);
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) {
return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, unloadBundles);
}

@Override
public void updateNamespaceIsolationPolicy(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData);
NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException {
setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, unloadBundles);
}

@Override
public CompletableFuture<Void> updateNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData);
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) {
return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, unloadBundles);
}

@Override
Expand All @@ -236,13 +236,14 @@ public CompletableFuture<Void> deleteNamespaceIsolationPolicyAsync(String cluste
}

private void setNamespaceIsolationPolicy(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
sync(() -> setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData));
NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException {
sync(() -> setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, unloadBundles));
}

private CompletableFuture<Void> setNamespaceIsolationPolicyAsync(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) {
WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName);
NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) {
WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName)
.queryParam("unloadBundles", unloadBundles);
return asyncPostRequest(path, Entity.entity(namespaceIsolationData, MediaType.APPLICATION_JSON));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ private class SetPolicy extends CliCommand {
required = true, split = ",")
private Map<String, String> autoFailoverPolicyParams;

@Option(names = "--unloadBundles", description = "Unload namespace bundles after applying policy")
private boolean unloadBundles;

void run() throws PulsarAdminException {
// validate and create the POJO
NamespaceIsolationData namespaceIsolationData = createNamespaceIsolationData(namespaces, primary, secondary,
autoFailoverPolicyTypeName, autoFailoverPolicyParams);

getAdmin().clusters().createNamespaceIsolationPolicy(clusterName, policyName, namespaceIsolationData);
getAdmin().clusters()
.createNamespaceIsolationPolicy(clusterName, policyName, namespaceIsolationData, unloadBundles);
}
}

Expand Down