Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumers should not be able to commit during a rebalance #4059

Open
3 of 7 tasks
roxelo opened this issue Nov 9, 2022 · 13 comments · May be fixed by #4908
Open
3 of 7 tasks

Consumers should not be able to commit during a rebalance #4059

roxelo opened this issue Nov 9, 2022 · 13 comments · May be fixed by #4908

Comments

@roxelo
Copy link

roxelo commented Nov 9, 2022

Description

Now that librdkafka supports cooperative sticky partitions assignment strategy, we should ensure that consumers that commit offsets manually can’t commit offsets during rebalance as it triggers a follow up rebalance.

I don’t think there is any valid use cases for allowing this type of behavior:

  1. This behavior could lead to an infinite amount of rebalances. Rebalances will only stop once none of the consumers attempt to commit offsets during a rebalance.
  2. The Java Kafka Consumer lib does not appear to allow clients to commit offsets during a rebalance
  3. Auto commit callback in librdkafka does not commit offsets when there is an ongoing rebalance

I believe this issue with manual auto commit is isolated to cooperative sticky strategy because when a consumer uses the eager strategy, a rebalance starts with all the partitions being revoked and ends when new partitions have been assigned to the consumer. As a result, the consumer will never attempt to commit offsets because there are no offsets to be committed. Of course, if the consumer uses cooperative sticky, we can’t ensure that the consumer won’t attempt to commit offsets during a rebalance as the consumer might still own partitions during a rebalance. Furthermore, clients have no way of knowing that a rebalance is ongoing or not and so they can’t prevent consumers from committing offsets when necessary.

I see three potential solutions to this problem, but I think the first one makes the most sense to implement:

  1. Add the same check that exists in the auto commit callback in rd_kafka_commit. We can throw an error to the client to let them know why we did not attempt to commit offsets
  2. Provide a new endpoint to librdkafka clients that allows them to check whether a rebalance is ongoing or not, so they are able to prevent consumers from committing offsets on their end
  3. Provided the option to client (by passing a flag of some sort) to decide whether they want rd_kafka_commit to not attempt to commit offsets during rebalances

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): v1.8.2
  • Apache Kafka version: v2.7.1
  • librdkafka client configuration:
"partition.assignment.strategy": "cooperative-sticky"
"enable.auto.commit": false,
  • Operating system: <REPLACE with e.g., Centos 5 (x64)>
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

edenhill commented Nov 9, 2022

I think the root of problem lies in OffsetCommit response error handling, we're probably getting a ERR_ILLEGAL_GENERATION back from the coordinator at this point, which causes us to rejoin the group - thus the rebalance.
Even if we add a rebalancing check to commit() it is possible for us to receive this error anyway, due to timing, so I think it might be better to also improve the error handling here and not trigger a rejoin if the group is rebalancing.

@roxelo
Copy link
Author

roxelo commented Nov 9, 2022

Agreed, improving the error handling would make a great difference as well. I haven't been able to confirm this yet, but I believe a new rebalance is also triggered when the error is ERR_REBALANCE_IN_PROGRESS

@mironovdm
Copy link

It would be great if this behaviour were documented somewhere in the wiki or in the config description. We spent some efforts to understand what the heck is going on and why the cooperative-sticky rebalance is not working as expected and leads to endless rebalances.

We use confluent_kafka for python. I tried to implement a workaround using on_assign/on_revoke callbacks and prevent manual commits for some time when these callbacks are triggered. But noticed the group generation id can be changed even before these callbacks are triggered. As I understand currently with confluent_kafka python there is now way to know for sure the rebalance is in progress and we can't use manual commits with cooperative-sticky. Also I noticed when auto commit enabled librdkafka can send OffsetCommitRequests when rebalance is in progress and it doesn't lead to invalid group generation id error. I see this offset commit happens between the on_revoke callback triggered in one consumer and on_assign in another.

so I think it might be better to also improve the error handling here and not trigger a rejoin if the group is rebalancing.

Is not this contradicts to the behavior described in this KIP document? It says:

If received UNKNOWN_MEMBER_ID or ILLEGAL_GENERATION from join-group / sync-group / commit / heartbeat response: reset generation / clear member-id correspondingly, call rebalance listener's onPartitionsLost for all the partition and then re-join group with empty assigned partition.

@wmorgan6796
Copy link

Just adding an additional data point that we see this exact behavior with our setup in kafka. I can try to take a crack at fixing this in my off time, but wondering if there is anyone on Confluent side working on this @milindl @emasab since if you're using manual offset commits, you cannot use Cooperative rebalancing today in any context.

@scanterog
Copy link

is there any ongoing work for this?

@wmorgan6796
Copy link

@scanterog see #4220 , but in short, I think that this issue has dropped off the radar. Unfortunately the fix that @roxelo created (and I attempted to get merged) broke other behavior in librdkafka. @milindl said the librdkafka team was looking into this internally, but unfortunately from my side there are few things I can do as my place of work doesn't pay for confluent support and therefore we have no sway on their roadmap. If anyone who is interested in getting this more attention on the confluent side and is a paying confluent customer I highly recommend they push via their TAMs to get traction on this.

@massimeddu-sj
Copy link

Same issue here. Hopefully this will be prioritized by Confluent because it makes cooperative-stiky unusable with manual commits.

@emasab
Copy link
Contributor

emasab commented Aug 27, 2024

Commits are possible during a rebalance, before a partition is revoked the user can commit offsets for the revoked partitions in the rebalance callback, that is possible in Java client too.
What isn't needed is to avoid a re-join when the ILLEGAL_GENERATION error happens, as proposed by Magnus.

@massimeddu-sj
Copy link

Commits are possible during a rebalance, before a partition is revoked the user can commit offsets for the revoked partitions in the rebalance callback, that is possible in Java client too. What isn't needed is to avoid a re-join when the ILLEGAL_GENERATION error happens, as proposed by Magnus.

Thanks for looking into this @emasab. Any hope to see this sorted out soon?

Thank you!

@emasab
Copy link
Contributor

emasab commented Aug 28, 2024

@massimeddu-sj at the moment there's a strategy that should prevent this error during commit and a subsequent rebalance.
It requires you have a rebalance callback set, but that is often used when using manual offset management. The consumer should always commit in the consume loop and only when a message or an EOF is returned.
Given between the first and second phase of the cooperative rebalance the revoke callback is called, it returns from the consume call after calling it with no messages. Then second rejoin is started that increases the generation id, there is a second assignment callback with no message returning from the consume call and finally messages start coming again. If you commit manually only when you have a new message or receive a partition EOF, that error should be avoided.

@massimeddu-sj
Copy link

@massimeddu-sj at the moment there's a strategy that should prevent this error during commit and a subsequent rebalance. It requires you have a rebalance callback set, but that is often used when using manual offset management. The consumer should always commit in the consume loop and only when a message or an EOF is returned. Given between the first and second phase of the cooperative rebalance the revoke callback is called, it returns from the consume call after calling it with no messages. Then second rejoin is started that increases the generation id, there is a second assignment callback with no message returning from the consume call and finally messages start coming again. If you commit manually only when you have a new message or receive a partition EOF, that error should be avoided.

Thank you very much for the additional information @emasab . Unfortunately I'm not too familiar with the rebalance/revoke callbacks, and I'm not too confident on overriding the default implementation. It would be great if you are able to share any snippet or implementation example.

My current implementation is actually quite simple:

consumer_conf = {
    [...]
    'enable.auto.commit': False,
    'partition.assignment.strategy': 'cooperative-sticky'
}

kafka_consumer = Consumer(consumer_conf, logger=logger)

while True:
    message = kafka_consumer.poll()

    if message is None: continue

    if message.error():
        if message.error().code() == KafkaError._PARTITION_EOF:
            # End of partition event
            logger.error('%% %s [%d] reached end at offset %d\n' %
                        (message.topic(), message.partition(), message.offset()))
        else:
            raise KafkaConsumerException(Exception(message.error()))

    message_handler(message)

    kafka_consumer.commit(asynchronous=False)

golanbz added a commit to golanbz/kafkaflow that referenced this issue Sep 2, 2024
…world" scenarios (e.g., cooperative sticky).

* Enabled automatic committing with `confluent auto commit: true` instead of relying solely on manual commits, but only when the consumer strategy is cooperative sticky. (Refer to the open librdkafka issue at confluentinc/librdkafka#4059).
golanbz added a commit to golanbz/kafkaflow that referenced this issue Sep 2, 2024
…world" scenarios (e.g., cooperative sticky). Fixes issue Farfetch#557 and Fixes issue Farfetch#456

* Enabled automatic committing with `confluent auto commit: true` instead of relying solely on manual commits, but only when the consumer strategy is cooperative sticky. (Refer to the open librdkafka issue at confluentinc/librdkafka#4059).
@sjportalatin
Copy link

Hi team, for anyone still looking for a workaround. It seems like we have been able to get around this issue by using Kafka autocommit, as suggested, however we still want control over when offsets are committed, so we used store_offsets() to mark them ready for commit. This requires setting enable.auto.offset.store to False and enable.auto.commit to True. In our code, anywhere we would use commit() we now use store_offsets([offsets]) and let Kafka commit automatically using auto.commit.interval.ms.

This way, the cooperative-sticky assignment does not trigger endless rebalances and we also have control over which offsets to commit. Hope this helps!

@emasab
Copy link
Contributor

emasab commented Nov 20, 2024

If you commit manually only when you have a new message or receive a partition EOF, that error should be avoided.

TL;DR: manual commits during a rebalance will be sent but they won't cause an additional rebalance or losing the assignment.

It can happen that the consumer receives a message after the first rejoin of the cooperative incremental assignment, in that case the partitions are resumed, and only later the second rejoin is done to redistribute the revoked partitions.

The message can be committed manually, causing an ILLEGAL_GENERATION error.
That is possible, but must not cause it to lose the assignment while rebalance is in progress, as in this Java client code.

Generation id was incorrectly reset to -1 on OffsetCommit ILLEGAL_GENERATION error and, immediately after that, the second SyncGroup fails with same error because of the wrong generation id, this time causing lost partitions.

Differences from Java client: it avoids sending the commit and the exception is a RebalanceInProgressException instead.
As users are already expecting an ILLEGAL_GENERATION error, in librdkafka we leave this change to the error code for 3.x.

@emasab emasab added the bug label Nov 20, 2024
airlock-confluentinc bot pushed a commit that referenced this issue Nov 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants