diff --git a/e2e/message_tracker.go b/e2e/message_tracker.go index 6da8a09..f544c92 100644 --- a/e2e/message_tracker.go +++ b/e2e/message_tracker.go @@ -73,8 +73,9 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) { } // message arrived early enough - t.svc.messagesReceived.Inc() - t.svc.endToEndRoundtripLatency.WithLabelValues(strconv.Itoa(msg.partition)).Observe(latency.Seconds()) + pID := strconv.Itoa(msg.partition) + t.svc.messagesReceived.WithLabelValues(pID).Inc() + t.svc.endToEndRoundtripLatency.WithLabelValues(pID).Observe(latency.Seconds()) // We mark the message as arrived so that we won't mark the message as lost and overwrite that modified message // into the cache. @@ -92,7 +93,7 @@ func (t *messageTracker) onMessageExpired(_ string, msg *EndToEndMessage) { created := msg.creationTime() age := time.Since(created) - t.svc.lostMessages.Inc() + t.svc.lostMessages.WithLabelValues(strconv.Itoa(msg.partition)).Inc() t.logger.Info("message lost/expired", zap.Int64("age_ms", age.Milliseconds()), diff --git a/e2e/service.go b/e2e/service.go index 6e30c62..e8ba76b 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -33,9 +33,9 @@ type Service struct { messagesProducedInFlight *prometheus.GaugeVec messagesProducedTotal *prometheus.CounterVec messagesProducedFailed *prometheus.CounterVec - messagesReceived prometheus.Counter + messagesReceived *prometheus.CounterVec offsetCommits prometheus.Counter - lostMessages prometheus.Counter + lostMessages *prometheus.CounterVec endToEndAckLatency *prometheus.HistogramVec endToEndRoundtripLatency *prometheus.HistogramVec @@ -131,9 +131,9 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k svc.messagesProducedInFlight = makeGaugeVec("messages_produced_in_flight", []string{"partition_id"}, "Number of messages that kminion's end-to-end test produced but has not received an answer for yet") svc.messagesProducedTotal = makeCounterVec("messages_produced_total", []string{"partition_id"}, "Number of all messages produced to Kafka. This counter will be incremented when we receive a response (failure/timeout or success) from Kafka") svc.messagesProducedFailed = makeCounterVec("messages_produced_failed_total", []string{"partition_id"}, "Number of messages failed to produce to Kafka because of a timeout or failure") - svc.messagesReceived = makeCounter("messages_received_total", "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)") + svc.messagesReceived = makeCounterVec("messages_received_total", []string{"partition_id"}, "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)") svc.offsetCommits = makeCounter("offset_commits_total", "Counts how many times kminions end-to-end test has committed offsets") - svc.lostMessages = makeCounter("messages_lost_total", "Number of messages that have been produced successfully but not received within the configured SLA duration") + svc.lostMessages = makeCounterVec("messages_lost_total", []string{"partition_id"}, "Number of messages that have been produced successfully but not received within the configured SLA duration") // Latency Histograms // More detailed info about how long stuff took