diff --git a/admin.go b/admin.go index 498aec0d2..4b8f02995 100644 --- a/admin.go +++ b/admin.go @@ -249,8 +249,9 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO return errors.New("you must specify topic details") } - topicDetails := make(map[string]*TopicDetail) - topicDetails[topic] = detail + topicDetails := map[string]*TopicDetail{ + topic: detail, + } request := NewCreateTopicsRequest( ca.conf.Version, @@ -366,7 +367,7 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) { return nil, err } - topicsDetailsMap := make(map[string]TopicDetail) + topicsDetailsMap := make(map[string]TopicDetail, len(metadataResp.Topics)) var describeConfigsResources []*ConfigResource @@ -375,7 +376,7 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) { NumPartitions: int32(len(topic.Partitions)), } if len(topic.Partitions) > 0 { - topicDetails.ReplicaAssignment = map[int32][]int32{} + topicDetails.ReplicaAssignment = make(map[int32][]int32, len(topic.Partitions)) for _, partition := range topic.Partitions { topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas } @@ -479,8 +480,12 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [ return ErrInvalidTopic } - topicPartitions := make(map[string]*TopicPartition) - topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment} + topicPartitions := map[string]*TopicPartition{ + topic: { + Count: count, + Assignment: assignment, + }, + } request := &CreatePartitionsRequest{ TopicPartitions: topicPartitions, @@ -615,13 +620,14 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i partitionPerBroker[broker] = append(partitionPerBroker[broker], partition) } for broker, partitions := range partitionPerBroker { - topics := make(map[string]*DeleteRecordsRequestTopic) - recordsToDelete := make(map[int32]int64) + recordsToDelete := make(map[int32]int64, len(partitions)) for _, p := range partitions { recordsToDelete[p] = partitionOffsets[p] } - topics[topic] = &DeleteRecordsRequestTopic{ - PartitionOffsets: recordsToDelete, + topics := map[string]*DeleteRecordsRequestTopic{ + topic: { + PartitionOffsets: recordsToDelete, + }, } request := &DeleteRecordsRequest{ Topics: topics, @@ -1032,10 +1038,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e return } - groups := make(map[string]string) - maps.Copy(groups, response.Groups) - - groupMaps <- groups + groupMaps <- maps.Clone(response.Groups) }(b, ca.conf) } @@ -1204,7 +1207,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32 close(logDirsResults) close(errChan) - allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata) + allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds)) for logDirsResult := range logDirsResults { allLogDirs[logDirsResult.id] = logDirsResult.logdirs } diff --git a/balance_strategy.go b/balance_strategy.go index 73066d26e..59f894867 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -313,8 +313,7 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later preBalanceAssignment := deepCopyAssignment(currentAssignment) - preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer)) - maps.Copy(preBalancePartitionConsumers, currentPartitionConsumer) + preBalancePartitionConsumers := maps.Clone(currentPartitionConsumer) reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer) @@ -322,7 +321,7 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart // make sure we are getting a more balanced assignment; otherwise, revert to previous assignment if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) { currentAssignment = deepCopyAssignment(preBalanceAssignment) - currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers)) + clear(currentPartitionConsumer) maps.Copy(currentPartitionConsumer, preBalancePartitionConsumers) } diff --git a/balance_strategy_test.go b/balance_strategy_test.go index af41b522d..4064ed5aa 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -2202,8 +2202,7 @@ func verifyValidityAndBalance(t *testing.T, consumers map[string]ConsumerGroupMe for i, memberID := range members { for assignedTopic := range plan[memberID] { - found := slices.Contains(consumers[memberID].Topics, assignedTopic) - if !found { + if !slices.Contains(consumers[memberID].Topics, assignedTopic) { t.Errorf("Consumer %s had assigned topic %q that wasn't in the list of assignable topics %v", memberID, assignedTopic, consumers[memberID].Topics) t.FailNow() } diff --git a/mockbroker.go b/mockbroker.go index a985d6e10..d913d44e1 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -86,8 +86,7 @@ func (b *MockBroker) SetLatency(latency time.Duration) { // and uses the found MockResponse instance to generate an appropriate reply. // If the request type is not found in the map then nothing is sent. func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { - fnMap := make(map[string]MockResponse) - maps.Copy(fnMap, handlerMap) + fnMap := maps.Clone(handlerMap) b.setHandler(func(req *request) (res encoderWithHeader) { reqTypeName := reflect.TypeOf(req.body).Elem().Name() mockResponse := fnMap[reqTypeName] @@ -102,8 +101,7 @@ func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { // request is received by the broker, it looks up the request type in the map // and invoke the found RequestHandlerFunc instance to generate an appropriate reply. func (b *MockBroker) SetHandlerFuncByMap(handlerMap map[string]requestHandlerFunc) { - fnMap := make(map[string]requestHandlerFunc) - maps.Copy(fnMap, handlerMap) + fnMap := maps.Clone(handlerMap) b.setHandler(func(req *request) (res encoderWithHeader) { reqTypeName := reflect.TypeOf(req.body).Elem().Name() return fnMap[reqTypeName](req) diff --git a/tools/kafka-console-producer/kafka-console-producer.go b/tools/kafka-console-producer/kafka-console-producer.go index 4e65caa71..f3cc81de7 100644 --- a/tools/kafka-console-producer/kafka-console-producer.go +++ b/tools/kafka-console-producer/kafka-console-producer.go @@ -103,8 +103,7 @@ func main() { if *headers != "" { var hdrs []sarama.RecordHeader - arrHdrs := strings.SplitSeq(*headers, ",") - for h := range arrHdrs { + for h := range strings.SplitSeq(*headers, ",") { if header := strings.Split(h, ":"); len(header) != 2 { printUsageErrorAndExit("-header should be key:value. Example: -headers=foo:bar,bar:foo") } else {