Skip to content

Commit bc1f2d1

Browse files
committed
kversion: switchup Versions API, finalize 2.7.0
As usual, a new release added new behavior: 2.7.0 has a gap in API keys in the ApiVersions response. This commit switches up the Versions type to a struct, which is a breaking change but makes future changes unlikely to be breaking. The gap is indicated with negative values. This required small changes in the client guts, but I think the result is easier to read. Also removes an erroneous kversions bump for ApiVersions in tip.
1 parent b38befd commit bc1f2d1

File tree

3 files changed

+305
-229
lines changed

3 files changed

+305
-229
lines changed

Diff for: pkg/kgo/broker.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,7 @@ func (b *broker) handleReqs() {
232232
}
233233

234234
if int(req.Key()) > len(cxn.versions[:]) ||
235-
b.cl.cfg.maxVersions != nil &&
236-
int(req.Key()) >= len(b.cl.cfg.maxVersions) {
235+
b.cl.cfg.maxVersions != nil && !b.cl.cfg.maxVersions.HasKey(req.Key()) {
237236
pr.promise(nil, ErrUnknownRequestKey)
238237
continue
239238
}
@@ -248,7 +247,7 @@ func (b *broker) handleReqs() {
248247

249248
ourMax := req.MaxVersion()
250249
if b.cl.cfg.maxVersions != nil {
251-
userMax := b.cl.cfg.maxVersions[req.Key()]
250+
userMax, _ := b.cl.cfg.maxVersions.LookupVersion(req.Key()) // we validated HasKey above
252251
if userMax < ourMax {
253252
ourMax = userMax
254253
}
@@ -265,11 +264,12 @@ func (b *broker) handleReqs() {
265264
// If the version now (after potential broker downgrading) is
266265
// lower than we desire, we fail the request for the broker is
267266
// too old.
268-
if b.cl.cfg.minVersions != nil &&
269-
int(req.Key()) < len(b.cl.cfg.minVersions) &&
270-
version < b.cl.cfg.minVersions[req.Key()] {
271-
pr.promise(nil, ErrBrokerTooOld)
272-
continue
267+
if b.cl.cfg.minVersions != nil {
268+
minVersion, minVersionExists := b.cl.cfg.minVersions.LookupVersion(req.Key())
269+
if minVersionExists && version < minVersion {
270+
pr.promise(nil, ErrBrokerTooOld)
271+
continue
272+
}
273273
}
274274

275275
req.SetVersion(version) // always go for highest version
@@ -422,7 +422,7 @@ func (cxn *brokerCxn) init() error {
422422
cxn.versions[i] = -1
423423
}
424424

425-
if cxn.b.cl.cfg.maxVersions == nil || len(cxn.b.cl.cfg.maxVersions) >= 19 {
425+
if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) {
426426
if err := cxn.requestAPIVersions(); err != nil {
427427
cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "err", err)
428428
return err

Diff for: pkg/kgo/config.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ type cfg struct {
5555
logger Logger
5656

5757
seedBrokers []string
58-
maxVersions kversion.Versions
59-
minVersions kversion.Versions
58+
maxVersions *kversion.Versions
59+
minVersions *kversion.Versions
6060

6161
retryBackoff func(int) time.Duration
6262
retries int
@@ -142,7 +142,7 @@ func defaultCfg() cfg {
142142
logger: new(nopLogger),
143143

144144
seedBrokers: []string{"127.0.0.1"},
145-
maxVersions: kversion.Stable(),
145+
maxVersions: func() *kversion.Versions { s := kversion.Stable(); return &s }(),
146146

147147
retryBackoff: func() func(int) time.Duration {
148148
var rngMu sync.Mutex
@@ -300,7 +300,7 @@ func SeedBrokers(seeds ...string) Opt {
300300
// requests, it is recommended to pin versions so that new fields on requests
301301
// do not get invalid default zero values before you update your usage.
302302
func MaxVersions(versions kversion.Versions) Opt {
303-
return clientOpt{func(cfg *cfg) { cfg.maxVersions = versions }}
303+
return clientOpt{func(cfg *cfg) { cfg.maxVersions = &versions }}
304304
}
305305

306306
// MinVersions sets the minimum Kafka version a request can be downgraded to,
@@ -316,7 +316,7 @@ func MaxVersions(versions kversion.Versions) Opt {
316316
// versions, the request is allowed. It is assumed that there is no lower bound
317317
// for that request.
318318
func MinVersions(versions kversion.Versions) Opt {
319-
return clientOpt{func(cfg *cfg) { cfg.minVersions = versions }}
319+
return clientOpt{func(cfg *cfg) { cfg.minVersions = &versions }}
320320
}
321321

322322
// RetryBackoff sets the backoff strategy for how long to backoff for a given

0 commit comments

Comments
 (0)