Skip to content

Consumer intermittently hangs when committing offsets #1728

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jeffwidman opened this issue Mar 5, 2019 · 8 comments
Closed

Consumer intermittently hangs when committing offsets #1728

jeffwidman opened this issue Mar 5, 2019 · 8 comments

Comments

@jeffwidman
Copy link
Contributor

jeffwidman commented Mar 5, 2019

There is some sort of deadlock that we are intermittently hitting within kafka-python when our applications are calling commit(). The consumer will drop out of the group w/o the process actually dying and the only fix is to restart the process.

This is hurting us badly, we are having to restart multiple applications every six hours. However, I have not yet been able to reliably reproduce what's happening. Here is what I have so far.

The code is wrapped in a thin-layer, so the pseudo code/configs may be slightly named differently, but it should be relatively clear what's happening.

Kafka Broker Version: 1.0.1

kafka-python Version: commit eed59ba (this is newer than 1.4.4, as I wanted to be sure we picked up the latest deadlock fix from a4f0cb8)

KafkaConsumer config:

{
    "max_poll_records": 5,
    "connections_max_idle_ms":540000,
    "request_timeout_ms":305000,
    "session_timeout_ms":10000,
    "heartbeat_interval_ms":3000,
    "auto_offset_reset": "earliest"
}

Application pseudo-code:

while True:
    logging.info("About to call consumer.poll for new messages")
    messages = consumer.poll()
    logging.info("Finished consumer.poll, now process_messages")
    process_messages(messages)
    logging.info("Finished process_messages, now committing new offsets")
    consumer.commit()

I enabled debug logging for kafka-python, and noticed a difference between healthy and unhealthy logs.

Here are healthy logs:

2019-03-05 00:55:35,422     INFO __main__                 ns_consumer.py:0213 NSLog: Finished process_messages, now committing new offsets
# here the application calls consumer.commit()
2019-03-05 00:55:35,422    DEBUG    kafka.coordinator.consumer      consumer.py:0616 NSLog: Sending offset-commit request with {TopicPartition(topic=u'secret_topic', partition=54): OffsetAndMetadata(offset=11822203, metadata='')} for group secret_group.ns_consumer to 5
2019-03-05 00:55:35,422    DEBUG    kafka.protocol.parser           parser.py:0053 NSLog: Sending request OffsetCommitRequest_v2(consumer_group='secret_group.ns_consumer', consumer_group_generation_id=3238, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=54, offset=11822203, metadata='')])])
2019-03-05 00:55:35,423    DEBUG    kafka.conn                      conn.py:0768 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Request 401: OffsetCommitRequest_v2(consumer_group='secret_group.ns_consumer', consumer_group_generation_id=3238, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=54, offset=11822203, metadata='')])])
2019-03-05 00:55:35,426    DEBUG    kafka.protocol.parser           parser.py:0133 NSLog: Received correlation id: 401
2019-03-05 00:55:35,426    DEBUG    kafka.protocol.parser           parser.py:0160 NSLog: Processing response OffsetCommitResponse_v2
2019-03-05 00:55:35,427    DEBUG    kafka.conn                      conn.py:0816 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Response 401 (3.74102592468 ms): OffsetCommitResponse_v2(topics=[(topic=u'secret_topic', partitions=[(partition=54, error_code=0)])])
2019-03-05 00:55:35,427    DEBUG    kafka.coordinator.consumer      consumer.py:0637 NSLog: Group secret_group.ns_consumer committed offset OffsetAndMetadata(offset=11822203, metadata='') for partition TopicPartition(topic=u'secret_topic', partition=54)
# offset successfully committed, control returns to the application which calls poll() again
2019-03-05 00:55:35,427     INFO    __main__                        ns_consumer.py:0195 NSLog: About to call consumer.poll for new messages

Here are the stuck logs:

2019-03-05 00:55:51,023     INFO    __main__                        ns_consumer.py:0213 NSLog: Finished process_messages, now committing new offsets
# here the application calls consumer.commit()
2019-03-05 00:55:51,023    DEBUG    kafka.coordinator.consumer      consumer.py:0616 NSLog: Sending offset-commit request with {TopicPartition(topic=u'secret_topic', partition=54): OffsetAndMetadata(offset=11822204, metadata='')} for group secret_group.ns_consumer to 5
2019-03-05 00:55:51,023    DEBUG    kafka.protocol.parser           parser.py:0053 NSLog: Sending request OffsetCommitRequest_v2(consumer_group='secret_group.ns_consumer', consumer_group_generation_id=3238, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=54, offset=11822204, metadata='')])])
2019-03-05 00:55:51,024    DEBUG    kafka.conn                      conn.py:0768 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Request 402: OffsetCommitRequest_v2(consumer_group='secret_group.ns_consumer', consumer_group_generation_id=3238, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=54, offset=11822204, metadata='')])])
2019-03-05 00:55:51,034    DEBUG    kafka.protocol.parser           parser.py:0133 NSLog: Received correlation id: 402
2019-03-05 00:55:51,034    DEBUG    kafka.protocol.parser           parser.py:0160 NSLog: Processing response OffsetCommitResponse_v2
2019-03-05 00:55:51,034    DEBUG    kafka.conn                      conn.py:0816 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Response 402 (10.4370117188 ms): OffsetCommitResponse_v2(topics=[(topic=u'secret_topic', partitions=[(partition=54, error_code=0)])])
2019-03-05 00:32:22,882    DEBUG    kafka.coordinator                        base.py:0801 NSLog: Heartbeat: secret_group.ns_consumer[33844] secret_id
2019-03-05 00:32:22,883    DEBUG    kafka.protocol.parser                      parser.py:0053 NSLog: Sending request HeartbeatRequest_v0(group='secret_group.ns_consumer', generation_id=33844, member_id=u'secret_id')
2019-03-05 00:32:22,883    DEBUG    kafka.conn                        conn.py:0768 NSLog: <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Request 1406: HeartbeatRequest_v0(group='secret_group.ns_consumer', generation_id=33844, member_id=u'secret_id')
2019-03-05 00:32:22,884    DEBUG    kafka.protocol.parser                      parser.py:0133 NSLog: Received correlation id: 1406
2019-03-05 00:32:22,884    DEBUG    kafka.protocol.parser                      parser.py:0160 NSLog: Processing response HeartbeatResponse_v0
2019-03-05 00:32:22,884    DEBUG    kafka.conn                        conn.py:0816 NSLog: <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Response 1406 (1.1990070343 ms): HeartbeatResponse_v0(error_code=0)
2019-03-05 00:32:22,885    DEBUG    kafka.coordinator                        base.py:0814 NSLog: Received successful heartbeat response for group secret_group.ns_consumer
# Logs continue to show heartbeating until the heartbeat timer expires, then the heartbeat thread issues a LeaveGroupRequest
# Also within the rest of the logs are intermittent metadata calls. I'm not sure which thread is issuing them, for some reason the python logging formatter wasn't printing any info for `thread`/`threadName`

Diff'ing the two, it appears that something is breaking between the following two lines:

2019-03-05 00:55:35,427    DEBUG    kafka.conn                      conn.py:0816 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Response 401 (3.74102592468 ms): OffsetCommitResponse_v2(topics=[(topic=u'secret_topic', partitions=[(partition=54, error_code=0)])])
2019-03-05 00:55:35,427    DEBUG    kafka.coordinator.consumer      consumer.py:0637 NSLog: Group secret_group.ns_consumer committed offset OffsetAndMetadata(offset=11822203, metadata='') for partition TopicPartition(topic=u'secret_topic', partition=54)

I am not sure if the problem is in the main thread or the background heartbeat thread:

  • The background heartbeat thread might be getting swapped in and then takes a lock and never lets go.
  • The main thread could be getting stuck and simply sits idle, in which case the background thread is working as expected.
  • The background thread might be reading the offset commit response in which case the main thread never receives it... I think this unlikely, but the python logging formatter wasn't printing anything for thread/threadID, so I cannot completely rule this out yet. I assume if this happened that the main thread would have thrown an error timing out the offset commit request, but I have not yet confirmed that the code works that way.
@jeffwidman
Copy link
Contributor Author

jeffwidman commented Mar 11, 2019

I upgraded a production box to 8c07925 and processes still intermittently hang.

However, when the application calls consumer.commit(), the logs are now more insightful:

  1. Two exceptions are printed... the main thread is getting the HeartbeatResponse and trying to process it as an OffsetCommitResponse, followed by the main thread processing the OffsetCommitResponse as if it were a HeartbeatResponse.
  2. The main thread is getting all responses after the exceptions are handled. At the bottom of the failure, notice that the background heartbeat thread continues to fire heartbeats, and then the main thread is handling the heartbeat responses. I checked, and in normal operation, the background heartbeat thread handles the heartbeat responses.

Raw logs of a failure:

2019-03-08 21:27:22,821    DEBUG secret_hostname 26934  140614132643584 MainThread  897 kafka.coordinator.consumer                    consumer.py:0616 Sending offset-commit request with {TopicPartition(topic=u'secret_topic', partition=192): OffsetAndMetadata(offset=11742039, metadata='')} for group secret_group to 3
2019-03-08 21:27:22,821    DEBUG secret_hostname 26934  140614132643584 MainThread  897 kafka.protocol.parser                      parser.py:0053 Sending request OffsetCommitRequest_v2(consumer_group='secret_group', consumer_group_generation_id=34110, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=192, offset=11742039, metadata='')])])
2019-03-08 21:27:22,823    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat   897 kafka.coordinator                        base.py:0801 Heartbeat: secret_group[34110] secret_id
2019-03-08 21:27:22,824    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat  897 kafka.protocol.parser                      parser.py:0053 Sending request HeartbeatRequest_v0(group='secret_group', generation_id=34110, member_id=u'secret_id')
2019-03-08 21:27:22,824    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat  897   kafka.conn                        conn.py:0760 <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.XXX.XXX.XX', 9092)]> Request 303: HeartbeatRequest_v0(group='secret_group', generation_id=34110, member_id=u'secret_id')
2019-03-08 21:27:22,823    DEBUG secret_hostname 26934  140614132643584 MainThread  897   kafka.conn                        conn.py:0760 <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.XXX.XXX.XX', 9092)]> Request 302: OffsetCommitRequest_v2(consumer_group='secret_group', consumer_group_generation_id=34110, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=192, offset=11742039, metadata='')])])
2019-03-08 21:27:22,854    DEBUG secret_hostname 26934  140614132643584 MainThread  897 kafka.protocol.parser                      parser.py:0133 Received correlation id: 302
2019-03-08 21:27:22,855    DEBUG secret_hostname 26934  140614132643584 MainThread  897 kafka.protocol.parser                      parser.py:0160 Processing response OffsetCommitResponse_v2
2019-03-08 21:27:22,855    DEBUG secret_hostname 26934  140614132643584 MainThread  897   kafka.conn                        conn.py:0827 <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.XXX.XXX.XX', 9092)]> Response 303 (31.4252376556 ms): OffsetCommitResponse_v2(topics=[(topic=u'secret_topic', partitions=[(partition=192, error_code=0)])])
2019-03-08 21:27:22,856    ERROR secret_hostname 26934  140614132643584 MainThread  897 kafka.future                      future.py:0081 Error processing callback
Traceback (most recent call last):
  File "/home/NSKOPE/jeff/kafka-python/kafka/future.py", line 79, in _call_backs
    f(value)
  File "/home/NSKOPE/jeff/kafka-python/kafka/coordinator/base.py", line 811, in _handle_heartbeat_response
    error_type = Errors.for_code(response.error_code)
AttributeError: 'OffsetCommitResponse_v2' object has no attribute 'error_code'
2019-03-08 21:27:22,858    DEBUG secret_hostname 26934  140614132643584 MainThread  897 kafka.protocol.parser                      parser.py:0133 Received correlation id: 303
2019-03-08 21:27:22,858    DEBUG secret_hostname 26934  140614132643584 MainThread  897 kafka.protocol.parser                      parser.py:0160 Processing response HeartbeatResponse_v0
2019-03-08 21:27:22,859    DEBUG secret_hostname 26934  140614132643584 MainThread  897   kafka.conn                        conn.py:0827 <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.XXX.XXX.XX', 9092)]> Response 302 (31.6119194031 ms): HeartbeatResponse_v0(error_code=0)
2019-03-08 21:27:22,859    ERROR secret_hostname 26934  140614132643584 MainThread  897 kafka.future                      future.py:0081 Error processing callback
Traceback (most recent call last):
  File "/home/NSKOPE/jeff/kafka-python/kafka/future.py", line 79, in _call_backs
    f(value)
  File "/home/NSKOPE/jeff/kafka-python/kafka/coordinator/consumer.py", line 629, in _handle_offset_commit_response
    for topic, partitions in response.topics:
AttributeError: 'HeartbeatResponse_v0' object has no attribute 'topics'
2019-03-11 20:57:36,586    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat kafka.coordinator                        base.py:0801 Heartbeat: secret_group[34333] secret_id
2019-03-11 20:57:36,586    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat kafka.protocol.parser                      parser.py:0053 Sending request HeartbeatRequest_v0(group='secret_group', generation_id=34333, member_id=u'secret_id')
2019-03-11 20:57:36,586    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat   kafka.conn                        conn.py:0760 <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.XXX.XXX.XX', 9092)]> Request 312: HeartbeatRequest_v0(group='secret_group', generation_id=34333, member_id=u'secret_id')
2019-03-11 20:57:36,590    DEBUG secret_hostname 26934  140614132643584 MainThread kafka.protocol.parser                      parser.py:0133 Received correlation id: 312
2019-03-11 20:57:36,590    DEBUG secret_hostname 26934  140614132643584 MainThread kafka.protocol.parser                      parser.py:0160 Processing response HeartbeatResponse_v0
2019-03-11 20:57:36,590    DEBUG secret_hostname 26934  140614132643584 MainThread   kafka.conn                        conn.py:0827 <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.XXX.XXX.XX', 9092)]> Response 312 (3.5719871521 ms): HeartbeatResponse_v0(error_code=0)
2019-03-11 20:57:36,591    DEBUG secret_hostname 26934  140614132643584 MainThread kafka.coordinator                        base.py:0814 Received successful heartbeat response for group secret_group
2019-03-11 20:57:58,000    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat kafka.coordinator                        base.py:0801 Heartbeat: secret_group[34333] secret_id
2019-03-11 20:57:58,000    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat kafka.protocol.parser                      parser.py:0053 Sending request HeartbeatRequest_v0(group='secret_group', generation_id=34333, member_id=u'secret_id')
2019-03-11 20:57:58,001    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat   kafka.conn                        conn.py:0760 <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.XXX.XXX.XX', 9092)]> Request 313: HeartbeatRequest_v0(group='secret_group', generation_id=34333, member_id=u'secret_id')
2019-03-11 20:57:58,003    DEBUG secret_hostname 26934  140614132643584 MainThread kafka.protocol.parser                      parser.py:0133 Received correlation id: 313
2019-03-11 20:57:58,003    DEBUG secret_hostname 26934  140614132643584 MainThread kafka.protocol.parser                      parser.py:0160 Processing response HeartbeatResponse_v0
2019-03-11 20:57:58,003    DEBUG secret_hostname 26934  140614132643584 MainThread   kafka.conn                        conn.py:0827 <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.XXX.XXX.XX', 9092)]> Response 313 (1.34110450745 ms): HeartbeatResponse_v0(error_code=0)
2019-03-11 20:57:58,004    DEBUG secret_hostname 26934  140614132643584 MainThread kafka.coordinator                        base.py:0814 Received successful heartbeat response for group secret_group

Raw logs of a successful background heartbeat--note that the background thread both issues the request and handles the response:

2019-03-11 20:54:33,470    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat kafka.coordinator                        base.py:0801 Heartbeat: secret_group[34333] secret_id
2019-03-11 20:54:33,471    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat kafka.protocol.parser                      parser.py:0053 Sending request HeartbeatRequest_v0(group='secret_group', generation_id=34333, member_id=u'secret_id')
2019-03-11 20:54:33,472    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat   kafka.conn                        conn.py:0760 <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.XXX.XXX.XX', 9092)]> Request 13: HeartbeatRequest_v0(group='secret_group', generation_id=34333, member_id=u'secret_id')
2019-03-11 20:54:33,511     INFO secret_hostname 26934  140614132643584 MainThread <application logs stome stuff>
2019-03-11 20:54:33,626    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat kafka.protocol.parser                      parser.py:0133 Received correlation id: 13
2019-03-11 20:54:33,626    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat kafka.protocol.parser                      parser.py:0160 Processing response HeartbeatResponse_v0
2019-03-11 20:54:33,626    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat   kafka.conn                        conn.py:0827 <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.XXX.XXX.XX', 9092)]> Response 13 (154.263019562 ms): HeartbeatResponse_v0(error_code=0)
2019-03-11 20:54:33,627    DEBUG secret_hostname 26934  140613456471808 secret_group-heartbeat kafka.coordinator                        base.py:0814 Received successful heartbeat response for group secret_group

@tvoinarovskyi
Copy link
Collaborator

tvoinarovskyi commented Mar 11, 2019

Emm seems like an out of order delivery of packets. As you can see above you got OffsetCommit in heartbeat thread. Maybe related to change on async send.

@jeffwidman
Copy link
Contributor Author

jeffwidman commented Mar 11, 2019

I started logging the thread names/IDs (should have done this earlier, but this code has some messy custom logging attached so not easy to do), and now this feels even more weird.

When this bug happens, the foreground thread starts receiving all responses, including heartbeat responses. But the root cause isn't the foreground thread taking a permanent lock, because the background heartbeat thread continues to emit background heartbeat requests, it just doesn't receive them.

@dpkp
Copy link
Owner

dpkp commented Mar 12, 2019 via email

@jeffwidman
Copy link
Contributor Author

jeffwidman commented Mar 12, 2019

this error means that the connections in-flight-requests
queue is getting out of sync w/ the actual in flight requests.

Interesting, I had assumed it meant the problem was the wrong callback was getting assigned to the OffsetRequest future. The logs seemed to indicate the response was getting correctly decoded/processed until shortly before the callback _handle_offset_commit_response() was supposed to be invoked. I spent a good portion of this afternoon pursuing that rabbit hole.

FWIW, because the stack is asynchronous and communicates via futures -- it should
not matter which thread actually processes the response.

Thanks for clarifying. That makes sense.

So the overall hypothesis of requests being mis-ordered due to concurrent send() calls sounds plausible. However, am I missing something in my mental model of how the response processing works?

I didn't realize the IFR was used when determining how to decode a response--I thought the IFR list was only used to cap the number of in-flight-requests and timeout requests that had been in the queue too long. For response decoding, I assumed client used the kafka protocol API key to decode the rest of the response, then the correlation ID was used to identify the existing request future and fire any callbacks, as well as removing it from the IFR queue. So the response would be fully decoded/processed by the client without regard to any state in the IFR queue. I never actually stepped through the code, this just seemed the intuitive way to do it, so I could easily be overlooking something.

The problem I see with tying the knowledge of how to process a response with the requests position in the IFR queue is what happens if we send two requests to the same broker, and the broker returns the requests out of order? I'm not aware of any promise that the Kafka brokers make about returning requests in order to clients.

For example, we might send an OffsetCommitRequest, followed immediately by a HeartbeatRequest. The OffsetCommit typically must be replicated to 3 brokers, so it's likely to sit in purgatory on the broker, whereas the HeartbeatResponse can come back immediately. But if the OffsetCommit was added to the IFR first, won't that throw an exception when the HeartbeatResponse comes back first and the client tries to decode it as a OffsetCommitResponse?

@dpkp
Copy link
Owner

dpkp commented Mar 12, 2019 via email

@tvoinarovskyi
Copy link
Collaborator

I can confirm. The order is strict as hell) broker will only process 1 request at a time. The quotes and throttling work on top of this design. To avoid the issue for Heartbeats Java client injests another connection specifically for Coordinator requests, so that does not conflict with fetches (not sure where commits are sent thou)

@jeffwidman
Copy link
Contributor Author

jeffwidman commented Mar 12, 2019

If you both say it, then it must be true. 😄

And yeah, now that you mention it, I remember reading the correlation ID is not guaranteed to be unique... so that wouldn't work...

Fixed by #1733.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants