Skip to content

Commit cc9a4e9

Browse files
committed
- remove SLA gauge metrics, they can be inferred from the 'inifinite' bucket in the latency histograms
- methods to handle metric events (onAck, onRoundtrip, onOffsetCommit); will also soon be used to extract e2e into its own package - add _total and _seconds suffixes to metrics for best practices
1 parent a1ca198 commit cc9a4e9

File tree

4 files changed

+75
-78
lines changed

4 files changed

+75
-78
lines changed

minion/endtoend_consumer.go

+14-28
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
4646
zap.Error(err.Err))
4747
}
4848

49-
receiveTimestamp := timeNowMs()
49+
receiveTimestampMs := timeNowMs()
5050

5151
//
5252
// Process messages
@@ -59,13 +59,13 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
5959
continue
6060
}
6161

62-
s.processMessage(record, receiveTimestamp)
62+
s.processMessage(record, receiveTimestampMs)
6363
}
6464

6565
//
6666
// Commit offsets for processed messages
67-
// todo:
68-
// - do we need to keep track of what offset to commit for which partition??
67+
// todo: the normal way to commit offsets with franz-go is pretty good, but in our special case
68+
// we want to do it manually, seperately for each partition, so we can track how long it took
6969
if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil {
7070

7171
startCommitTimestamp := timeNowMs()
@@ -74,18 +74,14 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
7474
// got commit response
7575
if err != nil {
7676
s.logger.Error(fmt.Sprintf("record had an error on commit: %v\n", err))
77-
s.setCachedItem("end_to_end_consumer_offset_availability", false, 120*time.Second)
78-
} else {
79-
commitLatencySec := float64(timeNowMs()-startCommitTimestamp) / float64(1000)
80-
s.endToEndCommitLatency.Observe(commitLatencySec)
81-
s.endToEndMessagesCommitted.Inc()
82-
83-
if commitLatencySec <= s.Cfg.EndToEnd.Consumer.CommitSla.Seconds() {
84-
s.endToEndWithinCommitSla.Set(1)
85-
} else {
86-
s.endToEndWithinCommitSla.Set(0)
87-
}
77+
return
8878
}
79+
80+
latencyMs := timeNowMs() - startCommitTimestamp
81+
commitLatency := time.Duration(latencyMs * float64(time.Millisecond))
82+
83+
// todo: partitionID
84+
s.onOffsetCommit(0, commitLatency)
8985
})
9086
}
9187
}
@@ -99,7 +95,7 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
9995
// processMessage takes a message and:
10096
// - checks if it matches minionID and latency
10197
// - updates metrics accordingly
102-
func (s *Service) processMessage(record *kgo.Record, receiveTimestamp int64) {
98+
func (s *Service) processMessage(record *kgo.Record, receiveTimestampMs float64) {
10399
var msg EndToEndMessage
104100
if jerr := json.Unmarshal(record.Value, &msg); jerr != nil {
105101
return // maybe older version
@@ -109,17 +105,7 @@ func (s *Service) processMessage(record *kgo.Record, receiveTimestamp int64) {
109105
return // not from us
110106
}
111107

112-
if msg.Timestamp < s.lastRoundtripTimestamp {
113-
return // msg older than what we recently processed (out of order, should never happen)
114-
}
115-
116-
latencyMs := receiveTimestamp - msg.Timestamp
117-
if latencyMs > s.Cfg.EndToEnd.Consumer.RoundtripSla.Milliseconds() {
118-
s.endToEndWithinRoundtripSla.Set(0)
119-
return // too late!
120-
}
108+
latency := time.Duration((receiveTimestampMs - msg.Timestamp) * float64(time.Millisecond))
121109

122-
s.lastRoundtripTimestamp = msg.Timestamp
123-
s.endToEndMessagesReceived.Inc()
124-
s.endToEndRoundtripLatency.Observe(float64(latencyMs) / 1000)
110+
s.onRoundtrip(record.Partition, latency)
125111
}

minion/endtoend_producer.go

+6-10
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"time"
78

89
"github.com/google/uuid"
910
"github.com/twmb/franz-go/pkg/kgo"
1011
)
1112

1213
type EndToEndMessage struct {
13-
MinionID string `json:"minionID"`
14-
Timestamp int64 `json:"timestamp"`
14+
MinionID string `json:"minionID"`
15+
Timestamp float64 `json:"timestamp"`
1516
}
1617

1718
func (s *Service) produceToManagementTopic(ctx context.Context) error {
@@ -33,18 +34,13 @@ func (s *Service) produceToManagementTopic(ctx context.Context) error {
3334

3435
err = s.kafkaSvc.Client.Produce(ctx, record, func(r *kgo.Record, err error) {
3536
endTime := timeNowMs()
36-
ackDuration := endTime - startTime
37+
ackDurationMs := endTime - startTime
38+
ackDuration := time.Duration(ackDurationMs) * time.Millisecond
3739

3840
if err != nil {
3941
fmt.Printf("record had a produce error: %v\n", err)
4042
} else {
41-
s.endToEndMessagesAcked.Inc()
42-
43-
if ackDuration < s.Cfg.EndToEnd.Producer.AckSla.Milliseconds() {
44-
s.endToEndWithinRoundtripSla.Set(1)
45-
} else {
46-
s.endToEndWithinRoundtripSla.Set(0)
47-
}
43+
s.onAck(r.Partition, ackDuration)
4844
}
4945
})
5046

minion/endtoend_topic.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
2222
// TopicMetadataArray could be empty, therefore needs to do this check beforehand
2323
topicMetadataArray := topicMetadata.Topics
2424
if len(topicMetadataArray) == 0 {
25-
return fmt.Errorf("Unable to retrieve metadata, please make sure the brokers are up and/or you have right to access them")
25+
return fmt.Errorf("unable to retrieve metadata, please make sure the brokers are up and/or you have right to access them")
2626
}
2727
doesTopicReachable := topicMetadata.Topics[0].Topic != ""
2828
if !doesTopicReachable {
29-
return fmt.Errorf("Unable to retrieve metadata, please make sure the brokers are up and/or you have right to access them")
29+
return fmt.Errorf("unable to retrieve metadata, please make sure the brokers are up and/or you have right to access them")
3030
}
3131

3232
// Create the management end to end topic if it does not exist
@@ -43,7 +43,7 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
4343
// topicMetadata.Brokers will return all the available brokers from the cluster
4444
isNumBrokerValid := len(topicMetadata.Brokers) >= expectedReplicationFactor
4545
if !isNumBrokerValid {
46-
return fmt.Errorf("Current cluster size differs from the expected size. Expected broker: %v NumOfBroker: %v", len(topicMetadata.Brokers), expectedReplicationFactor)
46+
return fmt.Errorf("current cluster size differs from the expected size. expected broker: %v NumOfBroker: %v", len(topicMetadata.Brokers), expectedReplicationFactor)
4747
}
4848

4949
// Check the number of Partition per broker, if it is too low create partition
@@ -244,6 +244,6 @@ func (s *Service) initEndToEnd(ctx context.Context) {
244244
}
245245
}
246246

247-
func timeNowMs() int64 {
248-
return time.Now().UnixNano() / int64(time.Millisecond)
247+
func timeNowMs() float64 {
248+
return float64(time.Now().UnixNano()) / float64(time.Millisecond)
249249
}

minion/service.go

+50-35
Original file line numberDiff line numberDiff line change
@@ -36,25 +36,18 @@ type Service struct {
3636
storage *Storage
3737

3838
// EndToEnd
39-
minionID string // unique identifier, reported in metrics, in case multiple instances run at the same time
40-
lastRoundtripTimestamp int64 // creation time (in utc ms) of the message that most recently passed the roundtripSla check
39+
minionID string // unique identifier, reported in metrics, in case multiple instances run at the same time
40+
lastRoundtripTimestamp float64 // creation time (in utc ms) of the message that most recently passed the roundtripSla check
4141

4242
// EndToEnd Metrics
4343
endToEndMessagesProduced prometheus.Counter
4444
endToEndMessagesAcked prometheus.Counter
4545
endToEndMessagesReceived prometheus.Counter
4646
endToEndMessagesCommitted prometheus.Counter
4747

48-
endToEndWithinAckSla prometheus.Gauge
49-
endToEndWithinRoundtripSla prometheus.Gauge
50-
endToEndWithinCommitSla prometheus.Gauge
51-
52-
endToEndProduceLatency prometheus.Histogram
48+
endToEndAckLatency prometheus.Histogram
5349
endToEndRoundtripLatency prometheus.Histogram
5450
endToEndCommitLatency prometheus.Histogram
55-
56-
// todo: produce latency histogram
57-
5851
}
5952

6053
func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricNamespace string) (*Service, error) {
@@ -92,14 +85,6 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN
9285

9386
// End-to-End metrics
9487
if cfg.EndToEnd.Enabled {
95-
makeGauge := func(name string, help string) prometheus.Gauge {
96-
return promauto.NewGauge(prometheus.GaugeOpts{
97-
Namespace: metricNamespace,
98-
Subsystem: "end_to_end",
99-
Name: name,
100-
Help: help,
101-
})
102-
}
10388
makeCounter := func(name string, help string) prometheus.Counter {
10489
return promauto.NewCounter(prometheus.CounterOpts{
10590
Namespace: metricNamespace,
@@ -119,26 +104,18 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN
119104
}
120105

121106
// Low-level info
122-
// Users can construct stuff like "message commits failed" themselves from those
123-
service.endToEndMessagesProduced = makeCounter("messages_produced", "Number of messages that kminion's end-to-end test has tried to send to kafka")
124-
service.endToEndMessagesAcked = makeCounter("messages_acked", "Number of messages kafka acknowledged as produced")
125-
service.endToEndMessagesReceived = makeCounter("messages_received", "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)")
126-
service.endToEndMessagesCommitted = makeCounter("messages_committed", "Number of *matching* messages kminion successfully commited as read/processed. See 'messages_received' for what 'matching' means. Kminion will commit late/mismatching messages to kafka as well, but those won't be counted in this metric.")
127-
128-
// High-level SLA reporting
129-
// Simple gauges that report if stuff is within the configured SLAs
130-
// Naturally those will potentially not trigger if, for example, only a single message is lost in-between scrap intervals.
131-
gaugeHelp := "Will be either 0 (false) or 1 (true), depending on the durations (SLAs) configured in kminion's config"
132-
service.endToEndWithinAckSla = makeGauge("is_within_ack_sla", "Reports whether messages can be produced. A message is only considered 'produced' when the broker has sent an ack within the configured timeout. "+gaugeHelp)
133-
service.endToEndWithinRoundtripSla = makeGauge("is_within_roundtrip_sla", "Reports whether or not kminion receives the test messages it produces within the configured timeout. "+gaugeHelp)
134-
service.endToEndWithinCommitSla = makeGauge("is_within_commit_sla", "Reports whether or not kminion can successfully commit offsets for the messages it receives/processes within the configured timeout. "+gaugeHelp)
107+
// Users can construct alerts like "can't produce messages" themselves from those
108+
service.endToEndMessagesProduced = makeCounter("messages_produced_total", "Number of messages that kminion's end-to-end test has tried to send to kafka")
109+
service.endToEndMessagesAcked = makeCounter("messages_acked_total", "Number of messages kafka acknowledged as produced")
110+
service.endToEndMessagesReceived = makeCounter("messages_received_total", "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)")
111+
service.endToEndMessagesCommitted = makeCounter("messages_committed_total", "Number of *matching* messages kminion successfully commited as read/processed. See 'messages_received' for what 'matching' means. Kminion will commit late/mismatching messages to kafka as well, but those won't be counted in this metric.")
135112

136113
// Latency Histograms
137114
// More detailed info about how long stuff took
138-
// Since histograms also have an 'infinite' bucket, they can be used to detect small hickups that won't trigger the SLA gauges
139-
service.endToEndProduceLatency = makeHistogram("produce_latency", cfg.EndToEnd.Producer.AckSla, "Time until we received an ack for a produced message")
140-
service.endToEndRoundtripLatency = makeHistogram("roundtrip_latency", cfg.EndToEnd.Consumer.RoundtripSla, "Time it took between sending (producing) and receiving (consuming) a message")
141-
service.endToEndCommitLatency = makeHistogram("commit_latency", cfg.EndToEnd.Consumer.CommitSla, "Time kafka took to respond to kminion's offset commit")
115+
// Since histograms also have an 'infinite' bucket, they can be used to detect small hickups "lost" messages
116+
service.endToEndAckLatency = makeHistogram("produce_latency_seconds", cfg.EndToEnd.Producer.AckSla, "Time until we received an ack for a produced message")
117+
service.endToEndRoundtripLatency = makeHistogram("roundtrip_latency_seconds", cfg.EndToEnd.Consumer.RoundtripSla, "Time it took between sending (producing) and receiving (consuming) a message")
118+
service.endToEndCommitLatency = makeHistogram("commit_latency_seconds", cfg.EndToEnd.Consumer.CommitSla, "Time kafka took to respond to kminion's offset commit")
142119
}
143120

144121
return service, nil
@@ -236,3 +213,41 @@ func createHistogramBuckets(maxLatency time.Duration) []float64 {
236213

237214
return bucket
238215
}
216+
217+
// called from e2e when a message is acknowledged
218+
func (s *Service) onAck(partitionId int32, duration time.Duration) {
219+
s.endToEndMessagesAcked.Inc()
220+
s.endToEndAckLatency.Observe(duration.Seconds())
221+
}
222+
223+
// called from e2e when a message completes a roundtrip (send to kafka, receive msg from kafka again)
224+
func (s *Service) onRoundtrip(partitionId int32, duration time.Duration) {
225+
if duration > s.Cfg.EndToEnd.Consumer.RoundtripSla {
226+
return // message is too old
227+
}
228+
229+
// todo: track "lastRoundtripMessage"
230+
// if msg.Timestamp < s.lastRoundtripTimestamp {
231+
// return // msg older than what we recently processed (out of order, should never happen)
232+
// }
233+
234+
s.endToEndMessagesReceived.Inc()
235+
s.endToEndRoundtripLatency.Observe(duration.Seconds())
236+
}
237+
238+
// called from e2e when an offset commit is confirmed
239+
func (s *Service) onOffsetCommit(partitionId int32, duration time.Duration) {
240+
241+
// todo:
242+
// if the commit took too long, don't count it in 'commits' but add it to the histogram?
243+
// and how do we want to handle cases where we get an error??
244+
// should we have another metric that tells us about failed commits? or a label on the counter?
245+
246+
s.endToEndCommitLatency.Observe(duration.Seconds())
247+
248+
if duration > s.Cfg.EndToEnd.Consumer.CommitSla {
249+
return
250+
}
251+
252+
s.endToEndMessagesCommitted.Inc()
253+
}

0 commit comments

Comments
 (0)