Skip to content

Commit

Permalink
client: log "seed #" rather than a large negative for seed brokers
Browse files Browse the repository at this point in the history
Previously, seed brokers logged with highly negative numbers, which was
confusing. Now, we log "seed 0", "seed 1", etc.

We still use negatives in int32 returns, but that is well documented:
basically, BrokerMetadata.NodeID documents the negative usage for seeds.

Lastly this cleans up some wording on producing to an unknown topic for
the first time.
  • Loading branch information
twmb committed Jun 30, 2021
1 parent 1e8eeba commit e48c03c
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 40 deletions.
45 changes: 26 additions & 19 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ type promisedResp struct {
readEnqueue time.Time
}

func logID(id int32) string {
if id >= -10 {
return strconv.FormatInt(int64(id), 10)
}
return "seed " + strconv.FormatInt(int64(id)-math.MinInt32, 10)
}

// BrokerMetadata is metadata for a broker.
//
// This struct mirrors kmsg.MetadataResponseBroker.
Expand Down Expand Up @@ -289,7 +296,7 @@ func (b *broker) handleReqs() {
// can only have an expiry if we went the authenticate
// flow, so we know we are authenticating again.
// For KIP-368.
cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl expiry limit reached, reauthenticating", "broker", cxn.b.meta.NodeID)
cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl expiry limit reached, reauthenticating", "broker", logID(cxn.b.meta.NodeID))
if err := cxn.sasl(); err != nil {
pr.promise(nil, err)
cxn.die()
Expand Down Expand Up @@ -426,11 +433,11 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn,
deadCh: make(chan struct{}),
}
if err = cxn.init(isProduceCxn); err != nil {
b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "broker", b.meta.NodeID, "err", err)
b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err)
cxn.closeConn()
return nil, err
}
b.cl.cfg.logger.Log(LogLevelDebug, "connection initialized successfully", "addr", b.addr, "broker", b.meta.NodeID)
b.cl.cfg.logger.Log(LogLevelDebug, "connection initialized successfully", "addr", b.addr, "broker", logID(b.meta.NodeID))

b.reapMu.Lock()
defer b.reapMu.Unlock()
Expand Down Expand Up @@ -513,7 +520,7 @@ func (b *broker) reapConnections(idleTimeout time.Duration) (total int) {

// connect connects to the broker's addr, returning the new connection.
func (b *broker) connect(ctx context.Context) (net.Conn, error) {
b.cl.cfg.logger.Log(LogLevelDebug, "opening connection to broker", "addr", b.addr, "broker", b.meta.NodeID)
b.cl.cfg.logger.Log(LogLevelDebug, "opening connection to broker", "addr", b.addr, "broker", logID(b.meta.NodeID))
start := time.Now()
conn, err := b.cl.cfg.dialFn(ctx, "tcp", b.addr)
since := time.Since(start)
Expand All @@ -523,10 +530,10 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) {
}
})
if err != nil {
b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "broker", b.meta.NodeID, "err", err)
b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err)
return nil, fmt.Errorf("unable to dial: %w", err)
} else {
b.cl.cfg.logger.Log(LogLevelDebug, "connection opened to broker", "addr", b.addr, "broker", b.meta.NodeID)
b.cl.cfg.logger.Log(LogLevelDebug, "connection opened to broker", "addr", b.addr, "broker", logID(b.meta.NodeID))
}
return conn, nil
}
Expand Down Expand Up @@ -574,13 +581,13 @@ func (cxn *brokerCxn) init(isProduceCxn bool) error {

if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) {
if err := cxn.requestAPIVersions(); err != nil {
cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", cxn.b.meta.NodeID, "err", err)
cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", logID(cxn.b.meta.NodeID), "err", err)
return err
}
}

if err := cxn.sasl(); err != nil {
cxn.cl.cfg.logger.Log(LogLevelError, "unable to initialize sasl", "broker", cxn.b.meta.NodeID, "err", err)
cxn.cl.cfg.logger.Log(LogLevelError, "unable to initialize sasl", "broker", logID(cxn.b.meta.NodeID), "err", err)
return err
}

Expand Down Expand Up @@ -613,7 +620,7 @@ start:
ClientSoftwareName: cxn.cl.cfg.softwareName,
ClientSoftwareVersion: cxn.cl.cfg.softwareVersion,
}
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing api versions request", "broker", cxn.b.meta.NodeID, "version", maxVersion)
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing api versions request", "broker", logID(cxn.b.meta.NodeID), "version", maxVersion)
corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req)
if writeErr != nil {
cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr)
Expand Down Expand Up @@ -646,7 +653,7 @@ start:
// EventHubs erroneously replies with v1, so we check
// for that as well.
srawResp == "\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00" {
cxn.cl.cfg.logger.Log(LogLevelDebug, "kafka does not know our ApiVersions version, downgrading to version 0 and retrying", "broker", cxn.b.meta.NodeID)
cxn.cl.cfg.logger.Log(LogLevelDebug, "kafka does not know our ApiVersions version, downgrading to version 0 and retrying", "broker", logID(cxn.b.meta.NodeID))
maxVersion = 0
goto start
}
Expand Down Expand Up @@ -682,7 +689,7 @@ start:
if mechanism.Name() != "GSSAPI" && cxn.versions[req.Key()] >= 0 {
req.Mechanism = mechanism.Name()
req.Version = cxn.versions[req.Key()]
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLHandshakeRequest", "broker", cxn.b.meta.NodeID)
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLHandshakeRequest", "broker", logID(cxn.b.meta.NodeID))
corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req)
if writeErr != nil {
cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr)
Expand Down Expand Up @@ -716,7 +723,7 @@ start:
}
authenticate = req.Version == 1
}
cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "broker", cxn.b.meta.NodeID, "mechanism", mechanism.Name(), "authenticate", authenticate)
cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "broker", logID(cxn.b.meta.NodeID), "mechanism", mechanism.Name(), "authenticate", authenticate)
cxn.mechanism = mechanism
return cxn.doSasl(authenticate)
}
Expand Down Expand Up @@ -751,7 +758,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
binary.BigEndian.PutUint32(buf, uint32(len(clientWrite)))
buf = append(buf, clientWrite...)

cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", cxn.b.meta.NodeID, "step", step)
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", logID(cxn.b.meta.NodeID), "step", step)
_, err, _, _, _ = cxn.writeConn(context.Background(), buf, wt, time.Now())

cxn.cl.bufPool.put(buf)
Expand All @@ -770,7 +777,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
SASLAuthBytes: clientWrite,
}
req.Version = cxn.versions[req.Key()]
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLAuthenticate", "broker", cxn.b.meta.NodeID, "version", req.Version, "step", step)
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLAuthenticate", "broker", logID(cxn.b.meta.NodeID), "version", req.Version, "step", step)

corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req)

Expand Down Expand Up @@ -823,7 +830,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
}
now := time.Now()
cxn.expiry = now.Add(time.Duration(lifetimeMillis)*time.Millisecond - time.Second)
cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl has a limited lifetime", "broker", cxn.b.meta.NodeID, "reauthenticate_in", cxn.expiry.Sub(now))
cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl has a limited lifetime", "broker", logID(cxn.b.meta.NodeID), "reauthenticate_in", cxn.expiry.Sub(now))
}
return nil
}
Expand Down Expand Up @@ -871,7 +878,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim
}
})
if logger := cxn.cl.cfg.logger; logger.Level() >= LogLevelDebug {
logger.Log(LogLevelDebug, fmt.Sprintf("wrote %s v%d", kmsg.NameForKey(req.Key()), req.GetVersion()), "broker", cxn.b.meta.NodeID, "bytes_written", bytesWritten, "write_wait", writeWait, "time_to_write", timeToWrite, "err", writeErr)
logger.Log(LogLevelDebug, fmt.Sprintf("wrote %s v%d", kmsg.NameForKey(req.Key()), req.GetVersion()), "broker", logID(cxn.b.meta.NodeID), "bytes_written", bytesWritten, "write_wait", writeWait, "time_to_write", timeToWrite, "err", writeErr)
}

if writeErr != nil {
Expand Down Expand Up @@ -1054,7 +1061,7 @@ func (cxn *brokerCxn) readResponse(
}
})
if logger := cxn.cl.cfg.logger; logger.Level() >= LogLevelDebug {
logger.Log(LogLevelDebug, fmt.Sprintf("read %s v%d", kmsg.NameForKey(key), version), "broker", cxn.b.meta.NodeID, "bytes_read", bytesRead, "read_wait", readWait, "time_to_read", timeToRead, "err", readErr)
logger.Log(LogLevelDebug, fmt.Sprintf("read %s v%d", kmsg.NameForKey(key), version), "broker", logID(cxn.b.meta.NodeID), "bytes_read", bytesRead, "read_wait", readWait, "time_to_read", timeToRead, "err", readErr)
}

if readErr != nil {
Expand Down Expand Up @@ -1247,9 +1254,9 @@ func (cxn *brokerCxn) handleResps() {
rawResp, err := cxn.readResponse(pr.ctx, pr.resp.Key(), pr.resp.GetVersion(), pr.corrID, pr.flexibleHeader, pr.readTimeout, pr.bytesWritten, pr.writeWait, pr.timeToWrite, pr.readEnqueue)
if err != nil {
if successes > 0 || len(cxn.b.cl.cfg.sasls) > 0 {
cxn.b.cl.cfg.logger.Log(LogLevelDebug, "read from broker errored, killing connection", "addr", cxn.b.addr, "id", cxn.b.meta.NodeID, "successful_reads", successes, "err", err)
cxn.b.cl.cfg.logger.Log(LogLevelDebug, "read from broker errored, killing connection", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "successful_reads", successes, "err", err)
} else {
cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker errored, killing connection after 0 successful responses (is sasl missing?)", "addr", cxn.b.addr, "id", cxn.b.meta.NodeID, "err", err)
cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker errored, killing connection after 0 successful responses (is sasl missing?)", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "err", err)
}
pr.promise(nil, err)
return
Expand Down
8 changes: 4 additions & 4 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func (cl *Client) waitUnknownTopic(
topic string,
unknown *unknownTopicProduces,
) {
cl.cfg.logger.Log(LogLevelInfo, "waiting for metadata to produce to unknown topic", "topic", topic)
cl.cfg.logger.Log(LogLevelInfo, "producing to a new topic for the first time, fetching metadata to learn its partitions", "topic", topic)
var after <-chan time.Time
if timeout := cl.cfg.recordTimeout; timeout > 0 {
timer := time.NewTimer(cl.cfg.recordTimeout)
Expand All @@ -633,10 +633,10 @@ func (cl *Client) waitUnknownTopic(
err = errRecordTimeout
case retriableErr, ok := <-unknown.wait:
if !ok {
cl.cfg.logger.Log(LogLevelInfo, "done waiting for unknown topic", "topic", topic)
cl.cfg.logger.Log(LogLevelInfo, "done waiting for metadata for new topic", "topic", topic)
return // metadata was successful!
}
cl.cfg.logger.Log(LogLevelInfo, "unknown topic wait failed, retrying wait", "topic", topic, "err", retriableErr)
cl.cfg.logger.Log(LogLevelInfo, "new topic metadata wait failed, retrying wait", "topic", topic, "err", retriableErr)
tries++
if int64(tries) >= cl.cfg.produceRetries {
err = fmt.Errorf("no partitions available after attempting to refresh metadata %d times, last err: %w", tries, retriableErr)
Expand All @@ -659,7 +659,7 @@ func (cl *Client) waitUnknownTopic(
if nowUnknown != unknown {
return
}
cl.cfg.logger.Log(LogLevelInfo, "unknown topic wait failed, done retrying, failing all records", "topic", topic)
cl.cfg.logger.Log(LogLevelInfo, "new topic metadata wait failed, done retrying, failing all records", "topic", topic, "err", err)

delete(p.unknownTopics, topic)
cl.failUnknownTopicRecords(topic, unknown, err)
Expand Down
28 changes: 14 additions & 14 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (s *sink) produce(sem <-chan struct{}) bool {
s.cl.cfg.logger.Log(LogLevelWarn, "unable to load producer ID, bumping client's buffered record load errors by 1 and retrying")
return true // whatever caused our produce, we did nothing, so keep going
default:
s.cl.cfg.logger.Log(LogLevelError, "fatal InitProducerID error, failing all buffered records", "broker", s.nodeID, "err", err)
s.cl.cfg.logger.Log(LogLevelError, "fatal InitProducerID error, failing all buffered records", "broker", logID(s.nodeID), "err", err)
fallthrough
case ErrClientClosed:
s.cl.failBufferedRecords(err)
Expand Down Expand Up @@ -321,7 +321,7 @@ func (s *sink) produce(sem <-chan struct{}) bool {
// because that can lead to undesirable behavior
// with produce request vs. end txn (KAFKA-12671)
s.cl.failProducerID(id, epoch, err)
s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", s.nodeID, "err", err)
s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", logID(s.nodeID), "err", err)
}
return false
}
Expand Down Expand Up @@ -413,7 +413,7 @@ func (s *sink) issueTxnReq(
for _, topic := range resp.Topics {
topicBatches, ok := req.batches[topic.Topic]
if !ok {
s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with topic in AddPartitionsToTxnResponse that was not in request", "broker", s.nodeID, "topic", topic.Topic)
s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with topic in AddPartitionsToTxnResponse that was not in request", "broker", logID(s.nodeID), "topic", topic.Topic)
continue
}
for _, partition := range topic.Partitions {
Expand All @@ -427,7 +427,7 @@ func (s *sink) issueTxnReq(

batch, ok := topicBatches[partition.Partition]
if !ok {
s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with partition in AddPartitionsToTxnResponse that was not in request", "broker", s.nodeID, "topic", topic.Topic, "partition", partition.Partition)
s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with partition in AddPartitionsToTxnResponse that was not in request", "broker", logID(s.nodeID), "topic", topic.Topic, "partition", partition.Partition)
continue
}

Expand Down Expand Up @@ -501,7 +501,7 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) {
s.cl.failBufferedRecords(ErrClientClosed)

default:
s.cl.cfg.logger.Log(LogLevelWarn, "random error while producing, requeueing unattempted request", "broker", s.nodeID, "err", err)
s.cl.cfg.logger.Log(LogLevelWarn, "random error while producing, requeueing unattempted request", "broker", logID(s.nodeID), "err", err)
fallthrough

case isRetriableBrokerErr(err):
Expand All @@ -525,7 +525,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response
defer func() {
update := b.String()
update = strings.TrimSuffix(update, ", ")
s.cl.cfg.logger.Log(LogLevelDebug, "produced", "broker", s.nodeID, "to", update)
s.cl.cfg.logger.Log(LogLevelDebug, "produced", "broker", logID(s.nodeID), "to", update)
}()
}

Expand Down Expand Up @@ -589,7 +589,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response
topic := rTopic.Topic
partitions, ok := req.batches[topic]
if !ok {
s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with topic in produce request that we did not produce to", "broker", s.nodeID, "topic", topic)
s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with topic in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", topic)
delete(req.metrics, topic)
continue // should not hit this
}
Expand All @@ -603,7 +603,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response
partition := rPartition.Partition
batch, ok := partitions[partition]
if !ok {
s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with partition in produce request that we did not produce to", "broker", s.nodeID, "topic", rTopic.Topic, "partition", partition)
s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with partition in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", rTopic.Topic, "partition", partition)
delete(tmetrics, partition)
continue // should not hit this
}
Expand Down Expand Up @@ -640,7 +640,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response
}

if len(req.batches) > 0 {
s.cl.cfg.logger.Log(LogLevelError, "Kafka did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", s.nodeID)
s.cl.cfg.logger.Log(LogLevelError, "Kafka did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", logID(s.nodeID))
s.handleRetryBatches(req.batches, 0, true, false)
}
if len(reqRetry) > 0 {
Expand Down Expand Up @@ -746,7 +746,7 @@ func (s *sink) handleReqRespBatch(

if s.cl.cfg.txnID != nil || s.cl.cfg.stopOnDataLoss {
s.cl.cfg.logger.Log(LogLevelInfo, "batch errored, failing the producer ID",
"broker", s.nodeID,
"broker", logID(s.nodeID),
"topic", topic,
"partition", partition,
"producer_id", producerID,
Expand Down Expand Up @@ -776,7 +776,7 @@ func (s *sink) handleReqRespBatch(
// We should not be here, since this error occurs in the
// context of transactions, which are caught above.
s.cl.cfg.logger.Log(LogLevelInfo, fmt.Sprintf("batch errored with %s, failing the producer ID and resetting all sequence numbers", err.(*kerr.Error).Message),
"broker", s.nodeID,
"broker", logID(s.nodeID),
"topic", topic,
"partition", partition,
"producer_id", producerID,
Expand All @@ -796,7 +796,7 @@ func (s *sink) handleReqRespBatch(

case err == kerr.DuplicateSequenceNumber: // ignorable, but we should not get
s.cl.cfg.logger.Log(LogLevelInfo, "received unexpected duplicate sequence number, ignoring and treating batch as successful",
"broker", s.nodeID,
"broker", logID(s.nodeID),
"topic", topic,
"partition", partition,
)
Expand All @@ -805,7 +805,7 @@ func (s *sink) handleReqRespBatch(
default:
if err != nil {
s.cl.cfg.logger.Log(LogLevelInfo, "batch in a produce request failed",
"broker", s.nodeID,
"broker", logID(s.nodeID),
"topic", topic,
"partition", partition,
"err", err,
Expand Down Expand Up @@ -1133,7 +1133,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
if len(recBuf.batches) == 0 {
return
}
recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err)
recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch", "broker", logID(recBuf.sink.nodeID), "topic", recBuf.topic, "partition", recBuf.partition, "err", err)
batch0 := recBuf.batches[0]
batch0.tries++
failErr := batch0.maybeFailErr(&recBuf.cl.cfg)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,15 +623,15 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
// If the epoch was zero, the broker did not even
// establish a session for us (and thus is maxed on
// sessions). We stop trying.
s.cl.cfg.logger.Log(LogLevelInfo, "session failed with SessionIDNotFound while trying to establish a session; broker likely maxed on sessions; continuing on without using sessions", "broker", s.nodeID)
s.cl.cfg.logger.Log(LogLevelInfo, "session failed with SessionIDNotFound while trying to establish a session; broker likely maxed on sessions; continuing on without using sessions", "broker", logID(s.nodeID))
s.session.kill()
} else {
s.cl.cfg.logger.Log(LogLevelInfo, "received SessionIDNotFound from our in use session, our session was likely evicted; resetting session", "broker", s.nodeID)
s.cl.cfg.logger.Log(LogLevelInfo, "received SessionIDNotFound from our in use session, our session was likely evicted; resetting session", "broker", logID(s.nodeID))
s.session.reset()
}
return
case kerr.InvalidFetchSessionEpoch:
s.cl.cfg.logger.Log(LogLevelInfo, "resetting fetch session", "broker", s.nodeID, "err", err)
s.cl.cfg.logger.Log(LogLevelInfo, "resetting fetch session", "broker", logID(s.nodeID), "err", err)
s.session.reset()
return
}
Expand Down

0 comments on commit e48c03c

Please sign in to comment.