Skip to content

Commit 5b04c98

Browse files
committed
feat(metrics): track consumer-fetch-response-size
- add metrics registry to decode func - add metric to track the size of fetch response
1 parent b87a616 commit 5b04c98

17 files changed

+66
-36
lines changed

balance_strategy.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -609,9 +609,9 @@ func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscripti
609609
// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
610610
func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
611611
userDataV1 := &StickyAssignorUserDataV1{}
612-
if err := decode(userDataBytes, userDataV1); err != nil {
612+
if err := decode(userDataBytes, userDataV1, nil); err != nil {
613613
userDataV0 := &StickyAssignorUserDataV0{}
614-
if err := decode(userDataBytes, userDataV0); err != nil {
614+
if err := decode(userDataBytes, userDataV0, nil); err != nil {
615615
return nil, err
616616
}
617617
return userDataV0, nil

broker.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error
435435
return
436436
}
437437

438-
if err := versionedDecode(packets, res, request.version()); err != nil {
438+
if err := versionedDecode(packets, res, request.version(), b.conf.MetricRegistry); err != nil {
439439
// Malformed response
440440
cb(nil, err)
441441
return
@@ -1023,13 +1023,13 @@ func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
10231023
return nil
10241024
}
10251025

1026-
return handleResponsePromise(req, res, promise)
1026+
return b.handleResponsePromise(req, res, promise)
10271027
}
10281028

1029-
func handleResponsePromise(req protocolBody, res protocolBody, promise *responsePromise) error {
1029+
func (b *Broker) handleResponsePromise(req protocolBody, res protocolBody, promise *responsePromise) error {
10301030
select {
10311031
case buf := <-promise.packets:
1032-
return versionedDecode(buf, res, req.version())
1032+
return versionedDecode(buf, res, req.version(), b.conf.MetricRegistry)
10331033
case err := <-promise.errors:
10341034
return err
10351035
}
@@ -1121,7 +1121,7 @@ func (b *Broker) responseReceiver() {
11211121
}
11221122

11231123
decodedHeader := responseHeader{}
1124-
err = versionedDecode(header, &decodedHeader, response.headerVersion)
1124+
err = versionedDecode(header, &decodedHeader, response.headerVersion, b.conf.MetricRegistry)
11251125
if err != nil {
11261126
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
11271127
dead = err
@@ -1182,7 +1182,7 @@ func (b *Broker) authenticateViaSASLv1() error {
11821182
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
11831183
return handshakeErr
11841184
}
1185-
handshakeErr = handleResponsePromise(handshakeRequest, handshakeResponse, prom)
1185+
handshakeErr = b.handleResponsePromise(handshakeRequest, handshakeResponse, prom)
11861186
if handshakeErr != nil {
11871187
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
11881188
return handshakeErr
@@ -1202,7 +1202,7 @@ func (b *Broker) authenticateViaSASLv1() error {
12021202
Logger.Printf("Error while performing SASL Auth %s\n", b.addr)
12031203
return nil, authErr
12041204
}
1205-
authErr = handleResponsePromise(authenticateRequest, authenticateResponse, prom)
1205+
authErr = b.handleResponsePromise(authenticateRequest, authenticateResponse, prom)
12061206
if authErr != nil {
12071207
Logger.Printf("Error while performing SASL Auth %s\n", b.addr)
12081208
return nil, authErr
@@ -1280,7 +1280,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
12801280
b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
12811281
res := &SaslHandshakeResponse{}
12821282

1283-
err = versionedDecode(payload, res, 0)
1283+
err = versionedDecode(payload, res, 0, b.conf.MetricRegistry)
12841284
if err != nil {
12851285
Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
12861286
return err

consumer_group_members_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func TestConsumerGroupMemberMetadata(t *testing.T) {
5959
}
6060

6161
meta2 := new(ConsumerGroupMemberMetadata)
62-
err = decode(buf, meta2)
62+
err = decode(buf, meta2, nil)
6363
if err != nil {
6464
t.Error("Failed to decode data", err)
6565
} else if !reflect.DeepEqual(meta, meta2) {
@@ -69,10 +69,10 @@ func TestConsumerGroupMemberMetadata(t *testing.T) {
6969

7070
func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) {
7171
meta := new(ConsumerGroupMemberMetadata)
72-
if err := decode(groupMemberMetadataV1, meta); err != nil {
72+
if err := decode(groupMemberMetadataV1, meta, nil); err != nil {
7373
t.Error("Failed to decode V1 data", err)
7474
}
75-
if err := decode(groupMemberMetadataV1Bad, meta); err != nil {
75+
if err := decode(groupMemberMetadataV1Bad, meta, nil); err != nil {
7676
t.Error("Failed to decode V1 'bad' data", err)
7777
}
7878
}
@@ -94,7 +94,7 @@ func TestConsumerGroupMemberAssignment(t *testing.T) {
9494
}
9595

9696
amt2 := new(ConsumerGroupMemberAssignment)
97-
err = decode(buf, amt2)
97+
err = decode(buf, amt2, nil)
9898
if err != nil {
9999
t.Error("Failed to decode data", err)
100100
} else if !reflect.DeepEqual(amt, amt2) {

consumer_metadata_response_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func TestConsumerMetadataResponseError(t *testing.T) {
2626
testEncodable(t, "", response, consumerMetadataResponseError)
2727

2828
decodedResp := &ConsumerMetadataResponse{}
29-
if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0); err != nil {
29+
if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0, nil); err != nil {
3030
t.Error("could not decode: ", err)
3131
}
3232

describe_groups_response.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAs
250250
return nil, nil
251251
}
252252
assignment := new(ConsumerGroupMemberAssignment)
253-
err := decode(gmd.MemberAssignment, assignment)
253+
err := decode(gmd.MemberAssignment, assignment, nil)
254254
return assignment, err
255255
}
256256

@@ -259,6 +259,6 @@ func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMeta
259259
return nil, nil
260260
}
261261
metadata := new(ConsumerGroupMemberMetadata)
262-
err := decode(gmd.MemberMetadata, metadata)
262+
err := decode(gmd.MemberMetadata, metadata, nil)
263263
return metadata, err
264264
}

encoder_decoder.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,15 @@ type versionedDecoder interface {
5757

5858
// decode takes bytes and a decoder and fills the fields of the decoder from the bytes,
5959
// interpreted using Kafka's encoding rules.
60-
func decode(buf []byte, in decoder) error {
60+
func decode(buf []byte, in decoder, metricRegistry metrics.Registry) error {
6161
if buf == nil {
6262
return nil
6363
}
6464

65-
helper := realDecoder{raw: buf}
65+
helper := realDecoder{
66+
raw: buf,
67+
registry: metricRegistry,
68+
}
6669
err := in.decode(&helper)
6770
if err != nil {
6871
return err
@@ -75,12 +78,15 @@ func decode(buf []byte, in decoder) error {
7578
return nil
7679
}
7780

78-
func versionedDecode(buf []byte, in versionedDecoder, version int16) error {
81+
func versionedDecode(buf []byte, in versionedDecoder, version int16, metricRegistry metrics.Registry) error {
7982
if buf == nil {
8083
return nil
8184
}
8285

83-
helper := realDecoder{raw: buf}
86+
helper := realDecoder{
87+
raw: buf,
88+
registry: metricRegistry,
89+
}
8490
err := in.decode(&helper, version)
8591
if err != nil {
8692
return err

fetch_response.go

+11
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"errors"
55
"sort"
66
"time"
7+
8+
"github.com/rcrowley/go-metrics"
79
)
810

911
const invalidPreferredReplicaID = -1
@@ -60,6 +62,12 @@ type FetchResponseBlock struct {
6062
}
6163

6264
func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
65+
metricRegistry := pd.metricRegistry()
66+
var sizeMetric metrics.Histogram
67+
if metricRegistry != nil {
68+
sizeMetric = getOrRegisterHistogram("consumer-fetch-response-size", metricRegistry)
69+
}
70+
6371
tmp, err := pd.getInt16()
6472
if err != nil {
6573
return err
@@ -115,6 +123,9 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
115123
if err != nil {
116124
return err
117125
}
126+
if sizeMetric != nil {
127+
sizeMetric.Update(int64(recordsSize))
128+
}
118129

119130
recordsDecoder, err := pd.getSubset(int(recordsSize))
120131
if err != nil {

join_group_response.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata
2121
members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members))
2222
for _, member := range r.Members {
2323
meta := new(ConsumerGroupMemberMetadata)
24-
if err := decode(member.Metadata, meta); err != nil {
24+
if err := decode(member.Metadata, meta, nil); err != nil {
2525
return nil, err
2626
}
2727
members[member.MemberId] = *meta

message_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func TestMessageDecodingVersion1(t *testing.T) {
236236

237237
func TestMessageDecodingUnknownVersions(t *testing.T) {
238238
message := Message{Version: 2}
239-
err := decode(emptyV2Message, &message)
239+
err := decode(emptyV2Message, &message, nil)
240240
if err == nil {
241241
t.Error("Decoding did not produce an error for an unknown magic byte")
242242
}

packet_decoder.go

+5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package sarama
22

3+
import "github.com/rcrowley/go-metrics"
4+
35
// PacketDecoder is the interface providing helpers for reading with Kafka's encoding rules.
46
// Types implementing Decoder only need to worry about calling methods like GetString,
57
// not about how a string is represented in Kafka.
@@ -40,6 +42,9 @@ type packetDecoder interface {
4042
// Stacks, see PushDecoder
4143
push(in pushDecoder) error
4244
pop() error
45+
46+
// To record metrics when provided
47+
metricRegistry() metrics.Registry
4348
}
4449

4550
// PushDecoder is the interface for decoding fields like CRCs and lengths where the validity

real_decoder.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package sarama
33
import (
44
"encoding/binary"
55
"math"
6+
7+
"github.com/rcrowley/go-metrics"
68
)
79

810
var (
@@ -15,9 +17,10 @@ var (
1517
)
1618

1719
type realDecoder struct {
18-
raw []byte
19-
off int
20-
stack []pushDecoder
20+
raw []byte
21+
off int
22+
stack []pushDecoder
23+
registry metrics.Registry
2124
}
2225

2326
// primitives
@@ -459,3 +462,7 @@ func (rd *realDecoder) pop() error {
459462

460463
return in.check(rd.off, rd.raw)
461464
}
465+
466+
func (rd *realDecoder) metricRegistry() metrics.Registry {
467+
return rd.registry
468+
}

record_batch.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
186186
}
187187

188188
b.recordsLen = len(recBuffer)
189-
err = decode(recBuffer, recordsArray(b.Records))
189+
err = decode(recBuffer, recordsArray(b.Records), nil)
190190
if errors.Is(err, ErrInsufficientData) {
191191
b.PartialTrailingRecord = true
192192
b.Records = nil

records_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ func TestLegacyRecords(t *testing.T) {
3333
set = &MessageSet{}
3434
r = Records{}
3535

36-
err = decode(exp, set)
36+
err = decode(exp, set, nil)
3737
if err != nil {
3838
t.Fatal(err)
3939
}
40-
err = decode(buf, &r)
40+
err = decode(buf, &r, nil)
4141
if err != nil {
4242
t.Fatal(err)
4343
}
@@ -110,11 +110,11 @@ func TestDefaultRecords(t *testing.T) {
110110
batch = &RecordBatch{}
111111
r = Records{}
112112

113-
err = decode(exp, batch)
113+
err = decode(exp, batch, nil)
114114
if err != nil {
115115
t.Fatal(err)
116116
}
117-
err = decode(buf, &r)
117+
err = decode(buf, &r, nil)
118118
if err != nil {
119119
t.Fatal(err)
120120
}

request.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func decodeRequest(r io.Reader) (*request, int, error) {
109109
bytesRead += len(encodedReq)
110110

111111
req := &request{}
112-
if err := decode(encodedReq, req); err != nil {
112+
if err := decode(encodedReq, req, nil); err != nil {
113113
return nil, bytesRead, err
114114
}
115115

request_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ func testEncodable(t *testing.T, name string, in encoder, expect []byte) {
2121
}
2222

2323
func testDecodable(t *testing.T, name string, out decoder, in []byte) {
24-
err := decode(in, out)
24+
err := decode(in, out, nil)
2525
if err != nil {
2626
t.Error("Decoding", name, "failed:", err)
2727
}
2828
}
2929

3030
func testVersionDecodable(t *testing.T, name string, out versionedDecoder, in []byte, version int16) {
31-
err := versionedDecode(in, out, version)
31+
err := versionedDecode(in, out, version, nil)
3232
if err != nil {
3333
t.Error("Decoding", name, "version", version, "failed:", err)
3434
}
@@ -99,7 +99,7 @@ func testResponse(t *testing.T, name string, res protocolBody, expected []byte)
9999
}
100100

101101
decoded := reflect.New(reflect.TypeOf(res).Elem()).Interface().(versionedDecoder)
102-
if err := versionedDecode(encoded, decoded, res.version()); err != nil {
102+
if err := versionedDecode(encoded, decoded, res.version(), nil); err != nil {
103103
t.Error("Decoding", name, "failed:", err)
104104
}
105105

sarama.go

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ Consumer related metrics:
7171
| consumer-fetch-rate | meter | Fetch requests/second sent to all brokers |
7272
| consumer-fetch-rate-for-broker-<broker> | meter | Fetch requests/second sent to a given broker |
7373
| consumer-fetch-rate-for-topic-<topic> | meter | Fetch requests/second sent for a given topic |
74+
| consumer-fetch-response-size | histogram | Distribution of the fetch response size in bytes |
7475
| consumer-group-join-total-<GroupID> | counter | Total count of consumer group join attempts |
7576
| consumer-group-join-failed-<GroupID> | counter | Total count of consumer group join failures |
7677
| consumer-group-sync-total-<GroupID> | counter | Total count of consumer group sync attempts |

sync_group_response.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type SyncGroupResponse struct {
1515

1616
func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
1717
assignment := new(ConsumerGroupMemberAssignment)
18-
err := decode(r.MemberAssignment, assignment)
18+
err := decode(r.MemberAssignment, assignment, nil)
1919
return assignment, err
2020
}
2121

0 commit comments

Comments
 (0)