Skip to content

Commit

Permalink
Use consistent naming convention for log fields
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 3, 2021
1 parent a234a98 commit eddb2b8
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 39 deletions.
4 changes: 2 additions & 2 deletions e2e/client_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type clientHooks struct {
func newEndToEndClientHooks(logger *zap.Logger) *clientHooks {

return &clientHooks{
logger: logger.Named("e2e-hooks"),
logger: logger.Named("e2e_hooks"),
currentCoordinator: &atomic.Value{},
}
}
Expand All @@ -33,7 +33,7 @@ func (c *clientHooks) OnConnect(meta kgo.BrokerMetadata, dialDur time.Duration,
}
c.logger.Debug("kafka connection succeeded",
zap.String("host", meta.Host), zap.Int32("broker_id", meta.NodeID),
zap.Duration("dial_duration", dialDur))
zap.Int64("dial_duration_ms", dialDur.Milliseconds()))
}

func (c *clientHooks) OnDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
Expand Down
4 changes: 2 additions & 2 deletions e2e/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
func (s *Service) startConsumeMessages(ctx context.Context) {
client := s.client
s.logger.Info("Starting to consume end-to-end topic",
zap.String("topicName", s.config.TopicManagement.Name),
zap.String("groupId", s.groupId))
zap.String("topic_name", s.config.TopicManagement.Name),
zap.String("group_id", s.groupId))

for {
select {
Expand Down
6 changes: 3 additions & 3 deletions e2e/group_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type groupTracker struct {
func newGroupTracker(cfg Config, logger *zap.Logger, client *kgo.Client, groupID string) *groupTracker {
return &groupTracker{
cfg: cfg,
logger: logger.Named("groupTracker"),
logger: logger.Named("group_tracker"),
client: client,
groupId: groupID,
potentiallyEmptyGroups: make(map[string]time.Time),
Expand Down Expand Up @@ -147,7 +147,7 @@ func (g *groupTracker) checkAndDeleteOldConsumerGroups(ctx context.Context) erro

resp, ok := shard.Resp.(*kmsg.DeleteGroupsResponse)
if !ok {
g.logger.Error("failed to cast shard response to DeleteGroupsResponse while handling an error for deleting groups", zap.String("shardHost", shard.Meta.Host), zap.Int32("broker_id", shard.Meta.NodeID), zap.NamedError("shardError", shard.Err))
g.logger.Error("failed to cast shard response to DeleteGroupsResponse while handling an error for deleting groups", zap.String("shard_host", shard.Meta.Host), zap.Int32("broker_id", shard.Meta.NodeID), zap.Error(shard.Err))
continue
}

Expand All @@ -165,7 +165,7 @@ func (g *groupTracker) checkAndDeleteOldConsumerGroups(ctx context.Context) erro
}
}
}
g.logger.Info("deleted old consumer groups", zap.Strings("deletedGroups", deletedGroups))
g.logger.Info("deleted old consumer groups", zap.Strings("deleted_groups", deletedGroups))

if foundNotAuthorizedError {
g.logger.Info("disabling trying to delete old kminion consumer-groups since one of the last delete results had an 'GroupAuthorizationFailed' error")
Expand Down
6 changes: 3 additions & 3 deletions e2e/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newMessageTracker(svc *Service) *messageTracker {

t := &messageTracker{
svc: svc,
logger: svc.logger.Named("message-tracker"),
logger: svc.logger.Named("message_tracker"),
cache: goCache.New(defaultExpirationTime, cleanupInterval),
}

Expand Down Expand Up @@ -97,8 +97,8 @@ func (t *messageTracker) onMessageExpired(key string, msg *EndToEndMessage) {
age := time.Since(created)

t.logger.Debug("message lost/expired",
zap.Int64("ageMilliseconds", age.Milliseconds()),
zap.Int64("age_ms", age.Milliseconds()),
zap.Int("partition", msg.partition),
zap.String("messageId", msg.MessageID),
zap.String("message_id", msg.MessageID),
)
}
4 changes: 2 additions & 2 deletions e2e/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (s *Service) produceLatencyMessages(ctx context.Context) {
err := s.produceSingleMessage(ctx, i)
if err != nil {
s.logger.Error("failed to produce to end-to-end topic",
zap.String("topicName", s.config.TopicManagement.Name),
zap.String("topic_name", s.config.TopicManagement.Name),
zap.Int("partition", i),
zap.Error(err))
}
Expand Down Expand Up @@ -97,7 +97,7 @@ func createEndToEndRecord(minionID string, topicName string, partition int) (*kg

record := &kgo.Record{
Topic: topicName,
Value: []byte(mjson),
Value: mjson,
Partition: int32(partition), // we set partition for producing so our customPartitioner can make use of it
}

Expand Down
23 changes: 11 additions & 12 deletions e2e/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ func (s *Service) ensureEnoughPartitions(ctx context.Context, meta *kmsg.Metadat

partitionsToAdd := expectedPartitions - len(meta.Topics[0].Partitions)
s.logger.Warn("e2e test topic does not have enough partitions, partitionCount is less than brokerCount * partitionsPerBroker. will add partitions to the topic...",
zap.Int("expectedPartitionCount", expectedPartitions),
zap.Int("actualPartitionCount", len(meta.Topics[0].Partitions)),
zap.Int("brokerCount", len(meta.Brokers)),
zap.Int("config.partitionsPerBroker", s.config.TopicManagement.PartitionsPerBroker),
zap.Int("partitionsToAdd", partitionsToAdd),
zap.Int("expected_partition_count", expectedPartitions),
zap.Int("actual_partition_count", len(meta.Topics[0].Partitions)),
zap.Int("broker_count", len(meta.Brokers)),
zap.Int("config_partitions_per_broker", s.config.TopicManagement.PartitionsPerBroker),
zap.Int("partitions_to_add", partitionsToAdd),
)

topic := kmsg.NewCreatePartitionsRequestTopic()
Expand Down Expand Up @@ -224,8 +224,7 @@ func (s *Service) ensureEnoughPartitions(ctx context.Context, meta *kmsg.Metadat
if tErr != nil || (topicResponse.ErrorMessage != nil && *topicResponse.ErrorMessage != "") {
s.logger.Error("error in createPartitionsResponse",
zap.String("topic", topicResponse.Topic),
zap.Stringp("errorMessage", topicResponse.ErrorMessage),
zap.NamedError("topicError", tErr),
zap.Error(tErr),
)
nestedErrors++
}
Expand All @@ -243,11 +242,11 @@ func (s *Service) createManagementTopic(ctx context.Context, allMeta *kmsg.Metad
totalPartitions := brokerCount * topicCfg.PartitionsPerBroker

s.logger.Info("e2e topic does not exist, creating it...",
zap.String("topicName", topicCfg.Name),
zap.Int("partitionsPerBroker", topicCfg.PartitionsPerBroker),
zap.Int("replicationFactor", topicCfg.ReplicationFactor),
zap.Int("brokerCount", brokerCount),
zap.Int("totalPartitions", totalPartitions),
zap.String("topic_name", topicCfg.Name),
zap.Int("partitions_per_broker", topicCfg.PartitionsPerBroker),
zap.Int("replication_factor", topicCfg.ReplicationFactor),
zap.Int("broker_count", brokerCount),
zap.Int("total_partitions", totalPartitions),
)

topic := kmsg.NewCreateTopicsRequestTopic()
Expand Down
18 changes: 3 additions & 15 deletions e2e/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,8 @@ import (
"go.uber.org/zap"
)

// create histogram buckets for metrics reported by 'end-to-end'
// todo:
/*
- custom, much simpler, exponential buckets
we know:
- we want to go from 5ms to 'max'
- we want to double each time
- doubling 5ms might not get us to 'max' exactly
questions:
- can we slightly adjust the factor so we hit 'max' exactly?
- or can we adjust 'max'?
(and if so, better to overshoot or undershoot?)
- or should we just set the last bucket to 'max' exactly?
*/
// createHistogramBuckets creates the buckets for the histogram based on the number of desired buckets (10) and the
// upper bucket size.
func createHistogramBuckets(maxLatency time.Duration) []float64 {
// Since this is an exponential bucket we need to take Log base2 or binary as the upper bound
// Divide by 10 for the argument because the base is counted as 20ms and we want to normalize it as base 2 instead of 20
Expand Down Expand Up @@ -59,7 +47,7 @@ func (s *Service) logCommitErrors(r *kmsg.OffsetCommitResponse, err error) int {
if err != nil {
s.logger.Error("error committing partition offset",
zap.String("topic", t.Topic),
zap.Int32("partitionId", p.Partition),
zap.Int32("partition_id", p.Partition),
zap.Error(err),
)
errCount++
Expand Down

0 comments on commit eddb2b8

Please sign in to comment.