diff --git a/pkg/kfake/08_offset_commit.go b/pkg/kfake/08_offset_commit.go index 692f4cc0..b82400e0 100644 --- a/pkg/kfake/08_offset_commit.go +++ b/pkg/kfake/08_offset_commit.go @@ -1,7 +1,6 @@ package kfake import ( - "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kmsg" ) @@ -9,16 +8,11 @@ func init() { regKey(8, 0, 8) } func (c *Cluster) handleOffsetCommit(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.OffsetCommitRequest) - resp := req.ResponseKind().(*kmsg.OffsetCommitResponse) if err := checkReqVersion(req.Key(), req.Version); err != nil { return nil, err } - if c.groups.handleOffsetCommit(creq) { - return nil, nil - } - - fillOffsetCommit(req, resp, kerr.GroupIDNotFound.Code) - return resp, nil + c.groups.handleOffsetCommit(creq) + return nil, nil } diff --git a/pkg/kfake/groups.go b/pkg/kfake/groups.go index a0e5a98e..eec79391 100644 --- a/pkg/kfake/groups.go +++ b/pkg/kfake/groups.go @@ -132,6 +132,20 @@ func generateMemberID(clientID string, instanceID *string) string { // GROUPS // //////////// +func (gs *groups) newGroup(name string) *group { + return &group{ + c: gs.c, + gs: gs, + name: name, + members: make(map[string]*groupMember), + pending: make(map[string]*groupMember), + protocols: make(map[string]int), + reqCh: make(chan *clientReq), + controlCh: make(chan func()), + quitCh: make(chan struct{}), + } +} + // handleJoin completely hijacks the incoming request. func (gs *groups) handleJoin(creq *clientReq) { if gs.gs == nil { @@ -141,17 +155,7 @@ func (gs *groups) handleJoin(creq *clientReq) { start: g := gs.gs[req.Group] if g == nil { - g = &group{ - c: gs.c, - gs: gs, - name: req.Group, - members: make(map[string]*groupMember), - pending: make(map[string]*groupMember), - protocols: make(map[string]int), - reqCh: make(chan *clientReq), - controlCh: make(chan func()), - quitCh: make(chan struct{}), - } + g = gs.newGroup(req.Group) waitJoin := make(chan struct{}) gs.gs[req.Group] = g go g.manage(func() { close(waitJoin) }) @@ -194,8 +198,25 @@ func (gs *groups) handleLeave(creq *clientReq) bool { return gs.handleHijack(creq.kreq.(*kmsg.LeaveGroupRequest).Group, creq) } -func (gs *groups) handleOffsetCommit(creq *clientReq) bool { - return gs.handleHijack(creq.kreq.(*kmsg.OffsetCommitRequest).Group, creq) +func (gs *groups) handleOffsetCommit(creq *clientReq) { + if gs.gs == nil { + gs.gs = make(map[string]*group) + } + req := creq.kreq.(*kmsg.OffsetCommitRequest) +start: + g := gs.gs[req.Group] + if g == nil { + g = gs.newGroup(req.Group) + waitCommit := make(chan struct{}) + gs.gs[req.Group] = g + go g.manage(func() { close(waitCommit) }) + defer func() { <-waitCommit }() + } + select { + case g.reqCh <- creq: + case <-g.quitCh: + goto start + } } func (gs *groups) handleOffsetDelete(creq *clientReq) bool { @@ -551,7 +572,9 @@ func (g *group) manage(detachNew func()) { case *kmsg.LeaveGroupRequest: kresp = g.handleLeave(creq) case *kmsg.OffsetCommitRequest: - kresp = g.handleOffsetCommit(creq) + var ok bool + kresp, ok = g.handleOffsetCommit(creq) + firstJoin(ok) case *kmsg.OffsetDeleteRequest: kresp = g.handleOffsetDelete(creq) } @@ -807,34 +830,60 @@ func fillOffsetCommit(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResp } // Handles a commit. -func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse { +func (g *group) handleOffsetCommit(creq *clientReq) (*kmsg.OffsetCommitResponse, bool) { req := creq.kreq.(*kmsg.OffsetCommitRequest) resp := req.ResponseKind().(*kmsg.OffsetCommitResponse) if kerr := g.c.validateGroup(creq, req.Group); kerr != nil { fillOffsetCommit(req, resp, kerr.Code) - return resp + return resp, false } if req.InstanceID != nil { fillOffsetCommit(req, resp, kerr.InvalidGroupID.Code) - return resp - } - m, ok := g.members[req.MemberID] - if !ok { - fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code) - return resp + return resp, false } - if req.Generation != g.generation { - fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code) - return resp + + var m *groupMember + if len(g.members) > 0 { + var ok bool + m, ok = g.members[req.MemberID] + if !ok { + fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code) + return resp, false + } + if req.Generation != g.generation { + fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code) + return resp, false + } + } else { + if req.MemberID != "" { + fillOffsetCommit(req, resp, kerr.UnknownMemberID.Code) + return resp, false + } + if req.Generation != -1 { + fillOffsetCommit(req, resp, kerr.IllegalGeneration.Code) + return resp, false + } + if g.state != groupEmpty { + panic("invalid state: no members, but group not empty") + } } switch g.state { default: fillOffsetCommit(req, resp, kerr.GroupIDNotFound.Code) - return resp + return resp, true case groupEmpty: - // for when we support empty group commits + for _, t := range req.Topics { + for _, p := range t.Partitions { + g.commits.set(t.Topic, p.Partition, offsetCommit{ + offset: p.Offset, + leaderEpoch: p.LeaderEpoch, + metadata: p.Metadata, + }) + } + } + fillOffsetCommit(req, resp, 0) case groupPreparingRebalance, groupStable: for _, t := range req.Topics { for _, p := range t.Partitions { @@ -851,7 +900,7 @@ func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse { fillOffsetCommit(req, resp, kerr.RebalanceInProgress.Code) g.updateHeartbeat(m) } - return resp + return resp, true } // Transitions the group to the preparing rebalance state. We first need to