Skip to content

Commit

Permalink
kgo: add min fetch bytes option
Browse files Browse the repository at this point in the history
Previously, this was hardcoded to be 1. May as well add it as an option.
  • Loading branch information
twmb committed Nov 16, 2020
1 parent 43cf65b commit 7f30228
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
14 changes: 14 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type cfg struct {

// ***CONSUMER SECTION***
maxWait int32
minBytes int32
maxBytes int32
maxPartBytes int32
resetOffset Offset
Expand Down Expand Up @@ -196,6 +197,7 @@ func defaultCfg() cfg {
partitioner: StickyKeyPartitioner(nil), // default to how Kafka partitions

maxWait: 5000,
minBytes: 1,
maxBytes: 50 << 20,
maxPartBytes: 10 << 20,
resetOffset: NewOffset().AtStart(),
Expand Down Expand Up @@ -662,6 +664,18 @@ func FetchMaxBytes(b int32) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.maxBytes = b }}
}

// FetchMinBYtes sets the minimum amount of bytes a broker will try to send
// during a fetch, overriding the default 1 byte.
//
// With the default of 1, data is sent as soon as it is available. By bumping
// this, the broker will try to wait for more data, which may improve server
// throughput at the expense of added latency.
//
// This corresponds to the Java fetch.min.bytes setting.
func FetchMinBytes(b int32) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.minBytes = b }}
}

// FetchMaxPartitionBytes sets the maximum amount of bytes that will be
// consumed for a single partition in a fetch request, overriding the default
// 10MiB. Note that if a single batch is larger than this number, that batch
Expand Down
4 changes: 3 additions & 1 deletion pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ func (s *source) unuseAll(reqOffsets map[string]map[int32]*seqOffsetFrom) {
func (s *source) createReq() (req *fetchRequest, again bool) {
req = &fetchRequest{
maxWait: s.cl.cfg.maxWait,
minBytes: s.cl.cfg.minBytes,
maxBytes: s.cl.cfg.maxBytes,
maxPartBytes: s.cl.cfg.maxPartBytes,
rack: s.cl.cfg.rack,
Expand Down Expand Up @@ -1045,6 +1046,7 @@ func v1MessageToRecord(
type fetchRequest struct {
version int16
maxWait int32
minBytes int32
maxBytes int32
maxPartBytes int32
rack string
Expand Down Expand Up @@ -1094,7 +1096,7 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
Version: f.version,
ReplicaID: -1,
MaxWaitMillis: f.maxWait,
MinBytes: 1,
MinBytes: f.minBytes,
MaxBytes: f.maxBytes,
IsolationLevel: f.isolationLevel,
SessionID: f.session.id,
Expand Down

0 comments on commit 7f30228

Please sign in to comment.