Skip to content

Commit d89fa9d

Browse files
committed
Add DescribeLogDirs to admin client
1 parent 9501120 commit d89fa9d

File tree

4 files changed

+144
-0
lines changed

4 files changed

+144
-0
lines changed

admin.go

+48
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ type ClusterAdmin interface {
101101
// Get information about the nodes in the cluster
102102
DescribeCluster() (brokers []*Broker, controllerID int32, err error)
103103

104+
// Get information about all log directories on the given set of brokers
105+
DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
106+
104107
// Close shuts down the admin and closes underlying client.
105108
Close() error
106109
}
@@ -878,3 +881,48 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
878881

879882
return nil
880883
}
884+
885+
func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
886+
allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
887+
888+
// Query brokers in parallel, since we may have to query multiple brokers
889+
logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
890+
errors := make(chan error, len(brokerIds))
891+
wg := sync.WaitGroup{}
892+
893+
for _, b := range brokerIds {
894+
wg.Add(1)
895+
broker, err := ca.findBroker(b)
896+
if err != nil {
897+
Logger.Printf("Unable to find broker with ID = %v\n", b)
898+
continue
899+
}
900+
go func(b *Broker, conf *Config) {
901+
defer wg.Done()
902+
_ = b.Open(conf) // Ensure that broker is opened
903+
904+
response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
905+
if err != nil {
906+
errors <- err
907+
return
908+
}
909+
logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
910+
logDirs[b.ID()] = response.LogDirs
911+
logDirsMaps <- logDirs
912+
}(broker, ca.conf)
913+
}
914+
915+
wg.Wait()
916+
close(logDirsMaps)
917+
close(errors)
918+
919+
for logDirsMap := range logDirsMaps {
920+
for id, logDirs := range logDirsMap {
921+
allLogDirs[id] = logDirs
922+
}
923+
}
924+
925+
// Intentionally return only the first error for simplicity
926+
err = <-errors
927+
return
928+
}

admin_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -1309,3 +1309,51 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) {
13091309
seedBroker2.BrokerID(), b.ID())
13101310
}
13111311
}
1312+
1313+
func TestDescribeLogDirs(t *testing.T) {
1314+
seedBroker := NewMockBroker(t, 1)
1315+
defer seedBroker.Close()
1316+
1317+
seedBroker.SetHandlerByMap(map[string]MockResponse{
1318+
"MetadataRequest": NewMockMetadataResponse(t).
1319+
SetController(seedBroker.BrokerID()).
1320+
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
1321+
"DescribeLogDirsRequest": NewMockDescribeLogDirsResponse(t).
1322+
SetLogDirs("/tmp/logs", map[string]int{"topic1": 2, "topic2": 2}),
1323+
})
1324+
1325+
config := NewConfig()
1326+
config.Version = V1_0_0_0
1327+
1328+
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
1329+
if err != nil {
1330+
t.Fatal(err)
1331+
}
1332+
1333+
logDirsPerBroker, err := admin.DescribeLogDirs([]int32{seedBroker.BrokerID()})
1334+
if err != nil {
1335+
t.Fatal(err)
1336+
}
1337+
1338+
if len(logDirsPerBroker) != 1 {
1339+
t.Fatalf("Expected %v results, got %v", 1, len(logDirsPerBroker))
1340+
}
1341+
logDirs := logDirsPerBroker[seedBroker.BrokerID()]
1342+
if len(logDirs) != 1 {
1343+
t.Fatalf("Expected log dirs for broker %v to be returned, but it did not, got %v", seedBroker.BrokerID(), len(logDirs))
1344+
}
1345+
logDirsBroker := logDirs[0]
1346+
if logDirsBroker.ErrorCode != ErrNoError {
1347+
t.Fatalf("Expected no error for broker %v, but it was %v", seedBroker.BrokerID(), logDirsBroker.ErrorCode)
1348+
}
1349+
if logDirsBroker.Path != "/tmp/logs" {
1350+
t.Fatalf("Expected log dirs for broker %v to be '/tmp/logs', but it was %v", seedBroker.BrokerID(), logDirsBroker.Path)
1351+
}
1352+
if len(logDirsBroker.Topics) != 2 {
1353+
t.Fatalf("Expected log dirs for broker %v to have 2 topics, but it had %v", seedBroker.BrokerID(), len(logDirsBroker.Topics))
1354+
}
1355+
err = admin.Close()
1356+
if err != nil {
1357+
t.Fatal(err)
1358+
}
1359+
}

describe_log_dirs_response.go

+6
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ func (r *DescribeLogDirsResponseDirMetadata) encode(pe packetEncoder) error {
8484
return err
8585
}
8686

87+
if err := pe.putArrayLength(len(r.Topics)); err != nil {
88+
return err
89+
}
8790
for _, topic := range r.Topics {
8891
if err := topic.encode(pe); err != nil {
8992
return err
@@ -137,6 +140,9 @@ func (r *DescribeLogDirsResponseTopic) encode(pe packetEncoder) error {
137140
return err
138141
}
139142

143+
if err := pe.putArrayLength(len(r.Partitions)); err != nil {
144+
return err
145+
}
140146
for _, partition := range r.Partitions {
141147
if err := partition.encode(pe); err != nil {
142148
return err

mockresponses.go

+42
Original file line numberDiff line numberDiff line change
@@ -1029,3 +1029,45 @@ func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHead
10291029
}
10301030
return resp
10311031
}
1032+
1033+
type MockDescribeLogDirsResponse struct {
1034+
t TestReporter
1035+
logDirs []DescribeLogDirsResponseDirMetadata
1036+
}
1037+
1038+
func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
1039+
return &MockDescribeLogDirsResponse{t: t}
1040+
}
1041+
1042+
func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
1043+
topics := []DescribeLogDirsResponseTopic{}
1044+
for topic := range topicPartitions {
1045+
partitions := []DescribeLogDirsResponsePartition{}
1046+
for i := 0; i < topicPartitions[topic]; i++ {
1047+
partitions = append(partitions, DescribeLogDirsResponsePartition{
1048+
PartitionID: int32(i),
1049+
IsTemporary: false,
1050+
OffsetLag: int64(0),
1051+
Size: int64(1234),
1052+
})
1053+
}
1054+
topics = append(topics, DescribeLogDirsResponseTopic{
1055+
Topic: topic,
1056+
Partitions: partitions,
1057+
})
1058+
}
1059+
logDir := DescribeLogDirsResponseDirMetadata{
1060+
ErrorCode: ErrNoError,
1061+
Path: logDirPath,
1062+
Topics: topics,
1063+
}
1064+
m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
1065+
return m
1066+
}
1067+
1068+
func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1069+
resp := &DescribeLogDirsResponse{
1070+
LogDirs: m.logDirs,
1071+
}
1072+
return resp
1073+
}

0 commit comments

Comments
 (0)