@@ -132,6 +132,20 @@ func generateMemberID(clientID string, instanceID *string) string {
132
132
// GROUPS //
133
133
////////////
134
134
135
+ func (gs * groups ) newGroup (name string ) * group {
136
+ return & group {
137
+ c : gs .c ,
138
+ gs : gs ,
139
+ name : name ,
140
+ members : make (map [string ]* groupMember ),
141
+ pending : make (map [string ]* groupMember ),
142
+ protocols : make (map [string ]int ),
143
+ reqCh : make (chan * clientReq ),
144
+ controlCh : make (chan func ()),
145
+ quitCh : make (chan struct {}),
146
+ }
147
+ }
148
+
135
149
// handleJoin completely hijacks the incoming request.
136
150
func (gs * groups ) handleJoin (creq * clientReq ) {
137
151
if gs .gs == nil {
@@ -141,17 +155,7 @@ func (gs *groups) handleJoin(creq *clientReq) {
141
155
start:
142
156
g := gs .gs [req .Group ]
143
157
if g == nil {
144
- g = & group {
145
- c : gs .c ,
146
- gs : gs ,
147
- name : req .Group ,
148
- members : make (map [string ]* groupMember ),
149
- pending : make (map [string ]* groupMember ),
150
- protocols : make (map [string ]int ),
151
- reqCh : make (chan * clientReq ),
152
- controlCh : make (chan func ()),
153
- quitCh : make (chan struct {}),
154
- }
158
+ g = gs .newGroup (req .Group )
155
159
waitJoin := make (chan struct {})
156
160
gs .gs [req .Group ] = g
157
161
go g .manage (func () { close (waitJoin ) })
@@ -194,8 +198,25 @@ func (gs *groups) handleLeave(creq *clientReq) bool {
194
198
return gs .handleHijack (creq .kreq .(* kmsg.LeaveGroupRequest ).Group , creq )
195
199
}
196
200
197
- func (gs * groups ) handleOffsetCommit (creq * clientReq ) bool {
198
- return gs .handleHijack (creq .kreq .(* kmsg.OffsetCommitRequest ).Group , creq )
201
+ func (gs * groups ) handleOffsetCommit (creq * clientReq ) {
202
+ if gs .gs == nil {
203
+ gs .gs = make (map [string ]* group )
204
+ }
205
+ req := creq .kreq .(* kmsg.OffsetCommitRequest )
206
+ start:
207
+ g := gs .gs [req .Group ]
208
+ if g == nil {
209
+ g = gs .newGroup (req .Group )
210
+ waitCommit := make (chan struct {})
211
+ gs .gs [req .Group ] = g
212
+ go g .manage (func () { close (waitCommit ) })
213
+ defer func () { <- waitCommit }()
214
+ }
215
+ select {
216
+ case g .reqCh <- creq :
217
+ case <- g .quitCh :
218
+ goto start
219
+ }
199
220
}
200
221
201
222
func (gs * groups ) handleOffsetDelete (creq * clientReq ) bool {
@@ -551,7 +572,9 @@ func (g *group) manage(detachNew func()) {
551
572
case * kmsg.LeaveGroupRequest :
552
573
kresp = g .handleLeave (creq )
553
574
case * kmsg.OffsetCommitRequest :
554
- kresp = g .handleOffsetCommit (creq )
575
+ var ok bool
576
+ kresp , ok = g .handleOffsetCommit (creq )
577
+ firstJoin (ok )
555
578
case * kmsg.OffsetDeleteRequest :
556
579
kresp = g .handleOffsetDelete (creq )
557
580
}
@@ -807,34 +830,60 @@ func fillOffsetCommit(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResp
807
830
}
808
831
809
832
// Handles a commit.
810
- func (g * group ) handleOffsetCommit (creq * clientReq ) * kmsg.OffsetCommitResponse {
833
+ func (g * group ) handleOffsetCommit (creq * clientReq ) ( * kmsg.OffsetCommitResponse , bool ) {
811
834
req := creq .kreq .(* kmsg.OffsetCommitRequest )
812
835
resp := req .ResponseKind ().(* kmsg.OffsetCommitResponse )
813
836
814
837
if kerr := g .c .validateGroup (creq , req .Group ); kerr != nil {
815
838
fillOffsetCommit (req , resp , kerr .Code )
816
- return resp
839
+ return resp , false
817
840
}
818
841
if req .InstanceID != nil {
819
842
fillOffsetCommit (req , resp , kerr .InvalidGroupID .Code )
820
- return resp
821
- }
822
- m , ok := g .members [req .MemberID ]
823
- if ! ok {
824
- fillOffsetCommit (req , resp , kerr .UnknownMemberID .Code )
825
- return resp
843
+ return resp , false
826
844
}
827
- if req .Generation != g .generation {
828
- fillOffsetCommit (req , resp , kerr .IllegalGeneration .Code )
829
- return resp
845
+
846
+ var m * groupMember
847
+ if len (g .members ) > 0 {
848
+ var ok bool
849
+ m , ok = g .members [req .MemberID ]
850
+ if ! ok {
851
+ fillOffsetCommit (req , resp , kerr .UnknownMemberID .Code )
852
+ return resp , false
853
+ }
854
+ if req .Generation != g .generation {
855
+ fillOffsetCommit (req , resp , kerr .IllegalGeneration .Code )
856
+ return resp , false
857
+ }
858
+ } else {
859
+ if req .MemberID != "" {
860
+ fillOffsetCommit (req , resp , kerr .UnknownMemberID .Code )
861
+ return resp , false
862
+ }
863
+ if req .Generation != - 1 {
864
+ fillOffsetCommit (req , resp , kerr .IllegalGeneration .Code )
865
+ return resp , false
866
+ }
867
+ if g .state != groupEmpty {
868
+ panic ("invalid state: no members, but group not empty" )
869
+ }
830
870
}
831
871
832
872
switch g .state {
833
873
default :
834
874
fillOffsetCommit (req , resp , kerr .GroupIDNotFound .Code )
835
- return resp
875
+ return resp , true
836
876
case groupEmpty :
837
- // for when we support empty group commits
877
+ for _ , t := range req .Topics {
878
+ for _ , p := range t .Partitions {
879
+ g .commits .set (t .Topic , p .Partition , offsetCommit {
880
+ offset : p .Offset ,
881
+ leaderEpoch : p .LeaderEpoch ,
882
+ metadata : p .Metadata ,
883
+ })
884
+ }
885
+ }
886
+ fillOffsetCommit (req , resp , 0 )
838
887
case groupPreparingRebalance , groupStable :
839
888
for _ , t := range req .Topics {
840
889
for _ , p := range t .Partitions {
@@ -851,7 +900,7 @@ func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse {
851
900
fillOffsetCommit (req , resp , kerr .RebalanceInProgress .Code )
852
901
g .updateHeartbeat (m )
853
902
}
854
- return resp
903
+ return resp , true
855
904
}
856
905
857
906
// Transitions the group to the preparing rebalance state. We first need to
0 commit comments