Skip to content

Commit

Permalink
Make all message metrics vectors with partition id as label
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 13, 2021
1 parent 08dff9b commit 020c680
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
7 changes: 4 additions & 3 deletions e2e/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
}

// message arrived early enough
t.svc.messagesReceived.Inc()
t.svc.endToEndRoundtripLatency.WithLabelValues(strconv.Itoa(msg.partition)).Observe(latency.Seconds())
pID := strconv.Itoa(msg.partition)
t.svc.messagesReceived.WithLabelValues(pID).Inc()
t.svc.endToEndRoundtripLatency.WithLabelValues(pID).Observe(latency.Seconds())

// We mark the message as arrived so that we won't mark the message as lost and overwrite that modified message
// into the cache.
Expand All @@ -92,7 +93,7 @@ func (t *messageTracker) onMessageExpired(_ string, msg *EndToEndMessage) {

created := msg.creationTime()
age := time.Since(created)
t.svc.lostMessages.Inc()
t.svc.lostMessages.WithLabelValues(strconv.Itoa(msg.partition)).Inc()

t.logger.Info("message lost/expired",
zap.Int64("age_ms", age.Milliseconds()),
Expand Down
8 changes: 4 additions & 4 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ type Service struct {
messagesProducedInFlight *prometheus.GaugeVec
messagesProducedTotal *prometheus.CounterVec
messagesProducedFailed *prometheus.CounterVec
messagesReceived prometheus.Counter
messagesReceived *prometheus.CounterVec
offsetCommits prometheus.Counter
lostMessages prometheus.Counter
lostMessages *prometheus.CounterVec

endToEndAckLatency *prometheus.HistogramVec
endToEndRoundtripLatency *prometheus.HistogramVec
Expand Down Expand Up @@ -131,9 +131,9 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
svc.messagesProducedInFlight = makeGaugeVec("messages_produced_in_flight", []string{"partition_id"}, "Number of messages that kminion's end-to-end test produced but has not received an answer for yet")
svc.messagesProducedTotal = makeCounterVec("messages_produced_total", []string{"partition_id"}, "Number of all messages produced to Kafka. This counter will be incremented when we receive a response (failure/timeout or success) from Kafka")
svc.messagesProducedFailed = makeCounterVec("messages_produced_failed_total", []string{"partition_id"}, "Number of messages failed to produce to Kafka because of a timeout or failure")
svc.messagesReceived = makeCounter("messages_received_total", "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)")
svc.messagesReceived = makeCounterVec("messages_received_total", []string{"partition_id"}, "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)")
svc.offsetCommits = makeCounter("offset_commits_total", "Counts how many times kminions end-to-end test has committed offsets")
svc.lostMessages = makeCounter("messages_lost_total", "Number of messages that have been produced successfully but not received within the configured SLA duration")
svc.lostMessages = makeCounterVec("messages_lost_total", []string{"partition_id"}, "Number of messages that have been produced successfully but not received within the configured SLA duration")

// Latency Histograms
// More detailed info about how long stuff took
Expand Down

0 comments on commit 020c680

Please sign in to comment.