Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Do not use IO thread for consumerFlow in Shared subscription #16304

Merged
merged 2 commits into from
Jul 13, 2022

Conversation

eolivelli
Copy link
Contributor

@eolivelli eolivelli commented Jun 30, 2022

Motivation

In consumerFlow for Shared subscriptions (PersistentDispatcherMultipleConsumers) we execute readMoreEntries and this happens in the pulsar-io threadpool.

In PersistentDispatcherSingleActiveConsumer (Failover/Exclusive) we defer the execution of that code to another threadpool

@dave2wave reported in OMB tests with offloaders that the execution of tests with Shared subscription is really less performant.

Modifications

Move the execution of readMoreEntries to the topic thread.

Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

  • doc-not-needed

@eolivelli eolivelli added area/broker doc-not-needed Your PR changes do not impact docs labels Jun 30, 2022
@eolivelli eolivelli self-assigned this Jun 30, 2022
@eolivelli
Copy link
Contributor Author

@hangc0276 this is more details about BlobStoreBackedReadHandleImpl

This is when the read happens (it is actually “triggered”) for the Shared subscription (PersistentDispatcherMultipleConsumers)

2022-06-30T12:23:19,689+0200 [pulsar-io-18-21] INFO  org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl - Ledger 0: reading 95 - 95
java.lang.Exception: Reading 95 95
at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl.readAsync(BlobStoreBackedReadHandleImpl.java:105) ~[?:?]
at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry0(EntryCacheImpl.java:211) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry(EntryCacheImpl.java:188) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1982) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncReadEntry$24(ManagedLedgerImpl.java:1899) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:753) ~[?:?]
at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:731) ~[?:?]
at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2108) ~[?:?]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1899) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.lambda$asyncReplayEntries$11(ManagedCursorImpl.java:1413) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) [?:?]
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) [?:?]
at java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:2739) [?:?]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) [?:?]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) [?:?]
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) [?:?]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) [?:?]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) [?:?]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) [?:?]
at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1407) [managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntriesInOrder(PersistentDispatcherMultipleConsumers.java:393) [pulsar-broker.jar:2.10.1.1-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:252) [pulsar-broker.jar:2.10.1.1-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.consumerFlow(PersistentDispatcherMultipleConsumers.java:224) [pulsar-broker.jar:2.10.1.1-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentSubscription.consumerFlow(PersistentSubscription.java:363) [pulsar-broker.jar:2.10.1.1-SNAPSHOT]
at org.apache.pulsar.broker.service.Consumer.flowPermits(Consumer.java:691) [pulsar-broker.jar:2.10.1.1-SNAPSHOT]
at org.apache.pulsar.broker.service.ServerCnx.handleFlow(ServerCnx.java:1583) [pulsar-broker.jar:2.10.1.1-SNAPSHOT]

And this is the Failover subscription

2022-06-30T12:25:07,666+0200 [broker-topic-workers-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl - Ledger 0: reading 195 - 237
java.lang.Exception: Reading 195 237
at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl.readAsync(BlobStoreBackedReadHandleImpl.java:105) ~[?:?]
at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry0(EntryCacheImpl.java:292) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry(EntryCacheImpl.java:242) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1997) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalReadFromLedger(ManagedLedgerImpl.java:1969) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncReadEntries$17(ManagedLedgerImpl.java:1779) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:753) ~[?:?]
at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:731) ~[?:?]
at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2108) ~[?:?]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntries(ManagedLedgerImpl.java:1779) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntries(ManagedCursorImpl.java:721) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntries(ManagedCursorImpl.java:704) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesOrWait(ManagedCursorImpl.java:855) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.readMoreEntries(PersistentDispatcherSingleActiveConsumer.java:363) ~[pulsar-broker.jar:2.10.1.1-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalConsumerFlow(PersistentDispatcherSingleActiveConsumer.java:285) ~[pulsar-broker.jar:2.10.1.1-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$consumerFlow$5(PersistentDispatcherSingleActiveConsumer.java:261) ~[pulsar-broker.jar:2.10.1.1-SNAPSHOT]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [managed-ledger.jar:2.10.1.1-SNAPSHOT]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.14.5.1.0.1.jar:4.14.5.1.0.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.77.Final.jar:4.1.77.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]

@eolivelli
Copy link
Contributor Author

we are doing more performance testing with @dave2wave . I will make this patch as "ready" when we have full confirmation of the benefits

@eolivelli eolivelli marked this pull request as ready for review July 4, 2022 12:24
@eolivelli
Copy link
Contributor Author

@dave2wave reported that this patch brings some small improvements but not very much.

btw I think that it is better to move this synchronised part out of the main "pulsar-io" thread pool.

Copy link
Contributor

@hangc0276 hangc0276 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, We'd better execute it in a separate thread pool instead of pulsar-io.
ping @codelipenghui @merlimat, Please help take a look.

@eolivelli
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just change to another thread for reading offloaded data? I think for the consume flow without offloader, we don't need this change?

And it's better to have a test data with this new change to make it can really resolve the problem.

@eolivelli
Copy link
Contributor Author

Actually the read from offloader happens in another thread.
But we still are contending on the Dispatcher instance on a shared thread pool used for Broker io.

This patch doesn't bring much improvement in performance. But it is a good clean up, in order to simplify how threading works for the Dispatcher. I was surprised when I found this difference

@eolivelli
Copy link
Contributor Author

And it's better to have a test data with this new change to make it can really resolve the problem

@dave2wave confirmed that in his testing with OpenMessaging benchmarks this patch is a small win.

so @codelipenghui I believe it is worth to do this change.

I am sending other patches regarding this stuff in order to improve Shared subscriptions with Offloaders.

@eolivelli eolivelli force-pushed the fix/shared-no-synch-flow-offloader branch 2 times, most recently from 129491a to 14d990d Compare July 8, 2022 12:20
@eolivelli
Copy link
Contributor Author

@hangc0276 @codelipenghui thanks for your review, I have simplified the patch.
no need for an additional review, but I wanted to let you know that the patch is a little bit different

the last change is about the fact that in this class we are not using the Topic Ordered executor, so I have updated the code to use the same threadpool we use when calling readMoreEntries()

@eolivelli eolivelli force-pushed the fix/shared-no-synch-flow-offloader branch from 14d990d to de1cdde Compare July 11, 2022 12:04
@eolivelli eolivelli merged commit 9f70219 into apache:master Jul 13, 2022
@eolivelli eolivelli deleted the fix/shared-no-synch-flow-offloader branch July 13, 2022 09:59
@eolivelli eolivelli added this to the 2.11.0 milestone Jul 13, 2022
gaozhangmin pushed a commit to gaozhangmin/pulsar that referenced this pull request Jul 14, 2022
wuxuanqicn pushed a commit to wuxuanqicn/pulsar that referenced this pull request Jul 14, 2022
dragonls pushed a commit to dragonls/pulsar that referenced this pull request Oct 21, 2022
dragonls pushed a commit to dragonls/pulsar that referenced this pull request Oct 21, 2022
…request !70)

[fix][broker] Do not use IO thread for consumerFlow in Shared subscription (apache#16304)
@dao-jun
Copy link
Member

dao-jun commented Apr 9, 2024

@eolivelli It looks does not make any change, BrokerService#executor also returns I/O threadpool. Is there any potential mechanism?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants