diff --git a/admin.go b/admin.go index 2a2919b01..8f2a44d69 100644 --- a/admin.go +++ b/admin.go @@ -440,7 +440,10 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error { } // Versions 0, 1, 2, and 3 are the same. - if ca.conf.Version.IsAtLeast(V2_1_0_0) { + // Version 4 is first flexible version. + if ca.conf.Version.IsAtLeast(V2_4_0_0) { + request.Version = 4 + } else if ca.conf.Version.IsAtLeast(V2_1_0_0) { request.Version = 3 } else if ca.conf.Version.IsAtLeast(V2_0_0_0) { request.Version = 2 diff --git a/delete_topics_request.go b/delete_topics_request.go index 76a9fd28a..cb9e9244c 100644 --- a/delete_topics_request.go +++ b/delete_topics_request.go @@ -17,7 +17,9 @@ func NewDeleteTopicsRequest(version KafkaVersion, topics []string, timeout time. Topics: topics, Timeout: timeout, } - if version.IsAtLeast(V2_1_0_0) { + if version.IsAtLeast(V2_4_0_0) { + d.Version = 4 + } else if version.IsAtLeast(V2_1_0_0) { d.Version = 3 } else if version.IsAtLeast(V2_0_0_0) { d.Version = 2 @@ -32,7 +34,7 @@ func (d *DeleteTopicsRequest) encode(pe packetEncoder) error { return err } pe.putInt32(int32(d.Timeout / time.Millisecond)) - + pe.putEmptyTaggedFieldArray() return nil } @@ -46,7 +48,9 @@ func (d *DeleteTopicsRequest) decode(pd packetDecoder, version int16) (err error } d.Timeout = time.Duration(timeout) * time.Millisecond d.Version = version - return nil + + _, err = pd.getEmptyTaggedFieldArray() + return err } func (d *DeleteTopicsRequest) key() int16 { @@ -58,15 +62,28 @@ func (d *DeleteTopicsRequest) version() int16 { } func (d *DeleteTopicsRequest) headerVersion() int16 { + if d.Version >= 4 { + return 2 + } return 1 } +func (d *DeleteTopicsRequest) isFlexible() bool { + return d.isFlexibleVersion(d.Version) +} + +func (d *DeleteTopicsRequest) isFlexibleVersion(version int16) bool { + return version >= 4 +} + func (d *DeleteTopicsRequest) isValidVersion() bool { - return d.Version >= 0 && d.Version <= 3 + return d.Version >= 0 && d.Version <= 4 } func (d *DeleteTopicsRequest) requiredVersion() KafkaVersion { switch d.Version { + case 4: + return V2_4_0_0 case 3: return V2_1_0_0 case 2: diff --git a/delete_topics_request_test.go b/delete_topics_request_test.go index 18a6bd0fe..26c152479 100644 --- a/delete_topics_request_test.go +++ b/delete_topics_request_test.go @@ -7,12 +7,21 @@ import ( "time" ) -var deleteTopicsRequest = []byte{ - 0, 0, 0, 2, - 0, 5, 't', 'o', 'p', 'i', 'c', - 0, 5, 'o', 't', 'h', 'e', 'r', - 0, 0, 0, 100, -} +var ( + deleteTopicsRequest = []byte{ + 0, 0, 0, 2, + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 5, 'o', 't', 'h', 'e', 'r', + 0, 0, 0, 100, + } + deleteTopicsRequestV4 = []byte{ + 3, + 6, 't', 'o', 'p', 'i', 'c', + 6, 'o', 't', 'h', 'e', 'r', + 0, 0, 0, 100, + 0, // empty tagged fields + } +) func TestDeleteTopicsRequestV0(t *testing.T) { req := &DeleteTopicsRequest{ @@ -33,3 +42,13 @@ func TestDeleteTopicsRequestV1(t *testing.T) { testRequest(t, "", req, deleteTopicsRequest) } + +func TestDeleteTopicsRequestV4(t *testing.T) { + req := &DeleteTopicsRequest{ + Version: 4, + Topics: []string{"topic", "other"}, + Timeout: 100 * time.Millisecond, + } + + testRequest(t, "", req, deleteTopicsRequestV4) +} diff --git a/delete_topics_response.go b/delete_topics_response.go index 998982739..37fac52a7 100644 --- a/delete_topics_response.go +++ b/delete_topics_response.go @@ -1,6 +1,8 @@ package sarama -import "time" +import ( + "time" +) type DeleteTopicsResponse struct { Version int16 @@ -25,8 +27,10 @@ func (d *DeleteTopicsResponse) encode(pe packetEncoder) error { return err } pe.putInt16(int16(errorCode)) + pe.putEmptyTaggedFieldArray() } + pe.putEmptyTaggedFieldArray() return nil } @@ -59,9 +63,13 @@ func (d *DeleteTopicsResponse) decode(pd packetDecoder, version int16) (err erro } d.TopicErrorCodes[topic] = KError(errorCode) + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (d *DeleteTopicsResponse) key() int16 { @@ -73,15 +81,28 @@ func (d *DeleteTopicsResponse) version() int16 { } func (d *DeleteTopicsResponse) headerVersion() int16 { + if d.Version >= 4 { + return 1 + } return 0 } +func (d *DeleteTopicsResponse) isFlexible() bool { + return d.isFlexibleVersion(d.Version) +} + +func (d *DeleteTopicsResponse) isFlexibleVersion(version int16) bool { + return version >= 4 +} + func (d *DeleteTopicsResponse) isValidVersion() bool { - return d.Version >= 0 && d.Version <= 3 + return d.Version >= 0 && d.Version <= 4 } func (d *DeleteTopicsResponse) requiredVersion() KafkaVersion { switch d.Version { + case 4: + return V2_4_0_0 case 3: return V2_1_0_0 case 2: diff --git a/delete_topics_response_test.go b/delete_topics_response_test.go index 674faa2d1..3bc72bd2a 100644 --- a/delete_topics_response_test.go +++ b/delete_topics_response_test.go @@ -20,6 +20,15 @@ var ( 0, 5, 't', 'o', 'p', 'i', 'c', 0, 0, } + + deleteTopicsResponseV4 = []byte{ + 0, 0, 0, 100, + 2, + 6, 't', 'o', 'p', 'i', 'c', + 0, 0, + 0, // empty tagged fields + 0, // empty tagged fields + } ) func TestDeleteTopicsResponse(t *testing.T) { @@ -35,4 +44,7 @@ func TestDeleteTopicsResponse(t *testing.T) { resp.ThrottleTime = 100 * time.Millisecond testResponse(t, "version 1", resp, deleteTopicsResponseV1) + + resp.Version = 4 + testResponse(t, "version 4", resp, deleteTopicsResponseV4) } diff --git a/functional_admin_test.go b/functional_admin_test.go index 0553a89a5..c11b02b37 100644 --- a/functional_admin_test.go +++ b/functional_admin_test.go @@ -397,3 +397,25 @@ func TestFuncAdminDeleteGroup(t *testing.T) { t.Fatal("Expected no group. Found ", len(groups), "groups.") } } + +func TestFuncAdminDeleteTopic(t *testing.T) { + checkKafkaVersion(t, "0.10.0.0") + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + config := NewFunctionalTestConfig() + adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, adminClient) + + err = adminClient.CreateTopic("delete_topic_test", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false) + if err != nil { + t.Fatal(err) + } + err = adminClient.DeleteTopic("delete_topic_test") + if err != nil { + t.Fatal(err) + } +} diff --git a/request_test.go b/request_test.go index 81cc45f87..6b480d9da 100644 --- a/request_test.go +++ b/request_test.go @@ -358,8 +358,7 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { apiKeyApiVersions: 3, // up from 2 // TODO: CreateTopicsRequest v5 is not supported, but expected for KafkaVersion 2.4.0 // apiKeyCreateTopics: 5, // up from 3 - // TODO: DeleteTopicsRequest v4 is not supported, but expected for KafkaVersion 2.4.0 - // apiKeyDeleteTopics: 4, // up from 3 + apiKeyDeleteTopics: 4, // up from 3 apiKeyInitProducerId: 2, // up from 1 apiKeyDeleteGroups: 2, // up from 1 apiKeyElectLeaders: 2, // up from 0