Skip to content

Commit

Permalink
[fix] [broker] fix replicated namespaces filter in filterAndUnloadMat…
Browse files Browse the repository at this point in the history
…chedNamespaceAsync (#23100)

Co-authored-by: Lari Hotari <[email protected]>
  • Loading branch information
2 people authored and Technoboy- committed Aug 1, 2024
1 parent 2add3ab commit 6c01bb0
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -683,8 +683,8 @@ public void setNamespaceIsolationPolicy(
).thenCompose(nsIsolationPolicies -> {
nsIsolationPolicies.setPolicy(policyName, policyData);
return namespaceIsolationPolicies()
.setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies());
}).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(policyData))
.setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies());
}).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData))
.thenAccept(__ -> {
log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.",
clientAppId(), cluster, policyName);
Expand Down Expand Up @@ -718,42 +718,53 @@ public void setNamespaceIsolationPolicy(
/**
* Get matched namespaces; call unload for each namespaces.
*/
private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(NamespaceIsolationDataImpl policyData) {
private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String cluster,
NamespaceIsolationDataImpl policyData) {
PulsarAdmin adminClient;
try {
adminClient = pulsar().getAdminClient();
} catch (PulsarServerException e) {
return FutureUtil.failedFuture(e);
}
return adminClient.tenants().getTenantsAsync()
.thenCompose(tenants -> {
Stream<CompletableFuture<List<String>>> completableFutureStream = tenants.stream()
.map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant));
return FutureUtil.waitForAll(completableFutureStream)
.thenApply(namespaces -> {
// if namespace match any policy regex, add it to ns list to be unload.
return namespaces.stream()
.filter(namespaceName ->
policyData.getNamespaces().stream().anyMatch(namespaceName::matches))
.collect(Collectors.toList());
});
}).thenCompose(shouldUnloadNamespaces -> {
if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
return CompletableFuture.completedFuture(null);
}
List<CompletableFuture<Void>> futures = shouldUnloadNamespaces.stream()
.map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName))
.collect(Collectors.toList());
return FutureUtil.waitForAll(futures)
.thenAccept(__ -> {
try {
// write load info to load manager to make the load happens fast
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
} catch (Exception e) {
log.warn("[{}] Failed to writeLoadReportOnZookeeper.", clientAppId(), e);
}
});
});
// compile regex patterns once
List<Pattern> namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList();
return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> {
List<CompletableFuture<List<String>>> filteredNamespacesForEachTenant = tenants.stream()
.map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
List<CompletableFuture<String>> namespaceNamesInCluster = namespaces.stream()
.filter(namespaceName -> namespacePatterns.stream()
.anyMatch(pattern -> pattern.matcher(namespaceName).matches()))
.map(namespaceName -> adminClient.namespaces().getPoliciesAsync(namespaceName)
.thenApply(policies -> policies.replication_clusters.contains(cluster)
? namespaceName : null))
.collect(Collectors.toList());
return FutureUtil.waitForAll(namespaceNamesInCluster).thenApply(
__ -> namespaceNamesInCluster.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
})).toList();
return FutureUtil.waitForAll(filteredNamespacesForEachTenant)
.thenApply(__ -> filteredNamespacesForEachTenant.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList()));
}).thenCompose(shouldUnloadNamespaces -> {
if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
return CompletableFuture.completedFuture(null);
}
List<CompletableFuture<Void>> futures = shouldUnloadNamespaces.stream()
.map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName))
.collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenAccept(__ -> {
try {
// write load info to load manager to make the load happens fast
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
} catch (Exception e) {
log.warn("[{}] Failed to writeLoadReportOnZookeeper.", clientAppId(), e);
}
});
});
}

@DELETE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;

import static org.testng.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/**
* Test multi-broker admin api.
*/
@Slf4j
@Test(groups = "broker-admin")
public class AdminApiNamespaceIsolationMultiBrokersTest extends MultiBrokerBaseTest {

PulsarAdmin localAdmin;
PulsarAdmin remoteAdmin;

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setManagedLedgerMaxEntriesPerLedger(10);
}

@Override
protected void onCleanup() {
super.onCleanup();
}

@BeforeClass
public void setupClusters() throws Exception {
localAdmin = getAllAdmins().get(1);
remoteAdmin = getAllAdmins().get(2);
String localBrokerWebService = additionalPulsarTestContexts.get(0).getPulsarService().getWebServiceAddress();
String remoteBrokerWebService = additionalPulsarTestContexts.get(1).getPulsarService().getWebServiceAddress();
localAdmin.clusters()
.createCluster("cluster-1", ClusterData.builder().serviceUrl(localBrokerWebService).build());
remoteAdmin.clusters()
.createCluster("cluster-2", ClusterData.builder().serviceUrl(remoteBrokerWebService).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of(""), Set.of("test", "cluster-1", "cluster-2"));
localAdmin.tenants().createTenant("prop-ig", tenantInfo);
localAdmin.namespaces().createNamespace("prop-ig/ns1", Set.of("test", "cluster-1"));
}

public void testNamespaceIsolationPolicyForReplNS() throws Exception {

// Verify that namespace is not present in cluster-2.
Set<String> replicationClusters = localAdmin.namespaces().getPolicies("prop-ig/ns1").replication_clusters;
Assert.assertFalse(replicationClusters.contains("cluster-2"));

// setup ns-isolation-policy in both the clusters.
String policyName1 = "policy-1";
Map<String, String> parameters1 = new HashMap<>();
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");
List<String> nsRegexList = new ArrayList<>(Arrays.asList("prop-ig/.*"));

NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
// "prop-ig/ns1" is present in test cluster, policy set on test2 should work
.namespaces(nsRegexList)
.primary(Collections.singletonList(".*"))
.secondary(Collections.singletonList(""))
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
.build())
.build();

localAdmin.clusters().createNamespaceIsolationPolicy("test", policyName1, nsPolicyData1);
// verify policy is present in local cluster
Map<String, ? extends NamespaceIsolationData> policiesMap =
localAdmin.clusters().getNamespaceIsolationPolicies("test");
assertEquals(policiesMap.get(policyName1), nsPolicyData1);

remoteAdmin.clusters().createNamespaceIsolationPolicy("cluster-2", policyName1, nsPolicyData1);
// verify policy is present in remote cluster
policiesMap = remoteAdmin.clusters().getNamespaceIsolationPolicies("cluster-2");
assertEquals(policiesMap.get(policyName1), nsPolicyData1);

}

}

0 comments on commit 6c01bb0

Please sign in to comment.