Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,14 +1176,24 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
_ = b.Open(conf) // Ensure that broker is opened

request := &DescribeLogDirsRequest{}
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
if ca.conf.Version.IsAtLeast(V3_3_0_0) {
request.Version = 4
} else if ca.conf.Version.IsAtLeast(V3_2_0_0) {
request.Version = 3
} else if ca.conf.Version.IsAtLeast(V2_6_0_0) {
request.Version = 2
} else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 1
}
response, err := b.DescribeLogDirs(request)
if err != nil {
errChan <- err
return
}
if !errors.Is(response.ErrorCode, ErrNoError) {
errChan <- response.ErrorCode
return
}
logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
logDirs[b.ID()] = response.LogDirs
logDirsMaps <- logDirs
Expand Down
92 changes: 77 additions & 15 deletions describe_log_dirs_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,62 @@ type DescribeLogDirsRequestTopic struct {
}

func (r *DescribeLogDirsRequest) encode(pe packetEncoder) error {
isFlexible := r.Version >= 2

length := len(r.DescribeTopics)
if length == 0 {
// In order to query all topics we must send null
length = -1
}

if err := pe.putArrayLength(length); err != nil {
return err
}

for _, d := range r.DescribeTopics {
if err := pe.putString(d.Topic); err != nil {
if isFlexible {
pe.putCompactArrayLength(length)
} else {
if err := pe.putArrayLength(length); err != nil {
return err
}
}

if err := pe.putInt32Array(d.PartitionIDs); err != nil {
return err
for _, d := range r.DescribeTopics {
if isFlexible {
if err := pe.putCompactString(d.Topic); err != nil {
return err
}

if err := pe.putCompactInt32Array(d.PartitionIDs); err != nil {
return err
}
pe.putEmptyTaggedFieldArray()
} else {
if err := pe.putString(d.Topic); err != nil {
return err
}

if err := pe.putInt32Array(d.PartitionIDs); err != nil {
return err
}
}
}
if isFlexible {
pe.putEmptyTaggedFieldArray()
}

return nil
}

func (r *DescribeLogDirsRequest) decode(pd packetDecoder, version int16) error {
n, err := pd.getArrayLength()
isFlexible := r.Version >= 2

var n int
var err error
if isFlexible {
n, err = pd.getCompactArrayLength()
} else {
n, err = pd.getArrayLength()
}
if err != nil {
return err
}

if n == -1 {
n = 0
}
Expand All @@ -57,20 +85,43 @@ func (r *DescribeLogDirsRequest) decode(pd packetDecoder, version int16) error {
for i := 0; i < n; i++ {
topics[i] = DescribeLogDirsRequestTopic{}

topic, err := pd.getString()
var topic string
if isFlexible {
topic, err = pd.getCompactString()
} else {
topic, err = pd.getString()
}
if err != nil {
return err
}
topics[i].Topic = topic

pIDs, err := pd.getInt32Array()
var pIDs []int32
if isFlexible {
pIDs, err = pd.getCompactInt32Array()
} else {
pIDs, err = pd.getInt32Array()
}
if err != nil {
return err
}
topics[i].PartitionIDs = pIDs
if isFlexible {
_, err = pd.getEmptyTaggedFieldArray()
if err != nil {
return err
}
}
}
r.DescribeTopics = topics

if isFlexible {
_, err = pd.getEmptyTaggedFieldArray()
if err != nil {
return err
}
}

return nil
}

Expand All @@ -83,16 +134,27 @@ func (r *DescribeLogDirsRequest) version() int16 {
}

func (r *DescribeLogDirsRequest) headerVersion() int16 {
if r.Version >= 2 {
return 2
}
return 1
}

func (r *DescribeLogDirsRequest) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 1
return r.Version >= 0 && r.Version <= 4
}

func (r *DescribeLogDirsRequest) requiredVersion() KafkaVersion {
if r.Version > 0 {
switch r.Version {
case 4:
return V3_3_0_0
case 3:
return V3_2_0_0
case 2:
return V2_6_0_0
case 1:
return V2_0_0_0
default:
return V1_0_0_0
}
return V1_0_0_0
}
26 changes: 25 additions & 1 deletion describe_log_dirs_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ var (
0, 0, 0, 25, // PartitionID 25
0, 0, 0, 26, // PartitionID 26
}
emptyDescribeLogDirsRequestV2 = []byte{0, 0} // Compact empty array length
topicDescribeLogDirsRequestV2 = []byte{
2, // DescribeTopics array, Array length 1+1
7, // Topic name length 6+1
'r', 'a', 'n', 'd', 'o', 'm', // Topic name
3, // PartitionIDs int32 array, Array length 2+1
0, 0, 0, 25, // PartitionID 25
0, 0, 0, 26, // PartitionID 26
0, 0, // empty tagged fields
}
)

func TestDescribeLogDirsRequest(t *testing.T) {
Expand All @@ -29,5 +39,19 @@ func TestDescribeLogDirsRequest(t *testing.T) {
PartitionIDs: []int32{25, 26},
},
}
testRequest(t, "no topics", request, topicDescribeLogDirsRequest)
testRequest(t, "with topics", request, topicDescribeLogDirsRequest)

request = &DescribeLogDirsRequest{
Version: 2,
DescribeTopics: []DescribeLogDirsRequestTopic{},
}
testRequest(t, "no topics v2", request, emptyDescribeLogDirsRequestV2)

request.DescribeTopics = []DescribeLogDirsRequestTopic{
{
Topic: "random",
PartitionIDs: []int32{25, 26},
},
}
testRequest(t, "with topics v2", request, topicDescribeLogDirsRequestV2)
}
Loading
Loading