Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
return errors.New("you must specify topic details")
}

topicDetails := make(map[string]*TopicDetail)
topicDetails[topic] = detail
topicDetails := map[string]*TopicDetail{
topic: detail,
}

request := NewCreateTopicsRequest(
ca.conf.Version,
Expand Down Expand Up @@ -366,7 +367,7 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
return nil, err
}

topicsDetailsMap := make(map[string]TopicDetail)
topicsDetailsMap := make(map[string]TopicDetail, len(metadataResp.Topics))

var describeConfigsResources []*ConfigResource

Expand All @@ -375,7 +376,7 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
NumPartitions: int32(len(topic.Partitions)),
}
if len(topic.Partitions) > 0 {
topicDetails.ReplicaAssignment = map[int32][]int32{}
topicDetails.ReplicaAssignment = make(map[int32][]int32, len(topic.Partitions))
for _, partition := range topic.Partitions {
topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
}
Expand Down Expand Up @@ -479,8 +480,12 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
return ErrInvalidTopic
}

topicPartitions := make(map[string]*TopicPartition)
topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
topicPartitions := map[string]*TopicPartition{
topic: {
Count: count,
Assignment: assignment,
},
}

request := &CreatePartitionsRequest{
TopicPartitions: topicPartitions,
Expand Down Expand Up @@ -615,13 +620,14 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
}
for broker, partitions := range partitionPerBroker {
topics := make(map[string]*DeleteRecordsRequestTopic)
recordsToDelete := make(map[int32]int64)
recordsToDelete := make(map[int32]int64, len(partitions))
for _, p := range partitions {
recordsToDelete[p] = partitionOffsets[p]
}
topics[topic] = &DeleteRecordsRequestTopic{
PartitionOffsets: recordsToDelete,
topics := map[string]*DeleteRecordsRequestTopic{
topic: {
PartitionOffsets: recordsToDelete,
},
}
request := &DeleteRecordsRequest{
Topics: topics,
Expand Down Expand Up @@ -1032,10 +1038,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
return
}

groups := make(map[string]string)
maps.Copy(groups, response.Groups)

groupMaps <- groups
groupMaps <- maps.Clone(response.Groups)
}(b, ca.conf)
}

Expand Down Expand Up @@ -1204,7 +1207,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
close(logDirsResults)
close(errChan)

allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
for logDirsResult := range logDirsResults {
allLogDirs[logDirsResult.id] = logDirsResult.logdirs
}
Expand Down
5 changes: 2 additions & 3 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,16 +313,15 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart

// create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
preBalanceAssignment := deepCopyAssignment(currentAssignment)
preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))
maps.Copy(preBalancePartitionConsumers, currentPartitionConsumer)
preBalancePartitionConsumers := maps.Clone(currentPartitionConsumer)

reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)

// if we are not preserving existing assignments and we have made changes to the current assignment
// make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
currentAssignment = deepCopyAssignment(preBalanceAssignment)
currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))
clear(currentPartitionConsumer)
maps.Copy(currentPartitionConsumer, preBalancePartitionConsumers)
}

Expand Down
3 changes: 1 addition & 2 deletions balance_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2202,8 +2202,7 @@ func verifyValidityAndBalance(t *testing.T, consumers map[string]ConsumerGroupMe

for i, memberID := range members {
for assignedTopic := range plan[memberID] {
found := slices.Contains(consumers[memberID].Topics, assignedTopic)
if !found {
if !slices.Contains(consumers[memberID].Topics, assignedTopic) {
t.Errorf("Consumer %s had assigned topic %q that wasn't in the list of assignable topics %v", memberID, assignedTopic, consumers[memberID].Topics)
t.FailNow()
}
Expand Down
6 changes: 2 additions & 4 deletions mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ func (b *MockBroker) SetLatency(latency time.Duration) {
// and uses the found MockResponse instance to generate an appropriate reply.
// If the request type is not found in the map then nothing is sent.
func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
fnMap := make(map[string]MockResponse)
maps.Copy(fnMap, handlerMap)
fnMap := maps.Clone(handlerMap)
b.setHandler(func(req *request) (res encoderWithHeader) {
reqTypeName := reflect.TypeOf(req.body).Elem().Name()
mockResponse := fnMap[reqTypeName]
Expand All @@ -102,8 +101,7 @@ func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
// request is received by the broker, it looks up the request type in the map
// and invoke the found RequestHandlerFunc instance to generate an appropriate reply.
func (b *MockBroker) SetHandlerFuncByMap(handlerMap map[string]requestHandlerFunc) {
fnMap := make(map[string]requestHandlerFunc)
maps.Copy(fnMap, handlerMap)
fnMap := maps.Clone(handlerMap)
b.setHandler(func(req *request) (res encoderWithHeader) {
reqTypeName := reflect.TypeOf(req.body).Elem().Name()
return fnMap[reqTypeName](req)
Expand Down
3 changes: 1 addition & 2 deletions tools/kafka-console-producer/kafka-console-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ func main() {

if *headers != "" {
var hdrs []sarama.RecordHeader
arrHdrs := strings.SplitSeq(*headers, ",")
for h := range arrHdrs {
for h := range strings.SplitSeq(*headers, ",") {
if header := strings.Split(h, ":"); len(header) != 2 {
printUsageErrorAndExit("-header should be key:value. Example: -headers=foo:bar,bar:foo")
} else {
Expand Down
Loading