Skip to content

Commit

Permalink
Print aggregated error summary
Browse files Browse the repository at this point in the history
If fetching partition offsets fail due to offline brokers or missing partition leader we no longer print one log message per failed partition. Instead we will print a log line that summarizes the number of failed requests grouped by error type. An additional log is printed that informs how many topics are affected by this.
  • Loading branch information
weeco committed Jun 9, 2021
1 parent 3c7cfad commit 1ffd02b
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 18 deletions.
40 changes: 39 additions & 1 deletion minion/list_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package minion
import (
"context"
"fmt"
"github.com/twmb/franz-go/pkg/kerr"
"go.uber.org/zap"
"strconv"
"time"

Expand Down Expand Up @@ -60,5 +62,41 @@ func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListO
req := kmsg.NewListOffsetsRequest()
req.Topics = topicReqs

return req.RequestWith(ctx, s.client)
res, err := req.RequestWith(ctx, s.client)
if err != nil {
return res, err
}

// Log inner errors before returning them. We do that inside of this function to avoid duplicate logging as the response
// are cached for each scrape anyways.
//
// Create two metrics to aggregate error logs in few messages. Logging one message per occured partition error
// is too much. Typical errors are LEADER_NOT_AVAILABLE etc.
errorCountByErrCode := make(map[int16]int)
errorCountByTopic := make(map[string]int)

// Iterate on all partitions
for _, topic := range res.Topics {
for _, partition := range topic.Partitions {
err := kerr.TypedErrorForCode(partition.ErrorCode)
if err != nil {
errorCountByErrCode[partition.ErrorCode]++
errorCountByTopic[topic.Topic]++
}
}
}

// Print log line for each error type
for errCode, count := range errorCountByErrCode {
typedErr := kerr.TypedErrorForCode(errCode)
s.logger.Warn("failed to list some partitions watermarks",
zap.Error(typedErr),
zap.Int("error_count", count))
}
if len(errorCountByTopic) > 0 {
s.logger.Warn("some topics had one or more partitions whose watermarks could not be fetched from Kafka",
zap.Int("topics_with_errors", len(errorCountByTopic)))
}

return res, nil
}
4 changes: 2 additions & 2 deletions prometheus/collect_consumer_group_lags.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (e *Exporter) waterMarksByTopic(lowMarks *kmsg.ListOffsetsResponse, highMar
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {
e.logger.Warn("failed to get partition low water mark, inner kafka error",
e.logger.Debug("failed to get partition low water mark, inner kafka error",
zap.String("topic_name", topic.Topic),
zap.Int32("partition_id", partition.Partition),
zap.Error(err))
Expand All @@ -243,7 +243,7 @@ func (e *Exporter) waterMarksByTopic(lowMarks *kmsg.ListOffsetsResponse, highMar
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {
e.logger.Warn("failed to get partition high water mark, inner kafka error",
e.logger.Debug("failed to get partition high water mark, inner kafka error",
zap.String("topic_name", topic.Topic),
zap.Int32("partition_id", partition.Partition),
zap.Error(err))
Expand Down
40 changes: 25 additions & 15 deletions prometheus/collect_topic_partition_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
return false
}

// Process Low Watermarks
for _, topic := range lowWaterMarks.Topics {
if !e.minionSvc.IsTopicAllowed(topic.Topic) {
continue
}

waterMarkSum := int64(0)
hasErrors := false
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {
e.logger.Error("failed to fetch partition low water mark", zap.Error(err))
hasErrors = true
isOk = false
continue
}
Expand All @@ -50,24 +53,28 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
strconv.Itoa(int(partition.Partition)),
)
}
ch <- prometheus.MustNewConstMetric(
e.topicLowWaterMarkSum,
prometheus.GaugeValue,
float64(waterMarkSum),
topic.Topic,
)
// We only want to report the sum of all partition marks if we receive watermarks from all partition
if !hasErrors {
ch <- prometheus.MustNewConstMetric(
e.topicLowWaterMarkSum,
prometheus.GaugeValue,
float64(waterMarkSum),
topic.Topic,
)
}
}

for _, topic := range highWaterMarks.Topics {
if !e.minionSvc.IsTopicAllowed(topic.Topic) {
continue
}
waterMarkSum := int64(0)
hasErrors := false
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {
e.logger.Error("failed to fetch partition high water mark", zap.Error(err))
isOk = true
hasErrors = true
isOk = false
continue
}
waterMarkSum += partition.Offset
Expand All @@ -83,12 +90,15 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
strconv.Itoa(int(partition.Partition)),
)
}
ch <- prometheus.MustNewConstMetric(
e.topicHighWaterMarkSum,
prometheus.GaugeValue,
float64(waterMarkSum),
topic.Topic,
)
// We only want to report the sum of all partition marks if we receive watermarks from all partitions
if !hasErrors {
ch <- prometheus.MustNewConstMetric(
e.topicHighWaterMarkSum,
prometheus.GaugeValue,
float64(waterMarkSum),
topic.Topic,
)
}
}

return isOk
Expand Down

0 comments on commit 1ffd02b

Please sign in to comment.