Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-8: Introduce peer cluster for global namespace redirection #903

Merged
merged 4 commits into from
Nov 21, 2017

Conversation

rdhabalia
Copy link
Contributor

@rdhabalia rdhabalia commented Nov 15, 2017

Motivation

As described in PIP-8, introducing a "peer-group" which can help a broker to redirect a lookup/admin request for global-namespace to appropriate cluster that actually owns (cluster part of replication-clusters) the namespace.

So, if a cluster cluster-A receives a lookup/admin request for a global-namespace for which cluster-A is not part of replication-clusters then cluster-A will check with peer which is part of replication-clusters and redirects request to that appropriate peer cluster.

It can help to partition namespaces across multiple clusters and with peer-group abstraction, client can access them with a single service-url.

Modifications

  • Admin API: to add/update/get peer-clusters
  • Lookups (Http/BinaryProto): Partitioned-metadata and Topic lookup supports request redirection to appropriate peer-cluster for a global namespace
  • Admin-API : namespaces and persistent/non-persistent supports peer-group for a global namespace
  • CLI tool : add cli-tool commands
  • Validation on PartitionedMetadata-Lookup: Fail request if global namespace's replication-clusters doesn't contain current/peer-clusters (Earlier this validation was only present at lookup only). So, client can't create producer/consumer object and don't do internal retry for lookup.

Result

It will support peer-group described on PIP-8.
Note: I will create a separate PR for admin-api doc after reviewing/merging this one.

@rdhabalia rdhabalia added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages type/feature The PR added a new feature or issue requested a new feature labels Nov 15, 2017
@rdhabalia rdhabalia added this to the 1.21.0-incubating milestone Nov 15, 2017
@rdhabalia rdhabalia self-assigned this Nov 15, 2017
@rdhabalia rdhabalia requested a review from merlimat November 15, 2017 02:27
@apache apache deleted a comment from asfgit Nov 15, 2017
@rdhabalia rdhabalia requested a review from saandrews November 15, 2017 21:12

log.warn(msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use log.warn("Global namespace missing local cluster name in replication list : local_cluster=%s ns={} repl_clusters={}", localCluster, namespace.toString() like other logs ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually this is existing logging already present into master. The reason of creating string msg is: we can use it while logging at line#584 and while creating exception-message at line#585. If we don't do it then probably we will create duplicate info-msg multiple times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see

@rdhabalia
Copy link
Contributor Author

rdhabalia commented Nov 16, 2017

ping @merlimat @saandrews

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Change looks good!


public class ClusterData {
private String serviceUrl;
private String serviceUrlTls;
private String brokerServiceUrl;
private String brokerServiceUrlTls;
// For given Cluster1(us-west1, us-east1) and Cluster2(us-west2, us-east2)
// Peer: [us-west1 -> us-west2] and [us-east1 -> us-east2]
private List<String> peerClusterNames;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use Set<String> since it's supposed to contain unique values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true. However, one of the requirement was to keep the ordering to provide the preference while looking in the peer-list. So, we can change it to LinkedHashSet to keep the ordering and unique elements.?

@rdhabalia
Copy link
Contributor Author

@merlimat updated peerClusterNames type as LinkedHashSet.

// if peer-cluster-data is present it means namespace is owned by that peer-cluster and request should be
// redirect to the peer-cluster
if (peerClusterData != null) {
URI redirect = getRedirectionUrl(peerClusterData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid cyclic request between clusters due to misconfiguration or other issues, should the request indicate that it's a redirected request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may not need it because it won't do multiple redirections. The cluster which receives the lookup-request for a global-namespace, if that cluster is not part of the replication-clusters then it will find out peer which is part of replication-cluster and can own that global-namespace. So, cluster will only redirect to peer if it finds out that selected peer can own the namespace else this cluster will reject the request.
So, peer which will receive redirected-lookup request will always own this namespace and will never do another redirection.

@apache apache deleted a comment from asfgit Nov 18, 2017
@apache apache deleted a comment from asfgit Nov 20, 2017
@rdhabalia
Copy link
Contributor Author

@saandrews I updated PR with change which validates that peer-clusters can't coexist into replication-cluster list while setting replication-cluster for a given namespace.

if (peerClusters != null && !peerClusters.isEmpty()) {
SetView<String> conflictPeerClusters = Sets.intersection(peerClusters, clusters);
if (!conflictPeerClusters.isEmpty()) {
log.warn("[{}] {}'s peer cluster can't be part of replication clusters {}", clientAppId(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also log replication clusters?

private void validatePeerClusterConflict(String clusterName, Set<String> clusters) {
try {
Optional<ClusterData> clusterData = clustersCache().get(path("clusters", clusterName));
if (clusterData.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the cluster isn't there, shouldn't we throw exception?

@rdhabalia
Copy link
Contributor Author

@saandrews addressed comments.

@rdhabalia rdhabalia merged commit f602e68 into apache:master Nov 21, 2017
jai1 pushed a commit to jai1/pulsar that referenced this pull request Feb 8, 2018
…e#903)

* PIP-8: Introduce peer cluster for global namespace redirection

* Change peerClusterNames type to LinkedHashSet

* Validate peer-cluster can't coexit in replication-cluster list

* log replication-cluster and check valid-cluster-data
sijie pushed a commit that referenced this pull request Dec 11, 2020
…oid deadlocks (#8877)

### Motivation

Some of our broker servers experienced what appears to be a deadlock. The following is the thread dump at that time.

[threaddump.txt.zip](https://github.com/apache/pulsar/files/5665572/threaddump.txt.zip)

The thread "ForkJoinPool.commonPool-worker-120" was locking an instance of `ManagedLedgerImpl`. And this thread seemed to be waiting for `subscriptions`, which is an instance of `ConcurrentOpenHashMap`, to be unlocked. Many other threads were blocked because the lock on the `ManagedLedgerImpl` instance was not released.

```
"ForkJoinPool.commonPool-worker-120" #903 daemon prio=5 os_prio=0 tid=0x00007f9aa0010000 nid=0x12b59 waiting on condition [0x00007f9528cc3000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007fa20b3e5eb0> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:650)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:720)
        - locked <0x00007fa20512f968> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:643)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:590)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$476/1880414247.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$475/707554512.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
```

The thread that locked `subscriptions` seems to be "pulsar-msg-expiry-monitor-24-1".
```
"pulsar-msg-expiry-monitor-24-1" #304 prio=5 os_prio=0 tid=0x00007f99602dd000 nid=0x12036 waiting on condition [0x00007f998d47c000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007fca4361dfb0> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNthEntry(ManagedCursorImpl.java:537)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.isOldestMessageExpired(PersistentTopic.java:1820)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.expireMessages(PersistentSubscription.java:901)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkMessageExpiry$36(PersistentTopic.java:1102)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$1011/2104832020.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkMessageExpiry(PersistentTopic.java:1102)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$1009/2005752676.accept(Unknown Source)
        at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$32(BrokerService.java:951)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$779/1852910990.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:948)
        at org.apache.pulsar.broker.service.BrokerService.checkMessageExpiry(BrokerService.java:925)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$108/203149502.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
```

I can't understand why "pulsar-msg-expiry-monitor-24-1" was stuck. However, it seems that this deadlock can be avoided if `subscriptions` is not locked when checking for message expiration, so I created this PR. If anyone can explain why "pulsar-msg-expiry-monitor-24-1" was stuck, please let me know.

### Modifications

When expiring messages for each subscription, copy the values of `subscriptions` as `List` and execute `forEach()` for that `List` instance.
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Dec 1, 2021
…oid deadlocks (apache#8877)

Some of our broker servers experienced what appears to be a deadlock. The following is the thread dump at that time.

[threaddump.txt.zip](https://github.com/apache/pulsar/files/5665572/threaddump.txt.zip)

The thread "ForkJoinPool.commonPool-worker-120" was locking an instance of `ManagedLedgerImpl`. And this thread seemed to be waiting for `subscriptions`, which is an instance of `ConcurrentOpenHashMap`, to be unlocked. Many other threads were blocked because the lock on the `ManagedLedgerImpl` instance was not released.

```
"ForkJoinPool.commonPool-worker-120" apache#903 daemon prio=5 os_prio=0 tid=0x00007f9aa0010000 nid=0x12b59 waiting on condition [0x00007f9528cc3000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007fa20b3e5eb0> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:650)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:720)
        - locked <0x00007fa20512f968> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:643)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:590)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$476/1880414247.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$475/707554512.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
```

The thread that locked `subscriptions` seems to be "pulsar-msg-expiry-monitor-24-1".
```
"pulsar-msg-expiry-monitor-24-1" #304 prio=5 os_prio=0 tid=0x00007f99602dd000 nid=0x12036 waiting on condition [0x00007f998d47c000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007fca4361dfb0> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNthEntry(ManagedCursorImpl.java:537)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.isOldestMessageExpired(PersistentTopic.java:1820)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.expireMessages(PersistentSubscription.java:901)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkMessageExpiry$36(PersistentTopic.java:1102)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$1011/2104832020.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkMessageExpiry(PersistentTopic.java:1102)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$1009/2005752676.accept(Unknown Source)
        at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$32(BrokerService.java:951)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$779/1852910990.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:948)
        at org.apache.pulsar.broker.service.BrokerService.checkMessageExpiry(BrokerService.java:925)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$108/203149502.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
```

I can't understand why "pulsar-msg-expiry-monitor-24-1" was stuck. However, it seems that this deadlock can be avoided if `subscriptions` is not locked when checking for message expiration, so I created this PR. If anyone can explain why "pulsar-msg-expiry-monitor-24-1" was stuck, please let me know.

When expiring messages for each subscription, copy the values of `subscriptions` as `List` and execute `forEach()` for that `List` instance.

(cherry picked from commit d08ac1d)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants