Skip to content

Commit

Permalink
rewrite the consumer & source code; other bug fixes
Browse files Browse the repository at this point in the history
This is a very large commit that aims to make a lot of things simpler to
reason about in the consumer code, and then includes a few other
bugfixes bundled in that I noticed along the way.

Before this, the consumer used a bunch of sequence numbers to track
changes across assignments / etc. These sequence numbers were then used
to knife out stale partitions / ignore in-progress work that was
completing for stale partitions, while still accepting the non-stale
results.

This sequence number tracking and partition knifing was actually really
complicated and difficult to reason about, and I was never fully
confident in it. I knew there was some bug somewhere, and getting it to
work in the first place had me tracking down some really complicated
bugs to begin with.

This mostly rewrites the source and consumer code to ideally simplify
things. The state of partitions is no longer tracked through sequence
numbers that can be bumped whenever; instead, we use consumer sessions
that are stopped and started in full, and modifications happen while
stopped. This also attempts to simplify state changes from basically
happening anywhere to happening in fewer functions, and then adds a lot
of long winded documentation for why the state changes are safe.

Some simplifications still remain, but this is passing integration tests
right now. I'd like to clarify usedCursors, and to switch the useState
in a cursor to only have two states rather than three.

I'll shortly be following this with some consumer group API changes as
well, and will be simplifying the guts of a consumer group. Some changes
were already made to make things clearer, and to have fewer blocking
functions.

I noticed that fetch sessions had some lock inversion; that's fixed, and
the mutex aspects of a source / cursor have been drastically simplified.
The session also looked to have some flaws on unhappy path errors;
ideally those have been fixed, but they're hard to test.

I noticed that sinks and sources never updated their broker pointer,
which would be problematic if a broker changed but kept the same node
ID. There may have been some stuff in the metadata that fixed this up,
but regardless it was not the cleanest flow. The broker pointer was
definitely never updated. This commit changes that by putting sinks and
sources as a dedicated nodeID => sink/source field in the client itself,
and the sink/source looks up the broker pointer to use whenever issuing
a request. This allows brokers to change at any moment. This also moves
the sequenced async request from the broker to the sink itself, since
the sink is the only one that needed that, and putting it on the sink
allows brokers to change between requests.

This also removes the potential for a recBuf's sink or a cursor's source
to ever be nil. Instead, we default to the first seed broker. This is an
OK compromise because it allows us to start sending (and receiving
errors) immediately, while we expect to always have the relevant leader
broker loaded. This simplifies using a sink or a source, and also allows
things to begin notifying of load errors.
  • Loading branch information
twmb committed Dec 31, 2020
1 parent 385cecb commit 539b06c
Show file tree
Hide file tree
Showing 12 changed files with 1,923 additions and 1,698 deletions.
62 changes: 2 additions & 60 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@ type promisedResp struct {
enqueue time.Time // used to calculate readWait
}

type waitingResp struct {
resp kmsg.Response
promise func(kmsg.Response, error)
err error
}

var unknownMetadata = BrokerMetadata{
NodeID: -1,
}
Expand Down Expand Up @@ -119,21 +113,6 @@ type broker struct {
cxnProduce *brokerCxn
cxnFetch *brokerCxn

// sink and source exist so that metadata updates can copy these
// pointers to a topicPartition's record's sink field and consumption's
// source field.
//
// Brokers are created with these two fields initialized; when a topic
// partition wants to use the broker, it copies these pointers.
sink *sink
source *source

// seqResps, guarded by seqRespsMu, contains responses that must be
// handled sequentially. These responses are handled asyncronously,
// but sequentially.
seqRespsMu sync.Mutex
seqResps []waitingResp

// dieMu guards sending to reqs in case the broker has been
// permanently stopped.
dieMu sync.RWMutex
Expand Down Expand Up @@ -166,8 +145,6 @@ func (cl *Client) newBroker(nodeID int32, host string, port int32, rack *string)

reqs: make(chan promisedReq, 10),
}
br.sink = newSink(cl, br)
br.source = newSource(cl, br)
go br.handleReqs()

return br
Expand Down Expand Up @@ -219,43 +196,6 @@ func (b *broker) do(
}
}

// doSequencedAsyncPromise is the same as do, but all requests using this
// function have their responses handled sequentially.
//
// This is important for example for ordering of produce requests.
//
// Note that the requests may finish out of order (e.g. dead connection kills
// latter request); this is handled appropriately in producing.
func (b *broker) doSequencedAsyncPromise(
ctx context.Context,
req kmsg.Request,
promise func(kmsg.Response, error),
) {
b.do(ctx, req, func(resp kmsg.Response, err error) {
b.seqRespsMu.Lock()
b.seqResps = append(b.seqResps, waitingResp{resp, promise, err})
if len(b.seqResps) == 1 {
go b.handleSeqResp(b.seqResps[0])
}
b.seqRespsMu.Unlock()
})
}

// handleSeqResp handles a sequenced response while there is one.
func (b *broker) handleSeqResp(wr waitingResp) {
more:
wr.promise(wr.resp, wr.err)

b.seqRespsMu.Lock()
b.seqResps = b.seqResps[1:]
if len(b.seqResps) > 0 {
wr = b.seqResps[0]
b.seqRespsMu.Unlock()
goto more
}
b.seqRespsMu.Unlock()
}

// waitResp runs a req, waits for the resp and returns the resp and err.
func (b *broker) waitResp(ctx context.Context, req kmsg.Request) (kmsg.Response, error) {
var resp kmsg.Response
Expand Down Expand Up @@ -734,6 +674,8 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, writeWait time.Duration,
}
}

// TODO: write in a goroutine, use ctx to allow for early cancel.

buf := cxn.cl.bufPool.get()
defer cxn.cl.bufPool.put(buf)
buf = cxn.cl.reqFormatter.AppendRequest(
Expand Down
29 changes: 25 additions & 4 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ type Client struct {
anyBrokerIdx int
stopBrokers bool // set to true on close to stop updateBrokers

// A sink and a source is created once per node ID and persists
// forever. We expect the list to be small.
//
// The mutex only exists to allow consumer session stopping to read
// sources to notify when starting a session; all writes happen in the
// metadata loop.
sinksAndSourcesMu sync.Mutex
sinksAndSources map[int32]sinkAndSource

reqFormatter *kmsg.RequestFormatter
connTimeoutFn func(kmsg.Request) (time.Duration, time.Duration)

Expand Down Expand Up @@ -88,6 +97,11 @@ type Client struct {
metadone chan struct{}
}

type sinkAndSource struct {
sink *sink
source *source
}

// NewClient returns a new Kafka client with the given options or an error if
// the options are invalid. Connections to brokers are lazily created only when
// requests are written to them.
Expand Down Expand Up @@ -145,6 +159,8 @@ func NewClient(opts ...Opt) (*Client, error) {
controllerID: unknownControllerID,
brokers: make(map[int32]*broker),

sinksAndSources: make(map[int32]sinkAndSource),

reqFormatter: new(kmsg.RequestFormatter),
connTimeoutFn: connTimeoutBuilder(cfg.connTimeoutOverhead),

Expand Down Expand Up @@ -255,7 +271,7 @@ func connTimeoutBuilder(defaultTimeout time.Duration) func(kmsg.Request) (time.D

// broker returns a random broker from all brokers ever known.
func (cl *Client) broker() *broker {
cl.brokersMu.Lock()
cl.brokersMu.Lock() // full lock needed for anyBrokerIdx below
defer cl.brokersMu.Unlock()

if cl.anyBrokerIdx >= len(cl.anyBroker) { // metadata update lost us brokers
Expand Down Expand Up @@ -400,15 +416,19 @@ func (cl *Client) Close() {
cl.stopBrokers = true
for _, broker := range cl.brokers {
broker.stopForever()
broker.sink.maybeDrain() // awaken anything in backoff
broker.source.maybeConsume() // same
}
cl.brokersMu.Unlock()

// Wait for metadata to quit so we know no more erroring topic
// partitions will be created.
// partitions will be created. After metadata has quit, we can
// safely stop sinks and sources, as no more will be made.
<-cl.metadone

for _, sns := range cl.sinksAndSources {
sns.sink.maybeDrain() // awaken anything in backoff
sns.source.maybeConsume() // same
}

// We must manually fail all partitions that never had a sink.
for _, partitions := range cl.loadTopics() {
for _, partition := range partitions.load().all {
Expand Down Expand Up @@ -602,6 +622,7 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
}
// fetchMetadata does its own retrying, so we do not do
// retrying here.
// TODO also needs auto topic create
br, resp, err := cl.fetchMetadata(ctx, metaReq.Topics == nil, topics)
return shards(shard(br, req, resp, err)), nil

Expand Down
3 changes: 2 additions & 1 deletion pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ func AutoTopicCreation() Opt {
// defaults to 100MiB.
//
// The only Kafka request that could come reasonable close to hitting this
// limit should be produce requests.
// limit should be produce requests, and thus this limit is only enforced for
// produce requests.
func BrokerMaxWriteBytes(v int32) Opt {
return clientOpt{func(cfg *cfg) { cfg.maxBrokerWriteBytes = v }}
}
Expand Down
Loading

0 comments on commit 539b06c

Please sign in to comment.