Skip to content

Commit 7ac5aaf

Browse files
committed
kgo: add MinVersions
This allows ensuring that you will not be bumped down if you are talking to an old broker, which is necessary if you absolutely want to use some new features.
1 parent 73bd272 commit 7ac5aaf

File tree

3 files changed

+48
-5
lines changed

3 files changed

+48
-5
lines changed

Diff for: pkg/kgo/broker.go

+24-3
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,15 @@ func (b *broker) handleReqs() {
293293
pr.promise(nil, ErrUnknownRequestKey)
294294
continue
295295
}
296+
297+
// If cxn.versions[0] is non-negative, then we loaded API
298+
// versions. If the version for this request is negative, we
299+
// know the broker cannot handle this request.
300+
if cxn.versions[0] >= 0 && cxn.versions[req.Key()] < 0 {
301+
pr.promise(nil, ErrBrokerTooOld)
302+
continue
303+
}
304+
296305
ourMax := req.MaxVersion()
297306
if b.cl.cfg.maxVersions != nil {
298307
userMax := b.cl.cfg.maxVersions[req.Key()]
@@ -301,12 +310,24 @@ func (b *broker) handleReqs() {
301310
}
302311
}
303312

304-
// If brokerMax is negative, we have no api versions because
305-
// the client is pinned pre 0.10.0 and we stick with our max.
313+
// If brokerMax is negative at this point, we have no api
314+
// versions because the client is pinned pre 0.10.0 and we
315+
// stick with our max.
306316
version := ourMax
307317
if brokerMax := cxn.versions[req.Key()]; brokerMax >= 0 && brokerMax < ourMax {
308318
version = brokerMax
309319
}
320+
321+
// If the version now (after potential broker downgrading) is
322+
// lower than we desire, we fail the request for the broker is
323+
// too old.
324+
if b.cl.cfg.minVersions != nil &&
325+
int(req.Key()) < len(b.cl.cfg.minVersions) &&
326+
version < b.cl.cfg.minVersions[req.Key()] {
327+
pr.promise(nil, ErrBrokerTooOld)
328+
continue
329+
}
330+
310331
req.SetVersion(version) // always go for highest version
311332

312333
if !cxn.expiry.IsZero() && time.Now().After(cxn.expiry) {
@@ -661,7 +682,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
661682

662683
if err = kerr.ErrorForCode(resp.ErrorCode); err != nil {
663684
if resp.ErrorMessage != nil {
664-
return fmt.Errorf("%s: %v", *resp.ErrorMessage, err)
685+
return fmt.Errorf("%s: %w", *resp.ErrorMessage, err)
665686
}
666687
return err
667688
}

Diff for: pkg/kgo/config.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type cfg struct {
5656

5757
seedBrokers []string
5858
maxVersions kversion.Versions
59+
minVersions kversion.Versions
5960

6061
retryBackoff func(int) time.Duration
6162
retries int
@@ -288,7 +289,7 @@ func SeedBrokers(seeds ...string) Opt {
288289
}
289290

290291
// MaxVersions sets the maximum Kafka version to try, overriding the
291-
// internal unbounded (latest) versions.
292+
// internal unbounded (latest stable) versions.
292293
//
293294
// Note that specific max version pinning is required if trying to interact
294295
// with versions pre 0.10.0. Otherwise, unless using more complicated requests
@@ -300,6 +301,22 @@ func MaxVersions(versions kversion.Versions) Opt {
300301
return clientOpt{func(cfg *cfg) { cfg.maxVersions = versions }}
301302
}
302303

304+
// MinVersions sets the minimum Kafka version a request can be downgraded to,
305+
// overriding the default of the lowest version.
306+
//
307+
// This option is useful if you are issuing requests that you absolutely do not
308+
// want to be downgraded; that is, if you are relying on features in newer
309+
// requests, and you are not sure if your brokers can handle those features.
310+
// By setting a min version, if the client detects it needs to downgrade past
311+
// the version, it will instead avoid issuing the request.
312+
//
313+
// Unlike MaxVersions, if a request is issued that is unknown to the min
314+
// versions, the request is allowed. It is assumed that there is no lower bound
315+
// for that request.
316+
func MinVersions(versions kversion.Versions) Opt {
317+
return clientOpt{func(cfg *cfg) { cfg.minVersions = versions }}
318+
}
319+
303320
// RetryBackoff sets the backoff strategy for how long to backoff for a given
304321
// amount of retries, overriding the default exponential backoff that ranges
305322
// from 100ms min to 1s max.

Diff for: pkg/kgo/errors.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@ var (
1212

1313
// ErrClientTooOld is returned when issuing request that are unknown or
1414
// use an unknown version.
15-
ErrClientTooOld = errors.New("client is too old; this client does not know what to do with this")
15+
ErrClientTooOld = errors.New("client is too old; this client does not know what to do with this request")
16+
17+
// ErrBrokerTooOld is returned if a connection has loaded broker
18+
// ApiVersions and knows that a broker cannot handle the request that
19+
// is attempting to be issued.
20+
ErrBrokerTooOld = errors.New("broker is too old; the broker has already indicated it will not know how to handle the request")
1621

1722
// ErrNoResp is the error used if Kafka does not reply to a topic or
1823
// partition in a produce request. This error should never be seen.

0 commit comments

Comments
 (0)