Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static NamespaceBundle suBundleFromPath(String path, NamespaceBundleFacto
Range<Long> range = getHashRange(parts[5]);
return factory.getBundle(NamespaceName.get(parts[2], parts[3], parts[4]), range);
} else {
// this is a V2 path prop/namespace/hash
// this is a V2 path tenant/namespace/hash
Range<Long> range = getHashRange(parts[4]);
return factory.getBundle(NamespaceName.get(parts[2], parts[3]), range);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ public static String getReplicatorName(String replicatorPrefix, String cluster)
*
* <pre>
* eg:
* if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic with 2 partitions then
* if topic : persistent://tenant/ns/my-topic is a partitioned topic with 2 partitions then
* broker explicitly creates replicator producer for: "my-topic-partition-1" and "my-topic-partition-2".
*
* However, if broker tries to start producer with root topic "my-topic" then client-lib internally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,9 @@ public static String getDefaultBundleRange() {
}

/*
* @param path - path for the namespace policies ex. /admin/policies/prop/cluster/namespace
* @param path - path for the namespace policies ex. /admin/policies/tenant/namespace
*
* @returns namespace with path, ex. prop/cluster/namespace
* @returns namespace with path, ex. tenant/namespace
*/
public static String getNamespaceFromPoliciesPath(String path) {
if (path.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ protected void cleanup() throws Exception {

@BeforeMethod
protected void setupTest() throws Exception {
namespace = "prop/" + UUID.randomUUID();
namespace = BrokerTestUtil.newUniqueName("tenant/ns");
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
assertTrue(admin.namespaces().getNamespaces("prop").contains(namespace));
assertTrue(admin.namespaces().getNamespaces("tenant").contains(namespace));
admin.namespaces().setRetention(namespace, new RetentionPolicies(3, 10));
try (PulsarAdmin admin2 = createPulsarAdmin()) {
Awaitility.await().untilAsserted(() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1132,20 +1132,19 @@ public void testReplicationPeerCluster() throws Exception {
ClusterData.builder().serviceUrl("http://broker.messaging.east1.example.com:8080").build());
admin.clusters().createCluster("us-east2",
ClusterData.builder().serviceUrl("http://broker.messaging.east2.example.com:8080").build());
admin.clusters().createCluster("global", ClusterData.builder().build());

List<String> allClusters = admin.clusters().getClusters();
Collections.sort(allClusters);
assertEquals(allClusters,
List.of("test", "us-east1", "us-east2", "us-west1", "us-west2", "us-west3", "us-west4"));

final String property = newUniqueName("peer-prop");
final String tenant = newUniqueName("peer-tenant");
Set<String> allowedClusters = Set.of("us-west1", "us-west2", "us-west3", "us-west4", "us-east1",
"us-east2", "global");
TenantInfoImpl propConfig = new TenantInfoImpl(Set.of("test"), allowedClusters);
admin.tenants().createTenant(property, propConfig);
"us-east2", "test");
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("test"), allowedClusters);
admin.tenants().createTenant(tenant, tenantInfo);

final String namespace = property + "/global/conflictPeer";
final String namespace = tenant + "/conflictPeer";
admin.namespaces().createNamespace(namespace);

admin.clusters().updatePeerClusterNames("us-west1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public class AdminApiGetLastMessageIdTest extends MockedPulsarServiceBaseTest {
protected void setup() throws Exception {
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("prop",
admin.tenants().createTenant("tenant",
new TenantInfoImpl(Set.of("appid1"), Set.of("test")));
admin.namespaces().createNamespace("prop/ns-abc");
admin.namespaces().setNamespaceReplicationClusters("prop/ns-abc", Set.of("test"));
admin.namespaces().createNamespace("tenant/ns-abc");
admin.namespaces().setNamespaceReplicationClusters("tenant/ns-abc", Set.of("test"));
persistentTopics = spy(PersistentTopics.class);
persistentTopics.setServletContext(new MockServletContext());
persistentTopics.setPulsar(pulsar);
Expand Down Expand Up @@ -172,7 +172,7 @@ public Map<Class<?>, Collection<Class<?>>> register(Object callback, Object... c
}

String key = "legendtkl";
final String topicName = "persistent://prop/ns-abc/my-topic";
final String topicName = "persistent://tenant/ns-abc/my-topic";
final String messagePredicate = "my-message-" + key + "-";
final int numberOfMessages = 30;

Expand All @@ -188,7 +188,7 @@ public Map<Class<?>, Collection<Class<?>>> register(Object callback, Object... c
producer.send(message.getBytes());
}

persistentTopics.getLastMessageId(asyncResponse, "prop", "ns-abc", "my-topic", true);
persistentTopics.getLastMessageId(asyncResponse, "tenant", "ns-abc", "my-topic", true);
Awaitility.await().until(() -> id[0] != null);
Assert.assertTrue(((MessageIdImpl) id[0]).getLedgerId() >= 0);
Assert.assertEquals(numberOfMessages - 1, ((MessageIdImpl) id[0]).getEntryId());
Expand All @@ -200,7 +200,7 @@ public Map<Class<?>, Collection<Class<?>>> register(Object callback, Object... c
String message = messagePredicate + i;
producer.send(message.getBytes());
}
persistentTopics.getLastMessageId(asyncResponse, "prop", "ns-abc", "my-topic", true);
persistentTopics.getLastMessageId(asyncResponse, "tenant", "ns-abc", "my-topic", true);
while (id[0] == messageId) {
Thread.sleep(1);
}
Expand All @@ -217,7 +217,7 @@ public Map<Class<?>, Collection<Class<?>>> register(Object callback, Object... c
*/
@Test
public void testGetLastMessageIdWhenTopicWithoutData() throws Exception {
final String topic = "persistent://prop/ns-abc/testGetLastMessageIdWhenTopicWithoutData-" + UUID.randomUUID();
final String topic = "persistent://tenant/ns-abc/testGetLastMessageIdWhenTopicWithoutData-" + UUID.randomUUID();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void testValidatePartitionedTopicNameInvalid() {

@Test
public void testValidatePartitionedTopicMetadata() throws Exception {
String tenant = "prop";
String tenant = "tenant";
String namespace = "ns-abc";
String partitionedTopic = "partitionedTopic";
String nonPartitionedTopic = "notPartitionedTopic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,19 +233,19 @@
.validateTenantOperation(this.testOtherTenant, null);

doThrow(new RestException(Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
.validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
.validateNamespacePolicyOperation(NamespaceName.get("other-tenant/test-namespace-1"),
PolicyName.PERSISTENCE, PolicyOperation.WRITE);

doThrow(new RestException(Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
.validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
.validateNamespacePolicyOperation(NamespaceName.get("other-tenant/test-namespace-1"),
PolicyName.RETENTION, PolicyOperation.WRITE);

doReturn(FutureUtil.failedFuture(new RestException(Status.UNAUTHORIZED, "unauthorized"))).when(namespaces)
.validateNamespacePolicyOperationAsync(NamespaceName.get("other-tenant/use/test-namespace-1"),
.validateNamespacePolicyOperationAsync(NamespaceName.get("other-tenant/test-namespace-1"),
PolicyName.PERSISTENCE, PolicyOperation.WRITE);

doReturn(FutureUtil.failedFuture(new RestException(Status.UNAUTHORIZED, "unauthorized"))).when(namespaces)
.validateNamespacePolicyOperationAsync(NamespaceName.get("other-tenant/use/test-namespace-1"),
.validateNamespacePolicyOperationAsync(NamespaceName.get("other-tenant/test-namespace-1"),
PolicyName.RETENTION, PolicyOperation.WRITE);

nsSvc = pulsar.getNamespaceService();
Expand Down Expand Up @@ -296,12 +296,12 @@

mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.CREATE
&& path.equals("/admin/policies/my-tenant/use/my-namespace-3");
&& path.equals("/admin/policies/my-tenant/my-namespace-3");
});
try {
asyncRequests(response -> namespaces.createNamespace(response, this.testTenant,
"use", "my-namespace-3", BundlesData.builder().build()));
fail("should have failed");

Check failure on line 304 in pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java

View workflow job for this annotation

GitHub Actions / CI - Unit - Brokers - Broker Group 3

NamespacesTest.testCreateNamespaces

should have failed
} catch (RestException e) {
// Ok
}
Expand Down Expand Up @@ -370,7 +370,7 @@

mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/admin/policies/my-tenant/use");
&& path.equals("/admin/policies/my-tenant");
});
try {
namespaces.getNamespacesForCluster(this.testTenant, this.testLocalCluster);
Expand Down Expand Up @@ -1106,7 +1106,7 @@
@Test
public void testValidateAdminAccessOnTenant() throws Exception {
try {
final String tenant = "prop";
final String tenant = "tenant";
pulsar.getConfiguration().setAuthenticationEnabled(true);
pulsar.getConfiguration().setAuthorizationEnabled(true);
pulsar.getPulsarResources().getTenantResources().createTenant(tenant,
Expand Down Expand Up @@ -2207,7 +2207,7 @@

mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.CREATE
&& path.equals("/admin/policies/my-tenant/use/my-namespace-3");
&& path.equals("/admin/policies/my-tenant/my-namespace-3");
});
try {
asyncRequests(response -> namespaces.createNamespace(response, this.testTenant,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ public void setup() throws Exception {
createTestNamespaces(this.testLocalNamespaces);

doThrow(new RestException(Response.Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
.validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
.validateNamespacePolicyOperation(NamespaceName.get("other-tenant/test-namespace-1"),
PolicyName.PERSISTENCE, PolicyOperation.WRITE);

doThrow(new RestException(Response.Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
.validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
.validateNamespacePolicyOperation(NamespaceName.get("other-tenant/test-namespace-1"),
PolicyName.RETENTION, PolicyOperation.WRITE);

nsSvc = pulsar.getNamespaceService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ public void testLoadSheddingWithNamespaceIsolationPolicies() throws Exception {

final String cluster = "use";
final String tenant = "my-tenant";
final String namespace = "my-tenant/use/my-ns";
final String namespace = "my-tenant/my-ns";
final String bundle = "0x00000000_0xffffffff";
final String brokerHost = pulsar1.getAdvertisedAddress();
final String brokerAddress = brokerHost + ":8080";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
Expand Down Expand Up @@ -54,7 +54,7 @@ protected void cleanup() throws Exception {

@Test
public void testCreateNamespaceWithDefaultBundles() throws Exception {
String namespaceName = "prop/" + UUID.randomUUID().toString();
String namespaceName = BrokerTestUtil.newUniqueName("tenant/ns");

admin.namespaces().createNamespace(namespaceName);

Expand All @@ -65,7 +65,7 @@ public void testCreateNamespaceWithDefaultBundles() throws Exception {

@Test
public void testSplitBundleUpdatesLocalPoliciesWithoutOverwriting() throws Exception {
String namespaceName = "prop/" + UUID.randomUUID().toString();
String namespaceName = BrokerTestUtil.newUniqueName("tenant/ns");
String topicName = "persistent://" + namespaceName + "/my-topic5";

admin.namespaces().createNamespace(namespaceName);
Expand All @@ -87,7 +87,7 @@ public void testSplitBundleUpdatesLocalPoliciesWithoutOverwriting() throws Excep

@Test
public void testBundleSplitListener() throws Exception {
String namespaceName = "prop/" + UUID.randomUUID().toString();
String namespaceName = BrokerTestUtil.newUniqueName("tenant/ns");
String topicName = "persistent://" + namespaceName + "/my-topic5";
admin.namespaces().createNamespace(namespaceName);
@Cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -59,7 +59,7 @@ public void testNamespaceBundleOwnershipListener() throws Exception {
final AtomicBoolean onLoad = new AtomicBoolean(false);
final AtomicBoolean unLoad = new AtomicBoolean(false);

final String namespace = "prop/" + UUID.randomUUID().toString();
final String namespace = BrokerTestUtil.newUniqueName("tenant/ns");

pulsar.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() {

Expand All @@ -82,7 +82,7 @@ public void unLoad(NamespaceBundle bundle) {
});

admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
assertTrue(admin.namespaces().getNamespaces("prop").contains(namespace));
assertTrue(admin.namespaces().getNamespaces("tenant").contains(namespace));

final String topic = "persistent://" + namespace + "/os-0";

Expand Down Expand Up @@ -123,9 +123,9 @@ public boolean test(NamespaceBundle namespaceBundle) {

@Test
public void testGetAllPartitions() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString();
final String namespace = BrokerTestUtil.newUniqueName("tenant/ns");
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
assertTrue(admin.namespaces().getNamespaces("prop").contains(namespace));
assertTrue(admin.namespaces().getNamespaces("tenant").contains(namespace));

final String topicName = "persistent://" + namespace + "/os";
admin.topics().createPartitionedTopic(topicName, 6);
Expand All @@ -149,7 +149,7 @@ public void testNamespaceBundleLookupOnwershipListener() throws Exception,
final AtomicInteger onLoad = new AtomicInteger(0);
final AtomicInteger unLoad = new AtomicInteger(0);

final String namespace = "prop/" + UUID.randomUUID().toString();
final String namespace = BrokerTestUtil.newUniqueName("tenant/ns");

pulsar.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() {
@Override
Expand All @@ -171,7 +171,7 @@ public boolean test(NamespaceBundle namespaceBundle) {
});

admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
assertTrue(admin.namespaces().getNamespaces("prop").contains(namespace));
assertTrue(admin.namespaces().getNamespaces("tenant").contains(namespace));

final String topic = "persistent://" + namespace + "/os-0";
Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down
Loading
Loading