Skip to content

Commit

Permalink
breaking: rename ConnTimeoutOverhead to RequestTimeoutOverhead
Browse files Browse the repository at this point in the history
The concept of a request timeout overhead is much clearer than a conn
timeout overhead. The new name is clearer as to the purpose of the
function.
  • Loading branch information
twmb committed Sep 1, 2021
1 parent 98128ef commit 2109ed4
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 65 deletions.
10 changes: 5 additions & 5 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (b *broker) handleReqs() {
continue
}

rt, _ := cxn.cl.connTimeoutFn(req)
rt, _ := cxn.cl.connTimeouter.timeouts(req)

cxn.waitResp(promisedResp{
pr.ctx,
Expand Down Expand Up @@ -664,7 +664,7 @@ start:
return writeErr
}

rt, _ := cxn.cl.connTimeoutFn(req)
rt, _ := cxn.cl.connTimeouter.timeouts(req)
// api versions does *not* use flexible response headers; see comment in promisedResp
rawResp, err := cxn.readResponse(nil, req.Key(), req.GetVersion(), corrID, false, rt, bytesWritten, writeWait, timeToWrite, readEnqueue)
if err != nil {
Expand Down Expand Up @@ -736,7 +736,7 @@ start:
return writeErr
}

rt, _ := cxn.cl.connTimeoutFn(req)
rt, _ := cxn.cl.connTimeouter.timeouts(req)
rawResp, err := cxn.readResponse(nil, req.Key(), req.GetVersion(), corrID, req.IsFlexible(), rt, bytesWritten, writeWait, timeToWrite, readEnqueue)
if err != nil {
return err
Expand Down Expand Up @@ -781,7 +781,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {

// Even if we do not wrap our reads/writes in SASLAuthenticate, we
// still use the SASLAuthenticate timeouts.
rt, wt := cxn.cl.connTimeoutFn(kmsg.NewPtrSASLAuthenticateRequest())
rt, wt := cxn.cl.connTimeouter.timeouts(kmsg.NewPtrSASLAuthenticateRequest())

// We continue writing until both the challenging is done AND the
// responses are done. We can have an additional response once we
Expand Down Expand Up @@ -918,7 +918,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim
cxn.corrID,
)

_, wt := cxn.cl.connTimeoutFn(req)
_, wt := cxn.cl.connTimeouter.timeouts(req)
bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue = cxn.writeConn(ctx, buf, wt, enqueuedForWritingAt)

cxn.cl.bufPool.put(buf)
Expand Down
96 changes: 49 additions & 47 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Client struct {
sinksAndSources map[int32]sinkAndSource

reqFormatter *kmsg.RequestFormatter
connTimeoutFn func(kmsg.Request) (time.Duration, time.Duration)
connTimeouter connTimeouter

bufPool bufPool // for to brokers to share underlying reusable request buffers
pnrPool pnrPool // for sinks to reuse []promisedNumberedRecord
Expand Down Expand Up @@ -154,7 +154,7 @@ func NewClient(opts ...Opt) (*Client, error) {
sinksAndSources: make(map[int32]sinkAndSource),

reqFormatter: kmsg.NewRequestFormatter(),
connTimeoutFn: connTimeoutBuilder(cfg.connTimeoutOverhead),
connTimeouter: connTimeouter{def: cfg.requestTimeoutOverhead},

bufPool: newBufPool(),
pnrPool: newPnrPool(),
Expand Down Expand Up @@ -245,55 +245,57 @@ func parseBrokerAddr(addr string) (hostport, error) {
return hostport{h, int32(port)}, nil
}

func connTimeoutBuilder(def time.Duration) func(kmsg.Request) (time.Duration, time.Duration) {
var joinMu sync.Mutex
var lastRebalanceTimeout time.Duration
type connTimeouter struct {
def time.Duration
joinMu sync.Mutex
lastRebalanceTimeout time.Duration
}

return func(req kmsg.Request) (read, write time.Duration) {
millis := func(m int32) time.Duration { return time.Duration(m) * time.Millisecond }
switch t := req.(type) {
default:
if timeoutRequest, ok := req.(kmsg.TimeoutRequest); ok {
timeoutMillis := timeoutRequest.Timeout()
return def + millis(timeoutMillis), def
}
return def, def

case *produceRequest:
return def + millis(t.timeout), def
case *fetchRequest:
return def + millis(t.maxWait), def
case *kmsg.FetchRequest:
return def + millis(t.MaxWaitMillis), def

// SASL may interact with an external system; we give each step
// of the read process 30s by default.

case *kmsg.SASLHandshakeRequest,
*kmsg.SASLAuthenticateRequest:
return 30 * time.Second, def

// Join and sync can take a long time. Sync has no notion of
// timeouts, but since the flow of requests should be first
// join, then sync, we can stash the timeout from the join.

case *kmsg.JoinGroupRequest:
joinMu.Lock()
lastRebalanceTimeout = millis(t.RebalanceTimeoutMillis)
joinMu.Unlock()

return def + millis(t.RebalanceTimeoutMillis), def
case *kmsg.SyncGroupRequest:
read := def
joinMu.Lock()
if lastRebalanceTimeout != 0 {
read = lastRebalanceTimeout
}
joinMu.Unlock()
func (c *connTimeouter) timeouts(req kmsg.Request) (r, w time.Duration) {
def := c.def
millis := func(m int32) time.Duration { return time.Duration(m) * time.Millisecond }
switch t := req.(type) {
default:
if timeoutRequest, ok := req.(kmsg.TimeoutRequest); ok {
timeoutMillis := timeoutRequest.Timeout()
return def + millis(timeoutMillis), def
}
return def, def

case *produceRequest:
return def + millis(t.timeout), def
case *fetchRequest:
return def + millis(t.maxWait), def
case *kmsg.FetchRequest:
return def + millis(t.MaxWaitMillis), def

return read, def
// SASL may interact with an external system; we give each step
// of the read process 30s by default.

case *kmsg.SASLHandshakeRequest,
*kmsg.SASLAuthenticateRequest:
return 30 * time.Second, def

// Join and sync can take a long time. Sync has no notion of
// timeouts, but since the flow of requests should be first
// join, then sync, we can stash the timeout from the join.

case *kmsg.JoinGroupRequest:
c.joinMu.Lock()
c.lastRebalanceTimeout = millis(t.RebalanceTimeoutMillis)
c.joinMu.Unlock()

return def + millis(t.RebalanceTimeoutMillis), def
case *kmsg.SyncGroupRequest:
read := def
c.joinMu.Lock()
if c.lastRebalanceTimeout != 0 {
read = c.lastRebalanceTimeout
}
c.joinMu.Unlock()

return read, def

}
}

Expand Down
29 changes: 16 additions & 13 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ type cfg struct {
// GENERAL SECTION //
/////////////////////

id *string // client ID
dialFn func(context.Context, string, string) (net.Conn, error)
connTimeoutOverhead time.Duration
connIdleTimeout time.Duration
id *string // client ID
dialFn func(context.Context, string, string) (net.Conn, error)
requestTimeoutOverhead time.Duration
connIdleTimeout time.Duration

softwareName string // KIP-511
softwareVersion string // KIP-511
Expand Down Expand Up @@ -263,9 +263,9 @@ func (cfg *cfg) validate() error {
// 0 <= allowed concurrency
{name: "max concurrent fetches", v: int64(cfg.maxConcurrentFetches), allowed: 0, badcmp: i64lt},

// 1s <= conn timeout overhead <= 15m
{name: "conn timeout max overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(15 * time.Minute), badcmp: i64gt, durs: true},
{name: "conn timeout min overhead", v: int64(cfg.connTimeoutOverhead), allowed: int64(time.Second), badcmp: i64lt, durs: true},
// 1s <= request timeout overhead <= 15m
{name: "request timeout max overhead", v: int64(cfg.requestTimeoutOverhead), allowed: int64(15 * time.Minute), badcmp: i64gt, durs: true},
{name: "request timeout min overhead", v: int64(cfg.requestTimeoutOverhead), allowed: int64(time.Second), badcmp: i64lt, durs: true},

// 1s <= conn idle <= 15m
{name: "conn min idle timeout", v: int64(cfg.connIdleTimeout), allowed: int64(time.Second), badcmp: i64lt, durs: true},
Expand Down Expand Up @@ -368,8 +368,8 @@ func defaultCfg() cfg {
id: &defaultID,
dialFn: defaultDialer.DialContext,

connTimeoutOverhead: 20 * time.Second,
connIdleTimeout: 20 * time.Second,
requestTimeoutOverhead: 20 * time.Second,
connIdleTimeout: 20 * time.Second,

softwareName: "kgo",
softwareVersion: "0.1.0",
Expand Down Expand Up @@ -500,7 +500,7 @@ func WithLogger(l Logger) Opt {
return clientOpt{func(cfg *cfg) { cfg.logger = &wrappedLogger{l} }}
}

// ConnTimeoutOverhead uses the given time as overhead while deadlining
// RequestTimeoutOverhead uses the given time as overhead while deadlining
// requests, overriding the default overhead of 20s.
//
// For most requests, the overhead will simply be this timeout. However, for
Expand All @@ -512,10 +512,13 @@ func WithLogger(l Logger) Opt {
// For writes, the timeout is always the overhead. We buffer writes in our
// client before one quick flush, so we always expect the write to be fast.
//
// Note that hitting the timeout kills a connection, which will fail any other
// active writes or reads on the connection.
//
// This option is roughly equivalent to request.timeout.ms, but grants
// additional time to requests that have timeout fields.
func ConnTimeoutOverhead(overhead time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.connTimeoutOverhead = overhead }}
func RequestTimeoutOverhead(overhead time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.requestTimeoutOverhead = overhead }}
}

// ConnIdleTimeout is a rough amount of time to allow connections to idle
Expand All @@ -530,7 +533,7 @@ func ConnTimeoutOverhead(overhead time.Duration) Opt {
//
// Connections are not reaped if they are actively being written to or read
// from; thus, a request can take a really long time itself and not be reaped
// (however, this may lead to the ConnTimeoutOverhead).
// (however, this may lead to the RequestTimeoutOverhead).
func ConnIdleTimeout(timeout time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.connIdleTimeout = timeout }}
}
Expand Down

0 comments on commit 2109ed4

Please sign in to comment.