Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 6793][broker] Add config of maxNumPartitionsPerPartitionedTopic #6794

Merged
merged 3 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ messagePublishBufferCheckIntervalInMillis=100
# Use 0 or negative number to disable the check
retentionCheckIntervalInSeconds=120

# Max number of partitions per partitioned topic
# Use 0 or negative number to disable the check
maxNumPartitionsPerPartitionedTopic=0

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ maxConsumersPerTopic=0
# Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
maxConsumersPerSubscription=0

# Max number of partitions per partitioned topic
# Use 0 or negative number to disable the check
maxNumPartitionsPerPartitionedTopic=0

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int retentionCheckIntervalInSeconds = 120;

@FieldContext(
category = CATEGORY_SERVER,
doc = "The number of partitions per partitioned topic.\n"
+ "If try to create or update partitioned topics by exceeded number of partitions, then fail."
)
private int maxNumPartitionsPerPartitionedTopic = 0;

/**** --- Messaging Protocols --- ****/

@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
}

protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
try {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception e) {
Expand All @@ -738,11 +739,16 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"));
return;
}
if (maxPartitions > 0 && numPartitions > maxPartitions) {
asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions));
return;
}
checkTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
} else {

try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,12 +563,16 @@ protected void internalSetNamespaceMessageTTL(int messageTTL) {
}

protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTopicCreationOverride autoTopicCreationOverride) {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();

if (!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride");
}
if (maxPartitions > 0 && autoTopicCreationOverride.defaultNumPartitions > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions);
}

// Force to read the data s.t. the watch to the cache content is setup.
policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,10 @@ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateL
if (!updateLocalTopicOnly) {
validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
}
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
if (maxPartitions > 0 && numPartitions > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions);
}

if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ public void modifyDeduplication(@PathParam("property") String property, @PathPar
@ApiOperation(value = "Override broker's allowAutoTopicCreation setting for a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 406, message = "The number of partitions should be less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 400, message = "Invalid autoTopicCreation override") })
public void setAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public PersistentTopicInternalStats getInternalStats(@PathParam("property") Stri
@Path("/{property}/{cluster}/{namespace}/{topic}/partitions")
@ApiOperation(hidden = true, value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist") })
public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public void revokePermissionsOnTopic(@PathParam("property") String property,
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist") })
public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
Expand Down Expand Up @@ -180,6 +181,7 @@ public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse,
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist") })
public void updatePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("
@ApiOperation(value = "Override broker's allowAutoTopicCreation setting for a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 406, message = "The number of partitions should be less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 400, message = "Invalid autoTopicCreation override") })
public void setAutoTopicCreation(
@Suspended final AsyncResponse asyncResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public PersistentTopicInternalStats getInternalStats(
@ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "The tenant/namespace does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exists"),
@ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"),
@ApiResponse(code = 500, message = "Internal server error"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public void revokePermissionsOnTopic(
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist"),
@ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"),
@ApiResponse(code = 500, message = "Internal server error"),
Expand Down Expand Up @@ -264,7 +265,7 @@ public void createNonPartitionedTopic(
@ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1894,8 +1894,10 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata

@SuppressWarnings("deprecation")
private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName);
final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName);
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
checkArgument(maxPartitions <= 0 || defaultNumPartitions <= maxPartitions, "Number of partitions should be less than or equal to " + maxPartitions);

PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = futureWithDeadline();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -1245,4 +1244,29 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti
assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0);
}

@Test
public void testMaxNumPartitionsPerPartitionedTopicSuccess() {
final String topic = "persistent://prop-xyz/ns1/max-num-partitions-per-partitioned-topic-success";
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(3);

try {
admin.topics().createPartitionedTopic(topic, 2);
} catch (Exception e) {
fail("should not throw any exceptions");
}
}

@Test
public void testMaxNumPartitionsPerPartitionedTopicFailure() {
final String topic = "persistent://prop-xyz/ns1/max-num-partitions-per-partitioned-topic-failure";
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(2);

try {
admin.topics().createPartitionedTopic(topic, 3);
fail("should throw exception when number of partitions exceed than max partitions");
} catch (Exception e) {
assertTrue(e instanceof PulsarAdminException);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,22 @@ public void testNotAllowSubscriptionTopicCreation() throws Exception{
}

}

@Test
public void testMaxNumPartitionsPerPartitionedTopicTopicCreation() {
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setDefaultNumPartitions(3);
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(2);

final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-11";
final String subscriptionName = "test-topic-sub-11";

try {
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
fail("should throw exception when number of partitions exceed than max partitions");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException);
}
}
}
2 changes: 2 additions & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|clientLibraryVersionCheckAllowUnversioned| Allow client libraries with no version information |true|
|statusFilePath| Path for the file used to determine the rotation status for the broker when responding to service discovery health checks ||
|preferLaterVersions| If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to use only brokers running the latest software version (to minimize impact to bundles) |false|
|maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned topic. Use 0 or negative number to disable the check|0|
|tlsEnabled| Enable TLS |false|
|tlsCertificateFilePath| Path for the TLS certificate file ||
|tlsKeyFilePath| Path for the TLS private key file ||
Expand Down Expand Up @@ -353,6 +354,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
|statusFilePath| The path for the file used to determine the rotation status for the broker when responding to service discovery health checks |/usr/local/apache/htdocs|
|maxUnackedMessagesPerConsumer| The maximum number of unacknowledged messages allowed to be received by consumers on a shared subscription. The broker will stop sending messages to a consumer once this limit is reached or until the consumer begins acknowledging messages. A value of 0 disables the unacked message limit check and thus allows consumers to receive messages without any restrictions. |50000|
|maxUnackedMessagesPerSubscription| The same as above, except per subscription rather than per consumer. |200000|
|maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned topic. Use 0 or negative number to disable the check|0|
|authenticationEnabled| Enable authentication for the broker. |false|
|authenticationProviders| A comma-separated list of class names for authentication providers. |false|
|authorizationEnabled| Enforce authorization in brokers. |false|
Expand Down