Skip to content

Commit

Permalink
kfake: support committing to non-existing groups
Browse files Browse the repository at this point in the history
Used to test the prior commit
  • Loading branch information
twmb committed May 26, 2024
1 parent a7caf20 commit 18e2cc3
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 36 deletions.
10 changes: 2 additions & 8 deletions pkg/kfake/08_offset_commit.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
package kfake

import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
)

func init() { regKey(8, 0, 8) }

func (c *Cluster) handleOffsetCommit(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.OffsetCommitRequest)
resp := req.ResponseKind().(*kmsg.OffsetCommitResponse)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
return nil, err
}

if c.groups.handleOffsetCommit(creq) {
return nil, nil
}

fillOffsetCommit(req, resp, kerr.GroupIDNotFound.Code)
return resp, nil
c.groups.handleOffsetCommit(creq)
return nil, nil
}
105 changes: 77 additions & 28 deletions pkg/kfake/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,20 @@ func generateMemberID(clientID string, instanceID *string) string {
// GROUPS //
////////////

func (gs *groups) newGroup(name string) *group {
return &group{
c: gs.c,
gs: gs,
name: name,
members: make(map[string]*groupMember),
pending: make(map[string]*groupMember),
protocols: make(map[string]int),
reqCh: make(chan *clientReq),
controlCh: make(chan func()),
quitCh: make(chan struct{}),
}
}

// handleJoin completely hijacks the incoming request.
func (gs *groups) handleJoin(creq *clientReq) {
if gs.gs == nil {
Expand All @@ -141,17 +155,7 @@ func (gs *groups) handleJoin(creq *clientReq) {
start:
g := gs.gs[req.Group]
if g == nil {
g = &group{
c: gs.c,
gs: gs,
name: req.Group,
members: make(map[string]*groupMember),
pending: make(map[string]*groupMember),
protocols: make(map[string]int),
reqCh: make(chan *clientReq),
controlCh: make(chan func()),
quitCh: make(chan struct{}),
}
g = gs.newGroup(req.Group)
waitJoin := make(chan struct{})
gs.gs[req.Group] = g
go g.manage(func() { close(waitJoin) })
Expand Down Expand Up @@ -194,8 +198,25 @@ func (gs *groups) handleLeave(creq *clientReq) bool {
return gs.handleHijack(creq.kreq.(*kmsg.LeaveGroupRequest).Group, creq)
}

func (gs *groups) handleOffsetCommit(creq *clientReq) bool {
return gs.handleHijack(creq.kreq.(*kmsg.OffsetCommitRequest).Group, creq)
func (gs *groups) handleOffsetCommit(creq *clientReq) {
if gs.gs == nil {
gs.gs = make(map[string]*group)
}
req := creq.kreq.(*kmsg.OffsetCommitRequest)
start:
g := gs.gs[req.Group]
if g == nil {
g = gs.newGroup(req.Group)
waitCommit := make(chan struct{})
gs.gs[req.Group] = g
go g.manage(func() { close(waitCommit) })
defer func() { <-waitCommit }()
}
select {
case g.reqCh <- creq:
case <-g.quitCh:
goto start
}
}

func (gs *groups) handleOffsetDelete(creq *clientReq) bool {
Expand Down Expand Up @@ -551,7 +572,9 @@ func (g *group) manage(detachNew func()) {
case *kmsg.LeaveGroupRequest:
kresp = g.handleLeave(creq)
case *kmsg.OffsetCommitRequest:
kresp = g.handleOffsetCommit(creq)
var ok bool
kresp, ok = g.handleOffsetCommit(creq)
firstJoin(ok)
case *kmsg.OffsetDeleteRequest:
kresp = g.handleOffsetDelete(creq)
}
Expand Down Expand Up @@ -807,34 +830,60 @@ func fillOffsetCommit(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResp
}

// Handles a commit.
func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse {
func (g *group) handleOffsetCommit(creq *clientReq) (*kmsg.OffsetCommitResponse, bool) {
req := creq.kreq.(*kmsg.OffsetCommitRequest)
resp := req.ResponseKind().(*kmsg.OffsetCommitResponse)

if kerr := g.c.validateGroup(creq, req.Group); kerr != nil {
fillOffsetCommit(req, resp, kerr.Code)
return resp
return resp, false
}
if req.InstanceID != nil {
fillOffsetCommit(req, resp, kerr.InvalidGroupID.Code)
return resp
}
m, ok := g.members[req.MemberID]
if !ok {
fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code)
return resp
return resp, false
}
if req.Generation != g.generation {
fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code)
return resp

var m *groupMember
if len(g.members) > 0 {
var ok bool
m, ok = g.members[req.MemberID]
if !ok {
fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code)
return resp, false
}
if req.Generation != g.generation {
fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code)
return resp, false
}
} else {
if req.MemberID != "" {
fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code)
return resp, false
}
if req.Generation != -1 {
fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code)
return resp, false
}
if g.state != groupEmpty {
panic("invalid state: no members, but group not empty")
}
}

switch g.state {
default:
fillOffsetCommit(req, resp, kerr.GroupIDNotFound.Code)
return resp
return resp, true
case groupEmpty:
// for when we support empty group commits
for _, t := range req.Topics {
for _, p := range t.Partitions {
g.commits.set(t.Topic, p.Partition, offsetCommit{
offset: p.Offset,
leaderEpoch: p.LeaderEpoch,
metadata: p.Metadata,
})
}
}
fillOffsetCommit(req, resp, 0)
case groupPreparingRebalance, groupStable:
for _, t := range req.Topics {
for _, p := range t.Partitions {
Expand All @@ -851,7 +900,7 @@ func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse {
fillOffsetCommit(req, resp, kerr.RebalanceInProgress.Code)
g.updateHeartbeat(m)
}
return resp
return resp, true
}

// Transitions the group to the preparing rebalance state. We first need to
Expand Down

0 comments on commit 18e2cc3

Please sign in to comment.