Skip to content

Commit

Permalink
broker: bugfix e2e
Browse files Browse the repository at this point in the history
While `err` to `writeErr` for the e2e integration, I forgot to update
the error used in the promise in *one* location. This fixes that, and
makes it so that the err variable no longer exists at that point anyway
now by scoping it.
  • Loading branch information
twmb committed Jun 2, 2021
1 parent 5576dce commit a9691bd
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,13 @@ func (b *broker) handleReqs() {

for pr := range b.reqs {
req := pr.req
cxn, err := b.loadConnection(pr.ctx, req.Key())
if err != nil {
pr.promise(nil, err)
continue
var cxn *brokerCxn
{
var err error
if cxn, err = b.loadConnection(pr.ctx, req.Key()); err != nil {
pr.promise(nil, err)
continue
}
}

if int(req.Key()) > len(cxn.versions[:]) ||
Expand Down Expand Up @@ -287,7 +290,7 @@ func (b *broker) handleReqs() {
// flow, so we know we are authenticating again.
// For KIP-368.
cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl expiry limit reached, reauthenticating", "broker", cxn.b.meta.NodeID)
if err = cxn.sasl(); err != nil {
if err := cxn.sasl(); err != nil {
pr.promise(nil, err)
cxn.die()
continue
Expand Down Expand Up @@ -338,7 +341,7 @@ func (b *broker) handleReqs() {
corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(pr.ctx, pr.enqueue, req)

if writeErr != nil {
pr.promise(nil, err)
pr.promise(nil, writeErr)
cxn.die()
cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr)
continue
Expand Down

0 comments on commit a9691bd

Please sign in to comment.