Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Use dedicated executor for requests in BinaryProtoLookupService #23378

Merged
merged 5 commits into from
Oct 15, 2024

Conversation

nodece
Copy link
Member

@nodece nodece commented Sep 30, 2024

Motivation

I'm testing Pulsar 3.0.6. We have a Pulsar interceptor, which records the data(web request/binary command) to the topic by the producer, but I got this error:

[ZKMetadataStore-12-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.100.102:60796] Created subscription on topic persistent://as/system/__change_events / reader-e710cb132a
[pulsar-io-6-2] WARN org.apache.pulsar.client.impl.ClientCnx - [/192.168.100.102:15002] Got exception java.lang.UnsupportedOperationException
	at org.apache.pulsar.common.protocol.PulsarDecoder.handlePartitionMetadataRequest(PulsarDecoder.java:508)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:134)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)

This indicates the broker sends a PartitionMetadataRequest command to the client, this is a bizarre behavior.

How to reproduce this issue?

  1. Create a producer asynchronously in the org.apache.pulsar.broker.intercept.BrokerInterceptor#onPulsarCommand.
  2. The producer will change the type of pulsar response command, which will be replaced with PARTITION_METADATA.
image

Due to Pulsar's use of ThreadLocal for command creation, the command instances are being unexpectedly modified.

Modifications

  • Add lookup executor for requests in BinaryProtoLookupService.
  • Add shared lookup executor for the broker client.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 30, 2024
@nodece nodece requested review from lhotari and dao-jun September 30, 2024 13:46
@nodece nodece self-assigned this Sep 30, 2024
@lhotari
Copy link
Member

lhotari commented Oct 1, 2024

Due to Pulsar's use of ThreadLocal for command creation, the command instances are being unexpectedly modified.

Thanks for bringing this up. This seems to be a severe bug and it would justify to report it separately. It's easy to miss this PR that this is addressing a critical issue.

@lhotari
Copy link
Member

lhotari commented Oct 1, 2024

Create a producer asynchronously in the org.apache.pulsar.broker.intercept.BrokerInterceptor#onPulsarCommand.

Is this the only case where this can be reproduced?

@lhotari
Copy link
Member

lhotari commented Oct 1, 2024

  1. Create a producer asynchronously in the org.apache.pulsar.broker.intercept.BrokerInterceptor#onPulsarCommand.

@nodece I think that the correct solution would be to call new BaseCommand().copyFrom(command) in the interceptor to make a copy of the command. That's what is needed whenever the thread locals are referenced later.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct solution for keeping a reference in the onPulsarCommand broker interceptor implementation would be to use new BaseCommand().copyFrom(command). That's why I don't think that the solution in the PR directly related to addressing the problem.

@nodece
Copy link
Member Author

nodece commented Oct 2, 2024

Create a producer asynchronously in the org.apache.pulsar.broker.intercept.BrokerInterceptor#onPulsarCommand.

Is this the only case where this can be reproduced?

At present, it looks like this.

@nodece I think that the correct solution would be to call new BaseCommand().copyFrom(command) in the interceptor to make a copy of the command. That's what is needed whenever the thread locals are referenced later.

My interceptor doesn't change the command. Please notice the 1 and 2 in my picture, the broker and client in the same k thread.

  1. org.apache.pulsar.broker.service.PulsarCommandSenderImpl#sendSuccessResponse:
    public void sendSuccessResponse(long requestId) {
        // step-1: caller in the zk thread.
        BaseCommand command = Commands.newSuccessCommand(requestId);
        // step-2: call the interceptor method.
        safeIntercept(command, cnx);
        ByteBuf outBuf = Commands.serializeWithSize(command);
        writeAndFlush(outBuf);
    }

Use the ThreadLocal to create the command in the zk thread.

  1. My intercept method:
    public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
        // caller in the zk thread.
        Producer<byte[]> producer = client.newProducer().topic("my-topic").create();
        producer.send(command.getType().name().getBytes(StandardCharsets.UTF_8));
    }

The produce creation needs to do the lookup and then get partition metadata, which requests the broker by Commands.newLookup and Commands.newPartitionMetadataRequest commands.

We are using the ThreadLocal to create the lookup or partition metadata requests, so the sendSuccessResponse command is broken in the step-2.

The current solution version uses a dedicated executor to create commands and make requests in the client.

Pulsar uses implicit executor in many places, so often we are not sure who the executor is, as a result, this accident occurred, clarifying the executor is not a bad thing.

@lhotari
Copy link
Member

lhotari commented Oct 8, 2024

Pulsar uses implicit executor in many places, so often we are not sure who the executor is, as a result, this accident occurred, clarifying the executor is not a bad thing.

It's very hard to be sure whether it had performance impacts. When an executor is defined, it will add extra overhead of queuing to the other executor and the extra thread switching overhead. It depends a lot on the situation whether this causes performance regressions or not.

@nodece
Copy link
Member Author

nodece commented Oct 9, 2024

Do you have any suggestions or recommendations for resolving this issue?

@lhotari
Copy link
Member

lhotari commented Oct 9, 2024

Do you have any suggestions or recommendations for resolving this issue?

@nodece The changes in this PR LGTM. There's no risk of additional overhead of using the separate executor since the number of ops is relatively low for the operations that it applies to. I'll approve this PR. Thanks for the contribution!

lhotari
lhotari previously approved these changes Oct 9, 2024
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari lhotari dismissed their stale review October 9, 2024 14:53

A lot of tests fail. This is causing problems.

@nodece nodece force-pushed the use-dedicated-executor-lookup branch from f2fdbbf to ae23e52 Compare October 10, 2024 04:02
@nodece nodece requested a review from lhotari October 10, 2024 04:03
@nodece nodece force-pushed the use-dedicated-executor-lookup branch from ae23e52 to df0949b Compare October 10, 2024 04:06
@nodece nodece force-pushed the use-dedicated-executor-lookup branch from df0949b to 4f31a8e Compare October 10, 2024 06:38
@codecov-commenter
Copy link

codecov-commenter commented Oct 10, 2024

Codecov Report

Attention: Patch coverage is 79.06977% with 9 lines in your changes missing coverage. Please review.

Project coverage is 74.32%. Comparing base (bbc6224) to head (4c3db76).
Report is 678 commits behind head on master.

Files with missing lines Patch % Lines
...e/pulsar/client/impl/BinaryProtoLookupService.java 80.95% 3 Missing and 1 partial ⚠️
...rg/apache/pulsar/client/impl/PulsarClientImpl.java 77.77% 4 Missing ⚠️
...n/java/org/apache/pulsar/broker/PulsarService.java 75.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23378      +/-   ##
============================================
+ Coverage     73.57%   74.32%   +0.75%     
- Complexity    32624    34402    +1778     
============================================
  Files          1877     1943      +66     
  Lines        139502   146973    +7471     
  Branches      15299    16191     +892     
============================================
+ Hits         102638   109239    +6601     
- Misses        28908    29302     +394     
- Partials       7956     8432     +476     
Flag Coverage Δ
inttests 27.65% <58.13%> (+3.06%) ⬆️
systests 24.40% <51.16%> (+0.08%) ⬆️
unittests 73.70% <79.06%> (+0.85%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...n/java/org/apache/pulsar/broker/PulsarService.java 84.28% <75.00%> (+1.91%) ⬆️
...e/pulsar/client/impl/BinaryProtoLookupService.java 81.16% <80.95%> (-1.38%) ⬇️
...rg/apache/pulsar/client/impl/PulsarClientImpl.java 75.20% <77.77%> (+0.90%) ⬆️

... and 647 files with indirect coverage changes

@nodece nodece force-pushed the use-dedicated-executor-lookup branch from 2d02348 to e52bd09 Compare October 14, 2024 13:35
@nodece nodece requested a review from lhotari October 14, 2024 13:36
@nodece
Copy link
Member Author

nodece commented Oct 15, 2024

Ping @lhotari

Copy link
Member

@dao-jun dao-jun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@nodece nodece merged commit f98297f into apache:master Oct 15, 2024
52 checks passed
@nodece nodece deleted the use-dedicated-executor-lookup branch October 15, 2024 13:41
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, good work @nodece

nodece added a commit to nodece/pulsar that referenced this pull request Oct 15, 2024
…pService (apache#23378)

Signed-off-by: Zixuan Liu <[email protected]>

(cherry picked from commit f98297f)
Signed-off-by: Zixuan Liu <[email protected]>
nodece added a commit to nodece/pulsar that referenced this pull request Oct 16, 2024
…pService (apache#23378)

Signed-off-by: Zixuan Liu <[email protected]>

(cherry picked from commit f98297f)
Signed-off-by: Zixuan Liu <[email protected]>
nodece added a commit that referenced this pull request Oct 16, 2024
@nodece
Copy link
Member Author

nodece commented Oct 16, 2024

It was cherry-picked to the branch-3.0 by #23461.

@lhotari
Copy link
Member

lhotari commented Oct 16, 2024

It was cherry-picked to the branch-3.0 by #23461.

@nodece Please notify the dev mailing list about this change. This PR could potentially cause regressions so it's better to keep others informed about the change in branch-3.0. I'm not against changing this, but it's hard to know if there are regressions before it has been extensively tested. Are you going to test branch-3.0 with this change?

@nodece
Copy link
Member Author

nodece commented Oct 16, 2024

@lhotari Thanks for your reminder! I sent a notice to the dev mailing list.

Are you going to test branch-3.0 with this change?

I tested this change in a private ecosystem, it works fine.

nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Oct 21, 2024
lhotari pushed a commit that referenced this pull request Oct 21, 2024
…pService (#23378)

Signed-off-by: Zixuan Liu <[email protected]>
(cherry picked from commit f98297f)
lhotari pushed a commit that referenced this pull request Oct 22, 2024
…pService (#23378)

Signed-off-by: Zixuan Liu <[email protected]>
(cherry picked from commit f98297f)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Oct 24, 2024
nodece added a commit to nodece/pulsar that referenced this pull request Dec 5, 2024
…pService (apache#23378)

Signed-off-by: Zixuan Liu <[email protected]>

(cherry picked from commit f98297f)
Signed-off-by: Zixuan Liu <[email protected]>
nodece added a commit to ascentstream/pulsar that referenced this pull request Dec 5, 2024
…pService (apache#23378) (#28)

* [fix][client] Use dedicated executor for requests in BinaryProtoLookupService (apache#23378)

Signed-off-by: Zixuan Liu <[email protected]>

(cherry picked from commit f98297f)
Signed-off-by: Zixuan Liu <[email protected]>

* Fix aircompressor license

Signed-off-by: Zixuan Liu <[email protected]>

---------

Signed-off-by: Zixuan Liu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants