Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug in doCommitOffsets of PulsarKafkaConsumer [Ver: 2.11.0] #48

Open
avinash-fk opened this issue May 4, 2023 · 0 comments · May be fixed by #49
Open

Bug in doCommitOffsets of PulsarKafkaConsumer [Ver: 2.11.0] #48

avinash-fk opened this issue May 4, 2023 · 0 comments · May be fixed by #49

Comments

@avinash-fk
Copy link

Describe the bug

The 2.11.0 version of pulsar-client-kafka-compat has bug that failes commitSync always. This seems to have been introduced in this PR - #37.

if (consumer instanceof MultiTopicsConsumerImpl) {
                msgId = new TopicMessageIdImpl(topicPartition.topic(), tp.topic(), msgId);
}

Now in MultiTopicsConsumerImpl map of the consumer with key as a topic for non-partitioned topic and topicPartitionName for the partitioned topic.

TopicMessageIdImpl takes the partition name as the first argument, but here topic name is passed. In the case of the partitioned topic, the value should have been suffixed with -partition-. But what is passed is just the topic name for which there is no consumer created in MultiTopicConsumerImpl resulting

Caused by: org.apache.pulsar.client.api.PulsarClientException$NotConnectedException: Not connected to broker
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doAcknowledge(MultiTopicsConsumerImpl.java:503)
	at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:650)
	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:616)
	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:601)
	at org.apache.kafka.clients.consumer.PulsarKafkaConsumer.lambda$doCommitOffsets$9(PulsarKafkaConsumer.java:526)
	at java.util.Collections$SingletonMap.forEach(Collections.java:4912)
	at org.apache.kafka.clients.consumer.PulsarKafkaConsumer.doCommitOffsets(PulsarKafkaConsumer.java:517)
@avinash-fk avinash-fk linked a pull request May 4, 2023 that will close this issue
1 task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant