Skip to content

Commit

Permalink
consumer group: document actual behavior for on revoked / lost
Browse files Browse the repository at this point in the history
The code explicitly set a no-op OnLost internally if there was no user
OnLost specified. We now formalize this in documentation and in the
proper section in code. Unlike the Java client, we are not starting from
a history of always calling OnRevoked, so we can explicitly document
that OnLost must be specified if users are interested.

We now also ensure we duplicate the map passed to the callbacks so that
users can own them and mutate them as they desire. We also document that
the callbacks are never called concurrently.
  • Loading branch information
twmb committed Sep 14, 2021
1 parent b87af2d commit 8f648e7
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 17 deletions.
25 changes: 17 additions & 8 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,9 @@ func RequireStableFetchOffsets() GroupOpt {
//
// The OnPartitionsAssigned function is passed the client's context, which is
// only canceled if the client is closed.
//
// This function is not called concurrent with any other On callback, and this
// function is given a new map that the user is free to modify.
func OnPartitionsAssigned(onAssigned func(context.Context, *Client, map[string][]int32)) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.onAssigned, cfg.setAssigned = onAssigned, true }}
}
Expand All @@ -1285,13 +1288,14 @@ func OnPartitionsAssigned(onAssigned func(context.Context, *Client, map[string][
// completes.
//
// The OnPartitionsRevoked function is passed the client's context, which is
// only canceled if the client is closed.
//
// OnPartitionsRevoked function is called at the end of a group session even if
// there are no partitions being revoked.
//
// If you are committing offsets manually (have disabled autocommitting), it is
// highly recommended to do a proper blocking commit in OnPartitionsRevoked.
// only canceled if the client is closed. OnPartitionsRevoked function is
// called at the end of a group session even if there are no partitions being
// revoked. If you are committing offsets manually (have disabled
// autocommitting), it is highly recommended to do a proper blocking commit in
// OnPartitionsRevoked.
//
// This function is not called concurrent with any other On callback, and this
// function is given a new map that the user is free to modify.
func OnPartitionsRevoked(onRevoked func(context.Context, *Client, map[string][]int32)) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.onRevoked, cfg.setRevoked = onRevoked, true }}
}
Expand All @@ -1302,7 +1306,12 @@ func OnPartitionsRevoked(onRevoked func(context.Context, *Client, map[string][]i
// commits will succeed when partitions are outright lost, whereas commits
// likely will succeed when revoking partitions.
//
// If not set, OnPartitionsRevoked is used.
// If this is not set, you will not know when a group error occurs that
// forcefully loses all partitions. If you wish to use the same callback for
// lost and revoked, you can use OnPartitionsLostAsRevoked as a shortcut.
//
// This function is not called concurrent with any other On callback, and this
// function is given a new map that the user is free to modify.
func OnPartitionsLost(onLost func(context.Context, *Client, map[string][]int32)) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.onLost, cfg.setLost = onLost, true }}
}
Expand Down
23 changes: 14 additions & 9 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,17 @@ func (c *consumer) initGroup() {
ctxExpired = true
default:
}
cl.cfg.logger.Log(LogLevelDebug, "entering "+name, "with", m, "context_expired", ctxExpired)
if ctxExpired {
cl.cfg.logger.Log(LogLevelDebug, "entering "+name, "with", m, "context_expired", ctxExpired)
} else {
cl.cfg.logger.Log(LogLevelDebug, "entering "+name, "with", m)
}
if user != nil {
user(ctx, cl, m)
dup := make(map[string][]int32)
for k, vs := range m {
dup[k] = append([]int32(nil), vs...)
}
user(ctx, cl, dup)
}
}
}
Expand Down Expand Up @@ -256,15 +264,12 @@ func (g *groupConsumer) manage() {
// the cooperative consumer we may as well just also
// include the eager consumer.
g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned)
} else if g.cfg.onLost != nil {
} else {
// Any other error is perceived as a fatal error,
// and we go into OnLost as appropriate.
g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned)
hook()

} else if g.cfg.onRevoked != nil {
// If OnLost is not specified, we fallback to OnRevoked.
g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned)
if g.cfg.onLost != nil {
g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned)
}
hook()
}

Expand Down

0 comments on commit 8f648e7

Please sign in to comment.