diff --git a/admin.go b/admin.go index a88fe6b06..8a24ad156 100644 --- a/admin.go +++ b/admin.go @@ -6,8 +6,10 @@ import ( "io" "maps" "math/rand" + "net" "strconv" "sync" + "syscall" "time" ) @@ -211,16 +213,23 @@ func (ca *clusterAdmin) refreshController() (*Broker, error) { } // isRetriableControllerError returns `true` if the given error type unwraps to -// an `ErrNotController` or `EOF` response from Kafka +// an `ErrNotController`, `EOF` response from Kafka, or a network-level error. func isRetriableControllerError(err error) bool { - return errors.Is(err, ErrNotController) || errors.Is(err, io.EOF) + return errors.Is(err, ErrNotController) || errors.Is(err, io.EOF) || isNetworkError(err) } // isRetriableGroupCoordinatorError returns `true` if the given error type // unwraps to an `ErrNotCoordinatorForConsumer`, -// `ErrConsumerCoordinatorNotAvailable` or `EOF` response from Kafka +// `ErrConsumerCoordinatorNotAvailable`, `EOF` response from Kafka, or a network-level error func isRetriableGroupCoordinatorError(err error) bool { - return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable) || errors.Is(err, io.EOF) + return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable) || errors.Is(err, io.EOF) || isNetworkError(err) +} + +// isNetworkError categorizes transient network failures that should trigger +// a reconnect/retry path (e.g. broken pipe, connection reset/timeout). +func isNetworkError(err error) bool { + var ne net.Error + return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.EPIPE) || errors.As(err, &ne) } // retryOnError will repeatedly call the given (error-returning) func in the @@ -234,6 +243,13 @@ func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error if err == nil || attemptsRemaining <= 0 || !retryable(err) { return err } + if isNetworkError(err) { + // Close the cached controller connection so the next attempt will reopen it. + // Ref: https://github.com/IBM/sarama/issues/1162 — admin calls can get stuck on a broken controller connection. + if ctrl, e := ca.client.Controller(); e == nil { + _ = ctrl.Close() + } + } Logger.Printf( "admin/request retrying after %dms... (%d attempts remaining)\n", ca.conf.Admin.Retry.Backoff/time.Millisecond, attemptsRemaining) @@ -355,76 +371,92 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) { // DescribeConfigsRequest request. To avoid sending many requests to the // broker, we use a single DescribeConfigsRequest. - // Send the all-topic MetadataRequest - b, err := ca.findAnyBroker() - if err != nil { - return nil, err - } - _ = b.Open(ca.client.Config()) + var topicsDetailsMap map[string]TopicDetail - metadataReq := NewMetadataRequest(ca.conf.Version, nil) - metadataResp, err := b.GetMetadata(metadataReq) - if err != nil { - return nil, err - } + err := ca.retryOnError(isNetworkError, func() error { + // Send the all-topic MetadataRequest + b, err := ca.findAnyBroker() + if err != nil { + return err + } + _ = b.Open(ca.client.Config()) - topicsDetailsMap := make(map[string]TopicDetail, len(metadataResp.Topics)) + metadataReq := NewMetadataRequest(ca.conf.Version, nil) + metadataResp, err := b.GetMetadata(metadataReq) + if err != nil { + if isNetworkError(err) { + _ = b.Close() + } + return err + } - var describeConfigsResources []*ConfigResource + localTopicsDetails := make(map[string]TopicDetail, len(metadataResp.Topics)) + var describeConfigsResources []*ConfigResource - for _, topic := range metadataResp.Topics { - topicDetails := TopicDetail{ - NumPartitions: int32(len(topic.Partitions)), - } - if len(topic.Partitions) > 0 { - topicDetails.ReplicaAssignment = make(map[int32][]int32, len(topic.Partitions)) - for _, partition := range topic.Partitions { - topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas + for _, topic := range metadataResp.Topics { + topicDetails := TopicDetail{ + NumPartitions: int32(len(topic.Partitions)), } - topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas)) - } - topicsDetailsMap[topic.Name] = topicDetails + if len(topic.Partitions) > 0 { + topicDetails.ReplicaAssignment = make(map[int32][]int32, len(topic.Partitions)) + for _, partition := range topic.Partitions { + topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas + } + topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas)) + } + localTopicsDetails[topic.Name] = topicDetails - // we populate the resources we want to describe from the MetadataResponse - topicResource := ConfigResource{ - Type: TopicResource, - Name: topic.Name, + // we populate the resources we want to describe from the MetadataResponse + topicResource := ConfigResource{ + Type: TopicResource, + Name: topic.Name, + } + describeConfigsResources = append(describeConfigsResources, &topicResource) } - describeConfigsResources = append(describeConfigsResources, &topicResource) - } - // Send the DescribeConfigsRequest - describeConfigsReq := &DescribeConfigsRequest{ - Resources: describeConfigsResources, - } + // Send the DescribeConfigsRequest + describeConfigsReq := &DescribeConfigsRequest{ + Resources: describeConfigsResources, + } - if ca.conf.Version.IsAtLeast(V1_1_0_0) { - describeConfigsReq.Version = 1 - } + if ca.conf.Version.IsAtLeast(V1_1_0_0) { + describeConfigsReq.Version = 1 + } - if ca.conf.Version.IsAtLeast(V2_0_0_0) { - describeConfigsReq.Version = 2 - } + if ca.conf.Version.IsAtLeast(V2_0_0_0) { + describeConfigsReq.Version = 2 + } - describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq) - if err != nil { - return nil, err - } + describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq) + if err != nil { + if isNetworkError(err) { + _ = b.Close() + } + return err + } - for _, resource := range describeConfigsResp.Resources { - topicDetails := topicsDetailsMap[resource.Name] - topicDetails.ConfigEntries = make(map[string]*string) + for _, resource := range describeConfigsResp.Resources { + topicDetails := localTopicsDetails[resource.Name] + topicDetails.ConfigEntries = make(map[string]*string) - for _, entry := range resource.Configs { - // only include non-default non-sensitive config - // (don't actually think topic config will ever be sensitive) - if entry.Default || entry.Sensitive { - continue + for _, entry := range resource.Configs { + // only include non-default non-sensitive config + // (don't actually think topic config will ever be sensitive) + if entry.Default || entry.Sensitive { + continue + } + topicDetails.ConfigEntries[entry.Name] = &entry.Value } - topicDetails.ConfigEntries[entry.Name] = &entry.Value + + localTopicsDetails[resource.Name] = topicDetails } - topicsDetailsMap[resource.Name] = topicDetails + topicsDetailsMap = localTopicsDetails + return nil + }) + + if err != nil { + return nil, err } return topicsDetailsMap, nil @@ -711,23 +743,31 @@ func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, return nil, err } - _ = b.Open(ca.client.Config()) - rsp, err := b.DescribeConfigs(request) - if err != nil { - return nil, err - } - - for _, rspResource := range rsp.Resources { - if rspResource.Name == resource.Name { - if rspResource.ErrorCode != 0 { - return nil, &DescribeConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg} + err = ca.retryOnError(isNetworkError, func() error { + _ = b.Open(ca.client.Config()) + rsp, err := b.DescribeConfigs(request) + if err != nil { + if isNetworkError(err) { + _ = b.Close() } - for _, cfgEntry := range rspResource.Configs { - entries = append(entries, *cfgEntry) + return err + } + + entries = entries[:0] + for _, rspResource := range rsp.Resources { + if rspResource.Name == resource.Name { + if rspResource.ErrorCode != 0 { + return &DescribeConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg} + } + for _, cfgEntry := range rspResource.Configs { + entries = append(entries, *cfgEntry) + } } } - } - return entries, nil + return nil + }) + + return entries, err } func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error { @@ -766,20 +806,25 @@ func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string return err } - _ = b.Open(ca.client.Config()) - rsp, err := b.AlterConfigs(request) - if err != nil { - return err - } + return ca.retryOnError(isNetworkError, func() error { + _ = b.Open(ca.client.Config()) + rsp, err := b.AlterConfigs(request) + if err != nil { + if isNetworkError(err) { + _ = b.Close() + } + return err + } - for _, rspResource := range rsp.Resources { - if rspResource.Name == name { - if rspResource.ErrorCode != 0 { - return &AlterConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg} + for _, rspResource := range rsp.Resources { + if rspResource.Name == name { + if rspResource.ErrorCode != 0 { + return &AlterConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg} + } } } - } - return nil + return nil + }) } func (ca *clusterAdmin) IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error { @@ -819,24 +864,29 @@ func (ca *clusterAdmin) IncrementalAlterConfig(resourceType ConfigResourceType, return err } - _ = b.Open(ca.client.Config()) - rsp, err := b.IncrementalAlterConfigs(request) - if err != nil { - return err - } + return ca.retryOnError(isNetworkError, func() error { + _ = b.Open(ca.client.Config()) + rsp, err := b.IncrementalAlterConfigs(request) + if err != nil { + if isNetworkError(err) { + _ = b.Close() + } + return err + } - for _, rspResource := range rsp.Resources { - if rspResource.Name == name { - if rspResource.ErrorCode != int16(ErrNoError) { - err = KError(rspResource.ErrorCode) - if rspResource.ErrorMsg != "" { - err = fmt.Errorf("%w: %s", err, rspResource.ErrorMsg) + for _, rspResource := range rsp.Resources { + if rspResource.Name == name { + if rspResource.ErrorCode != int16(ErrNoError) { + err = KError(rspResource.ErrorCode) + if rspResource.ErrorMsg != "" { + err = fmt.Errorf("%w: %s", err, rspResource.ErrorMsg) + } + return err } - return err } } - } - return nil + return nil + }) } func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error { @@ -848,13 +898,22 @@ func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error { request.Version = 1 } - b, err := ca.Controller() - if err != nil { - return err - } + return ca.retryOnError(isRetriableControllerError, func() error { + b, err := ca.Controller() + if err != nil { + return err + } + _ = b.Open(ca.client.Config()) - _, err = b.CreateAcls(request) - return err + _, err = b.CreateAcls(request) + if isNetworkError(err) { + _ = b.Close() + } + if isRetriableControllerError(err) { + _, _ = ca.refreshController() + } + return err + }) } func (ca *clusterAdmin) CreateACLs(resourceACLs []*ResourceAcls) error { @@ -870,13 +929,22 @@ func (ca *clusterAdmin) CreateACLs(resourceACLs []*ResourceAcls) error { request.Version = 1 } - b, err := ca.Controller() - if err != nil { - return err - } + return ca.retryOnError(isRetriableControllerError, func() error { + b, err := ca.Controller() + if err != nil { + return err + } + _ = b.Open(ca.client.Config()) - _, err = b.CreateAcls(request) - return err + _, err = b.CreateAcls(request) + if isNetworkError(err) { + _ = b.Close() + } + if isRetriableControllerError(err) { + _, _ = ca.refreshController() + } + return err + }) } func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) { @@ -886,23 +954,36 @@ func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) { request.Version = 1 } - b, err := ca.Controller() - if err != nil { - return nil, err - } + var lAcls []ResourceAcls + if err := ca.retryOnError(isRetriableControllerError, func() error { + b, err := ca.Controller() + if err != nil { + return err + } + _ = b.Open(ca.client.Config()) - rsp, err := b.DescribeAcls(request) - if err != nil { + rsp, err := b.DescribeAcls(request) + if isNetworkError(err) { + _ = b.Close() + } + if isRetriableControllerError(err) { + _, _ = ca.refreshController() + } + if err != nil { + return err + } + + lAcls = lAcls[:0] + for _, rAcl := range rsp.ResourceAcls { + lAcls = append(lAcls, *rAcl) + } + return nil + }); err != nil { return nil, err } - var lAcls []ResourceAcls - for _, rAcl := range rsp.ResourceAcls { - lAcls = append(lAcls, *rAcl) - } return lAcls, nil } - func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) { var filters []*AclFilter filters = append(filters, &filter) @@ -912,25 +993,38 @@ func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]Matchi request.Version = 1 } - b, err := ca.Controller() - if err != nil { - return nil, err - } + var mAcls []MatchingAcl + if err := ca.retryOnError(isRetriableControllerError, func() error { + b, err := ca.Controller() + if err != nil { + return err + } + _ = b.Open(ca.client.Config()) - rsp, err := b.DeleteAcls(request) - if err != nil { - return nil, err - } + rsp, err := b.DeleteAcls(request) + if isNetworkError(err) { + _ = b.Close() + } + if isRetriableControllerError(err) { + _, _ = ca.refreshController() + } + if err != nil { + return err + } - var mAcls []MatchingAcl - for _, fr := range rsp.FilterResponses { - for _, mACL := range fr.MatchingAcls { - mAcls = append(mAcls, *mACL) + mAcls = mAcls[:0] + for _, fr := range rsp.FilterResponses { + for _, mACL := range fr.MatchingAcls { + mAcls = append(mAcls, *mACL) + } } + return nil + }); err != nil { + return nil, err } + return mAcls, nil } - func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[string][]int32) (map[string]map[int32]*PartitionResult, error) { request := &ElectLeadersRequest{ Type: electionType, @@ -999,12 +1093,31 @@ func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*Group // Version 1 is the same as version 0. describeReq.Version = 1 } - response, err := broker.DescribeGroups(describeReq) + + err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + for _, g := range brokerGroups { + _ = ca.client.RefreshCoordinator(g) + } + } + }() + + _ = broker.Open(ca.conf) + response, err := broker.DescribeGroups(describeReq) + if err != nil { + if isNetworkError(err) { + _ = broker.Close() + } + return err + } + + result = append(result, response.Groups...) + return nil + }) if err != nil { return nil, err } - - result = append(result, response.Groups...) } return result, nil } @@ -1022,33 +1135,42 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e wg.Add(1) go func(b *Broker, conf *Config) { defer wg.Done() - _ = b.Open(conf) // Ensure that broker is opened - - request := &ListGroupsRequest{} - if ca.conf.Version.IsAtLeast(V3_8_0_0) { - // Version 5 adds the TypesFilter field (KIP-848). - request.Version = 5 - } else if ca.conf.Version.IsAtLeast(V2_6_0_0) { - // Version 4 adds the StatesFilter field (KIP-518). - request.Version = 4 - } else if ca.conf.Version.IsAtLeast(V2_4_0_0) { - // Version 3 is the first flexible version. - request.Version = 3 - } else if ca.conf.Version.IsAtLeast(V2_0_0_0) { - // Version 2 is the same as version 0. - request.Version = 2 - } else if ca.conf.Version.IsAtLeast(V0_11_0_0) { - // Version 1 is the same as version 0. - request.Version = 1 - } + err := ca.retryOnError(isNetworkError, func() error { + _ = b.Open(conf) // Ensure that broker is opened + + request := &ListGroupsRequest{} + if ca.conf.Version.IsAtLeast(V3_8_0_0) { + // Version 5 adds the TypesFilter field (KIP-848). + request.Version = 5 + } else if ca.conf.Version.IsAtLeast(V2_6_0_0) { + // Version 4 adds the StatesFilter field (KIP-518). + request.Version = 4 + } else if ca.conf.Version.IsAtLeast(V2_4_0_0) { + // Version 3 is the first flexible version. + request.Version = 3 + } else if ca.conf.Version.IsAtLeast(V2_0_0_0) { + // Version 2 is the same as version 0. + request.Version = 2 + } else if ca.conf.Version.IsAtLeast(V0_11_0_0) { + // Version 1 is the same as version 0. + request.Version = 1 + } + + response, err := b.ListGroups(request) + if err != nil { + if isNetworkError(err) { + _ = b.Close() + } + return err + } - response, err := b.ListGroups(request) + groupMaps <- maps.Clone(response.Groups) + return nil + }) if err != nil { errChan <- err return } - - groupMaps <- maps.Clone(response.Groups) }(b, ca.conf) } @@ -1191,28 +1313,36 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32 wg.Add(1) go func(b *Broker, conf *Config) { defer wg.Done() - _ = b.Open(conf) // Ensure that broker is opened - - request := &DescribeLogDirsRequest{} - if ca.conf.Version.IsAtLeast(V3_3_0_0) { - request.Version = 4 - } else if ca.conf.Version.IsAtLeast(V3_2_0_0) { - request.Version = 3 - } else if ca.conf.Version.IsAtLeast(V2_6_0_0) { - request.Version = 2 - } else if ca.conf.Version.IsAtLeast(V2_0_0_0) { - request.Version = 1 - } - response, err := b.DescribeLogDirs(request) + err := ca.retryOnError(isNetworkError, func() error { + _ = b.Open(conf) // Ensure each attempt uses a fresh/open connection + + request := &DescribeLogDirsRequest{} + if ca.conf.Version.IsAtLeast(V3_3_0_0) { + request.Version = 4 + } else if ca.conf.Version.IsAtLeast(V3_2_0_0) { + request.Version = 3 + } else if ca.conf.Version.IsAtLeast(V2_6_0_0) { + request.Version = 2 + } else if ca.conf.Version.IsAtLeast(V2_0_0_0) { + request.Version = 1 + } + + response, err := b.DescribeLogDirs(request) + if err != nil { + if isNetworkError(err) { + _ = b.Close() // drop broken connection before retry + } + return err + } + if !errors.Is(response.ErrorCode, ErrNoError) { + return response.ErrorCode + } + logDirsResults <- result{id: b.ID(), logdirs: response.LogDirs} + return nil + }) if err != nil { errChan <- err - return - } - if !errors.Is(response.ErrorCode, ErrNoError) { - errChan <- response.ErrorCode - return } - logDirsResults <- result{id: b.ID(), logdirs: response.LogDirs} }(broker, ca.conf) } @@ -1238,19 +1368,28 @@ func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*Describ }) } - b, err := ca.Controller() - if err != nil { - return nil, err - } + var rsp *DescribeUserScramCredentialsResponse + if err := ca.retryOnError(isRetriableControllerError, func() error { + b, err := ca.Controller() + if err != nil { + return err + } + _ = b.Open(ca.client.Config()) - rsp, err := b.DescribeUserScramCredentials(req) - if err != nil { + rsp, err = b.DescribeUserScramCredentials(req) + if isNetworkError(err) { + _ = b.Close() + } + if isRetriableControllerError(err) { + _, _ = ca.refreshController() + } + return err + }); err != nil { return nil, err } return rsp.Results, nil } - func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) { res, err := ca.AlterUserScramCredentials(upsert, nil) if err != nil { @@ -1302,13 +1441,23 @@ func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict, ) - b, err := ca.Controller() - if err != nil { - return nil, err - } + var rsp *DescribeClientQuotasResponse + if err := ca.retryOnError(isRetriableControllerError, func() error { + b, err := ca.Controller() + if err != nil { + return err + } + _ = b.Open(ca.client.Config()) - rsp, err := b.DescribeClientQuotas(request) - if err != nil { + rsp, err = b.DescribeClientQuotas(request) + if isNetworkError(err) { + _ = b.Close() + } + if isRetriableControllerError(err) { + _, _ = ca.refreshController() + } + return err + }); err != nil { return nil, err } @@ -1321,7 +1470,6 @@ func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, return rsp.Entries, nil } - func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error { entry := AlterClientQuotasEntry{ Entity: entity, @@ -1333,28 +1481,36 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie ValidateOnly: validateOnly, } - b, err := ca.Controller() - if err != nil { - return err - } - - rsp, err := b.AlterClientQuotas(request) - if err != nil { - return err - } + return ca.retryOnError(isRetriableControllerError, func() error { + b, err := ca.Controller() + if err != nil { + return err + } + _ = b.Open(ca.client.Config()) - for _, entry := range rsp.Entries { - if entry.ErrorMsg != nil && len(*entry.ErrorMsg) > 0 { - return errors.New(*entry.ErrorMsg) + rsp, err := b.AlterClientQuotas(request) + if isNetworkError(err) { + _ = b.Close() + } + if isRetriableControllerError(err) { + _, _ = ca.refreshController() } - if !errors.Is(entry.ErrorCode, ErrNoError) { - return entry.ErrorCode + if err != nil { + return err } - } - return nil -} + for _, entry := range rsp.Entries { + if entry.ErrorMsg != nil && len(*entry.ErrorMsg) > 0 { + return errors.New(*entry.ErrorMsg) + } + if !errors.Is(entry.ErrorCode, ErrNoError) { + return entry.ErrorCode + } + } + return nil + }) +} func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanceIds []string) (*LeaveGroupResponse, error) { if !ca.conf.Version.IsAtLeast(V2_4_0_0) { return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0")