From d9b4fbe99fef15bff16519698780a28f66336d9d Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 8 Jul 2021 14:51:18 -0600 Subject: [PATCH] consumer group: add "group" to all log messages May help debugging. It may be worth adding the member_id too, but that is less easy / may require some locks. --- pkg/kgo/consumer_group.go | 126 ++++++++++++++++++++------------------ 1 file changed, 66 insertions(+), 60 deletions(-) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index c4a0bfe5..7e77d0dd 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -179,7 +179,7 @@ func (c *consumer) initGroup() { } if !g.cfg.autocommitDisable && g.cfg.autocommitInterval > 0 { - g.cl.cfg.logger.Log(LogLevelInfo, "beginning autocommit loop") + g.cfg.logger.Log(LogLevelInfo, "beginning autocommit loop", "group", g.cfg.group) go g.loopCommit() } } @@ -192,7 +192,7 @@ func (c *consumer) initGroup() { // dedicated goroutine until the group is left. func (g *groupConsumer) manage() { defer close(g.manageDone) - g.cl.cfg.logger.Log(LogLevelInfo, "beginning to manage the group lifecycle") + g.cfg.logger.Log(LogLevelInfo, "beginning to manage the group lifecycle", "group", g.cfg.group) var consecutiveErrors int for { @@ -210,7 +210,7 @@ func (g *groupConsumer) manage() { } hook := func() { - g.cl.cfg.hooks.each(func(h Hook) { + g.cfg.hooks.each(func(h Hook) { if h, ok := h.(HookGroupManageError); ok { h.OnGroupManageError(err) } @@ -266,8 +266,9 @@ func (g *groupConsumer) manage() { // Waiting for the backoff is a good time to update our // metadata; maybe the error is from stale metadata. consecutiveErrors++ - backoff := g.cl.cfg.retryBackoff(consecutiveErrors) - g.cl.cfg.logger.Log(LogLevelError, "join and sync loop errored", + backoff := g.cfg.retryBackoff(consecutiveErrors) + g.cfg.logger.Log(LogLevelError, "join and sync loop errored", + "group", g.cfg.group, "err", err, "consecutive_errors", consecutiveErrors, "backoff", backoff, @@ -314,10 +315,9 @@ func (g *groupConsumer) leave() (wait func()) { } if g.cfg.instanceID == nil { - g.cl.cfg.logger.Log(LogLevelInfo, - "leaving group", + g.cfg.logger.Log(LogLevelInfo, "leaving group", "group", g.cfg.group, - "memberID", g.memberID, // lock not needed now since nothing can change it (manageDone) + "member_id", g.memberID, // lock not needed now since nothing can change it (manageDone) ) // If we error when leaving, there is not much // we can do. We may as well just return. @@ -419,9 +419,9 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi g.c.mu.Unlock() if !g.cooperative { - g.cl.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "revoking", g.nowAssigned) + g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned) } else { - g.cl.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "revoking", g.nowAssigned) + g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned) } if g.cfg.onRevoked != nil { g.cfg.onRevoked(g.ctx, g.cl, g.nowAssigned) @@ -481,9 +481,9 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi if len(lost) > 0 || stage == revokeThisSession { if len(lost) == 0 { - g.cl.cfg.logger.Log(LogLevelInfo, "cooperative consumer calling onRevoke at the end of a session even though no partitions were lost") + g.cfg.logger.Log(LogLevelInfo, "cooperative consumer calling onRevoke at the end of a session even though no partitions were lost", "group", g.cfg.group) } else { - g.cl.cfg.logger.Log(LogLevelInfo, "cooperative consumer calling onRevoke", "lost", lost, "stage", stage) + g.cfg.logger.Log(LogLevelInfo, "cooperative consumer calling onRevoke", "group", g.cfg.group, "lost", lost, "stage", stage) } if g.cfg.onRevoked != nil { g.cfg.onRevoked(g.ctx, g.cl, lost) @@ -597,7 +597,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error { s := newAssignRevokeSession() added, lost := g.diffAssigned() - g.cl.cfg.logger.Log(LogLevelInfo, "new group session begun", "added", added, "lost", lost) + g.cfg.logger.Log(LogLevelInfo, "new group session begun", "group", g.cfg.group, "added", added, "lost", lost) s.prerevoke(g, lost) // for cooperative consumers // Since we have joined the group, we immediately begin heartbeating. @@ -606,7 +606,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error { ctx, cancel := context.WithCancel(g.ctx) go func() { defer cancel() // potentially kill offset fetching - g.cl.cfg.logger.Log(LogLevelInfo, "beginning heartbeat loop") + g.cfg.logger.Log(LogLevelInfo, "beginning heartbeat loop", "group", g.cfg.group) hbErrCh <- g.heartbeat(fetchErrCh, s) }() @@ -624,7 +624,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error { go func() { defer close(fetchDone) defer close(fetchErrCh) - g.cl.cfg.logger.Log(LogLevelInfo, "fetching offsets for added partitions", "added", added) + g.cfg.logger.Log(LogLevelInfo, "fetching offsets for added partitions", "group", g.cfg.group, "added", added) fetchErrCh <- g.fetchOffsets(ctx, added) }() } else { @@ -715,7 +715,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio } if heartbeat { - g.cl.cfg.logger.Log(LogLevelDebug, "heartbeating") + g.cfg.logger.Log(LogLevelDebug, "heartbeating", "group", g.cfg.group) req := &kmsg.HeartbeatRequest{ Group: g.cfg.group, Generation: g.generation, @@ -726,7 +726,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio if resp, err = req.RequestWith(g.ctx, g.cl); err == nil { err = kerr.ErrorForCode(resp.ErrorCode) } - g.cl.cfg.logger.Log(LogLevelDebug, "heartbeat complete", "err", err) + g.cfg.logger.Log(LogLevelDebug, "heartbeat complete", "group", g.cfg.group, "err", err) if force != nil { force(err) } @@ -737,7 +737,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio // revoke, we wait for it to complete regardless of any future // error. if didMetadone && didRevoke { - g.cl.cfg.logger.Log(LogLevelInfo, "heartbeat loop complete", "err", lastErr) + g.cfg.logger.Log(LogLevelInfo, "heartbeat loop complete", "group", g.cfg.group, "err", lastErr) return lastErr } @@ -746,9 +746,9 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio } if lastErr == nil { - g.cl.cfg.logger.Log(LogLevelInfo, "heartbeat errored", "err", err) + g.cfg.logger.Log(LogLevelInfo, "heartbeat errored", "group", g.cfg.group, "err", err) } else { - g.cl.cfg.logger.Log(LogLevelInfo, "heartbeat errored again while waiting for user revoke to finish", "err", err) + g.cfg.logger.Log(LogLevelInfo, "heartbeat errored again while waiting for user revoke to finish", "group", g.cfg.group, "err", err) } // Since we errored, we must revoke. @@ -827,7 +827,7 @@ func (g *groupConsumer) rejoin() { // Joins and then syncs, issuing the two slow requests in goroutines to allow // for group cancelation to return early. func (g *groupConsumer) joinAndSync() error { - g.cl.cfg.logger.Log(LogLevelInfo, "joining group") + g.cfg.logger.Log(LogLevelInfo, "joining group", "group", g.cfg.group) g.leader.set(false) start: @@ -871,7 +871,7 @@ start: goto start } if err != nil { - g.cl.cfg.logger.Log(LogLevelWarn, "join group failed", "err", err) + g.cfg.logger.Log(LogLevelWarn, "join group failed", "group", g.cfg.group, "err", err) return err } @@ -890,7 +890,7 @@ start: synced = make(chan struct{}) ) - g.cl.cfg.logger.Log(LogLevelInfo, "syncing", "protocol_type", g.cfg.protocol, "protocol", protocol) + g.cfg.logger.Log(LogLevelInfo, "syncing", "group", g.cfg.group, "protocol_type", g.cfg.protocol, "protocol", protocol) go func() { defer close(synced) syncResp, err = syncReq.RequestWith(g.ctx, g.cl) @@ -907,10 +907,10 @@ start: if err = g.handleSyncResp(protocol, syncResp); err != nil { if err == kerr.RebalanceInProgress { - g.cl.cfg.logger.Log(LogLevelInfo, "sync failed with RebalanceInProgress, rejoining") + g.cfg.logger.Log(LogLevelInfo, "sync failed with RebalanceInProgress, rejoining", "group", g.cfg.group) goto start } - g.cl.cfg.logger.Log(LogLevelWarn, "sync group failed", "err", err) + g.cfg.logger.Log(LogLevelWarn, "sync group failed", "group", g.cfg.group, "err", err) return err } @@ -924,13 +924,13 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo g.mu.Lock() g.memberID = resp.MemberID // KIP-394 g.mu.Unlock() - g.cl.cfg.logger.Log(LogLevelInfo, "join returned MemberIDRequired, rejoining with response's MemberID", "memberID", resp.MemberID) + g.cfg.logger.Log(LogLevelInfo, "join returned MemberIDRequired, rejoining with response's MemberID", "group", g.cfg.group, "member_id", resp.MemberID) return true, "", nil, nil case kerr.UnknownMemberID: g.mu.Lock() g.memberID = "" g.mu.Unlock() - g.cl.cfg.logger.Log(LogLevelInfo, "join returned UnknownMemberID, rejoining without a member id") + g.cfg.logger.Log(LogLevelInfo, "join returned UnknownMemberID, rejoining without a member id", "group", g.cfg.group) return true, "", nil, nil } return // Request retries as necesary, so this must be a failure @@ -950,9 +950,10 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo leader := resp.LeaderID == resp.MemberID if leader { g.leader.set(true) - g.cl.cfg.logger.Log(LogLevelInfo, "joined, balancing group", - "memberID", g.memberID, - "instanceID", g.cfg.instanceID, + g.cfg.logger.Log(LogLevelInfo, "joined, balancing group", + "group", g.cfg.group, + "member_id", g.memberID, + "instance_id", g.cfg.instanceID, "generation", g.generation, "balance_protocol", protocol, "leader", true, @@ -964,9 +965,10 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo } } else { - g.cl.cfg.logger.Log(LogLevelInfo, "joined", - "memberID", g.memberID, - "instanceID", g.cfg.instanceID, + g.cfg.logger.Log(LogLevelInfo, "joined", + "group", g.cfg.group, + "member_id", g.memberID, + "instance_id", g.cfg.instanceID, "generation", g.generation, "leader", false, ) @@ -986,7 +988,7 @@ func (g *groupConsumer) handleSyncResp(protocol string, resp *kmsg.SyncGroupResp assigned, err := b.ParseSyncAssignment(resp.MemberAssignment) if err != nil { - g.cl.cfg.logger.Log(LogLevelError, "sync assignment parse failed", "err", err) + g.cfg.logger.Log(LogLevelError, "sync assignment parse failed", "group", g.cfg.group, "err", err) return err } @@ -995,7 +997,7 @@ func (g *groupConsumer) handleSyncResp(protocol string, resp *kmsg.SyncGroupResp fmt.Fprintf(&sb, "%s%v", topic, partitions) sb.WriteString(", ") } - g.cl.cfg.logger.Log(LogLevelInfo, "synced", "assigned", strings.TrimSuffix(sb.String(), ", ")) + g.cfg.logger.Log(LogLevelInfo, "synced", "group", g.cfg.group, "assigned", strings.TrimSuffix(sb.String(), ", ")) // Past this point, we will fall into the setupAssigned prerevoke code, // meaning for cooperative, we will revoke what we need to. @@ -1003,7 +1005,6 @@ func (g *groupConsumer) handleSyncResp(protocol string, resp *kmsg.SyncGroupResp g.lastAssigned = g.nowAssigned } g.nowAssigned = assigned - g.cl.cfg.logger.Log(LogLevelInfo, "synced successfully", "assigned", g.nowAssigned) return nil } @@ -1065,11 +1066,11 @@ start: select { case <-fetchDone: case <-ctx.Done(): - g.cl.cfg.logger.Log(LogLevelError, "fetch offsets failed due to context cancelation") + g.cfg.logger.Log(LogLevelError, "fetch offsets failed due to context cancelation", "group", g.cfg.group) return ctx.Err() } if err != nil { - g.cl.cfg.logger.Log(LogLevelError, "fetch offsets failed with non-retriable error", "err", err) + g.cfg.logger.Log(LogLevelError, "fetch offsets failed with non-retriable error", "group", g.cfg.group, "err", err) return err } @@ -1083,7 +1084,11 @@ start: // pending transaction that should be committing soon. // We sleep for 1s and retry fetching offsets. if err == kerr.UnstableOffsetCommit { - g.cl.cfg.logger.Log(LogLevelInfo, "fetch offsets failed with UnstableOffsetCommit, waiting 1s and retrying") + g.cfg.logger.Log(LogLevelInfo, "fetch offsets failed with UnstableOffsetCommit, waiting 1s and retrying", + "group", g.cfg.group, + "topic", rTopic.Topic, + "partition", rPartition.Partition, + ) select { case <-ctx.Done(): case <-time.After(time.Second): @@ -1100,7 +1105,7 @@ start: offset.epoch = rPartition.LeaderEpoch } if rPartition.Offset == -1 { - offset = g.cl.cfg.resetOffset + offset = g.cfg.resetOffset } topicOffsets[rPartition.Partition] = offset } @@ -1110,7 +1115,7 @@ start: for fetchedTopic := range offsets { if !groupTopics.hasTopic(fetchedTopic) { delete(offsets, fetchedTopic) - g.cl.cfg.logger.Log(LogLevelWarn, "member was assigned topic that we did not ask for in ConsumeTopics! skipping assigning this topic!", "topic", fetchedTopic) + g.cfg.logger.Log(LogLevelWarn, "member was assigned topic that we did not ask for in ConsumeTopics! skipping assigning this topic!", "group", g.cfg.group, "topic", fetchedTopic) } } @@ -1150,10 +1155,10 @@ start: } } - if g.cl.cfg.logger.Level() >= LogLevelDebug { - g.cl.cfg.logger.Log(LogLevelDebug, "fetched committed offsets", "fetched", offsets) + if g.cfg.logger.Level() >= LogLevelDebug { + g.cfg.logger.Log(LogLevelDebug, "fetched committed offsets", "group", g.cfg.group, "fetched", offsets) } else { - g.cl.cfg.logger.Log(LogLevelInfo, "fetched committed offsets") + g.cfg.logger.Log(LogLevelInfo, "fetched committed offsets", "group", g.cfg.group) } return nil } @@ -1268,7 +1273,7 @@ type uncommitted map[string]map[int32]uncommit // updateUncommitted sets the latest uncommitted offset. func (g *groupConsumer) updateUncommitted(fetches Fetches) { var b bytes.Buffer - debug := g.cl.cfg.logger.Level() >= LogLevelDebug + debug := g.cfg.logger.Level() >= LogLevelDebug g.mu.Lock() defer g.mu.Unlock() @@ -1325,7 +1330,7 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) { if debug { update := b.String() update = strings.TrimSuffix(update, ", ") // trim trailing comma and space after final topic - g.cl.cfg.logger.Log(LogLevelDebug, "updated uncommitted", "to", update) + g.cfg.logger.Log(LogLevelDebug, "updated uncommitted", "group", g.cfg.group, "to", update) } } @@ -1344,7 +1349,7 @@ func (g *groupConsumer) updateCommitted( } if g.uncommitted == nil || // just in case len(req.Topics) != len(resp.Topics) { // bad kafka - g.cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Num topics in request: %d, in reply: %d, we cannot handle this!", len(req.Topics), len(resp.Topics))) + g.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Num topics in request: %d, in reply: %d, we cannot handle this!", len(req.Topics), len(resp.Topics)), "group", g.cfg.group) return } @@ -1356,7 +1361,7 @@ func (g *groupConsumer) updateCommitted( }) var b bytes.Buffer - debug := g.cl.cfg.logger.Level() >= LogLevelDebug + debug := g.cfg.logger.Level() >= LogLevelDebug for i := range resp.Topics { reqTopic := &req.Topics[i] @@ -1365,7 +1370,7 @@ func (g *groupConsumer) updateCommitted( if topic == nil || // just in case reqTopic.Topic != respTopic.Topic || // bad kafka len(reqTopic.Partitions) != len(respTopic.Partitions) { // same - g.cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Topic at request index %d: %s, reply at index: %s; num partitions on request topic: %d, in reply: %d, we cannot handle this!", i, reqTopic.Topic, respTopic.Topic, len(reqTopic.Partitions), len(respTopic.Partitions))) + g.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Topic at request index %d: %s, reply at index: %s; num partitions on request topic: %d, in reply: %d, we cannot handle this!", i, reqTopic.Topic, respTopic.Topic, len(reqTopic.Partitions), len(respTopic.Partitions)), "group", g.cfg.group) continue } @@ -1387,11 +1392,11 @@ func (g *groupConsumer) updateCommitted( continue } if reqPart.Partition != respPart.Partition { // bad kafka - g.cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Topic %s partition %d != resp partition %d", reqTopic.Topic, reqPart.Partition, respPart.Partition)) + g.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Topic %s partition %d != resp partition %d", reqTopic.Topic, reqPart.Partition, respPart.Partition), "group", g.cfg.group) continue } if respPart.ErrorCode != 0 { - g.cl.cfg.logger.Log(LogLevelWarn, "unable to commit offset for topic partition", "topic", reqTopic.Topic, "partition", reqPart.Partition, "error_code", respPart.ErrorCode) + g.cfg.logger.Log(LogLevelWarn, "unable to commit offset for topic partition", "group", g.cfg.group, "topic", reqTopic.Topic, "partition", reqPart.Partition, "error_code", respPart.ErrorCode) continue } @@ -1418,23 +1423,24 @@ func (g *groupConsumer) updateCommitted( if debug { update := b.String() update = strings.TrimSuffix(update, ", ") // trim trailing comma and space after final topic - g.cl.cfg.logger.Log(LogLevelDebug, "updated committed", "to", update) + g.cfg.logger.Log(LogLevelDebug, "updated committed", "group", g.cfg.group, "to", update) } } func (g *groupConsumer) defaultCommitCallback(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { if err != nil { if err != context.Canceled { - g.cl.cfg.logger.Log(LogLevelError, "default commit failed", "err", err) + g.cfg.logger.Log(LogLevelError, "default commit failed", "group", g.cfg.group, "err", err) } else { - g.cl.cfg.logger.Log(LogLevelDebug, "default commit canceled") + g.cfg.logger.Log(LogLevelDebug, "default commit canceled", "group", g.cfg.group) } return } for _, topic := range resp.Topics { for _, partition := range topic.Partitions { if err := kerr.ErrorForCode(partition.ErrorCode); err != nil { - g.cl.cfg.logger.Log(LogLevelError, "in default commit: unable to commit offsets for topic partition", + g.cfg.logger.Log(LogLevelError, "in default commit: unable to commit offsets for topic partition", + "group", g.cfg.group, "topic", topic.Topic, "partition", partition.Partition, "error", err) @@ -1461,7 +1467,7 @@ func (g *groupConsumer) loopCommit() { // leaving a group). g.mu.Lock() if !g.blockAuto { - g.cl.cfg.logger.Log(LogLevelDebug, "autocommitting") + g.cfg.logger.Log(LogLevelDebug, "autocommitting", "group", g.cfg.group) g.commit(g.ctx, g.getUncommittedLocked(true), g.cfg.commitCallback) } g.mu.Unlock() @@ -1783,8 +1789,8 @@ func (g *groupConsumer) commitOffsetsSync( done := make(chan struct{}) defer func() { <-done }() - g.cl.cfg.logger.Log(LogLevelDebug, "in CommitOffsetsSync", "with", uncommitted) - defer g.cl.cfg.logger.Log(LogLevelDebug, "left CommitOffsetsSync") + g.cfg.logger.Log(LogLevelDebug, "in CommitOffsetsSync", "group", g.cfg.group, "with", uncommitted) + defer g.cfg.logger.Log(LogLevelDebug, "left CommitOffsetsSync", "group", g.cfg.group) if onDone == nil { onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {} @@ -1958,12 +1964,12 @@ func (g *groupConsumer) commit( select { case <-priorDone: default: - g.cl.cfg.logger.Log(LogLevelDebug, "canceling prior commit to issue another") + g.cfg.logger.Log(LogLevelDebug, "canceling prior commit to issue another", "group", g.cfg.group) priorCancel() <-priorDone } } - g.cl.cfg.logger.Log(LogLevelDebug, "issuing commit", "uncommitted", uncommitted) + g.cfg.logger.Log(LogLevelDebug, "issuing commit", "group", g.cfg.group, "uncommitted", uncommitted) for topic, partitions := range uncommitted { req.Topics = append(req.Topics, kmsg.OffsetCommitRequestTopic{