diff --git a/admin.go b/admin.go index feae1929d..fc500f090 100644 --- a/admin.go +++ b/admin.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "maps" "math/rand" "strconv" "sync" @@ -1032,9 +1033,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e } groups := make(map[string]string) - for group, typ := range response.Groups { - groups[group] = typ - } + maps.Copy(groups, response.Groups) groupMaps <- groups }(b, ca.conf) @@ -1045,9 +1044,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e close(errChan) for groupMap := range groupMaps { - for group, protocolType := range groupMap { - allGroups[group] = protocolType - } + maps.Copy(allGroups, groupMap) } // Intentionally return only the first error for simplicity @@ -1208,9 +1205,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32 close(errChan) for logDirsMap := range logDirsMaps { - for id, logDirs := range logDirsMap { - allLogDirs[id] = logDirs - } + maps.Copy(allLogDirs, logDirsMap) } // Intentionally return only the first error for simplicity diff --git a/balance_strategy.go b/balance_strategy.go index 5946c962f..73066d26e 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -4,7 +4,9 @@ import ( "container/heap" "errors" "fmt" + "maps" "math" + "slices" "sort" "strings" ) @@ -233,7 +235,7 @@ func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetad delete(unvisitedPartitions, partition) currentPartitionConsumers[partition] = memberID - if !strsContains(members[memberID].Topics, partition.Topic) { + if !slices.Contains(members[memberID].Topics, partition.Topic) { unassignedPartitions = append(unassignedPartitions, partition) continue } @@ -279,15 +281,6 @@ func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[strin }, nil) } -func strsContains(s []string, value string) bool { - for _, entry := range s { - if entry == value { - return true - } - } - return false -} - // Balance assignments across consumers for maximum fairness and stickiness. func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) { initializing := len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0 @@ -321,9 +314,7 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later preBalanceAssignment := deepCopyAssignment(currentAssignment) preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer)) - for k, v := range currentPartitionConsumer { - preBalancePartitionConsumers[k] = v - } + maps.Copy(preBalancePartitionConsumers, currentPartitionConsumer) reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer) @@ -332,15 +323,11 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) { currentAssignment = deepCopyAssignment(preBalanceAssignment) currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers)) - for k, v := range preBalancePartitionConsumers { - currentPartitionConsumer[k] = v - } + maps.Copy(currentPartitionConsumer, preBalancePartitionConsumers) } // add the fixed assignments (those that could not change) back - for consumer, assignments := range fixedAssignments { - currentAssignment[consumer] = assignments - } + maps.Copy(currentAssignment, fixedAssignments) } // NewBalanceStrategyRoundRobin returns a round-robin balance strategy, @@ -659,12 +646,7 @@ func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssig } func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool { - for _, assignment := range assignments { - if assignment == topic { - return true - } - } - return false + return slices.Contains(assignments, topic) } func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment { diff --git a/balance_strategy_test.go b/balance_strategy_test.go index e603cff6f..af41b522d 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -8,6 +8,7 @@ import ( "math" "math/rand" "reflect" + "slices" "sort" "testing" "time" @@ -2123,9 +2124,8 @@ func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopics(b *testing.B) { } topics[fmt.Sprintf("topic%d", i)] = partitions } - b.ResetTimer() - for n := 0; n < b.N; n++ { + for b.Loop() { if _, err := s.Plan(members, topics); err != nil { b.Errorf("Error building plan in benchmark: %v", err) } @@ -2163,9 +2163,8 @@ func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopicsAndExistingAssig for i := 0; i < 1; i++ { delete(members, fmt.Sprintf("consumer%d", i)) } - b.ResetTimer() - for n := 0; n < b.N; n++ { + for b.Loop() { if _, err := s.Plan(members, topics); err != nil { b.Errorf("Error building plan in benchmark: %v", err) } @@ -2203,13 +2202,7 @@ func verifyValidityAndBalance(t *testing.T, consumers map[string]ConsumerGroupMe for i, memberID := range members { for assignedTopic := range plan[memberID] { - found := false - for _, assignableTopic := range consumers[memberID].Topics { - if assignableTopic == assignedTopic { - found = true - break - } - } + found := slices.Contains(consumers[memberID].Topics, assignedTopic) if !found { t.Errorf("Consumer %s had assigned topic %q that wasn't in the list of assignable topics %v", memberID, assignedTopic, consumers[memberID].Topics) t.FailNow() diff --git a/broker.go b/broker.go index 187b516b8..d838a407b 100644 --- a/broker.go +++ b/broker.go @@ -1670,7 +1670,7 @@ func buildClientFirstMessage(token *AccessToken) ([]byte, error) { ext = "\x01" + mapToString(token.Extensions, "=", "\x01") } - resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext)) + resp := fmt.Appendf(nil, "n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext) return resp, nil } diff --git a/broker_test.go b/broker_test.go index 6701c895f..01a5f6349 100644 --- a/broker_test.go +++ b/broker_test.go @@ -1408,7 +1408,7 @@ func BenchmarkBroker_Open(b *testing.B) { metrics.UseNilMetrics = false conf := NewTestConfig() conf.Version = V1_0_0_0 - for i := 0; i < b.N; i++ { + for b.Loop() { err := broker.Open(conf) if err != nil { b.Fatal(err) @@ -1425,7 +1425,7 @@ func BenchmarkBroker_No_Metrics_Open(b *testing.B) { metrics.UseNilMetrics = true conf := NewTestConfig() conf.Version = V1_0_0_0 - for i := 0; i < b.N; i++ { + for b.Loop() { err := broker.Open(conf) if err != nil { b.Fatal(err) diff --git a/client.go b/client.go index 0d510ee92..3b117235d 100644 --- a/client.go +++ b/client.go @@ -6,6 +6,7 @@ import ( "math" "math/rand" "net" + "slices" "sort" "strings" "sync" @@ -517,10 +518,8 @@ func (client *client) RefreshMetadata(topics ...string) error { // Prior to 0.8.2, Kafka will throw exceptions on an empty topic and not return a proper // error. This handles the case by returning an error instead of sending it // off to Kafka. See: https://github.com/IBM/sarama/pull/38#issuecomment-26362310 - for _, topic := range topics { - if topic == "" { - return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return - } + if slices.Contains(topics, "") { + return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return } return client.metadataRefresh(topics) } diff --git a/delete_records_request.go b/delete_records_request.go index 9d9486169..05538ddc9 100644 --- a/delete_records_request.go +++ b/delete_records_request.go @@ -1,6 +1,7 @@ package sarama import ( + "slices" "sort" "time" ) @@ -111,7 +112,7 @@ func (t *DeleteRecordsRequestTopic) encode(pe packetEncoder) error { for partition := range t.PartitionOffsets { keys = append(keys, partition) } - sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + slices.Sort(keys) for _, partition := range keys { pe.putInt32(partition) pe.putInt64(t.PartitionOffsets[partition]) diff --git a/delete_records_response.go b/delete_records_response.go index 8f787b18c..c1c15aaed 100644 --- a/delete_records_response.go +++ b/delete_records_response.go @@ -1,6 +1,7 @@ package sarama import ( + "slices" "sort" "time" ) @@ -117,7 +118,7 @@ func (t *DeleteRecordsResponseTopic) encode(pe packetEncoder) error { for partition := range t.Partitions { keys = append(keys, partition) } - sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + slices.Sort(keys) for _, partition := range keys { pe.putInt32(partition) if err := t.Partitions[partition].encode(pe); err != nil { diff --git a/metrics_test.go b/metrics_test.go index dcd9c2a36..6669dbdfc 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -41,8 +41,8 @@ func TestGetMetricNameForBroker(t *testing.T) { func Benchmark_getMetricNameForTopic(b *testing.B) { b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { + + for b.Loop() { name := getMetricNameForTopic("sarama", "says.hello") if name != "sarama-for-topic-says_hello" { b.Fail() @@ -54,8 +54,8 @@ func Benchmark_getMetricNameForBroker(b *testing.B) { broker := &Broker{id: 1965} b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { + + for b.Loop() { name := getMetricNameForBroker("summer", broker) if name != "summer-for-broker-1965" { b.Fail() diff --git a/mockbroker.go b/mockbroker.go index 2c5e7cadd..a985d6e10 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "maps" "net" "reflect" "strconv" @@ -86,9 +87,7 @@ func (b *MockBroker) SetLatency(latency time.Duration) { // If the request type is not found in the map then nothing is sent. func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { fnMap := make(map[string]MockResponse) - for k, v := range handlerMap { - fnMap[k] = v - } + maps.Copy(fnMap, handlerMap) b.setHandler(func(req *request) (res encoderWithHeader) { reqTypeName := reflect.TypeOf(req.body).Elem().Name() mockResponse := fnMap[reqTypeName] @@ -104,9 +103,7 @@ func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { // and invoke the found RequestHandlerFunc instance to generate an appropriate reply. func (b *MockBroker) SetHandlerFuncByMap(handlerMap map[string]requestHandlerFunc) { fnMap := make(map[string]requestHandlerFunc) - for k, v := range handlerMap { - fnMap[k] = v - } + maps.Copy(fnMap, handlerMap) b.setHandler(func(req *request) (res encoderWithHeader) { reqTypeName := reflect.TypeOf(req.body).Elem().Name() return fnMap[reqTypeName](req) diff --git a/mocks/mocks.go b/mocks/mocks.go index be4007668..f1a75a56b 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -16,6 +16,7 @@ package mocks import ( "errors" "fmt" + "maps" "github.com/IBM/sarama" ) @@ -86,9 +87,7 @@ func (pc *TopicConfig) SetDefaultPartitions(n int32) { // SetPartitions sets the number of partitions the partitioners will see for specific topics. This // only applies to messages produced after setting them. func (pc *TopicConfig) SetPartitions(partitions map[string]int32) { - for p, n := range partitions { - pc.overridePartitions[p] = n - } + maps.Copy(pc.overridePartitions, partitions) } func (pc *TopicConfig) partitions(topic string) int32 { diff --git a/tools/kafka-console-producer/kafka-console-producer.go b/tools/kafka-console-producer/kafka-console-producer.go index 43e15ab9f..4e65caa71 100644 --- a/tools/kafka-console-producer/kafka-console-producer.go +++ b/tools/kafka-console-producer/kafka-console-producer.go @@ -103,8 +103,8 @@ func main() { if *headers != "" { var hdrs []sarama.RecordHeader - arrHdrs := strings.Split(*headers, ",") - for _, h := range arrHdrs { + arrHdrs := strings.SplitSeq(*headers, ",") + for h := range arrHdrs { if header := strings.Split(h, ":"); len(header) != 2 { printUsageErrorAndExit("-header should be key:value. Example: -headers=foo:bar,bar:foo") } else { diff --git a/zstd_test.go b/zstd_test.go index 119772b1d..0ca55625f 100644 --- a/zstd_test.go +++ b/zstd_test.go @@ -18,7 +18,7 @@ func BenchmarkZstdMemoryConsumption(b *testing.B) { gomaxprocsBackup := runtime.GOMAXPROCS(cpus) b.ReportAllocs() - for i := 0; i < b.N; i++ { + for b.Loop() { for j := 0; j < 2*cpus; j++ { _, _ = zstdCompress(params, nil, buf) }