Skip to content

Commit

Permalink
[Issue 6793][broker] Add config of maxNumPartitionsPerPartitionedTopic (
Browse files Browse the repository at this point in the history
apache#6794)

Fixes apache#6793

### Motivation
Now broker, can't set max number of partitions per partitioned topic. So we can't prevent to create partitioned topic by many partitions from server side.
In this PR, introduce limit of max partitions to broker and to be able to control it.

### Modifications
Add `maxNumPartitionsPerPartitionedTopic` config to broker and compare with numPartitions at create or update partitioned topic.
If the config is set to `0`(is default), then disable the check.
  • Loading branch information
equanz authored and Huanli-Meng committed May 27, 2020
1 parent 498a6e4 commit fd13250
Show file tree
Hide file tree
Showing 16 changed files with 85 additions and 3 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ messagePublishBufferCheckIntervalInMillis=100
# Use 0 or negative number to disable the check
retentionCheckIntervalInSeconds=120

# Max number of partitions per partitioned topic
# Use 0 or negative number to disable the check
maxNumPartitionsPerPartitionedTopic=0

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ maxConsumersPerTopic=0
# Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
maxConsumersPerSubscription=0

# Max number of partitions per partitioned topic
# Use 0 or negative number to disable the check
maxNumPartitionsPerPartitionedTopic=0

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int retentionCheckIntervalInSeconds = 120;

@FieldContext(
category = CATEGORY_SERVER,
doc = "The number of partitions per partitioned topic.\n"
+ "If try to create or update partitioned topics by exceeded number of partitions, then fail."
)
private int maxNumPartitionsPerPartitionedTopic = 0;

/**** --- Messaging Protocols --- ****/

@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
}

protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
try {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception e) {
Expand All @@ -738,11 +739,16 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"));
return;
}
if (maxPartitions > 0 && numPartitions > maxPartitions) {
asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions));
return;
}
checkTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
} else {

try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,12 +563,16 @@ protected void internalSetNamespaceMessageTTL(int messageTTL) {
}

protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTopicCreationOverride autoTopicCreationOverride) {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();

if (!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride");
}
if (maxPartitions > 0 && autoTopicCreationOverride.defaultNumPartitions > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions);
}

// Force to read the data s.t. the watch to the cache content is setup.
policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,10 @@ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateL
if (!updateLocalTopicOnly) {
validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
}
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
if (maxPartitions > 0 && numPartitions > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions);
}

if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ public void modifyDeduplication(@PathParam("property") String property, @PathPar
@ApiOperation(value = "Override broker's allowAutoTopicCreation setting for a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 406, message = "The number of partitions should be less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 400, message = "Invalid autoTopicCreation override") })
public void setAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public PersistentTopicInternalStats getInternalStats(@PathParam("property") Stri
@Path("/{property}/{cluster}/{namespace}/{topic}/partitions")
@ApiOperation(hidden = true, value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist") })
public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public void revokePermissionsOnTopic(@PathParam("property") String property,
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist") })
public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
Expand Down Expand Up @@ -180,6 +181,7 @@ public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse,
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist") })
public void updatePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("
@ApiOperation(value = "Override broker's allowAutoTopicCreation setting for a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 406, message = "The number of partitions should be less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 400, message = "Invalid autoTopicCreation override") })
public void setAutoTopicCreation(
@Suspended final AsyncResponse asyncResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public PersistentTopicInternalStats getInternalStats(
@ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "The tenant/namespace does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exists"),
@ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"),
@ApiResponse(code = 500, message = "Internal server error"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public void revokePermissionsOnTopic(
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist"),
@ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"),
@ApiResponse(code = 500, message = "Internal server error"),
Expand Down Expand Up @@ -264,7 +265,7 @@ public void createNonPartitionedTopic(
@ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1894,8 +1894,10 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata

@SuppressWarnings("deprecation")
private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName);
final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName);
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
checkArgument(maxPartitions <= 0 || defaultNumPartitions <= maxPartitions, "Number of partitions should be less than or equal to " + maxPartitions);

PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = futureWithDeadline();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -1245,4 +1244,29 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti
assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0);
}

@Test
public void testMaxNumPartitionsPerPartitionedTopicSuccess() {
final String topic = "persistent://prop-xyz/ns1/max-num-partitions-per-partitioned-topic-success";
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(3);

try {
admin.topics().createPartitionedTopic(topic, 2);
} catch (Exception e) {
fail("should not throw any exceptions");
}
}

@Test
public void testMaxNumPartitionsPerPartitionedTopicFailure() {
final String topic = "persistent://prop-xyz/ns1/max-num-partitions-per-partitioned-topic-failure";
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(2);

try {
admin.topics().createPartitionedTopic(topic, 3);
fail("should throw exception when number of partitions exceed than max partitions");
} catch (Exception e) {
assertTrue(e instanceof PulsarAdminException);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,22 @@ public void testNotAllowSubscriptionTopicCreation() throws Exception{
}

}

@Test
public void testMaxNumPartitionsPerPartitionedTopicTopicCreation() {
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setDefaultNumPartitions(3);
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(2);

final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-11";
final String subscriptionName = "test-topic-sub-11";

try {
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
fail("should throw exception when number of partitions exceed than max partitions");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException);
}
}
}
2 changes: 2 additions & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|clientLibraryVersionCheckAllowUnversioned| Allow client libraries with no version information |true|
|statusFilePath| Path for the file used to determine the rotation status for the broker when responding to service discovery health checks ||
|preferLaterVersions| If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to use only brokers running the latest software version (to minimize impact to bundles) |false|
|maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned topic. Use 0 or negative number to disable the check|0|
|tlsEnabled| Enable TLS |false|
|tlsCertificateFilePath| Path for the TLS certificate file ||
|tlsKeyFilePath| Path for the TLS private key file ||
Expand Down Expand Up @@ -355,6 +356,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
|statusFilePath| The path for the file used to determine the rotation status for the broker when responding to service discovery health checks |/usr/local/apache/htdocs|
|maxUnackedMessagesPerConsumer| The maximum number of unacknowledged messages allowed to be received by consumers on a shared subscription. The broker will stop sending messages to a consumer once this limit is reached or until the consumer begins acknowledging messages. A value of 0 disables the unacked message limit check and thus allows consumers to receive messages without any restrictions. |50000|
|maxUnackedMessagesPerSubscription| The same as above, except per subscription rather than per consumer. |200000|
|maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned topic. Use 0 or negative number to disable the check|0|
|authenticationEnabled| Enable authentication for the broker. |false|
|authenticationProviders| A comma-separated list of class names for authentication providers. |false|
|authorizationEnabled| Enforce authorization in brokers. |false|
Expand Down

0 comments on commit fd13250

Please sign in to comment.