-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KIP-848] Feature branch #4610
[KIP-848] Feature branch #4610
Conversation
c034a62
to
330358a
Compare
and response with basic cgrp_consumer flow - Added topic id to topic_partition_t while reading from buffer - Added new methods and way to add topic_id to topic partition private - Added new configs group.protocol and group.remote.assignor - Added ConsumerGroupHeartbeat API Request Contract - Added ConsumerGroupApi Request without handling different cases - Working ConsumerGroupHeartbeat API with proper response - Properly receiving assigned partitions with topic_id from the Response - Added metadata request as well after parsing the response. Separate it out from to the topic partition branch - Added metadata response flow with rko - Updated OffsetFetch to v9 - Removed unrequred fields from ConsumerGroupHeartbeat API to make it work with AK > 3.6 - OffsetFetch working fine. Able to consume. - Changed subsribed list to use correct field - Fixed few memory leaks - Some more memory leak fixes. Added updation to subscribed topics list - Minor changes - [KIP-848] Added new configs group.protocol and group.remote.assignor (#4414) - Added new configs group.protocol and group.remote.assignor - Removed printfs and Updated hardcoded one topic for metadata request - Removed some changes related to removed changes in the protocols - [KIP-848] Added topic id to topic_partition_t while reading from buffer (#4416) - Updating topic name from metadata response for all the requested topic_ids instead of just 1. - Style fixes and fixed skip tag issue in buf_*_topic_partition - Changed variable for next assignment - Added topic name while reading topic partition buffer - Changed variable name from assignments to assignment
- Rebased with master - WIP: assignment, revocation, leave group flow - Remove print statements - Remove print statement left - Separate rd_kafka_cgrp_consumer_assignment_done - Allow changing subscription to empty - Expedite next heartbeat - Static group membership and max poll interval checks - Expedite next heartbeat - Fix existing protocol - Partial implementation of reconciliation and next assignment handling - Uniform tests handling across scripts and KRaft mode - Run tests with group.protocol=consumer and reusable condition to skip mock cluster - Test 0113 partial - Test 0018 - Test 0113 stickyness - Test 0113 complete except regex subscription and u_multiple_subscription_changes(true) - Skip some tests, fix subscription change - Test 0029 exclusion clarified - Debug statements - Introduce current assignment rename rkcg_current_target_assignments to rkcg_target_assignment rename rkcg_next_target_assignments to rkcg_next_target_assignment - change to ConsumerGroupHeartbeat in logs - Add remote assignor to debug log - Fix rd_kafka_buf_write_topic_partitions not using topic ids for comparison
…onse and various fixes (#4634) - Added error handling to ConsumerGroupHeartbeat API - Added type new errors - UNRELEASED_INSTANCE_ID and UNSUPPORTED_ASSIGNOR - Added partial acknowledgement flow - Upgraded OffsetCommit Request and response to v9 - Fixed metadata being called with duplicate topic id - Fixed next_target_assignment not getting reset to NULL - Fixed member stuck if fenced during rebalancing - Fixed segfault with current and target assignment while resetting consumer group - Fixed segfault due to deleted topic in metadata - Fixed leave not being called if the consumer without any assignment leaves
improve documentation of Metadata functions
Metadata cache was cleared on full metadata refresh, leading to unnecessary refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating cache for existing or hinted entries instead of clearing them. Happening since 2.1.0
…g a consumer group (#4678) A metadata call before member joins consumer group, could lead to an `UNKNOWN_TOPIC_OR_PART` error. Solved by updating the consumer group following a metadata refresh only in safe states. Happening since 2.1.0
Metadata refreshes without partition leader change could lead to a loop of metadata calls at fixed intervals. Solved by stopping metadata refresh when all existing metadata is non-stale. Happening since 2.3.0
- rename 'generic' protocol to 'classic' - consumer group serve timer to awake the loop earlier - compare and find into topic partition list by topic id only - fix memory leak when instance creation fails and app_conf is provided - fix cases where HB response is received after unsubscription - use topic name from current assignment if it's missing from metadata - expedite heartbeat simplification and fixes to next interval check and to the place where number of retries is increased - expedite HB after changing state back to INIT - use the CONSUMER_F_WAIT_REJOIN to trigger the rejoin instead of calling it from max poll interval timer - schedule timer for next execution reschedule it if expected earlier expedite through scheduling cgrp serve timer - treat unsupported feature error as fatal - avoid removing partitions not matched by a new subscription immediately, for a possible desynchronization with the coordinator, with said partitions not being consumed anymore
…ch (#4681) - Added new errors to manual commit. - improvements to OffsetCommit and OffsetFetch error code handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Publishing partial comments to facilitate progress.
if (request->rkbuf_reqhdr.ApiVersion >= 1) | ||
rd_kafka_buf_read_throttle_time(rkbuf); | ||
|
||
rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possibly a good idea to parse the error message as well, the field after this, if possible (for more info).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this feels like an improvement in many other places as well. We should take it up as a separate improvement which might involve some design changes in error message parsing as well. I agree though that we atleast log the error message for now.
|
||
int8_t are_assignments_present; | ||
rd_kafka_buf_read_i8(rkbuf, &are_assignments_present); | ||
if (!RD_KAFKAP_STR_IS_NULL(&member_id)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: move this if block up near member id parse
rd_kafka_buf_read_i32(rkbuf, &heartbeat_interval_ms); | ||
|
||
int8_t are_assignments_present; | ||
rd_kafka_buf_read_i8(rkbuf, &are_assignments_present); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Can use rd_kafka_buf_read_bool and rd_bool_t for this instead of i8
…g of the response. (#4691)
A partition migration could happen, using stale metadata, when the partition was undergoing a validation and being retried because of an error. Solved by doing a partition migration only with a non-stale leader epoch. Happening since 2.1.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More in-progress comments
int32_t PartArrayCnt; | ||
char *topic_name; | ||
int j; | ||
|
||
rd_kafka_buf_read_str(rkbuf, &topic); | ||
|
||
// if(ApiVersion >= 9) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: fix comment style and add TODO (whenever we start getting topic ID from broker, we just need to uncomment this to make it work, right?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving for soak-test run
This will be the feature branch for KIP-848 until we complete the implementation.
Each PR will be squashed like with master and finally there will be a fast forward merge.