diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 41e6b4c6..8dd16e1f 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -62,6 +62,13 @@ type promisedResp struct { readEnqueue time.Time } +func logID(id int32) string { + if id >= -10 { + return strconv.FormatInt(int64(id), 10) + } + return "seed " + strconv.FormatInt(int64(id)-math.MinInt32, 10) +} + // BrokerMetadata is metadata for a broker. // // This struct mirrors kmsg.MetadataResponseBroker. @@ -289,7 +296,7 @@ func (b *broker) handleReqs() { // can only have an expiry if we went the authenticate // flow, so we know we are authenticating again. // For KIP-368. - cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl expiry limit reached, reauthenticating", "broker", cxn.b.meta.NodeID) + cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl expiry limit reached, reauthenticating", "broker", logID(cxn.b.meta.NodeID)) if err := cxn.sasl(); err != nil { pr.promise(nil, err) cxn.die() @@ -426,11 +433,11 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, deadCh: make(chan struct{}), } if err = cxn.init(isProduceCxn); err != nil { - b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "broker", b.meta.NodeID, "err", err) + b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err) cxn.closeConn() return nil, err } - b.cl.cfg.logger.Log(LogLevelDebug, "connection initialized successfully", "addr", b.addr, "broker", b.meta.NodeID) + b.cl.cfg.logger.Log(LogLevelDebug, "connection initialized successfully", "addr", b.addr, "broker", logID(b.meta.NodeID)) b.reapMu.Lock() defer b.reapMu.Unlock() @@ -513,7 +520,7 @@ func (b *broker) reapConnections(idleTimeout time.Duration) (total int) { // connect connects to the broker's addr, returning the new connection. func (b *broker) connect(ctx context.Context) (net.Conn, error) { - b.cl.cfg.logger.Log(LogLevelDebug, "opening connection to broker", "addr", b.addr, "broker", b.meta.NodeID) + b.cl.cfg.logger.Log(LogLevelDebug, "opening connection to broker", "addr", b.addr, "broker", logID(b.meta.NodeID)) start := time.Now() conn, err := b.cl.cfg.dialFn(ctx, "tcp", b.addr) since := time.Since(start) @@ -523,10 +530,10 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) { } }) if err != nil { - b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "broker", b.meta.NodeID, "err", err) + b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err) return nil, fmt.Errorf("unable to dial: %w", err) } else { - b.cl.cfg.logger.Log(LogLevelDebug, "connection opened to broker", "addr", b.addr, "broker", b.meta.NodeID) + b.cl.cfg.logger.Log(LogLevelDebug, "connection opened to broker", "addr", b.addr, "broker", logID(b.meta.NodeID)) } return conn, nil } @@ -574,13 +581,13 @@ func (cxn *brokerCxn) init(isProduceCxn bool) error { if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) { if err := cxn.requestAPIVersions(); err != nil { - cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", cxn.b.meta.NodeID, "err", err) + cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", logID(cxn.b.meta.NodeID), "err", err) return err } } if err := cxn.sasl(); err != nil { - cxn.cl.cfg.logger.Log(LogLevelError, "unable to initialize sasl", "broker", cxn.b.meta.NodeID, "err", err) + cxn.cl.cfg.logger.Log(LogLevelError, "unable to initialize sasl", "broker", logID(cxn.b.meta.NodeID), "err", err) return err } @@ -613,7 +620,7 @@ start: ClientSoftwareName: cxn.cl.cfg.softwareName, ClientSoftwareVersion: cxn.cl.cfg.softwareVersion, } - cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing api versions request", "broker", cxn.b.meta.NodeID, "version", maxVersion) + cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing api versions request", "broker", logID(cxn.b.meta.NodeID), "version", maxVersion) corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req) if writeErr != nil { cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr) @@ -646,7 +653,7 @@ start: // EventHubs erroneously replies with v1, so we check // for that as well. srawResp == "\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00" { - cxn.cl.cfg.logger.Log(LogLevelDebug, "kafka does not know our ApiVersions version, downgrading to version 0 and retrying", "broker", cxn.b.meta.NodeID) + cxn.cl.cfg.logger.Log(LogLevelDebug, "kafka does not know our ApiVersions version, downgrading to version 0 and retrying", "broker", logID(cxn.b.meta.NodeID)) maxVersion = 0 goto start } @@ -682,7 +689,7 @@ start: if mechanism.Name() != "GSSAPI" && cxn.versions[req.Key()] >= 0 { req.Mechanism = mechanism.Name() req.Version = cxn.versions[req.Key()] - cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLHandshakeRequest", "broker", cxn.b.meta.NodeID) + cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLHandshakeRequest", "broker", logID(cxn.b.meta.NodeID)) corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req) if writeErr != nil { cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr) @@ -716,7 +723,7 @@ start: } authenticate = req.Version == 1 } - cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "broker", cxn.b.meta.NodeID, "mechanism", mechanism.Name(), "authenticate", authenticate) + cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "broker", logID(cxn.b.meta.NodeID), "mechanism", mechanism.Name(), "authenticate", authenticate) cxn.mechanism = mechanism return cxn.doSasl(authenticate) } @@ -751,7 +758,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { binary.BigEndian.PutUint32(buf, uint32(len(clientWrite))) buf = append(buf, clientWrite...) - cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", cxn.b.meta.NodeID, "step", step) + cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", logID(cxn.b.meta.NodeID), "step", step) _, err, _, _, _ = cxn.writeConn(context.Background(), buf, wt, time.Now()) cxn.cl.bufPool.put(buf) @@ -770,7 +777,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { SASLAuthBytes: clientWrite, } req.Version = cxn.versions[req.Key()] - cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLAuthenticate", "broker", cxn.b.meta.NodeID, "version", req.Version, "step", step) + cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLAuthenticate", "broker", logID(cxn.b.meta.NodeID), "version", req.Version, "step", step) corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req) @@ -823,7 +830,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { } now := time.Now() cxn.expiry = now.Add(time.Duration(lifetimeMillis)*time.Millisecond - time.Second) - cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl has a limited lifetime", "broker", cxn.b.meta.NodeID, "reauthenticate_in", cxn.expiry.Sub(now)) + cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl has a limited lifetime", "broker", logID(cxn.b.meta.NodeID), "reauthenticate_in", cxn.expiry.Sub(now)) } return nil } @@ -871,7 +878,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim } }) if logger := cxn.cl.cfg.logger; logger.Level() >= LogLevelDebug { - logger.Log(LogLevelDebug, fmt.Sprintf("wrote %s v%d", kmsg.NameForKey(req.Key()), req.GetVersion()), "broker", cxn.b.meta.NodeID, "bytes_written", bytesWritten, "write_wait", writeWait, "time_to_write", timeToWrite, "err", writeErr) + logger.Log(LogLevelDebug, fmt.Sprintf("wrote %s v%d", kmsg.NameForKey(req.Key()), req.GetVersion()), "broker", logID(cxn.b.meta.NodeID), "bytes_written", bytesWritten, "write_wait", writeWait, "time_to_write", timeToWrite, "err", writeErr) } if writeErr != nil { @@ -1054,7 +1061,7 @@ func (cxn *brokerCxn) readResponse( } }) if logger := cxn.cl.cfg.logger; logger.Level() >= LogLevelDebug { - logger.Log(LogLevelDebug, fmt.Sprintf("read %s v%d", kmsg.NameForKey(key), version), "broker", cxn.b.meta.NodeID, "bytes_read", bytesRead, "read_wait", readWait, "time_to_read", timeToRead, "err", readErr) + logger.Log(LogLevelDebug, fmt.Sprintf("read %s v%d", kmsg.NameForKey(key), version), "broker", logID(cxn.b.meta.NodeID), "bytes_read", bytesRead, "read_wait", readWait, "time_to_read", timeToRead, "err", readErr) } if readErr != nil { @@ -1247,9 +1254,9 @@ func (cxn *brokerCxn) handleResps() { rawResp, err := cxn.readResponse(pr.ctx, pr.resp.Key(), pr.resp.GetVersion(), pr.corrID, pr.flexibleHeader, pr.readTimeout, pr.bytesWritten, pr.writeWait, pr.timeToWrite, pr.readEnqueue) if err != nil { if successes > 0 || len(cxn.b.cl.cfg.sasls) > 0 { - cxn.b.cl.cfg.logger.Log(LogLevelDebug, "read from broker errored, killing connection", "addr", cxn.b.addr, "id", cxn.b.meta.NodeID, "successful_reads", successes, "err", err) + cxn.b.cl.cfg.logger.Log(LogLevelDebug, "read from broker errored, killing connection", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "successful_reads", successes, "err", err) } else { - cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker errored, killing connection after 0 successful responses (is sasl missing?)", "addr", cxn.b.addr, "id", cxn.b.meta.NodeID, "err", err) + cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker errored, killing connection after 0 successful responses (is sasl missing?)", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "err", err) } pr.promise(nil, err) return diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 6b265c4f..a3dd05ff 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -616,7 +616,7 @@ func (cl *Client) waitUnknownTopic( topic string, unknown *unknownTopicProduces, ) { - cl.cfg.logger.Log(LogLevelInfo, "waiting for metadata to produce to unknown topic", "topic", topic) + cl.cfg.logger.Log(LogLevelInfo, "producing to a new topic for the first time, fetching metadata to learn its partitions", "topic", topic) var after <-chan time.Time if timeout := cl.cfg.recordTimeout; timeout > 0 { timer := time.NewTimer(cl.cfg.recordTimeout) @@ -633,10 +633,10 @@ func (cl *Client) waitUnknownTopic( err = errRecordTimeout case retriableErr, ok := <-unknown.wait: if !ok { - cl.cfg.logger.Log(LogLevelInfo, "done waiting for unknown topic", "topic", topic) + cl.cfg.logger.Log(LogLevelInfo, "done waiting for metadata for new topic", "topic", topic) return // metadata was successful! } - cl.cfg.logger.Log(LogLevelInfo, "unknown topic wait failed, retrying wait", "topic", topic, "err", retriableErr) + cl.cfg.logger.Log(LogLevelInfo, "new topic metadata wait failed, retrying wait", "topic", topic, "err", retriableErr) tries++ if int64(tries) >= cl.cfg.produceRetries { err = fmt.Errorf("no partitions available after attempting to refresh metadata %d times, last err: %w", tries, retriableErr) @@ -659,7 +659,7 @@ func (cl *Client) waitUnknownTopic( if nowUnknown != unknown { return } - cl.cfg.logger.Log(LogLevelInfo, "unknown topic wait failed, done retrying, failing all records", "topic", topic) + cl.cfg.logger.Log(LogLevelInfo, "new topic metadata wait failed, done retrying, failing all records", "topic", topic, "err", err) delete(p.unknownTopics, topic) cl.failUnknownTopicRecords(topic, unknown, err) diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 86bcfbb5..58544766 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -276,7 +276,7 @@ func (s *sink) produce(sem <-chan struct{}) bool { s.cl.cfg.logger.Log(LogLevelWarn, "unable to load producer ID, bumping client's buffered record load errors by 1 and retrying") return true // whatever caused our produce, we did nothing, so keep going default: - s.cl.cfg.logger.Log(LogLevelError, "fatal InitProducerID error, failing all buffered records", "broker", s.nodeID, "err", err) + s.cl.cfg.logger.Log(LogLevelError, "fatal InitProducerID error, failing all buffered records", "broker", logID(s.nodeID), "err", err) fallthrough case ErrClientClosed: s.cl.failBufferedRecords(err) @@ -321,7 +321,7 @@ func (s *sink) produce(sem <-chan struct{}) bool { // because that can lead to undesirable behavior // with produce request vs. end txn (KAFKA-12671) s.cl.failProducerID(id, epoch, err) - s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", s.nodeID, "err", err) + s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", logID(s.nodeID), "err", err) } return false } @@ -413,7 +413,7 @@ func (s *sink) issueTxnReq( for _, topic := range resp.Topics { topicBatches, ok := req.batches[topic.Topic] if !ok { - s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with topic in AddPartitionsToTxnResponse that was not in request", "broker", s.nodeID, "topic", topic.Topic) + s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with topic in AddPartitionsToTxnResponse that was not in request", "broker", logID(s.nodeID), "topic", topic.Topic) continue } for _, partition := range topic.Partitions { @@ -427,7 +427,7 @@ func (s *sink) issueTxnReq( batch, ok := topicBatches[partition.Partition] if !ok { - s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with partition in AddPartitionsToTxnResponse that was not in request", "broker", s.nodeID, "topic", topic.Topic, "partition", partition.Partition) + s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with partition in AddPartitionsToTxnResponse that was not in request", "broker", logID(s.nodeID), "topic", topic.Topic, "partition", partition.Partition) continue } @@ -501,7 +501,7 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) { s.cl.failBufferedRecords(ErrClientClosed) default: - s.cl.cfg.logger.Log(LogLevelWarn, "random error while producing, requeueing unattempted request", "broker", s.nodeID, "err", err) + s.cl.cfg.logger.Log(LogLevelWarn, "random error while producing, requeueing unattempted request", "broker", logID(s.nodeID), "err", err) fallthrough case isRetriableBrokerErr(err): @@ -525,7 +525,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response defer func() { update := b.String() update = strings.TrimSuffix(update, ", ") - s.cl.cfg.logger.Log(LogLevelDebug, "produced", "broker", s.nodeID, "to", update) + s.cl.cfg.logger.Log(LogLevelDebug, "produced", "broker", logID(s.nodeID), "to", update) }() } @@ -589,7 +589,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response topic := rTopic.Topic partitions, ok := req.batches[topic] if !ok { - s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with topic in produce request that we did not produce to", "broker", s.nodeID, "topic", topic) + s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with topic in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", topic) delete(req.metrics, topic) continue // should not hit this } @@ -603,7 +603,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response partition := rPartition.Partition batch, ok := partitions[partition] if !ok { - s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with partition in produce request that we did not produce to", "broker", s.nodeID, "topic", rTopic.Topic, "partition", partition) + s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with partition in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", rTopic.Topic, "partition", partition) delete(tmetrics, partition) continue // should not hit this } @@ -640,7 +640,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response } if len(req.batches) > 0 { - s.cl.cfg.logger.Log(LogLevelError, "Kafka did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", s.nodeID) + s.cl.cfg.logger.Log(LogLevelError, "Kafka did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", logID(s.nodeID)) s.handleRetryBatches(req.batches, 0, true, false) } if len(reqRetry) > 0 { @@ -746,7 +746,7 @@ func (s *sink) handleReqRespBatch( if s.cl.cfg.txnID != nil || s.cl.cfg.stopOnDataLoss { s.cl.cfg.logger.Log(LogLevelInfo, "batch errored, failing the producer ID", - "broker", s.nodeID, + "broker", logID(s.nodeID), "topic", topic, "partition", partition, "producer_id", producerID, @@ -776,7 +776,7 @@ func (s *sink) handleReqRespBatch( // We should not be here, since this error occurs in the // context of transactions, which are caught above. s.cl.cfg.logger.Log(LogLevelInfo, fmt.Sprintf("batch errored with %s, failing the producer ID and resetting all sequence numbers", err.(*kerr.Error).Message), - "broker", s.nodeID, + "broker", logID(s.nodeID), "topic", topic, "partition", partition, "producer_id", producerID, @@ -796,7 +796,7 @@ func (s *sink) handleReqRespBatch( case err == kerr.DuplicateSequenceNumber: // ignorable, but we should not get s.cl.cfg.logger.Log(LogLevelInfo, "received unexpected duplicate sequence number, ignoring and treating batch as successful", - "broker", s.nodeID, + "broker", logID(s.nodeID), "topic", topic, "partition", partition, ) @@ -805,7 +805,7 @@ func (s *sink) handleReqRespBatch( default: if err != nil { s.cl.cfg.logger.Log(LogLevelInfo, "batch in a produce request failed", - "broker", s.nodeID, + "broker", logID(s.nodeID), "topic", topic, "partition", partition, "err", err, @@ -1133,7 +1133,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { if len(recBuf.batches) == 0 { return } - recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err) + recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch", "broker", logID(recBuf.sink.nodeID), "topic", recBuf.topic, "partition", recBuf.partition, "err", err) batch0 := recBuf.batches[0] batch0.tries++ failErr := batch0.maybeFailErr(&recBuf.cl.cfg) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 2353b397..b70b2832 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -623,15 +623,15 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct // If the epoch was zero, the broker did not even // establish a session for us (and thus is maxed on // sessions). We stop trying. - s.cl.cfg.logger.Log(LogLevelInfo, "session failed with SessionIDNotFound while trying to establish a session; broker likely maxed on sessions; continuing on without using sessions", "broker", s.nodeID) + s.cl.cfg.logger.Log(LogLevelInfo, "session failed with SessionIDNotFound while trying to establish a session; broker likely maxed on sessions; continuing on without using sessions", "broker", logID(s.nodeID)) s.session.kill() } else { - s.cl.cfg.logger.Log(LogLevelInfo, "received SessionIDNotFound from our in use session, our session was likely evicted; resetting session", "broker", s.nodeID) + s.cl.cfg.logger.Log(LogLevelInfo, "received SessionIDNotFound from our in use session, our session was likely evicted; resetting session", "broker", logID(s.nodeID)) s.session.reset() } return case kerr.InvalidFetchSessionEpoch: - s.cl.cfg.logger.Log(LogLevelInfo, "resetting fetch session", "broker", s.nodeID, "err", err) + s.cl.cfg.logger.Log(LogLevelInfo, "resetting fetch session", "broker", logID(s.nodeID), "err", err) s.session.reset() return }