Skip to content

Commit

Permalink
Fix validateGlobalNamespaceOwnership wrap exception issue. (#14269)
Browse files Browse the repository at this point in the history
### Motivation
When Rest API call `AdminResource#validateGlobalNamespaceOwnership`, broker will execute `PulsarWebResource#checkLocalOrGetPeerReplicationCluster`.
In `PulsarWebResource#checkLocalOrGetPeerReplicationCluster`:
https://github.com/apache/pulsar/blob/6d717a08ef8cfcac032caee06105285594baf09f/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java#L773-L802

Line 780, 794, and 801 has thrown RestException.
But `validateGlobalNamespaceOwnership ` has wrapped the exception :
https://github.com/apache/pulsar/blob/6d717a08ef8cfcac032caee06105285594baf09f/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java#L202-L216

This could make the user confused that the log printed is not matched with the REST API.
  • Loading branch information
Technoboy- authored Feb 15, 2022
1 parent 44a273d commit 18d9f1b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,7 @@ protected void validateGlobalNamespaceOwnership() {
} catch (IllegalArgumentException e) {
throw new RestException(Status.PRECONDITION_FAILED, "Tenant name or namespace is not valid");
} catch (RestException re) {
if (re.getResponse().getStatus() == Status.NOT_FOUND.getStatusCode()) {
throw new RestException(Status.NOT_FOUND, "Namespace not found");
}
throw new RestException(Status.PRECONDITION_FAILED, "Namespace does not have any clusters configured");
throw re;
} catch (Exception e) {
log.warn("Failed to validate global cluster configuration : ns={} emsg={}", namespaceName, e.getMessage());
throw new RestException(Status.SERVICE_UNAVAILABLE, "Failed to validate global cluster configuration");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,9 +796,9 @@ public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationC
validationFuture.complete(null);
}
} else {
String msg = String.format("Policies not found for %s namespace", namespace.toString());
String msg = String.format("Namespace %s not found", namespace.toString());
log.warn(msg);
validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, msg));
validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace not found"));
}
}).exceptionally(ex -> {
String msg = String.format("Failed to validate global cluster configuration : cluster=%s ns=%s emsg=%s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
Expand Down Expand Up @@ -108,6 +109,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
protected Field uriField;
protected UriInfo uriInfo;
private NonPersistentTopics nonPersistentTopic;
private NamespaceResources namespaceResources;

@BeforeClass
public void initPersistentTopics() throws Exception {
Expand All @@ -133,6 +135,7 @@ protected void setup() throws Exception {
nonPersistentTopic = spy(NonPersistentTopics.class);
nonPersistentTopic.setServletContext(new MockServletContext());
nonPersistentTopic.setPulsar(pulsar);
namespaceResources = mock(NamespaceResources.class);
doReturn(false).when(nonPersistentTopic).isRequestHttps();
doReturn(null).when(nonPersistentTopic).originalPrincipal();
doReturn("test").when(nonPersistentTopic).clientAppId();
Expand Down Expand Up @@ -446,6 +449,33 @@ public void testCreatePartitionedTopic() {
});
}

@Test
public void testCreateTopicWithReplicationCluster() {
final String topicName = "test-topic-ownership";
NamespaceName namespaceName = NamespaceName.get(testTenant, testNamespace);
CompletableFuture<Optional<Policies>> policyFuture = new CompletableFuture<>();
Policies policies = new Policies();
policyFuture.complete(Optional.of(policies));
when(pulsar.getPulsarResources().getNamespaceResources()).thenReturn(namespaceResources);
doReturn(policyFuture).when(namespaceResources).getPoliciesAsync(namespaceName);
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode());
Assert.assertTrue(errCaptor.getValue().getMessage().contains("Namespace does not have any clusters configured"));
// Test policy not exist and return 'Namespace not found'
CompletableFuture<Optional<Policies>> policyFuture2 = new CompletableFuture<>();
policyFuture2.complete(Optional.empty());
doReturn(policyFuture2).when(namespaceResources).getPoliciesAsync(namespaceName);
response = mock(AsyncResponse.class);
errCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
Assert.assertTrue(errCaptor.getValue().getMessage().contains("Namespace not found"));
}

@Test(expectedExceptions = RestException.class)
public void testCreateNonPartitionedTopicWithInvalidName() {
final String topicName = "standard-topic-partition-10";
Expand Down

0 comments on commit 18d9f1b

Please sign in to comment.