Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix subscription topic name error. #16719

Merged
merged 6 commits into from
Jul 23, 2022

Conversation

shibd
Copy link
Member

@shibd shibd commented Jul 21, 2022

#16374 #16556 #16375 #16599

Motivation

This is a bug, after [PIP-145], PatternMultiTopicsConsumer can receive CommandWatchTopicUpdate from broker to subscribe new topic.

But this topic name is with the partition index(case: public/default/test-topic-partition-0), This will produce unexpected behavior when executing the following subscribe method.

client.getPartitionedTopicMetadata(topicName)
.thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions,
createTopicIfDoesNotExist))
.exceptionally(ex1 -> {
log.warn("[{}] Failed to get partitioned topic metadata: {}", fullTopicName, ex1.getMessage());
subscribeResult.completeExceptionally(ex1);
return null;
});

The reasons for these failed unit tests are:

When the client receives the CommandWatchTopicUpdate command, it will enter the logic of processing non-partitions, then only process the consumer collection and not the partitionedTopics collection

} else {
allTopicPartitionsNumber.incrementAndGet();
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
consumers.compute(topicName, (key, existingValue) -> {
if (existingValue != null) {
String errorMessage = String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. "
+ "Topic is already being subscribed for in other thread.", topic, topicName);
log.warn(errorMessage);
subscribeResult.completeExceptionally(new PulsarClientException(errorMessage));
return existingValue;
} else {
internalConfig.setStartPaused(paused);
ConsumerImpl<T> newConsumer = createInternalConsumer(internalConfig, topicName,
-1, subFuture, createIfDoesNotExist, schema);
synchronized (pauseMutex) {
if (paused) {
newConsumer.pause();
} else {
newConsumer.resume();
}
}
return newConsumer;
}
});

Then, when we execute the consumer1.run(consumer1.getRecheckPatternTimeout()) to trigger the update subscription on the unit test, By executing the following logic, will cause topic-4 to be removed and added at the same time. ultimately destabilizes unit tests (maybe deletes will be executed after additions)

final List<String> oldTopics = new ArrayList<>(getPartitionedTopics());
for (String partition : getPartitions()) {
TopicName topicName = TopicName.get(partition);
if (!topicName.isPartitioned() || !oldTopics.contains(topicName.getPartitionedTopicName())) {
oldTopics.add(partition);
}
}

Print log here:
image

Can See:

2022-07-21T19:43:35,883 - INFO  - [pulsar-client-io-397-1:PatternMultiTopicsConsumerImpl@154] - debug add: [persistent://my-property/my-ns/pattern-topic-4-AutoSubscribePatternConsumer]
2022-07-21T19:43:35,883 - INFO  - [pulsar-client-io-397-1:PatternMultiTopicsConsumerImpl@155] - debug remove: [persistent://my-property/my-ns/pattern-topic-4-AutoSubscribePatternConsumer-partition-0, persistent://my-property/my-ns/pattern-topic-4-AutoSubscribePatternConsumer-partition-1, persistent://my-property/my-ns/pattern-topic-4-AutoSubscribePatternConsumer-partition-2, persistent://my-property/my-ns/pattern-topic-4-AutoSubscribePatternConsumer-partition-3]

Modifications

  • Change subscribe logic, use the getPartitionedTopicName().

Documentation

  • doc-not-needed
    (Please explain why)

@shibd
Copy link
Member Author

shibd commented Jul 21, 2022

@codelipenghui @gaozhangmin @Technoboy- @nodece Please help review.

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

@codelipenghui codelipenghui added this to the 2.11.0 milestone Jul 21, 2022
@codelipenghui codelipenghui added type/bug The PR fixed a bug or issue reported a bug component/client-java labels Jul 21, 2022
@shibd shibd changed the title [fix][client] Fix topic name subscribe error. [fix][client] Fix subscription topic name error. Jul 21, 2022
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jul 21, 2022
Copy link
Member

@mattisonchao mattisonchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

@mattisonchao
Copy link
Member

@andrasbeni Could you help to review it?

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a unit test to protect this change?

@codelipenghui codelipenghui added the release/blocker Indicate the PR or issue that should block the release until it gets resolved label Jul 22, 2022
@shibd
Copy link
Member Author

shibd commented Jul 22, 2022

Could you add a unit test to protect this change?

@BewareMyPower Thanks for your reminder, I looked again, and It is found that we cannot be modify the logic in the MultiTopicsConsumerImpl.subscribeAsync method. Because users may use consumer like this:

        client.newConsumer().topic(
                        "persistent://public/dafault/test-topic1-partition-0",
                        "persistent://public/dafault/test-topic2-partition-1"
         );

If force subscription to PartitionTopic inside the method, Then all partitions of test-topic1 and test-topic2 will be subscribed. This is a breaking change.

So, I revert the original changes.

The new changes are: Inside the PatternTopicsChangedListener.onTopicsAdded method let it use partitioned name to subscribe.

@codelipenghui @mattisonchao @Technoboy- @nodece @BewareMyPower Sorry, please help review again.

@codelipenghui
Copy link
Contributor

The new changes are: Inside the PatternTopicsChangedListener.onTopicsAdded method let it use partitioned name to subscribe.

Make sense.

@shibd shibd force-pushed the fix_subscribe_topic_name branch from 45e8938 to a17c5e9 Compare July 22, 2022 08:49
@shibd shibd force-pushed the fix_subscribe_topic_name branch from a17c5e9 to 2d4d36a Compare July 22, 2022 08:53
…ernMultiTopicsConsumerImpl.java

Co-authored-by: Yunze Xu <[email protected]>
@shibd
Copy link
Member Author

shibd commented Jul 23, 2022

/pulsarbot run-failure-checks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client cherry-picked/branch-2.11 doc-not-needed Your PR changes do not impact docs release/blocker Indicate the PR or issue that should block the release until it gets resolved type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants