Skip to content

Commit

Permalink
message tracker, better comments, use time.Now() everywhere (no point…
Browse files Browse the repository at this point in the history
… in dealing with millis)
  • Loading branch information
rikimaru0345 committed May 19, 2021
1 parent 3754be9 commit 192bf30
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 140 deletions.
103 changes: 51 additions & 52 deletions e2e/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"go.uber.org/zap"
)

func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
func (s *Service) startConsumeMessages(ctx context.Context) {
client := s.client
topicName := s.config.TopicManagement.Name
topic := kgo.ConsumeTopics(kgo.NewOffset().AtEnd(), topicName)
Expand All @@ -33,81 +33,73 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
return
default:
fetches := client.PollFetches(ctx)
receiveTimestamp := time.Now()

// Log all errors and continue afterwards as we might get errors and still have some fetch results
errors := fetches.Errors()
for _, err := range errors {
// Log all errors and continue afterwards as we might get errors and still have some fetch results
s.logger.Error("kafka fetch error",
zap.String("topic", err.Topic),
zap.Int32("partition", err.Partition),
zap.Error(err.Err))
}

receiveTimestampMs := timeNowMs()

//
// Process messages
iter := fetches.RecordIter()
var record *kgo.Record
for !iter.Done() {
record = iter.Next()

if record == nil {
continue
fetches.EachRecord(func(record *kgo.Record) {
if record != nil {
s.processMessage(record, receiveTimestamp)
}
})
}
}
}

s.processMessage(record, receiveTimestampMs)
}
func (s *Service) commitOffsets(ctx context.Context) {
client := s.client

//
// Commit offsets for processed messages
// todo: the normal way to commit offsets with franz-go is pretty good, but in our special case
// we want to do it manually, seperately for each partition, so we can track how long it took
if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil {
//
// Commit offsets for processed messages
// todo: the normal way to commit offsets with franz-go is pretty good, but in our special case
// we want to do it manually, seperately for each partition, so we can track how long it took
if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil {

startCommitTimestamp := timeNowMs()
startCommitTimestamp := time.Now()

client.CommitOffsets(ctx, uncommittedOffset, func(req *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) {
// got commit response
client.CommitOffsets(ctx, uncommittedOffset, func(req *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) {
// got commit response
latency := time.Since(startCommitTimestamp)

latencyMs := timeNowMs() - startCommitTimestamp
commitLatency := time.Duration(latencyMs * float64(time.Millisecond))
if err != nil {
s.logger.Error("offset commit failed", zap.Error(err), zap.Int64("latencyMilliseconds", latency.Milliseconds()))
return
}

for _, t := range r.Topics {
for _, p := range t.Partitions {
err := kerr.ErrorForCode(p.ErrorCode)
if err != nil {
s.logger.Error("offset commit failed", zap.Error(err), zap.Int64("latencyMilliseconds", commitLatency.Milliseconds()))
return
}

for _, t := range r.Topics {
for _, p := range t.Partitions {
err := kerr.ErrorForCode(p.ErrorCode)
if err != nil {
s.logger.Error("error committing partition offset", zap.String("topic", t.Topic), zap.Int32("partitionId", p.Partition), zap.Error(err))
}
}
}

// only report commit latency if the coordinator wasn't set too long ago
if time.Since(s.clientHooks.lastCoordinatorUpdate) < 10*time.Second {
coordinator := s.clientHooks.currentCoordinator.Load().(kgo.BrokerMetadata)
s.onOffsetCommit(coordinator.NodeID, commitLatency)
s.logger.Error("error committing partition offset", zap.String("topic", t.Topic), zap.Int32("partitionId", p.Partition), zap.Error(err))
}
})
}
}

}
// only report commit latency if the coordinator wasn't set too long ago
if time.Since(s.clientHooks.lastCoordinatorUpdate) < 10*time.Second {
coordinator := s.clientHooks.currentCoordinator.Load().(kgo.BrokerMetadata)
s.onOffsetCommit(coordinator.NodeID, latency)
}
})
}

}

// todo: then also create a "tracker" that knows about in-flight messages, and the latest successful roundtrips

// processMessage takes a message and:
// - checks if it matches minionID and latency
// - updates metrics accordingly
func (s *Service) processMessage(record *kgo.Record, receiveTimestampMs float64) {
// processMessage:
// - deserializes the message
// - checks if it is from us, or from another kminion process running somewhere else
// - hands it off to the service, which then reports metrics on it
func (s *Service) processMessage(record *kgo.Record, receiveTimestamp time.Time) {
var msg EndToEndMessage
if jerr := json.Unmarshal(record.Value, &msg); jerr != nil {
return // maybe older version
Expand All @@ -117,7 +109,14 @@ func (s *Service) processMessage(record *kgo.Record, receiveTimestampMs float64)
return // not from us
}

latency := time.Duration((receiveTimestampMs - msg.Timestamp) * float64(time.Millisecond))
// restore partition, which was not serialized
msg.partition = int(record.Partition)

created := msg.creationTime()
latency := receiveTimestamp.Sub(created)

s.onRoundtrip(record.Partition, latency)

// notify the tracker that the message arrived
s.messageTracker.onMessageArrived(&msg)
}
6 changes: 3 additions & 3 deletions e2e/group_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type groupTracker struct {
groupId string // our own groupId
potentiallyEmptyGroups map[string]time.Time // groupName -> utc timestamp when the group was first seen

isNotAuthorized bool
isNotAuthorized bool // if we get a not authorized response while trying to delete old groups, this will be set to true, essentially disabling the tracker
}

func newGroupTracker(svc *Service, ctx context.Context) *groupTracker {
Expand Down Expand Up @@ -120,7 +120,7 @@ func (g *groupTracker) checkAndDeleteOldConsumerGroups() error {
_, exists := g.potentiallyEmptyGroups[name]
if !exists {
// add it with the current timestamp
now := time.Now().UTC()
now := time.Now()
g.potentiallyEmptyGroups[name] = now
g.logger.Debug("new empty kminion group, adding to tracker", zap.String("group", name), zap.Time("firstSeen", now))
}
Expand All @@ -134,7 +134,7 @@ func (g *groupTracker) checkAndDeleteOldConsumerGroups() error {
exists, _ := containsStr(matchingGroups, name)
if exists {
// still there, check age and maybe delete it
age := time.Now().UTC().Sub(firstSeen)
age := time.Now().Sub(firstSeen)
if age > oldGroupMaxAge {
// group was unused for too long, delete it
groupsToDelete = append(groupsToDelete, name)
Expand Down
106 changes: 106 additions & 0 deletions e2e/message_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package e2e

import (
"context"
"time"

goCache "github.com/patrickmn/go-cache"
"go.uber.org/zap"
)

// messageTracker keeps track of messages (wow)
//
// When we successfully send a mesasge, it will be added to this tracker.
// Later, when we receive the message back in the consumer, the message is marked as completed and removed from the tracker.
// If the message does not arrive within the configured `consumer.roundtripSla`, it is counted as lost.
// A lost message is reported in the `roundtrip_latency_seconds` metric with infinite duration,
// but it would probably be a good idea to also have a metric that reports the number of lost messages.
//
// When we fail to send a message, it isn't tracked.
//
// todo: We should probably report that in the roundtrip metric as infinite duration.
// since, if one broker is offline, we can't produce to the partition it leads,
// but we are still able to produce to other partitions led by other brokers.
// This should add at least a little protection against people who only alert on messages_produced and messages_received.
//
// Alternatively, maybe some sort of "failed count" metric could be a good idea?
//
type messageTracker struct {
svc *Service
logger *zap.Logger
ctx context.Context
cache *goCache.Cache
}

func newMessageTracker(svc *Service) *messageTracker {

defaultExpirationTime := svc.config.Consumer.RoundtripSla
cleanupInterval := 1 * time.Second

t := &messageTracker{
svc: svc,
logger: svc.logger.Named("message-tracker"),
cache: goCache.New(defaultExpirationTime, cleanupInterval),
}

t.cache.OnEvicted(func(key string, item interface{}) {
t.onMessageExpired(key, item.(*EndToEndMessage))
})

return t
}

func (t *messageTracker) addToTracker(msg *EndToEndMessage) {
t.cache.SetDefault(msg.MessageID, &msg)
}

func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
cachedMessageInterface, _, found := t.cache.GetWithExpiration(arrivedMessage.MessageID)
if !found {
// message expired and was removed from the cache
// it arrived too late, nothing to do here...
return
}

actualExpireTime := arrivedMessage.creationTime().Add(t.svc.config.Consumer.RoundtripSla)
if time.Now().Before(actualExpireTime) {
// message arrived early enough

// timeUntilExpire := time.Until(actualExpireTime)
// t.logger.Debug("message arrived",
// zap.Duration("timeLeft", timeUntilExpire),
// zap.Duration("age", ),
// zap.Int("partition", msg.partition),
// zap.String("messageId", msg.MessageID),
// )
} else {
// Message arrived late, but was still in cache.
// Maybe we could log something like "message arrived after the sla"...
//
// But for now we don't report it as "lost" in the log (because it actually *did* arrive just now, just too late).
// The metrics will report it as 'duration infinite' anyway.
}

// Set it as arrived, so we don't log it as lost in 'onMessageExpired' and remove it from the tracker
msg := cachedMessageInterface.(*EndToEndMessage)
msg.hasArrived = true
t.cache.Delete(msg.MessageID)
}

func (t *messageTracker) onMessageExpired(key string, msg *EndToEndMessage) {

if msg.hasArrived {
// message did, in fact, arrive (doesn't matter here if soon enough of barely expired)
// don't log anything
return
}

created := msg.creationTime()
age := time.Since(created)

t.logger.Debug("message lost/expired",
zap.Int64("ageMilliseconds", age.Milliseconds()),
zap.Int("partition", msg.partition),
zap.String("messageId", msg.MessageID),
)
}
8 changes: 0 additions & 8 deletions e2e/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@ import (
Why do we want to do that?
We want to test all brokers with our "end-to-end test" test (sending message, receiving it again, measuring latency).
To do that, we need to ensure we send a message to each broker.
todo:
Of course that also requires that we have exactly as many partitions as we have brokers,
and that each broker leads exactly one of our test partitions.
However, we only create the topic initially (with the right amount of partitions and proper leader balancing over the brokers).
So should two or more partitions of our topic end up being led (/hosted) by the same broker somehow, we neither detect nor fix that currently.
*/

// Partitioner: Creates a TopicPartitioner for a given topic name
Expand Down
Loading

0 comments on commit 192bf30

Please sign in to comment.