From 303186a3a720c45a2e71333a6310b06ff4aa097d Mon Sep 17 00:00:00 2001 From: Alex Kesler Date: Fri, 4 Dec 2020 16:04:42 -0700 Subject: [PATCH] Add BrokerThrottleHook --- pkg/kgo/broker.go | 5 +++++ pkg/kgo/hooks.go | 16 ++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 331563d5..991741c1 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -850,6 +850,11 @@ func (cxn *brokerCxn) handleResps() { if throttleUntil > cxn.throttleUntil { atomic.StoreInt64(&cxn.throttleUntil, throttleUntil) } + cxn.cl.cfg.hooks.each(func(h Hook) { + if h, ok := h.(BrokerThrottleHook); ok { + h.OnThrottle(cxn.b.meta, time.Duration(millis)*time.Millisecond, throttlesAfterResp) + } + }) } } } diff --git a/pkg/kgo/hooks.go b/pkg/kgo/hooks.go index 4ce45b9d..863e965b 100644 --- a/pkg/kgo/hooks.go +++ b/pkg/kgo/hooks.go @@ -70,3 +70,19 @@ type BrokerReadHook interface { // The bytes read does not count any tls overhead. OnRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) } + +// BrokerThrottleHook is called after a response to a request is read +// from a broker, and the response identifies throttling in effect. +type BrokerThrottleHook interface { + // OnThrottle is passed the broker metadata, the imposed throttling + // interval, and whether the throttle was applied before Kafka + // responded to them request or after. + // + // For Kafka < 2.0.0, the throttle is applied before issuing a response. + // For Kafka >= 2.0.0, the throttle is applied after issuing a response. + // + // If throttledAfterResponse is false, then Kafka already applied the + // throttle. If it is true, the client internally will not send another + // request until the throttle deadline has passed. + OnThrottle(meta BrokerMetadata, throttleInterval time.Duration, throttledAfterResponse bool) +}