Skip to content

Commit

Permalink
[PIP-143] Support split bundle by specified boundaries (#13796)
Browse files Browse the repository at this point in the history
* support split bundle by specified boundaries

* add license header

* fix checkstyle

* fix test NPE

* fix test

* fix checkstyle

* fix test

* apply comments

* remove un-used import

* apply comments

* apply comments

* fix test
  • Loading branch information
aloyszhang authored Mar 11, 2022
1 parent e0687e3 commit 7998c44
Show file tree
Hide file tree
Showing 26 changed files with 749 additions and 84 deletions.
2 changes: 1 addition & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManage
# Supported algorithms name for namespace bundle split.
# "range_equally_divide" divides the bundle into two parts with the same hash range size.
# "topic_count_equally_divide" divides the bundle into two parts with the same topics count.
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide

# Default algorithm name for namespace bundle split
defaultNamespaceBundleSplitAlgorithm=range_equally_divide
Expand Down
2 changes: 1 addition & 1 deletion deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManage
# Supported algorithms name for namespace bundle split.
# "range_equally_divide" divides the bundle into two parts with the same hash range size.
# "topic_count_equally_divide" divides the bundle into two parts with the same topics count.
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide

# Default algorithm name for namespace bundle split
defaultNamespaceBundleSplitAlgorithm=range_equally_divide
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2089,7 +2089,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Supported algorithms name for namespace bundle split"
)
private List<String> supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide",
"topic_count_equally_divide");
"topic_count_equally_divide", "specified_positions_divide");
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -95,8 +96,10 @@
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicHashPositions;
import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
Expand Down Expand Up @@ -1104,8 +1107,8 @@ public void internalUnloadNamespaceBundle(AsyncResponse asyncResponse, String bu
}

@SuppressWarnings("deprecation")
protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleName,
boolean authoritative, boolean unload, String splitAlgorithmName) {
protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleName, boolean authoritative,
boolean unload, String splitAlgorithmName, List<Long> splitBoundaries) {
validateSuperUserAccess();
checkNotNull(bundleName, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);
Expand All @@ -1126,15 +1129,20 @@ protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String

List<String> supportedNamespaceBundleSplitAlgorithms =
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
if (StringUtils.isNotBlank(splitAlgorithmName)
&& !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unsupported namespace bundle split algorithm, supported algorithms are "
+ supportedNamespaceBundleSplitAlgorithms));
if (StringUtils.isNotBlank(splitAlgorithmName)) {
if (!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unsupported namespace bundle split algorithm, supported algorithms are "
+ supportedNamespaceBundleSplitAlgorithms));
}
if (splitAlgorithmName.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
&& (splitBoundaries == null || splitBoundaries.size() == 0)) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"With specified_positions_divide split algorithm, splitBoundaries must not be emtpy"));
}
}

NamespaceBundle nsBundle;

try {
nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);
Expand All @@ -1144,7 +1152,7 @@ protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String
}

pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName))
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries)
.thenRun(() -> {
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
asyncResponse.resume(Response.noContent().build());
Expand All @@ -1162,6 +1170,56 @@ protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String
});
}

protected void internalGetTopicHashPositions(AsyncResponse asyncResponse, String bundleRange, List<String> topics) {
if (log.isDebugEnabled()) {
log.debug("[{}] Getting hash position for topic list {}, bundle {}", clientAppId(), topics, bundleRange);
}
validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
false, true);
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete(
(allTopicsInThisBundle, throwable) -> {
if (throwable != null) {
log.error("[{}] {} Failed to get topic list for bundle {}.", clientAppId(),
namespaceName, bundle);
asyncResponse.resume(new RestException(throwable));
}
// if topics is empty, return all topics' hash position in this bundle
Map<String, Long> topicHashPositions = new HashMap<>();
if (topics == null || topics.size() == 0) {
allTopicsInThisBundle.forEach(t -> {
topicHashPositions.put(t,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(t));
});
} else {
for (String topic : topics.stream().map(Codec::decode).collect(Collectors.toList())) {
TopicName topicName = TopicName.get(topic);
// partitioned topic
if (topicName.getPartitionIndex() == -1) {
allTopicsInThisBundle.stream()
.filter(t -> TopicName.get(t).getPartitionedTopicName()
.equals(TopicName.get(topic).getPartitionedTopicName()))
.forEach(partition -> {
topicHashPositions.put(partition,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(partition));
});
} else { // topic partition
if (allTopicsInThisBundle.contains(topicName.toString())) {
topicHashPositions.put(topic,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(topic));
}
}
}
}
asyncResponse.resume(
new TopicHashPositions(namespaceName.toString(), bundleRange, topicHashPositions));
});
}

private String getBundleRange(String bundleName) {
if (BundleType.LARGEST.toString().equals(bundleName)) {
return findLargestBundleWithTopics(namespaceName).getBundleRange();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,18 +651,35 @@ public void splitNamespaceBundle(
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload) {
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitBoundaries") @DefaultValue("") List<Long> splitBoundaries) {
try {
validateNamespaceName(property, cluster, namespace);
internalSplitNamespaceBundle(asyncResponse, bundleRange, authoritative, unload,
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
internalSplitNamespaceBundle(asyncResponse, bundleRange,
authoritative, unload, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
@Path("/{property}/{cluster}/{namespace}/{bundle}/topicHashPositions")
@ApiOperation(value = "Get hash positions for topics")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
public void getTopicHashPositions(
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundle,
@QueryParam("topics") List<String> topics,
@Suspended AsyncResponse asyncResponse) {
validateNamespaceName(property, cluster, namespace);
internalGetTopicHashPositions(asyncResponse, bundle, topics);
}

@POST
@Path("/{property}/{cluster}/{namespace}/publishRate")
@ApiOperation(hidden = true, value = "Set publish-rate throttling for all topics of the namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,18 +570,35 @@ public void splitNamespaceBundle(
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitAlgorithmName") String splitAlgorithmName) {

@QueryParam("splitAlgorithmName") String splitAlgorithmName,
@ApiParam("splitBoundaries") List<Long> splitBoundaries) {
try {
validateNamespaceName(tenant, namespace);
internalSplitNamespaceBundle(asyncResponse, bundleRange, authoritative, unload, splitAlgorithmName);
internalSplitNamespaceBundle(asyncResponse,
bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
@Path("/{tenant}/{namespace}/{bundle}/topicHashPositions")
@ApiOperation(value = "Get hash positions for topics")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
public void getTopicHashPositions(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("topics") List<String> topics,
@Suspended AsyncResponse asyncResponse) {
validateNamespaceName(tenant, namespace);
internalGetTopicHashPositions(asyncResponse, bundleRange, topics);
}

@POST
@Path("/{property}/{namespace}/publishRate")
@ApiOperation(hidden = true, value = "Set publish-rate throttling for all topics of the namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.BundleSplitOption;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
Expand Down Expand Up @@ -797,11 +798,12 @@ public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exceptio
* @throws Exception
*/
public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload,
NamespaceBundleSplitAlgorithm splitAlgorithm) {
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {

final CompletableFuture<Void> unloadFuture = new CompletableFuture<>();
final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm);
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm, boundaries);

return unloadFuture;
}
Expand All @@ -810,13 +812,20 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
boolean unload,
AtomicInteger counter,
CompletableFuture<Void> completionFuture,
NamespaceBundleSplitAlgorithm splitAlgorithm) {
splitAlgorithm.getSplitBoundary(this, bundle).whenComplete((splitBoundary, ex) -> {
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {
BundleSplitOption bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> {
CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();
if (ex == null) {
if (splitBoundaries == null || splitBoundaries.size() == 0) {
LOG.info("[{}] No valid boundary found in {} to split bundle {}",
bundle.getNamespaceObject().toString(), boundaries, bundle.getBundleRange());
completionFuture.complete(null);
return;
}
try {
bundleFactory.splitBundles(bundle,
2 /* by default split into 2 */, splitBoundary)
bundleFactory.splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries)
.thenAccept(splittedBundles -> {
// Split and updateNamespaceBundles. Update may fail because of concurrent write to
// Zookeeper.
Expand All @@ -829,14 +838,13 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,

checkNotNull(splittedBundles.getLeft());
checkNotNull(splittedBundles.getRight());
checkArgument(splittedBundles.getRight().size() == 2,
"bundle has to be split in two bundles");
checkArgument(splittedBundles.getRight().size() == splitBoundaries.size() + 1,
"bundle has to be split in " + (splitBoundaries.size() + 1) + " bundles");
NamespaceName nsname = bundle.getNamespaceObject();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, 2 bundles: {}, {}",
LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, bundles: {}",
nsname.toString(), bundle.getBundleRange(), counter.get(),
splittedBundles.getRight().get(0).getBundleRange(),
splittedBundles.getRight().get(1).getBundleRange());
splittedBundles.getRight());
}
try {
// take ownership of newly split bundles
Expand Down Expand Up @@ -880,7 +888,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
&& (counter.decrementAndGet() >= 0)) {
pulsar.getOrderedExecutor()
.execute(() -> splitAndOwnBundleOnceAndRetry(
bundle, unload, counter, completionFuture, splitAlgorithm));
bundle, unload, counter, completionFuture, splitAlgorithm, boundaries));
} else if (t instanceof IllegalArgumentException) {
completionFuture.completeExceptionally(t);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.naming;

import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.pulsar.broker.namespace.NamespaceService;

@Getter
@NoArgsConstructor
@AllArgsConstructor
public class BundleSplitOption {
private NamespaceService service;
private NamespaceBundle bundle;
private List<Long> positions;
}
Loading

0 comments on commit 7998c44

Please sign in to comment.