Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correlation IDs do not match: kafka python offset commit issues #1529

Closed
charankooram opened this issue Jun 22, 2018 · 6 comments
Closed

Correlation IDs do not match: kafka python offset commit issues #1529

charankooram opened this issue Jun 22, 2018 · 6 comments

Comments

@charankooram
Copy link

charankooram commented Jun 22, 2018

I am using kafka python for my stream and consistently run into this issue on trying to commit offsets
using the commit_async api. The log below is caught on the error call back function. Could someone please point me to the proper way to triage this issue.

2018-06-21 15:05:46,285 ERROR 112:kafka_qa_ingestion.py(6145) - Exception happened committing offset - CorrelationIdError: <BrokerConnection node_id=1 host=messagebroker1.test3.companycom/10.174.11.5 port=9092>: Correlation IDs do not match: sent 4792, recv 4791

I was hoping this would resolve itself since it looked like a low level thing. But after a while the producer just crashes and i'd love any help to figure this out.

Kafka python version: 1.3.5
Kafka version : 0.11

@charankooram
Copy link
Author

charankooram commented Jun 22, 2018

""" code snippets and code configurations """

self.consumer = 
KafkaConsumer(
self.topic, 
group_id=consumer_grp_name, 
bootstrap_servers=[source], 
auto_offset_reset="earliest", 
enable_auto_commit=False, 
max_in_flight_requests_per_connection= 16000, 
max_poll_records=500
)
self.producer = 
KafkaProducer(
bootstrap_servers=[destination], 
max_in_flight_requests_per_connection=16000
)
def on_send_success(record_metadata, topic_partition, offset_and_meta_data):
            def on_success_offset_commit(args):
                self.logger.debug("Offset successfully committed - {}".format(args))
            def on_err_offset_commit(ex):
                self.logger.error("Exception happened committing offset - {}".format(ex))
            self.logger.debug("{},{},{}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
            self.consumer.commit_async(offsets={topic_partition: offset_and_meta_data}).add_callback(
                 on_success_offset_commit).add_errback(on_err_offet_commit)

def on_send_error(excp):
            self.logger.error("Exception happened sending messages to destination kafka broker", exc_info=excp)
            self._send_alert(excp)
self.producer.send(topic=self.topic, value=record.value).add_errback(on_send_error).add_callback(on_send_success, topic_partition=topic_partition, offset_and_meta_data=OffsetAndMetadata(offset=record.offset, metadata=record.offset))

@charankooram
Copy link
Author

can someone please address this issue and please point me to a way to resolve this. I notice that my code is often breaking due to this and i am not confident to move to production until this resolves or i move to someother streaming library.

@isamaru
Copy link
Contributor

isamaru commented Mar 20, 2019

I can reproduce this race condition when using 1.4.5 and doing a manual commit (from the main thread).
It is basically the same thing that was fixed as #1733 but it is happening in protocol/parser.py instead of conn.py.

It is not critical as the worker recovers on its own, but I would still like to fix it.

@dpkp do you agree that we can apply the same technique (changing IFR queue to dict) that you did in conn.py in protocol/parser.py?

@dpkp
Copy link
Owner

dpkp commented Mar 20, 2019

@isamaru this is a different issue than #1744 . protocol/parser.py did not exist in the 1.3.5 release. 1.4 includes a substantial refactor of the protocol layer. Anyone seeing Correlation ID mismatch on 1.3.5 should simply upgrade to 1.4.

@dpkp dpkp closed this as completed Mar 20, 2019
@isamaru
Copy link
Contributor

isamaru commented Mar 20, 2019

@dpkp should I open a new issue for 1.4.5? Or I can try to fix it in one patch with #1744, even though it's a separate issue.

@dpkp
Copy link
Owner

dpkp commented Mar 20, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants