Skip to content

Commit

Permalink
kversion: switchup Versions API, finalize 2.7.0
Browse files Browse the repository at this point in the history
As usual, a new release added new behavior: 2.7.0 has a gap in API keys
in the ApiVersions response.

This commit switches up the Versions type to a struct, which is a
breaking change but makes future changes unlikely to be breaking. The
gap is indicated with negative values.

This required small changes in the client guts, but I think the result
is easier to read.

Also removes an erroneous kversions bump for ApiVersions in tip.
  • Loading branch information
twmb committed Jan 4, 2021
1 parent b38befd commit bc1f2d1
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 229 deletions.
18 changes: 9 additions & 9 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ func (b *broker) handleReqs() {
}

if int(req.Key()) > len(cxn.versions[:]) ||
b.cl.cfg.maxVersions != nil &&
int(req.Key()) >= len(b.cl.cfg.maxVersions) {
b.cl.cfg.maxVersions != nil && !b.cl.cfg.maxVersions.HasKey(req.Key()) {
pr.promise(nil, ErrUnknownRequestKey)
continue
}
Expand All @@ -248,7 +247,7 @@ func (b *broker) handleReqs() {

ourMax := req.MaxVersion()
if b.cl.cfg.maxVersions != nil {
userMax := b.cl.cfg.maxVersions[req.Key()]
userMax, _ := b.cl.cfg.maxVersions.LookupVersion(req.Key()) // we validated HasKey above
if userMax < ourMax {
ourMax = userMax
}
Expand All @@ -265,11 +264,12 @@ func (b *broker) handleReqs() {
// If the version now (after potential broker downgrading) is
// lower than we desire, we fail the request for the broker is
// too old.
if b.cl.cfg.minVersions != nil &&
int(req.Key()) < len(b.cl.cfg.minVersions) &&
version < b.cl.cfg.minVersions[req.Key()] {
pr.promise(nil, ErrBrokerTooOld)
continue
if b.cl.cfg.minVersions != nil {
minVersion, minVersionExists := b.cl.cfg.minVersions.LookupVersion(req.Key())
if minVersionExists && version < minVersion {
pr.promise(nil, ErrBrokerTooOld)
continue
}
}

req.SetVersion(version) // always go for highest version
Expand Down Expand Up @@ -422,7 +422,7 @@ func (cxn *brokerCxn) init() error {
cxn.versions[i] = -1
}

if cxn.b.cl.cfg.maxVersions == nil || len(cxn.b.cl.cfg.maxVersions) >= 19 {
if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) {
if err := cxn.requestAPIVersions(); err != nil {
cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "err", err)
return err
Expand Down
10 changes: 5 additions & 5 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type cfg struct {
logger Logger

seedBrokers []string
maxVersions kversion.Versions
minVersions kversion.Versions
maxVersions *kversion.Versions
minVersions *kversion.Versions

retryBackoff func(int) time.Duration
retries int
Expand Down Expand Up @@ -142,7 +142,7 @@ func defaultCfg() cfg {
logger: new(nopLogger),

seedBrokers: []string{"127.0.0.1"},
maxVersions: kversion.Stable(),
maxVersions: func() *kversion.Versions { s := kversion.Stable(); return &s }(),

retryBackoff: func() func(int) time.Duration {
var rngMu sync.Mutex
Expand Down Expand Up @@ -300,7 +300,7 @@ func SeedBrokers(seeds ...string) Opt {
// requests, it is recommended to pin versions so that new fields on requests
// do not get invalid default zero values before you update your usage.
func MaxVersions(versions kversion.Versions) Opt {
return clientOpt{func(cfg *cfg) { cfg.maxVersions = versions }}
return clientOpt{func(cfg *cfg) { cfg.maxVersions = &versions }}
}

// MinVersions sets the minimum Kafka version a request can be downgraded to,
Expand All @@ -316,7 +316,7 @@ func MaxVersions(versions kversion.Versions) Opt {
// versions, the request is allowed. It is assumed that there is no lower bound
// for that request.
func MinVersions(versions kversion.Versions) Opt {
return clientOpt{func(cfg *cfg) { cfg.minVersions = versions }}
return clientOpt{func(cfg *cfg) { cfg.minVersions = &versions }}
}

// RetryBackoff sets the backoff strategy for how long to backoff for a given
Expand Down
Loading

0 comments on commit bc1f2d1

Please sign in to comment.