diff --git a/pkg/kmsg/api.go b/pkg/kmsg/api.go index 812a90f3..da99c772 100644 --- a/pkg/kmsg/api.go +++ b/pkg/kmsg/api.go @@ -136,6 +136,15 @@ type Response interface { RequestKind() Request } +// UnsafeReadFrom, implemented by all requests and responses generated in this +// package, switches to using unsafe slice-to-string conversions when reading. +// This can be used to avoid a lot of garbage, but it means to have to be +// careful when using any strings in structs: if you hold onto the string, the +// underlying response slice will not be garbage collected. +type UnsafeReadFrom interface { + UnsafeReadFrom([]byte) error +} + // ThrottleResponse represents a response that could have a throttle applied by // Kafka. // @@ -235,6 +244,16 @@ func StringPtr(in string) *string { // point of this type is that it does not contain a version number inside it, // but it is versioned: if decoding v1 fails, this falls back to v0. func (s *StickyMemberMetadata) ReadFrom(src []byte) error { + return s.readFrom(src, false) +} + +// UnsafeReadFrom is the same as ReadFrom, but uses unsafe slice to string +// conversions to reduce garbage. +func (s *StickyMemberMetadata) UnsafeReadFrom(src []byte) error { + return s.readFrom(src, true) +} + +func (s *StickyMemberMetadata) readFrom(src []byte, unsafe bool) error { b := kbin.Reader{Src: src} numAssignments := b.ArrayLen() if numAssignments < 0 { @@ -243,10 +262,16 @@ func (s *StickyMemberMetadata) ReadFrom(src []byte) error { need := numAssignments - int32(cap(s.CurrentAssignment)) if need > 0 { s.CurrentAssignment = append(s.CurrentAssignment[:cap(s.CurrentAssignment)], make([]StickyMemberMetadataCurrentAssignment, need)...) + } else { + s.CurrentAssignment = s.CurrentAssignment[:numAssignments] } - s.CurrentAssignment = s.CurrentAssignment[:numAssignments] for i := int32(0); i < numAssignments; i++ { - topic := b.String() + var topic string + if unsafe { + topic = b.UnsafeString() + } else { + topic = b.String() + } numPartitions := b.ArrayLen() if numPartitions < 0 { numPartitions = 0 @@ -256,10 +281,11 @@ func (s *StickyMemberMetadata) ReadFrom(src []byte) error { need := numPartitions - int32(cap(a.Partitions)) if need > 0 { a.Partitions = append(a.Partitions[:cap(a.Partitions)], make([]int32, need)...) + } else { + a.Partitions = a.Partitions[:numPartitions] } - a.Partitions = a.Partitions[:0] - for i := numPartitions; i > 0; i-- { - a.Partitions = append(a.Partitions, b.Int32()) + for i := range a.Partitions { + a.Partitions[i] = b.Int32() } } if len(b.Src) > 0 {