diff --git a/consumer_group.go b/consumer_group.go index 48af3b42e..02d75089a 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -574,6 +574,10 @@ func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, g if c.config.Version.IsAtLeast(V2_3_0_0) { req.Version = 3 req.GroupInstanceId = c.groupInstanceId + // Version 4 is the first flexible version + if c.config.Version.IsAtLeast(V2_4_0_0) { + req.Version = 4 + } } return coordinator.Heartbeat(req) diff --git a/heartbeat_request.go b/heartbeat_request.go index c8b4f02b9..21b99712c 100644 --- a/heartbeat_request.go +++ b/heartbeat_request.go @@ -29,6 +29,7 @@ func (r *HeartbeatRequest) encode(pe packetEncoder) error { } } + pe.putEmptyTaggedFieldArray() return nil } @@ -49,7 +50,8 @@ func (r *HeartbeatRequest) decode(pd packetDecoder, version int16) (err error) { } } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (r *HeartbeatRequest) key() int16 { @@ -61,15 +63,28 @@ func (r *HeartbeatRequest) version() int16 { } func (r *HeartbeatRequest) headerVersion() int16 { + if r.Version >= 4 { + return 2 + } return 1 } func (r *HeartbeatRequest) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 3 + return r.Version >= 0 && r.Version <= 4 +} + +func (r *HeartbeatRequest) isFlexible() bool { + return r.isFlexibleVersion(r.Version) +} + +func (r *HeartbeatRequest) isFlexibleVersion(version int16) bool { + return version >= 4 } func (r *HeartbeatRequest) requiredVersion() KafkaVersion { switch r.Version { + case 4: + return V2_4_0_0 case 3: return V2_3_0_0 case 2: diff --git a/heartbeat_request_test.go b/heartbeat_request_test.go index abcddf1be..7fe924570 100644 --- a/heartbeat_request_test.go +++ b/heartbeat_request_test.go @@ -26,6 +26,20 @@ var ( 0, 3, 'b', 'a', 'z', // Member ID 255, 255, // Group Instance ID } + basicHeartbeatRequestV4_GID = []byte{ + 4, 'f', 'o', 'o', // Group ID + 0x00, 0x01, 0x02, 0x03, // Generation ID + 4, 'b', 'a', 'z', // Member ID + 4, 'g', 'i', 'd', // Group Instance ID + 0, // empty tagged fields + } + basicHeartbeatRequestV4_NOGID = []byte{ + 4, 'f', 'o', 'o', // Group ID + 0x00, 0x01, 0x02, 0x03, // Generation ID + 4, 'b', 'a', 'z', // Member ID + 0, // Group Instance ID + 0, // empty tagged fields + } ) func TestHeartbeatRequest(t *testing.T) { @@ -71,6 +85,30 @@ func TestHeartbeatRequest(t *testing.T) { GroupInstanceId: nil, }, }, + { + "v4_basic", + 4, + basicHeartbeatRequestV4_GID, + &HeartbeatRequest{ + Version: 4, + GroupId: "foo", + GenerationId: 0x00010203, + MemberId: "baz", + GroupInstanceId: &groupInstanceId, + }, + }, + { + "v4_basic", + 4, + basicHeartbeatRequestV4_NOGID, + &HeartbeatRequest{ + Version: 4, + GroupId: "foo", + GenerationId: 0x00010203, + MemberId: "baz", + GroupInstanceId: nil, + }, + }, } for _, c := range tests { testEncodable(t, c.CaseName, c.Message, c.MessageBytes) diff --git a/heartbeat_response.go b/heartbeat_response.go index 0371527c2..d753957c9 100644 --- a/heartbeat_response.go +++ b/heartbeat_response.go @@ -17,6 +17,7 @@ func (r *HeartbeatResponse) encode(pe packetEncoder) error { pe.putInt32(r.ThrottleTime) } pe.putKError(r.Err) + pe.putEmptyTaggedFieldArray() return nil } @@ -33,7 +34,8 @@ func (r *HeartbeatResponse) decode(pd packetDecoder, version int16) error { return err } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (r *HeartbeatResponse) key() int16 { @@ -45,15 +47,28 @@ func (r *HeartbeatResponse) version() int16 { } func (r *HeartbeatResponse) headerVersion() int16 { + if r.Version >= 4 { + return 1 + } return 0 } func (r *HeartbeatResponse) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 3 + return r.Version >= 0 && r.Version <= 4 +} + +func (r *HeartbeatResponse) isFlexible() bool { + return r.isFlexibleVersion(r.Version) +} + +func (r *HeartbeatResponse) isFlexibleVersion(version int16) bool { + return version >= 4 } func (r *HeartbeatResponse) requiredVersion() KafkaVersion { switch r.Version { + case 4: + return V2_4_0_0 case 3: return V2_3_0_0 case 2: diff --git a/heartbeat_response_test.go b/heartbeat_response_test.go index 071810c70..928c518c7 100644 --- a/heartbeat_response_test.go +++ b/heartbeat_response_test.go @@ -19,6 +19,16 @@ var ( 0, 0, 0, 100, 0, byte(ErrFencedInstancedId), } + heartbeatResponseNoError_V4 = []byte{ + 0, 0, 0, 100, + 0, 0, + 0, // empty tagged fields + } + heartbeatResponseError_V4 = []byte{ + 0, 0, 0, 100, + 0, byte(ErrFencedInstancedId), + 0, // empty tagged fields + } ) func TestHeartbeatResponse(t *testing.T) { @@ -57,6 +67,26 @@ func TestHeartbeatResponse(t *testing.T) { ThrottleTime: 100, }, }, + { + "v4_noErr", + 4, + heartbeatResponseNoError_V4, + &HeartbeatResponse{ + Version: 4, + Err: ErrNoError, + ThrottleTime: 100, + }, + }, + { + "v4_Err", + 4, + heartbeatResponseError_V4, + &HeartbeatResponse{ + Version: 4, + Err: ErrFencedInstancedId, + ThrottleTime: 100, + }, + }, } for _, c := range tests { testEncodable(t, c.CaseName, c.Message, c.MessageBytes) diff --git a/request_test.go b/request_test.go index 16932d29c..f8bd8dc7f 100644 --- a/request_test.go +++ b/request_test.go @@ -346,8 +346,7 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { // apiKeyFindCoordinator: 3, // up from 2 // TODO: JoinGroupRequest v6 is not supported, but expected for KafkaVersion 2.4.0 // apiKeyJoinGroup: 6, // up from 6 - // TODO: HeartbeatRequest v4 is not supported, but expected for KafkaVersion 2.4.0 - // apiKeyHeartbeat: 4, // up from 3 + apiKeyHeartbeat: 4, // up from 3 apiKeyLeaveGroup: 4, // up from 2 apiKeySyncGroup: 4, // up from 3 apiKeyDescribeGroups: 5, // up from 3