diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 991741c1..58f8fa67 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -246,7 +246,7 @@ func (b *broker) handleReqs() { ourMax := req.MaxVersion() if b.cl.cfg.maxVersions != nil { - userMax, _ := b.cl.cfg.maxVersions.LookupVersion(req.Key()) // we validated HasKey above + userMax, _ := b.cl.cfg.maxVersions.LookupMaxKeyVersion(req.Key()) // we validated HasKey above if userMax < ourMax { ourMax = userMax } @@ -264,7 +264,7 @@ func (b *broker) handleReqs() { // lower than we desire, we fail the request for the broker is // too old. if b.cl.cfg.minVersions != nil { - minVersion, minVersionExists := b.cl.cfg.minVersions.LookupVersion(req.Key()) + minVersion, minVersionExists := b.cl.cfg.minVersions.LookupMaxKeyVersion(req.Key()) if minVersionExists && version < minVersion { pr.promise(nil, ErrBrokerTooOld) continue diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index 97af6781..83702b5a 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -14,6 +14,13 @@ import ( // Versions is a list of versions, with each item corresponding to a Kafka key // and each item's value corresponding to the max version supported. +// +// Minimum versions are not currently tracked because all keys have a minimum +// version of zero. The internals of a Versions may change in the future to +// support minimum versions; the outward facing API of Versions should not +// change to support this. +// +// As well, supported features may be added in the future. type Versions struct { // If any version is -1, then it is left out in that version. // This was first done in version 2.7.0, where Kafka added support @@ -22,15 +29,24 @@ type Versions struct { k2v []int16 } +// FromApiVersionsResponse returns a Versions from a kmsg.ApiVersionsResponse. +func FromApiVersionsResponse(r *kmsg.ApiVersionsResponse) Versions { + var v Versions + for _, key := range r.ApiKeys { + v.SetMaxKeyVersion(key.ApiKey, key.MaxVersion) + } + return v +} + // HasKey returns true if the versions contains the given key. func (vs Versions) HasKey(k int16) bool { - _, has := vs.LookupVersion(k) + _, has := vs.LookupMaxKeyVersion(k) return has } -// LookupVersion returns the version for the given key and whether the key -// exists. If the key does not exist, this returns (-1, false). -func (vs Versions) LookupVersion(k int16) (int16, bool) { +// LookupMaxKeyVersion returns the version for the given key and whether the +// key exists. If the key does not exist, this returns (-1, false). +func (vs Versions) LookupMaxKeyVersion(k int16) (int16, bool) { if k < 0 { return -1, false } @@ -44,6 +60,23 @@ func (vs Versions) LookupVersion(k int16) (int16, bool) { return version, true } +// SetMaxKeyVersion sets the max version for the given key. +// +// Setting a version to -1 unsets the key. +// +// Versions are backed by a slice; if the slice is not long enough, it is +// extended to fit the key. +func (vs *Versions) SetMaxKeyVersion(k, v int16) { + if k < 0 { + return + } + needLen := int(k + 1) + for len(vs.k2v) < needLen { + vs.k2v = append(vs.k2v, -1) + } + vs.k2v[k] = v +} + // Returns whether two versions are equal. func (vs Versions) Equal(other Versions) bool { // We allow the version slices to be of different lengths, so long as @@ -67,23 +100,7 @@ func (vs Versions) Equal(other Versions) bool { return true } -// SetKeyVersion sets the version for the given key. -// -// Setting a version to -1 unsets the key. -// -// Versions are backed by a slice; if the slice is not long enough, it is -// extended to fit the key. -func (vs *Versions) SetKeyVersion(k, v int16) { - if k < 0 { - return - } - needLen := int(k + 1) - for len(vs.k2v) < needLen { - vs.k2v = append(vs.k2v, -1) - } - vs.k2v[k] = v -} - +// Returns a string representation of the versions; the format may change. func (vs Versions) String() string { var buf bytes.Buffer w := tabwriter.NewWriter(&buf, 0, 0, 2, ' ', 0)