diff --git a/broker.go b/broker.go index 51e34a6f3..7c559cfcb 100644 --- a/broker.go +++ b/broker.go @@ -1135,9 +1135,9 @@ func (b *Broker) decode(pd packetDecoder, version int16) (err error) { if version >= 1 { b.rack, err = pd.getNullableString() - } - if err != nil { - return err + if err != nil { + return err + } } b.addr = net.JoinHostPort(host, fmt.Sprint(port)) diff --git a/consumer_group.go b/consumer_group.go index 02d75089a..f47eac725 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -472,6 +472,9 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) ( if c.config.Version.IsAtLeast(V2_3_0_0) { req.Version = 5 req.GroupInstanceId = c.groupInstanceId + if c.config.Version.IsAtLeast(V2_4_0_0) { + req.Version = 6 + } } meta := &ConsumerGroupMemberMetadata{ diff --git a/join_group_request.go b/join_group_request.go index 3fb41774c..e8cbc46cd 100644 --- a/join_group_request.go +++ b/join_group_request.go @@ -13,6 +13,10 @@ func (p *GroupProtocol) decode(pd packetDecoder) (err error) { return err } p.Metadata, err = pd.getBytes() + if err != nil { + return err + } + _, err = pd.getEmptyTaggedFieldArray() return err } @@ -23,6 +27,7 @@ func (p *GroupProtocol) encode(pe packetEncoder) (err error) { if err := pe.putBytes(p.Metadata); err != nil { return err } + pe.putEmptyTaggedFieldArray() return nil } @@ -93,6 +98,7 @@ func (r *JoinGroupRequest) encode(pe packetEncoder) error { if err := pe.putBytes(metadata); err != nil { return err } + pe.putEmptyTaggedFieldArray() } } else { if err := pe.putArrayLength(len(r.OrderedGroupProtocols)); err != nil { @@ -105,6 +111,7 @@ func (r *JoinGroupRequest) encode(pe packetEncoder) error { } } + pe.putEmptyTaggedFieldArray() return nil } @@ -157,7 +164,8 @@ func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) { r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, protocol) } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (r *JoinGroupRequest) key() int16 { @@ -169,15 +177,28 @@ func (r *JoinGroupRequest) version() int16 { } func (r *JoinGroupRequest) headerVersion() int16 { + if r.Version >= 6 { + return 2 + } return 1 } func (r *JoinGroupRequest) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 5 + return r.Version >= 0 && r.Version <= 6 +} + +func (r *JoinGroupRequest) isFlexible() bool { + return r.isFlexibleVersion(r.Version) +} + +func (r *JoinGroupRequest) isFlexibleVersion(version int16) bool { + return version >= 6 } func (r *JoinGroupRequest) requiredVersion() KafkaVersion { switch r.Version { + case 6: + return V2_4_0_0 case 5: return V2_3_0_0 case 4: diff --git a/join_group_request_test.go b/join_group_request_test.go index eb13c7d2c..0ff5b150a 100644 --- a/join_group_request_test.go +++ b/join_group_request_test.go @@ -99,6 +99,20 @@ var ( 0, 3, 'o', 'n', 'e', // Protocol name 0, 0, 0, 3, 0x01, 0x02, 0x03, // protocol metadata } + + joinGroupRequestV6 = []byte{ + 10, 'T', 'e', 's', 't', 'G', 'r', 'o', 'u', 'p', // Group ID + 0, 0, 0, 100, // Session timeout + 0, 0, 0, 200, // Rebalance timeout + 12, 'O', 'n', 'e', 'P', 'r', 'o', 't', 'o', 'c', 'o', 'l', // Member ID + 4, 'g', 'i', 'd', // GroupInstanceId + 9, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // Protocol Type + 2, // 1 group protocol + 4, 'o', 'n', 'e', // Protocol name + 4, 0x01, 0x02, 0x03, // protocol metadata + 0, // empty tagged fields + 0, // empty tagged fields + } ) func TestJoinGroupRequestV3plus(t *testing.T) { @@ -129,6 +143,26 @@ func TestJoinGroupRequestV3plus(t *testing.T) { }, }, }, + { + "v6", + 6, + joinGroupRequestV6, + &JoinGroupRequest{ + Version: 6, + GroupId: "TestGroup", + SessionTimeout: 100, + RebalanceTimeout: 200, + MemberId: "OneProtocol", + GroupInstanceId: &groupInstanceId, + ProtocolType: "consumer", + GroupProtocols: map[string][]byte{ + "one": {1, 2, 3}, + }, + OrderedGroupProtocols: []*GroupProtocol{ + {Name: "one", Metadata: []byte{1, 2, 3}}, + }, + }, + }, } for _, c := range tests { request := new(JoinGroupRequest) diff --git a/join_group_response.go b/join_group_response.go index 0a8ea4f48..bdabe0a8a 100644 --- a/join_group_response.go +++ b/join_group_response.go @@ -81,8 +81,10 @@ func (r *JoinGroupResponse) encode(pe packetEncoder) error { if err := pe.putBytes(member.Metadata); err != nil { return err } + pe.putEmptyTaggedFieldArray() } + pe.putEmptyTaggedFieldArray() return nil } @@ -121,7 +123,8 @@ func (r *JoinGroupResponse) decode(pd packetDecoder, version int16) (err error) return err } if n == 0 { - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } r.Members = make([]GroupMember, n) @@ -145,9 +148,14 @@ func (r *JoinGroupResponse) decode(pd packetDecoder, version int16) (err error) } r.Members[i] = GroupMember{MemberId: memberId, GroupInstanceId: groupInstanceId, Metadata: memberMetadata} + + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (r *JoinGroupResponse) key() int16 { @@ -159,15 +167,28 @@ func (r *JoinGroupResponse) version() int16 { } func (r *JoinGroupResponse) headerVersion() int16 { + if r.Version >= 6 { + return 1 + } return 0 } func (r *JoinGroupResponse) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 5 + return r.Version >= 0 && r.Version <= 6 +} + +func (r *JoinGroupResponse) isFlexible() bool { + return r.isFlexibleVersion(r.Version) +} + +func (r *JoinGroupResponse) isFlexibleVersion(version int16) bool { + return version >= 6 } func (r *JoinGroupResponse) requiredVersion() KafkaVersion { switch r.Version { + case 6: + return V2_4_0_0 case 5: return V2_3_0_0 case 4: diff --git a/join_group_response_test.go b/join_group_response_test.go index e86602732..7d2c00406 100644 --- a/join_group_response_test.go +++ b/join_group_response_test.go @@ -190,6 +190,32 @@ var ( 0, 3, 'g', 'i', 'd', // GroupInstanceId 0, 0, 0, 3, 1, 2, 3, // Metadata } + + joinGroupResponseV6 = []byte{ + 0, 0, 0, 100, // ThrottleTimeMs + 0x00, 0x00, // No error + 0x00, 0x01, 0x02, 0x03, // Generation ID + 9, 'p', 'r', 'o', 't', 'o', 'c', 'o', 'l', // Protocol name chosen + 4, 'f', 'o', 'o', // Leader ID + 4, 'b', 'a', 'r', // Member ID + 2, // One member info + 4, 'm', 'i', 'd', // memberId + 4, 'g', 'i', 'd', // GroupInstanceId + 4, 1, 2, 3, // Metadata + 0, // empty tagged fields + 0, // empty tagged fields + } + + joinGroupResponseV6MemberIDRequired = []byte{ + 0, 0, 0, 100, // throttle time + 0, 79, // member ID required error + 255, 255, 255, 255, // generation id + 1, // protocol name + 1, // leader id + 4, 'f', 'o', 'o', + 1, // no members + 0, // empty tagged fields + } ) func TestJoinGroupResponse3plus(t *testing.T) { @@ -217,6 +243,38 @@ func TestJoinGroupResponse3plus(t *testing.T) { }, }, }, + { + "v6", + 6, + joinGroupResponseV6, + &JoinGroupResponse{ + Version: 6, + ThrottleTime: 100, + Err: ErrNoError, + GenerationId: 0x00010203, + GroupProtocol: "protocol", + LeaderId: "foo", + MemberId: "bar", + Members: []GroupMember{ + {"mid", &groupInstanceId, []byte{1, 2, 3}}, + }, + }, + }, + { + "v6 member id required", + 6, + joinGroupResponseV6MemberIDRequired, + &JoinGroupResponse{ + Version: 6, + ThrottleTime: 100, + Err: ErrMemberIdRequired, + GenerationId: -1, + GroupProtocol: "", + LeaderId: "", + MemberId: "foo", + Members: nil, + }, + }, } for _, c := range tests { response := new(JoinGroupResponse) diff --git a/real_decoder.go b/real_decoder.go index 9f74b86aa..ddeb5435d 100644 --- a/real_decoder.go +++ b/real_decoder.go @@ -479,7 +479,7 @@ func (rd *realFlexibleDecoder) getStringLength() (int, error) { func (rd *realFlexibleDecoder) getString() (string, error) { length, err := rd.getStringLength() - if err != nil { + if err != nil || length == -1 { return "", err } diff --git a/request_test.go b/request_test.go index f8bd8dc7f..581d08232 100644 --- a/request_test.go +++ b/request_test.go @@ -344,8 +344,7 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { apiKeyOffsetFetch: 6, // up from 5 // TODO: FindCoordinatorRequest v3 is not supported, but expected for KafkaVersion 2.4.0 // apiKeyFindCoordinator: 3, // up from 2 - // TODO: JoinGroupRequest v6 is not supported, but expected for KafkaVersion 2.4.0 - // apiKeyJoinGroup: 6, // up from 6 + apiKeyJoinGroup: 6, // up from 5 apiKeyHeartbeat: 4, // up from 3 apiKeyLeaveGroup: 4, // up from 2 apiKeySyncGroup: 4, // up from 3