From f35ef66a290c3e5169aa08d27b60508212ba56f3 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 6 Oct 2022 11:19:16 -0600 Subject: [PATCH] make errUnknown{Controller,Coordinator} retryable, improve error wording The two errors were not retryable, so if they were encountered, we failed requests immediately. We also improve the wording for errUnknownController to indicate that -1 means the controller is not ready, and we change all "Kafka" to "broker" to be more broker-implementation agnostic --- pkg/kgo/broker.go | 4 ++-- pkg/kgo/consumer_group.go | 6 +++--- pkg/kgo/errors.go | 21 +++++++++++++++++---- pkg/kgo/group_balancer.go | 2 +- pkg/kgo/sink.go | 12 ++++++------ pkg/kgo/txn.go | 2 +- 6 files changed, 30 insertions(+), 17 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 9b65572f..6b2cba80 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -696,14 +696,14 @@ start: // Post, Kafka replies with all versions. if rawResp[1] == 35 { if maxVersion == 0 { - return errors.New("Kafka replied with UNSUPPORTED_VERSION to an ApiVersions request of version 0") + return errors.New("broker replied with UNSUPPORTED_VERSION to an ApiVersions request of version 0") } srawResp := string(rawResp) if srawResp == "\x00\x23\x00\x00\x00\x00" || // EventHubs erroneously replies with v1, so we check // for that as well. srawResp == "\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00" { - cxn.cl.cfg.logger.Log(LogLevelDebug, "kafka does not know our ApiVersions version, downgrading to version 0 and retrying", "broker", logID(cxn.b.meta.NodeID)) + cxn.cl.cfg.logger.Log(LogLevelDebug, "broker does not know our ApiVersions version, downgrading to version 0 and retrying", "broker", logID(cxn.b.meta.NodeID)) maxVersion = 0 goto start } diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index abdf443a..2ddc546a 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1785,7 +1785,7 @@ func (g *groupConsumer) updateCommitted( } if g.uncommitted == nil || // just in case len(req.Topics) != len(resp.Topics) { // bad kafka - g.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Num topics in request: %d, in reply: %d, we cannot handle this!", len(req.Topics), len(resp.Topics)), "group", g.cfg.group) + g.cfg.logger.Log(LogLevelError, fmt.Sprintf("broker replied to our OffsetCommitRequest incorrectly! Num topics in request: %d, in reply: %d, we cannot handle this!", len(req.Topics), len(resp.Topics)), "group", g.cfg.group) return } @@ -1806,7 +1806,7 @@ func (g *groupConsumer) updateCommitted( if topic == nil || // just in case reqTopic.Topic != respTopic.Topic || // bad kafka len(reqTopic.Partitions) != len(respTopic.Partitions) { // same - g.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Topic at request index %d: %s, reply at index: %s; num partitions on request topic: %d, in reply: %d, we cannot handle this!", i, reqTopic.Topic, respTopic.Topic, len(reqTopic.Partitions), len(respTopic.Partitions)), "group", g.cfg.group) + g.cfg.logger.Log(LogLevelError, fmt.Sprintf("broker replied to our OffsetCommitRequest incorrectly! Topic at request index %d: %s, reply at index: %s; num partitions on request topic: %d, in reply: %d, we cannot handle this!", i, reqTopic.Topic, respTopic.Topic, len(reqTopic.Partitions), len(respTopic.Partitions)), "group", g.cfg.group) continue } @@ -1828,7 +1828,7 @@ func (g *groupConsumer) updateCommitted( continue } if reqPart.Partition != respPart.Partition { // bad kafka - g.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Topic %s partition %d != resp partition %d", reqTopic.Topic, reqPart.Partition, respPart.Partition), "group", g.cfg.group) + g.cfg.logger.Log(LogLevelError, fmt.Sprintf("broker replied to our OffsetCommitRequest incorrectly! Topic %s partition %d != resp partition %d", reqTopic.Topic, reqPart.Partition, respPart.Partition), "group", g.cfg.group) continue } if respPart.ErrorCode != 0 { diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index e9e78516..fad5a49c 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -71,6 +71,16 @@ func isRetriableBrokerErr(err error) bool { if errors.Is(err, errCorrelationIDMismatch) { return true } + // We sometimes load the controller before issuing requests, and the + // cluster may not yet be ready and will return -1 for the controller. + // We can backoff and retry and hope the cluster has stabilized. + if ce := (*errUnknownController)(nil); errors.As(err, &ce) { + return true + } + // Same thought for a non-existing coordinator. + if ce := (*errUnknownCoordinator)(nil); errors.As(err, &ce) { + return true + } var tempErr interface{ Temporary() bool } if errors.As(err, &tempErr) { return tempErr.Temporary() @@ -213,7 +223,10 @@ type errUnknownController struct { } func (e *errUnknownController) Error() string { - return fmt.Sprintf("Kafka replied that the controller broker is %d,"+ + if e.id == -1 { + return "broker replied that the controller broker is not available" + } + return fmt.Sprintf("broker replied that the controller broker is %d,"+ " but did not reply with that broker in the broker list", e.id) } @@ -225,15 +238,15 @@ type errUnknownCoordinator struct { func (e *errUnknownCoordinator) Error() string { switch e.key.typ { case coordinatorTypeGroup: - return fmt.Sprintf("Kafka replied that group %s has broker coordinator %d,"+ + return fmt.Sprintf("broker replied that group %s has broker coordinator %d,"+ " but did not reply with that broker in the broker list", e.key.name, e.coordinator) case coordinatorTypeTxn: - return fmt.Sprintf("Kafka replied that txn id %s has broker coordinator %d,"+ + return fmt.Sprintf("broker replied that txn id %s has broker coordinator %d,"+ " but did not reply with that broker in the broker list", e.key.name, e.coordinator) default: - return fmt.Sprintf("Kafka replied to an unknown coordinator key %s (type %d) that it has a broker coordinator %d,"+ + return fmt.Sprintf("broker replied to an unknown coordinator key %s (type %d) that it has a broker coordinator %d,"+ " but did not reply with that broker in the broker list", e.key.name, e.key.typ, e.coordinator) } } diff --git a/pkg/kgo/group_balancer.go b/pkg/kgo/group_balancer.go index 37da30f5..6c6e8313 100644 --- a/pkg/kgo/group_balancer.go +++ b/pkg/kgo/group_balancer.go @@ -327,7 +327,7 @@ func (g *groupConsumer) findBalancer(from, proto string) (GroupBalancer, error) for _, b := range g.cfg.balancers { ours = append(ours, b.ProtocolName()) } - g.cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("%s could not find Kafka-chosen balancer", from), "kafka_choice", proto, "our_set", strings.Join(ours, ", ")) + g.cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("%s could not find broker-chosen balancer", from), "kafka_choice", proto, "our_set", strings.Join(ours, ", ")) return nil, fmt.Errorf("unable to balance: none of our balancers have a name equal to the balancer chosen for balancing (%s)", proto) } diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index c8c42cf8..a74e015d 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -455,7 +455,7 @@ func (s *sink) issueTxnReq( for _, topic := range resp.Topics { topicBatches, ok := req.batches[topic.Topic] if !ok { - s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with topic in AddPartitionsToTxnResponse that was not in request", "topic", topic.Topic) + s.cl.cfg.logger.Log(LogLevelError, "broker replied with topic in AddPartitionsToTxnResponse that was not in request", "topic", topic.Topic) continue } for _, partition := range topic.Partitions { @@ -470,7 +470,7 @@ func (s *sink) issueTxnReq( batch, ok := topicBatches[partition.Partition] if !ok { - s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with partition in AddPartitionsToTxnResponse that was not in request", "topic", topic.Topic, "partition", partition.Partition) + s.cl.cfg.logger.Log(LogLevelError, "broker replied with partition in AddPartitionsToTxnResponse that was not in request", "topic", topic.Topic, "partition", partition.Partition) continue } @@ -604,7 +604,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response topic := rTopic.Topic partitions, ok := req.batches[topic] if !ok { - s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with topic in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", topic) + s.cl.cfg.logger.Log(LogLevelError, "broker erroneously replied with topic in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", topic) delete(req.metrics, topic) continue // should not hit this } @@ -618,7 +618,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response partition := rPartition.Partition batch, ok := partitions[partition] if !ok { - s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with partition in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", rTopic.Topic, "partition", partition) + s.cl.cfg.logger.Log(LogLevelError, "broker erroneously replied with partition in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", rTopic.Topic, "partition", partition) delete(tmetrics, partition) continue // should not hit this } @@ -655,8 +655,8 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response } if len(req.batches) > 0 { - s.cl.cfg.logger.Log(LogLevelError, "Kafka did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", logID(s.nodeID)) - s.handleRetryBatches(req.batches, 0, true, false, "kafka did not reply to all topics in produce request") + s.cl.cfg.logger.Log(LogLevelError, "broker did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", logID(s.nodeID)) + s.handleRetryBatches(req.batches, 0, true, false, "broker did not reply to all topics in produce request") } if len(reqRetry) > 0 { s.handleRetryBatches(reqRetry, 0, true, true, "produce request had retry batches") diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index bb12e54d..aed72ea9 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -342,7 +342,7 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry } else { defer func() { if committed { - s.cl.cfg.logger.Log(LogLevelDebug, "sleeping 200ms before allowing a rebalance to continue to give Kafka a chance to write txn markers and avoid duplicates") + s.cl.cfg.logger.Log(LogLevelDebug, "sleeping 200ms before allowing a rebalance to continue to give the brokers a chance to write txn markers and avoid duplicates") go func() { time.Sleep(200 * time.Millisecond) s.failMu.Unlock()