Skip to content

Commit

Permalink
[KIP-848] HB Error Code, Partial ack flow, OffsetCommit Request, Resp…
Browse files Browse the repository at this point in the history
…onse and various fixes (#4634)

- Added error handling to ConsumerGroupHeartbeat API
- Added type new errors - UNRELEASED_INSTANCE_ID and UNSUPPORTED_ASSIGNOR
- Added partial acknowledgement flow
- Upgraded OffsetCommit Request and response to v9
- Fixed metadata being called with duplicate topic id
- Fixed next_target_assignment not getting reset to NULL
- Fixed member stuck if fenced during rebalancing
- Fixed segfault with current and target assignment while resetting consumer group
- Fixed segfault due to deleted topic in metadata
- Fixed leave not being called if the consumer without any assignment leaves
  • Loading branch information
pranavrth authored Mar 12, 2024
1 parent 8b0d538 commit 228362d
Show file tree
Hide file tree
Showing 16 changed files with 493 additions and 222 deletions.
78 changes: 39 additions & 39 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1972,45 +1972,45 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
release of librdkafka.


| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ------------------------------| ----------- | ----------------------- |
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 23 | OffsetForLeaderEpoch | 4 | 2 |
| 24 | AddPartitionsToTxn | 4 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 29 | DescribeAcls | 3 | 1 |
| 30 | CreateAcls | 3 | 1 |
| 31 | DeleteAcls | 3 | 1 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 2 |
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 44 | IncrementalAlterConfigs | 1 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| 50 | DescribeUserScramCredentials | 0 | 0 |
| 51 | AlterUserScramCredentials | 0 | 0 |

| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ------------------------------|-----------|----------------|
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 9 | 9 |
| 9 | OffsetFetch | 9 | 9 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 23 | OffsetForLeaderEpoch | 4 | 2 |
| 24 | AddPartitionsToTxn | 4 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 29 | DescribeAcls | 3 | 1 |
| 30 | CreateAcls | 3 | 1 |
| 31 | DeleteAcls | 3 | 1 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 2 |
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 44 | IncrementalAlterConfigs | 1 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| 50 | DescribeUserScramCredentials | 0 | 0 |
| 51 | AlterUserScramCredentials | 0 | 0 |
| 68 | ConsumerGroupHeartbeat | 0 | 0 |

# Recommendations for language binding developers

Expand Down
2 changes: 1 addition & 1 deletion examples/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,4 @@ int main(int argc, char **argv) {
rd_kafka_destroy(rk);

return 0;
}
}
6 changes: 6 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,12 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID, "Broker: Unknown topic id"),
_ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH,
"Broker: The member epoch is fenced by the group coordinator"),
_ERR_DESC(RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID,
"Broker: The instance ID is still used by another member in the "
"consumer group"),
_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR,
"Broker: The assignor or its version range is not supported by "
"the consumer group"),
_ERR_DESC(RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH,
"Broker: The member epoch is stale"),
_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)};
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,12 @@ typedef enum {
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID = 100,
/** The member epoch is fenced by the group coordinator */
RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH = 110,
/** The instance ID is still used by another member in the
* consumer group */
RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID = 111,
/** The assignor or its version range is not supported by the consumer
* group */
RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR = 112,
/** The member epoch is stale */
RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH = 113,
RD_KAFKA_RESP_ERR_END_ALL,
Expand Down
Loading

0 comments on commit 228362d

Please sign in to comment.