Skip to content

Commit

Permalink
Fix create consumer on partitioned topic while disable topic auto cre…
Browse files Browse the repository at this point in the history
…ation. (apache#5572)

### Motivation

Currently, disable the topic auto creation will cause consumer create failed on a partitioned topic. Since the partitioned topic is already created, so we should handle the topic partition create when disable the topic auto creation.

### Modifications

By default, create partitioned topics also try to create all partitions, and if create partitions failed, users can use `create-missed-partitions` to repair.

If users already have a partitioned topic without created partitions, can also use `create-missed-partitions` to repair.
  • Loading branch information
codelipenghui authored Jan 9, 2020
1 parent 4f177e6 commit 602f1c2
Show file tree
Hide file tree
Showing 19 changed files with 398 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,13 @@ public List<String> getChildren(String path, boolean watch) throws KeeperExcepti
if (path.length() >= item.length()) {
continue;
}

String child = item.substring(path.length() + 1);
if (!child.contains("/")) {
children.add(child);
String child = item.substring(path.length());
if (child.indexOf("/") == 0) {
child = child.substring(1);
log.debug("child: '{}'", child);
if (!child.contains("/")) {
children.add(child);
}
}
}
}
Expand Down Expand Up @@ -465,10 +468,13 @@ public void getChildren(final String path, boolean watcher, final Children2Callb
} else if (item.equals(path)) {
continue;
} else {
String child = item.substring(path.length() + 1);
log.debug("child: '{}'", child);
if (!child.contains("/")) {
children.add(child);
String child = item.substring(path.length());
if (child.indexOf("/") == 0) {
child = child.substring(1);
log.debug("child: '{}'", child);
if (!child.contains("/")) {
children.add(child);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import javax.servlet.ServletContext;
Expand Down Expand Up @@ -66,9 +68,9 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
Expand Down Expand Up @@ -111,6 +113,11 @@ protected void zkCreateOptimistic(String path, byte[] content) throws Exception
ZkUtils.createFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

protected void zkCreateOptimisticAsync(String path, byte[] content, AsyncCallback.StringCallback callback) {
ZkUtils.asyncCreateFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT, callback, null);
}

protected boolean zkPathExists(String path) throws KeeperException, InterruptedException {
Stat stat = globalZk().exists(path, false);
if (null != stat) {
Expand All @@ -119,6 +126,21 @@ protected boolean zkPathExists(String path) throws KeeperException, InterruptedE
return false;
}

protected void zkSync(String path) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger rc = new AtomicInteger(KeeperException.Code.OK.intValue());
globalZk().sync(path, (rc2, s, ctx) -> {
if (KeeperException.Code.OK.intValue() != rc2) {
rc.set(rc2);
}
latch.countDown();
}, null);
latch.await();
if (KeeperException.Code.OK.intValue() != rc.get()) {
throw KeeperException.create(KeeperException.Code.get(rc.get()));
}
}

/**
* Get the domain of the topic (whether it's persistent or non-persistent)
*/
Expand Down Expand Up @@ -233,6 +255,37 @@ protected List<String> getListOfNamespaces(String property) throws Exception {
return namespaces;
}

protected void tryCreatePartitionsAsync(int numPartitions) {
if (!topicName.isPersistent()) {
return;
}
for (int i = 0; i < numPartitions; i++) {
tryCreatePartitionAsync(i);
}
}

private void tryCreatePartitionAsync(final int partition) {
zkCreateOptimisticAsync(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
(rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
if (log.isDebugEnabled()) {
log.debug("[{}] Topic partition {} created.", clientAppId(),
topicName.getPartition(partition));
}
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
topicName.getPartition(partition));
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
clientAppId(), topicName.getPartition(partition));
tryCreatePartitionAsync(partition);
} else {
log.error("[{}] Fail to create topic partition {}", clientAppId(),
topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc)));
}
});
}

protected NamespaceName namespaceName;

protected void validateNamespaceName(String property, String namespace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public static String partitionedTopicPath(TopicName name) {
name.getNamespace(), name.getDomain().value(), name.getEncodedLocalName());
}

public static String managedLedgerPath(TopicName name) {
return "/managed-ledgers/" + name.getPersistenceNamingEncoding();
}

public static String namespacePoliciesPath(NamespaceName name) {
return adminPath(POLICIES, name.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@
public class PersistentTopicsBase extends AdminResource {
private static final Logger log = LoggerFactory.getLogger(PersistentTopicsBase.class);

public static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21);
Expand Down Expand Up @@ -414,8 +413,9 @@ protected void internalCreatePartitionedTopic(int numPartitions) {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
// we wait for the data to be synced in all quorums and the observers
Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
tryCreatePartitionsAsync(numPartitions);
// Sync data to all quorums and the observers
zkSync(path);
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
Expand Down Expand Up @@ -540,6 +540,13 @@ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateL
}
}

protected void internalCreateMissedPartitions() {
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, false, false);
if (metadata != null) {
tryCreatePartitionsAsync(metadata.partitions);
}
}

private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
List<CompletableFuture<Void>> results = new ArrayList<>(clusters.size() -1);
clusters.forEach(cluster -> {
Expand Down Expand Up @@ -627,8 +634,8 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
try {
globalZk().delete(path, -1);
globalZkCache().invalidate(path);
// we wait for the data to be synced in all quorums and the observers
Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
// Sync data to all quorums and the observers
zkSync(path);
log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
return;
Expand Down Expand Up @@ -1846,24 +1853,28 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
}

admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> {
stats.subscriptions.keySet().forEach(subscription -> {
List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();
for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
final String topicNamePartition = topicName.getPartition(i).toString();
if (stats.subscriptions.size() == 0) {
result.complete(null);
} else {
stats.subscriptions.keySet().forEach(subscription -> {
List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();
for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
final String topicNamePartition = topicName.getPartition(i).toString();

subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
subscription, MessageId.latest));
}
subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
subscription, MessageId.latest));
}

FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName);
result.complete(null);
}).exceptionally(ex -> {
log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), topicName, ex);
result.completeExceptionally(ex);
return null;
FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName);
result.complete(null);
}).exceptionally(ex -> {
log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), topicName, ex);
result.completeExceptionally(ex);
return null;
});
});
});
}
}).exceptionally(ex -> {
if (ex.getCause() instanceof PulsarAdminException.NotFoundException) {
// The first partition doesn't exist, so there are currently to subscriptions to recreate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ public void createPartitionedTopic(@PathParam("property") String property, @Path
topicName.getEncodedLocalName());
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
// we wait for the data to be synced in all quorums and the observers
Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
// Sync data to all quorums and the observers
zkSync(path);
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ public void createPartitionedTopic(
topicName.getEncodedLocalName());
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
// we wait for the data to be synced in all quorums and the observers
Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
// Sync data to all quorums and the observers
zkSync(path);
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public void createNonPartitionedTopic(
*/
@POST
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Increment partitons of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic")
@ApiOperation(value = "Increment partitions of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
Expand All @@ -270,6 +270,30 @@ public void updatePartitionedTopic(
internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly);
}


@POST
@Path("/{tenant}/{namespace}/{topic}/createMissedPartitions")
@ApiOperation(value = "Create missed partitions of an existing partitioned topic.", notes = "This is a best-effort operation for create missed partitions of existing non-global partitioned-topic and does't throw any exceptions when create failed")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
})
public void createMissedPartitions(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic) {

validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalCreateMissedPartitions();
}

@GET
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Get partitioned topic metadata.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,13 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
Expand Down Expand Up @@ -1822,10 +1820,15 @@ private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopi
partitionedTopicPath(topicName), content,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> {
if (rc == KeeperException.Code.OK.intValue()) {
// we wait for the data to be synced in all quorums and the observers
executor().schedule(
SafeRunnable.safeRun(() -> partitionedTopicFuture.complete(configMetadata)),
PersistentTopicsBase.PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS, TimeUnit.MILLISECONDS);
// Sync data to all quorums and the observers
pulsar.getGlobalZkCache().getZooKeeper().sync(partitionedTopicPath(topicName),
(rc2, path2, ctx2) -> {
if (rc2 == KeeperException.Code.OK.intValue()) {
partitionedTopicFuture.complete(configMetadata);
} else {
partitionedTopicFuture.completeExceptionally(KeeperException.create(rc2));
}
}, null);
} else {
partitionedTopicFuture.completeExceptionally(KeeperException.create(rc));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,9 +815,8 @@ public void partitionedTopics(String topicName) throws Exception {

assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 4);

// check if the virtual topic doesn't get created
List<String> topics = admin.topics().getList("prop-xyz/ns1");
assertEquals(topics.size(), 0);
assertEquals(topics.size(), 4);

assertEquals(admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/ns1/ds2").partitions,
0);
Expand All @@ -829,15 +828,8 @@ public void partitionedTopics(String topicName) throws Exception {
assertEquals(admin.topics().getPartitionedStats(partitionedTopicName, false).partitions.size(),
0);

try {
admin.topics().getSubscriptions(partitionedTopicName);
fail("should have failed");
} catch (PulsarAdminException e) {
// ok
assertEquals(e.getStatusCode(), Status.NOT_FOUND.getStatusCode());
} catch (Exception e) {
fail(e.getMessage());
}
List<String> subscriptions = admin.topics().getSubscriptions(partitionedTopicName);
assertEquals(subscriptions.size(), 0);

// create consumer and subscription
PulsarClient client = PulsarClient.builder()
Expand Down
Loading

0 comments on commit 602f1c2

Please sign in to comment.