Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"maps"
"math/rand"
"strconv"
"sync"
Expand Down Expand Up @@ -1032,9 +1033,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
}

groups := make(map[string]string)
for group, typ := range response.Groups {
groups[group] = typ
}
maps.Copy(groups, response.Groups)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we’re also making the map on line 1034, we can replace this with groups := maps.Clone(response.Groups)


groupMaps <- groups
}(b, ca.conf)
Expand All @@ -1045,9 +1044,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
close(errChan)

for groupMap := range groupMaps {
for group, protocolType := range groupMap {
allGroups[group] = protocolType
}
maps.Copy(allGroups, groupMap)
}

// Intentionally return only the first error for simplicity
Expand Down Expand Up @@ -1208,9 +1205,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
close(errChan)

for logDirsMap := range logDirsMaps {
for id, logDirs := range logDirsMap {
allLogDirs[id] = logDirs
}
maps.Copy(allLogDirs, logDirsMap)
}

// Intentionally return only the first error for simplicity
Expand Down
32 changes: 7 additions & 25 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"container/heap"
"errors"
"fmt"
"maps"
"math"
"slices"
"sort"
"strings"
)
Expand Down Expand Up @@ -233,7 +235,7 @@ func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetad
delete(unvisitedPartitions, partition)
currentPartitionConsumers[partition] = memberID

if !strsContains(members[memberID].Topics, partition.Topic) {
if !slices.Contains(members[memberID].Topics, partition.Topic) {
unassignedPartitions = append(unassignedPartitions, partition)
continue
}
Expand Down Expand Up @@ -279,15 +281,6 @@ func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[strin
}, nil)
}

func strsContains(s []string, value string) bool {
for _, entry := range s {
if entry == value {
return true
}
}
return false
}

// Balance assignments across consumers for maximum fairness and stickiness.
func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
initializing := len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0
Expand Down Expand Up @@ -321,9 +314,7 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart
// create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
preBalanceAssignment := deepCopyAssignment(currentAssignment)
preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))
for k, v := range currentPartitionConsumer {
preBalancePartitionConsumers[k] = v
}
maps.Copy(preBalancePartitionConsumers, currentPartitionConsumer)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also a good candidate for maps.Clone


reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)

Expand All @@ -332,15 +323,11 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart
if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
currentAssignment = deepCopyAssignment(preBalanceAssignment)
currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))
for k, v := range preBalancePartitionConsumers {
currentPartitionConsumer[k] = v
}
maps.Copy(currentPartitionConsumer, preBalancePartitionConsumers)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maps.Clone.

}

// add the fixed assignments (those that could not change) back
for consumer, assignments := range fixedAssignments {
currentAssignment[consumer] = assignments
}
maps.Copy(currentAssignment, fixedAssignments)
}

// NewBalanceStrategyRoundRobin returns a round-robin balance strategy,
Expand Down Expand Up @@ -659,12 +646,7 @@ func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssig
}

func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {
for _, assignment := range assignments {
if assignment == topic {
return true
}
}
return false
return slices.Contains(assignments, topic)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to replace the actual function calls to this unexported function to slices.Contains itself?

}

func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
Expand Down
15 changes: 4 additions & 11 deletions balance_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"math/rand"
"reflect"
"slices"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -2123,9 +2124,8 @@ func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopics(b *testing.B) {
}
topics[fmt.Sprintf("topic%d", i)] = partitions
}
b.ResetTimer()

for n := 0; n < b.N; n++ {
for b.Loop() {
if _, err := s.Plan(members, topics); err != nil {
b.Errorf("Error building plan in benchmark: %v", err)
}
Expand Down Expand Up @@ -2163,9 +2163,8 @@ func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopicsAndExistingAssig
for i := 0; i < 1; i++ {
delete(members, fmt.Sprintf("consumer%d", i))
}
b.ResetTimer()

for n := 0; n < b.N; n++ {
for b.Loop() {
if _, err := s.Plan(members, topics); err != nil {
b.Errorf("Error building plan in benchmark: %v", err)
}
Expand Down Expand Up @@ -2203,13 +2202,7 @@ func verifyValidityAndBalance(t *testing.T, consumers map[string]ConsumerGroupMe

for i, memberID := range members {
for assignedTopic := range plan[memberID] {
found := false
for _, assignableTopic := range consumers[memberID].Topics {
if assignableTopic == assignedTopic {
found = true
break
}
}
found := slices.Contains(consumers[memberID].Topics, assignedTopic)
if !found {
Comment on lines +2205 to 2206
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to go through a temporary local variable anymore, we could just do the slices.Contains directly in the if condition.

t.Errorf("Consumer %s had assigned topic %q that wasn't in the list of assignable topics %v", memberID, assignedTopic, consumers[memberID].Topics)
t.FailNow()
Expand Down
2 changes: 1 addition & 1 deletion broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1670,7 +1670,7 @@ func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
}

resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
resp := fmt.Appendf(nil, "n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext)

return resp, nil
Comment on lines +1673 to 1675
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn’t seem to be necessary to pass through a temporary local variable, but the composed return might look more odd.

}
Expand Down
4 changes: 2 additions & 2 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1408,7 +1408,7 @@ func BenchmarkBroker_Open(b *testing.B) {
metrics.UseNilMetrics = false
conf := NewTestConfig()
conf.Version = V1_0_0_0
for i := 0; i < b.N; i++ {
for b.Loop() {
err := broker.Open(conf)
if err != nil {
b.Fatal(err)
Expand All @@ -1425,7 +1425,7 @@ func BenchmarkBroker_No_Metrics_Open(b *testing.B) {
metrics.UseNilMetrics = true
conf := NewTestConfig()
conf.Version = V1_0_0_0
for i := 0; i < b.N; i++ {
for b.Loop() {
err := broker.Open(conf)
if err != nil {
b.Fatal(err)
Expand Down
7 changes: 3 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"math/rand"
"net"
"slices"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -517,10 +518,8 @@ func (client *client) RefreshMetadata(topics ...string) error {
// Prior to 0.8.2, Kafka will throw exceptions on an empty topic and not return a proper
// error. This handles the case by returning an error instead of sending it
// off to Kafka. See: https://github.com/IBM/sarama/pull/38#issuecomment-26362310
for _, topic := range topics {
if topic == "" {
return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
}
if slices.Contains(topics, "") {
return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
}
return client.metadataRefresh(topics)
}
Expand Down
3 changes: 2 additions & 1 deletion delete_records_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"slices"
"sort"
"time"
)
Expand Down Expand Up @@ -111,7 +112,7 @@ func (t *DeleteRecordsRequestTopic) encode(pe packetEncoder) error {
for partition := range t.PartitionOffsets {
keys = append(keys, partition)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
slices.Sort(keys)
for _, partition := range keys {
pe.putInt32(partition)
pe.putInt64(t.PartitionOffsets[partition])
Expand Down
3 changes: 2 additions & 1 deletion delete_records_response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"slices"
"sort"
"time"
)
Expand Down Expand Up @@ -117,7 +118,7 @@ func (t *DeleteRecordsResponseTopic) encode(pe packetEncoder) error {
for partition := range t.Partitions {
keys = append(keys, partition)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
slices.Sort(keys)
for _, partition := range keys {
pe.putInt32(partition)
if err := t.Partitions[partition].encode(pe); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func TestGetMetricNameForBroker(t *testing.T) {

func Benchmark_getMetricNameForTopic(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {

for b.Loop() {
name := getMetricNameForTopic("sarama", "says.hello")
if name != "sarama-for-topic-says_hello" {
b.Fail()
Expand All @@ -54,8 +54,8 @@ func Benchmark_getMetricNameForBroker(b *testing.B) {
broker := &Broker{id: 1965}

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {

for b.Loop() {
name := getMetricNameForBroker("summer", broker)
if name != "summer-for-broker-1965" {
b.Fail()
Expand Down
9 changes: 3 additions & 6 deletions mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"maps"
"net"
"reflect"
"strconv"
Expand Down Expand Up @@ -86,9 +87,7 @@ func (b *MockBroker) SetLatency(latency time.Duration) {
// If the request type is not found in the map then nothing is sent.
func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
fnMap := make(map[string]MockResponse)
for k, v := range handlerMap {
fnMap[k] = v
}
maps.Copy(fnMap, handlerMap)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maps.Clone.

b.setHandler(func(req *request) (res encoderWithHeader) {
reqTypeName := reflect.TypeOf(req.body).Elem().Name()
mockResponse := fnMap[reqTypeName]
Expand All @@ -104,9 +103,7 @@ func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
// and invoke the found RequestHandlerFunc instance to generate an appropriate reply.
func (b *MockBroker) SetHandlerFuncByMap(handlerMap map[string]requestHandlerFunc) {
fnMap := make(map[string]requestHandlerFunc)
for k, v := range handlerMap {
fnMap[k] = v
}
maps.Copy(fnMap, handlerMap)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maps.Clone.

b.setHandler(func(req *request) (res encoderWithHeader) {
reqTypeName := reflect.TypeOf(req.body).Elem().Name()
return fnMap[reqTypeName](req)
Expand Down
5 changes: 2 additions & 3 deletions mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mocks
import (
"errors"
"fmt"
"maps"

"github.com/IBM/sarama"
)
Expand Down Expand Up @@ -86,9 +87,7 @@ func (pc *TopicConfig) SetDefaultPartitions(n int32) {
// SetPartitions sets the number of partitions the partitioners will see for specific topics. This
// only applies to messages produced after setting them.
func (pc *TopicConfig) SetPartitions(partitions map[string]int32) {
for p, n := range partitions {
pc.overridePartitions[p] = n
}
maps.Copy(pc.overridePartitions, partitions)
}

func (pc *TopicConfig) partitions(topic string) int32 {
Expand Down
4 changes: 2 additions & 2 deletions tools/kafka-console-producer/kafka-console-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func main() {

if *headers != "" {
var hdrs []sarama.RecordHeader
arrHdrs := strings.Split(*headers, ",")
for _, h := range arrHdrs {
arrHdrs := strings.SplitSeq(*headers, ",")
for h := range arrHdrs {
Comment on lines +106 to +107
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to pass this through a temporary local variable.

if header := strings.Split(h, ":"); len(header) != 2 {
printUsageErrorAndExit("-header should be key:value. Example: -headers=foo:bar,bar:foo")
} else {
Expand Down
2 changes: 1 addition & 1 deletion zstd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func BenchmarkZstdMemoryConsumption(b *testing.B) {

gomaxprocsBackup := runtime.GOMAXPROCS(cpus)
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for b.Loop() {
for j := 0; j < 2*cpus; j++ {
_, _ = zstdCompress(params, nil, buf)
}
Expand Down
Loading