From e48c03c4287dbf0210661cdd2799f25c918a5980 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 29 Jun 2021 19:28:03 -0600 Subject: [PATCH] client: log "seed #" rather than a large negative for seed brokers Previously, seed brokers logged with highly negative numbers, which was confusing. Now, we log "seed 0", "seed 1", etc. We still use negatives in int32 returns, but that is well documented: basically, BrokerMetadata.NodeID documents the negative usage for seeds. Lastly this cleans up some wording on producing to an unknown topic for the first time. --- pkg/kgo/broker.go | 45 ++++++++++++++++++++++++++------------------- pkg/kgo/producer.go | 8 ++++---- pkg/kgo/sink.go | 28 ++++++++++++++-------------- pkg/kgo/source.go | 6 +++--- 4 files changed, 47 insertions(+), 40 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 41e6b4c6..8dd16e1f 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -62,6 +62,13 @@ type promisedResp struct { readEnqueue time.Time } +func logID(id int32) string { + if id >= -10 { + return strconv.FormatInt(int64(id), 10) + } + return "seed " + strconv.FormatInt(int64(id)-math.MinInt32, 10) +} + // BrokerMetadata is metadata for a broker. // // This struct mirrors kmsg.MetadataResponseBroker. @@ -289,7 +296,7 @@ func (b *broker) handleReqs() { // can only have an expiry if we went the authenticate // flow, so we know we are authenticating again. // For KIP-368. - cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl expiry limit reached, reauthenticating", "broker", cxn.b.meta.NodeID) + cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl expiry limit reached, reauthenticating", "broker", logID(cxn.b.meta.NodeID)) if err := cxn.sasl(); err != nil { pr.promise(nil, err) cxn.die() @@ -426,11 +433,11 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, deadCh: make(chan struct{}), } if err = cxn.init(isProduceCxn); err != nil { - b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "broker", b.meta.NodeID, "err", err) + b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err) cxn.closeConn() return nil, err } - b.cl.cfg.logger.Log(LogLevelDebug, "connection initialized successfully", "addr", b.addr, "broker", b.meta.NodeID) + b.cl.cfg.logger.Log(LogLevelDebug, "connection initialized successfully", "addr", b.addr, "broker", logID(b.meta.NodeID)) b.reapMu.Lock() defer b.reapMu.Unlock() @@ -513,7 +520,7 @@ func (b *broker) reapConnections(idleTimeout time.Duration) (total int) { // connect connects to the broker's addr, returning the new connection. func (b *broker) connect(ctx context.Context) (net.Conn, error) { - b.cl.cfg.logger.Log(LogLevelDebug, "opening connection to broker", "addr", b.addr, "broker", b.meta.NodeID) + b.cl.cfg.logger.Log(LogLevelDebug, "opening connection to broker", "addr", b.addr, "broker", logID(b.meta.NodeID)) start := time.Now() conn, err := b.cl.cfg.dialFn(ctx, "tcp", b.addr) since := time.Since(start) @@ -523,10 +530,10 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) { } }) if err != nil { - b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "broker", b.meta.NodeID, "err", err) + b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err) return nil, fmt.Errorf("unable to dial: %w", err) } else { - b.cl.cfg.logger.Log(LogLevelDebug, "connection opened to broker", "addr", b.addr, "broker", b.meta.NodeID) + b.cl.cfg.logger.Log(LogLevelDebug, "connection opened to broker", "addr", b.addr, "broker", logID(b.meta.NodeID)) } return conn, nil } @@ -574,13 +581,13 @@ func (cxn *brokerCxn) init(isProduceCxn bool) error { if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) { if err := cxn.requestAPIVersions(); err != nil { - cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", cxn.b.meta.NodeID, "err", err) + cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", logID(cxn.b.meta.NodeID), "err", err) return err } } if err := cxn.sasl(); err != nil { - cxn.cl.cfg.logger.Log(LogLevelError, "unable to initialize sasl", "broker", cxn.b.meta.NodeID, "err", err) + cxn.cl.cfg.logger.Log(LogLevelError, "unable to initialize sasl", "broker", logID(cxn.b.meta.NodeID), "err", err) return err } @@ -613,7 +620,7 @@ start: ClientSoftwareName: cxn.cl.cfg.softwareName, ClientSoftwareVersion: cxn.cl.cfg.softwareVersion, } - cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing api versions request", "broker", cxn.b.meta.NodeID, "version", maxVersion) + cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing api versions request", "broker", logID(cxn.b.meta.NodeID), "version", maxVersion) corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req) if writeErr != nil { cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr) @@ -646,7 +653,7 @@ start: // EventHubs erroneously replies with v1, so we check // for that as well. srawResp == "\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00" { - cxn.cl.cfg.logger.Log(LogLevelDebug, "kafka does not know our ApiVersions version, downgrading to version 0 and retrying", "broker", cxn.b.meta.NodeID) + cxn.cl.cfg.logger.Log(LogLevelDebug, "kafka does not know our ApiVersions version, downgrading to version 0 and retrying", "broker", logID(cxn.b.meta.NodeID)) maxVersion = 0 goto start } @@ -682,7 +689,7 @@ start: if mechanism.Name() != "GSSAPI" && cxn.versions[req.Key()] >= 0 { req.Mechanism = mechanism.Name() req.Version = cxn.versions[req.Key()] - cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLHandshakeRequest", "broker", cxn.b.meta.NodeID) + cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLHandshakeRequest", "broker", logID(cxn.b.meta.NodeID)) corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req) if writeErr != nil { cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr) @@ -716,7 +723,7 @@ start: } authenticate = req.Version == 1 } - cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "broker", cxn.b.meta.NodeID, "mechanism", mechanism.Name(), "authenticate", authenticate) + cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "broker", logID(cxn.b.meta.NodeID), "mechanism", mechanism.Name(), "authenticate", authenticate) cxn.mechanism = mechanism return cxn.doSasl(authenticate) } @@ -751,7 +758,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { binary.BigEndian.PutUint32(buf, uint32(len(clientWrite))) buf = append(buf, clientWrite...) - cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", cxn.b.meta.NodeID, "step", step) + cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", logID(cxn.b.meta.NodeID), "step", step) _, err, _, _, _ = cxn.writeConn(context.Background(), buf, wt, time.Now()) cxn.cl.bufPool.put(buf) @@ -770,7 +777,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { SASLAuthBytes: clientWrite, } req.Version = cxn.versions[req.Key()] - cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLAuthenticate", "broker", cxn.b.meta.NodeID, "version", req.Version, "step", step) + cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLAuthenticate", "broker", logID(cxn.b.meta.NodeID), "version", req.Version, "step", step) corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req) @@ -823,7 +830,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { } now := time.Now() cxn.expiry = now.Add(time.Duration(lifetimeMillis)*time.Millisecond - time.Second) - cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl has a limited lifetime", "broker", cxn.b.meta.NodeID, "reauthenticate_in", cxn.expiry.Sub(now)) + cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl has a limited lifetime", "broker", logID(cxn.b.meta.NodeID), "reauthenticate_in", cxn.expiry.Sub(now)) } return nil } @@ -871,7 +878,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim } }) if logger := cxn.cl.cfg.logger; logger.Level() >= LogLevelDebug { - logger.Log(LogLevelDebug, fmt.Sprintf("wrote %s v%d", kmsg.NameForKey(req.Key()), req.GetVersion()), "broker", cxn.b.meta.NodeID, "bytes_written", bytesWritten, "write_wait", writeWait, "time_to_write", timeToWrite, "err", writeErr) + logger.Log(LogLevelDebug, fmt.Sprintf("wrote %s v%d", kmsg.NameForKey(req.Key()), req.GetVersion()), "broker", logID(cxn.b.meta.NodeID), "bytes_written", bytesWritten, "write_wait", writeWait, "time_to_write", timeToWrite, "err", writeErr) } if writeErr != nil { @@ -1054,7 +1061,7 @@ func (cxn *brokerCxn) readResponse( } }) if logger := cxn.cl.cfg.logger; logger.Level() >= LogLevelDebug { - logger.Log(LogLevelDebug, fmt.Sprintf("read %s v%d", kmsg.NameForKey(key), version), "broker", cxn.b.meta.NodeID, "bytes_read", bytesRead, "read_wait", readWait, "time_to_read", timeToRead, "err", readErr) + logger.Log(LogLevelDebug, fmt.Sprintf("read %s v%d", kmsg.NameForKey(key), version), "broker", logID(cxn.b.meta.NodeID), "bytes_read", bytesRead, "read_wait", readWait, "time_to_read", timeToRead, "err", readErr) } if readErr != nil { @@ -1247,9 +1254,9 @@ func (cxn *brokerCxn) handleResps() { rawResp, err := cxn.readResponse(pr.ctx, pr.resp.Key(), pr.resp.GetVersion(), pr.corrID, pr.flexibleHeader, pr.readTimeout, pr.bytesWritten, pr.writeWait, pr.timeToWrite, pr.readEnqueue) if err != nil { if successes > 0 || len(cxn.b.cl.cfg.sasls) > 0 { - cxn.b.cl.cfg.logger.Log(LogLevelDebug, "read from broker errored, killing connection", "addr", cxn.b.addr, "id", cxn.b.meta.NodeID, "successful_reads", successes, "err", err) + cxn.b.cl.cfg.logger.Log(LogLevelDebug, "read from broker errored, killing connection", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "successful_reads", successes, "err", err) } else { - cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker errored, killing connection after 0 successful responses (is sasl missing?)", "addr", cxn.b.addr, "id", cxn.b.meta.NodeID, "err", err) + cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker errored, killing connection after 0 successful responses (is sasl missing?)", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "err", err) } pr.promise(nil, err) return diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 6b265c4f..a3dd05ff 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -616,7 +616,7 @@ func (cl *Client) waitUnknownTopic( topic string, unknown *unknownTopicProduces, ) { - cl.cfg.logger.Log(LogLevelInfo, "waiting for metadata to produce to unknown topic", "topic", topic) + cl.cfg.logger.Log(LogLevelInfo, "producing to a new topic for the first time, fetching metadata to learn its partitions", "topic", topic) var after <-chan time.Time if timeout := cl.cfg.recordTimeout; timeout > 0 { timer := time.NewTimer(cl.cfg.recordTimeout) @@ -633,10 +633,10 @@ func (cl *Client) waitUnknownTopic( err = errRecordTimeout case retriableErr, ok := <-unknown.wait: if !ok { - cl.cfg.logger.Log(LogLevelInfo, "done waiting for unknown topic", "topic", topic) + cl.cfg.logger.Log(LogLevelInfo, "done waiting for metadata for new topic", "topic", topic) return // metadata was successful! } - cl.cfg.logger.Log(LogLevelInfo, "unknown topic wait failed, retrying wait", "topic", topic, "err", retriableErr) + cl.cfg.logger.Log(LogLevelInfo, "new topic metadata wait failed, retrying wait", "topic", topic, "err", retriableErr) tries++ if int64(tries) >= cl.cfg.produceRetries { err = fmt.Errorf("no partitions available after attempting to refresh metadata %d times, last err: %w", tries, retriableErr) @@ -659,7 +659,7 @@ func (cl *Client) waitUnknownTopic( if nowUnknown != unknown { return } - cl.cfg.logger.Log(LogLevelInfo, "unknown topic wait failed, done retrying, failing all records", "topic", topic) + cl.cfg.logger.Log(LogLevelInfo, "new topic metadata wait failed, done retrying, failing all records", "topic", topic, "err", err) delete(p.unknownTopics, topic) cl.failUnknownTopicRecords(topic, unknown, err) diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 86bcfbb5..58544766 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -276,7 +276,7 @@ func (s *sink) produce(sem <-chan struct{}) bool { s.cl.cfg.logger.Log(LogLevelWarn, "unable to load producer ID, bumping client's buffered record load errors by 1 and retrying") return true // whatever caused our produce, we did nothing, so keep going default: - s.cl.cfg.logger.Log(LogLevelError, "fatal InitProducerID error, failing all buffered records", "broker", s.nodeID, "err", err) + s.cl.cfg.logger.Log(LogLevelError, "fatal InitProducerID error, failing all buffered records", "broker", logID(s.nodeID), "err", err) fallthrough case ErrClientClosed: s.cl.failBufferedRecords(err) @@ -321,7 +321,7 @@ func (s *sink) produce(sem <-chan struct{}) bool { // because that can lead to undesirable behavior // with produce request vs. end txn (KAFKA-12671) s.cl.failProducerID(id, epoch, err) - s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", s.nodeID, "err", err) + s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", logID(s.nodeID), "err", err) } return false } @@ -413,7 +413,7 @@ func (s *sink) issueTxnReq( for _, topic := range resp.Topics { topicBatches, ok := req.batches[topic.Topic] if !ok { - s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with topic in AddPartitionsToTxnResponse that was not in request", "broker", s.nodeID, "topic", topic.Topic) + s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with topic in AddPartitionsToTxnResponse that was not in request", "broker", logID(s.nodeID), "topic", topic.Topic) continue } for _, partition := range topic.Partitions { @@ -427,7 +427,7 @@ func (s *sink) issueTxnReq( batch, ok := topicBatches[partition.Partition] if !ok { - s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with partition in AddPartitionsToTxnResponse that was not in request", "broker", s.nodeID, "topic", topic.Topic, "partition", partition.Partition) + s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with partition in AddPartitionsToTxnResponse that was not in request", "broker", logID(s.nodeID), "topic", topic.Topic, "partition", partition.Partition) continue } @@ -501,7 +501,7 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) { s.cl.failBufferedRecords(ErrClientClosed) default: - s.cl.cfg.logger.Log(LogLevelWarn, "random error while producing, requeueing unattempted request", "broker", s.nodeID, "err", err) + s.cl.cfg.logger.Log(LogLevelWarn, "random error while producing, requeueing unattempted request", "broker", logID(s.nodeID), "err", err) fallthrough case isRetriableBrokerErr(err): @@ -525,7 +525,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response defer func() { update := b.String() update = strings.TrimSuffix(update, ", ") - s.cl.cfg.logger.Log(LogLevelDebug, "produced", "broker", s.nodeID, "to", update) + s.cl.cfg.logger.Log(LogLevelDebug, "produced", "broker", logID(s.nodeID), "to", update) }() } @@ -589,7 +589,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response topic := rTopic.Topic partitions, ok := req.batches[topic] if !ok { - s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with topic in produce request that we did not produce to", "broker", s.nodeID, "topic", topic) + s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with topic in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", topic) delete(req.metrics, topic) continue // should not hit this } @@ -603,7 +603,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response partition := rPartition.Partition batch, ok := partitions[partition] if !ok { - s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with partition in produce request that we did not produce to", "broker", s.nodeID, "topic", rTopic.Topic, "partition", partition) + s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with partition in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", rTopic.Topic, "partition", partition) delete(tmetrics, partition) continue // should not hit this } @@ -640,7 +640,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response } if len(req.batches) > 0 { - s.cl.cfg.logger.Log(LogLevelError, "Kafka did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", s.nodeID) + s.cl.cfg.logger.Log(LogLevelError, "Kafka did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", logID(s.nodeID)) s.handleRetryBatches(req.batches, 0, true, false) } if len(reqRetry) > 0 { @@ -746,7 +746,7 @@ func (s *sink) handleReqRespBatch( if s.cl.cfg.txnID != nil || s.cl.cfg.stopOnDataLoss { s.cl.cfg.logger.Log(LogLevelInfo, "batch errored, failing the producer ID", - "broker", s.nodeID, + "broker", logID(s.nodeID), "topic", topic, "partition", partition, "producer_id", producerID, @@ -776,7 +776,7 @@ func (s *sink) handleReqRespBatch( // We should not be here, since this error occurs in the // context of transactions, which are caught above. s.cl.cfg.logger.Log(LogLevelInfo, fmt.Sprintf("batch errored with %s, failing the producer ID and resetting all sequence numbers", err.(*kerr.Error).Message), - "broker", s.nodeID, + "broker", logID(s.nodeID), "topic", topic, "partition", partition, "producer_id", producerID, @@ -796,7 +796,7 @@ func (s *sink) handleReqRespBatch( case err == kerr.DuplicateSequenceNumber: // ignorable, but we should not get s.cl.cfg.logger.Log(LogLevelInfo, "received unexpected duplicate sequence number, ignoring and treating batch as successful", - "broker", s.nodeID, + "broker", logID(s.nodeID), "topic", topic, "partition", partition, ) @@ -805,7 +805,7 @@ func (s *sink) handleReqRespBatch( default: if err != nil { s.cl.cfg.logger.Log(LogLevelInfo, "batch in a produce request failed", - "broker", s.nodeID, + "broker", logID(s.nodeID), "topic", topic, "partition", partition, "err", err, @@ -1133,7 +1133,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { if len(recBuf.batches) == 0 { return } - recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err) + recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch", "broker", logID(recBuf.sink.nodeID), "topic", recBuf.topic, "partition", recBuf.partition, "err", err) batch0 := recBuf.batches[0] batch0.tries++ failErr := batch0.maybeFailErr(&recBuf.cl.cfg) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 2353b397..b70b2832 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -623,15 +623,15 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct // If the epoch was zero, the broker did not even // establish a session for us (and thus is maxed on // sessions). We stop trying. - s.cl.cfg.logger.Log(LogLevelInfo, "session failed with SessionIDNotFound while trying to establish a session; broker likely maxed on sessions; continuing on without using sessions", "broker", s.nodeID) + s.cl.cfg.logger.Log(LogLevelInfo, "session failed with SessionIDNotFound while trying to establish a session; broker likely maxed on sessions; continuing on without using sessions", "broker", logID(s.nodeID)) s.session.kill() } else { - s.cl.cfg.logger.Log(LogLevelInfo, "received SessionIDNotFound from our in use session, our session was likely evicted; resetting session", "broker", s.nodeID) + s.cl.cfg.logger.Log(LogLevelInfo, "received SessionIDNotFound from our in use session, our session was likely evicted; resetting session", "broker", logID(s.nodeID)) s.session.reset() } return case kerr.InvalidFetchSessionEpoch: - s.cl.cfg.logger.Log(LogLevelInfo, "resetting fetch session", "broker", s.nodeID, "err", err) + s.cl.cfg.logger.Log(LogLevelInfo, "resetting fetch session", "broker", logID(s.nodeID), "err", err) s.session.reset() return }