Skip to content

Commit fce917a

Browse files
committed
This commit enables isolation level handling when fetching offsets.
It enables list available offset (OffsetRequest and OffsetResponse) to require read committed isolation level. It adds RequireStable field to FetchOffsetRequest.
1 parent 9a7d94e commit fce917a

11 files changed

+422
-44
lines changed

broker.go

+1
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitRespon
376376
//FetchOffset returns an offset fetch response or error
377377
func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
378378
response := new(OffsetFetchResponse)
379+
response.Version = request.Version // needed to handle the two header versions
379380

380381
err := b.sendAndReceive(request, response)
381382
if err != nil {

errors.go

+18
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,12 @@ const (
208208
ErrPreferredLeaderNotAvailable KError = 80
209209
ErrGroupMaxSizeReached KError = 81
210210
ErrFencedInstancedId KError = 82
211+
ErrEligibleLeadersNotAvailable KError = 83
212+
ErrElectionNotNeeded KError = 84
213+
ErrNoReassignmentInProgress KError = 85
214+
ErrGroupSubscribedToTopic KError = 86
215+
ErrInvalidRecord KError = 87
216+
ErrUnstableOffsetCommit KError = 88
211217
)
212218

213219
func (err KError) Error() string {
@@ -382,6 +388,18 @@ func (err KError) Error() string {
382388
return "kafka server: Consumer group The consumer group has reached its max size. already has the configured maximum number of members."
383389
case ErrFencedInstancedId:
384390
return "kafka server: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id."
391+
case ErrEligibleLeadersNotAvailable:
392+
return "kafka server: Eligible topic partition leaders are not available."
393+
case ErrElectionNotNeeded:
394+
return "kafka server: Leader election not needed for topic partition."
395+
case ErrNoReassignmentInProgress:
396+
return "kafka server: No partition reassignment is in progress."
397+
case ErrGroupSubscribedToTopic:
398+
return "kafka server: Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it."
399+
case ErrInvalidRecord:
400+
return "kafka server: This record has failed the validation on broker and hence will be rejected."
401+
case ErrUnstableOffsetCommit:
402+
return "kafka server: There are unstable offsets that need to be cleared."
385403
}
386404

387405
return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)

offset_fetch_request.go

+120-17
Original file line numberDiff line numberDiff line change
@@ -3,60 +3,155 @@ package sarama
33
type OffsetFetchRequest struct {
44
Version int16
55
ConsumerGroup string
6+
RequireStable bool // requires v7+
67
partitions map[string][]int32
78
}
89

910
func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
10-
if r.Version < 0 || r.Version > 5 {
11+
if r.Version < 0 || r.Version > 7 {
1112
return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
1213
}
1314

14-
if err = pe.putString(r.ConsumerGroup); err != nil {
15+
isFlexible := r.Version >= 6
16+
17+
if isFlexible {
18+
err = pe.putCompactString(r.ConsumerGroup)
19+
} else {
20+
err = pe.putString(r.ConsumerGroup)
21+
}
22+
if err != nil {
1523
return err
1624
}
1725

18-
if r.Version >= 2 && r.partitions == nil {
19-
pe.putInt32(-1)
20-
} else {
21-
if err = pe.putArrayLength(len(r.partitions)); err != nil {
22-
return err
26+
if isFlexible {
27+
if r.partitions == nil {
28+
pe.putUVarint(0)
29+
} else {
30+
pe.putCompactArrayLength(len(r.partitions))
2331
}
24-
for topic, partitions := range r.partitions {
25-
if err = pe.putString(topic); err != nil {
26-
return err
27-
}
28-
if err = pe.putInt32Array(partitions); err != nil {
32+
} else {
33+
if r.partitions == nil && r.Version >= 2 {
34+
pe.putInt32(-1)
35+
} else {
36+
if err = pe.putArrayLength(len(r.partitions)); err != nil {
2937
return err
3038
}
3139
}
3240
}
41+
42+
for topic, partitions := range r.partitions {
43+
if isFlexible {
44+
err = pe.putCompactString(topic)
45+
} else {
46+
err = pe.putString(topic)
47+
}
48+
if err != nil {
49+
return err
50+
}
51+
52+
//
53+
54+
if isFlexible {
55+
err = pe.putCompactInt32Array(partitions)
56+
} else {
57+
err = pe.putInt32Array(partitions)
58+
}
59+
if err != nil {
60+
return err
61+
}
62+
63+
if isFlexible {
64+
pe.putEmptyTaggedFieldArray()
65+
}
66+
}
67+
68+
if r.RequireStable && r.Version < 7 {
69+
return PacketEncodingError{"requireStable is not supported. use version 7 or later"}
70+
}
71+
72+
if r.Version >= 7 {
73+
pe.putBool(r.RequireStable)
74+
}
75+
76+
if isFlexible {
77+
pe.putEmptyTaggedFieldArray()
78+
}
79+
3380
return nil
3481
}
3582

3683
func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) {
3784
r.Version = version
38-
if r.ConsumerGroup, err = pd.getString(); err != nil {
85+
isFlexible := r.Version >= 6
86+
if isFlexible {
87+
r.ConsumerGroup, err = pd.getCompactString()
88+
} else {
89+
r.ConsumerGroup, err = pd.getString()
90+
}
91+
if err != nil {
3992
return err
4093
}
41-
partitionCount, err := pd.getArrayLength()
94+
95+
var partitionCount int
96+
97+
if isFlexible {
98+
partitionCount, err = pd.getCompactArrayLength()
99+
} else {
100+
partitionCount, err = pd.getArrayLength()
101+
}
42102
if err != nil {
43103
return err
44104
}
105+
45106
if (partitionCount == 0 && version < 2) || partitionCount < 0 {
46107
return nil
47108
}
48-
r.partitions = make(map[string][]int32)
109+
110+
r.partitions = make(map[string][]int32, partitionCount)
49111
for i := 0; i < partitionCount; i++ {
50-
topic, err := pd.getString()
112+
var topic string
113+
if isFlexible {
114+
topic, err = pd.getCompactString()
115+
} else {
116+
topic, err = pd.getString()
117+
}
51118
if err != nil {
52119
return err
53120
}
54-
partitions, err := pd.getInt32Array()
121+
122+
var partitions []int32
123+
if isFlexible {
124+
partitions, err = pd.getCompactInt32Array()
125+
} else {
126+
partitions, err = pd.getInt32Array()
127+
}
55128
if err != nil {
56129
return err
57130
}
131+
if isFlexible {
132+
_, err = pd.getEmptyTaggedFieldArray()
133+
if err != nil {
134+
return err
135+
}
136+
}
137+
58138
r.partitions[topic] = partitions
59139
}
140+
141+
if r.Version >= 7 {
142+
r.RequireStable, err = pd.getBool()
143+
if err != nil {
144+
return err
145+
}
146+
}
147+
148+
if isFlexible {
149+
_, err = pd.getEmptyTaggedFieldArray()
150+
if err != nil {
151+
return err
152+
}
153+
}
154+
60155
return nil
61156
}
62157

@@ -69,6 +164,10 @@ func (r *OffsetFetchRequest) version() int16 {
69164
}
70165

71166
func (r *OffsetFetchRequest) headerVersion() int16 {
167+
if r.Version >= 6 {
168+
return 2
169+
}
170+
72171
return 1
73172
}
74173

@@ -84,6 +183,10 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
84183
return V2_0_0_0
85184
case 5:
86185
return V2_1_0_0
186+
case 6:
187+
return V2_4_0_0
188+
case 7:
189+
return V2_5_0_0
87190
default:
88191
return MinVersion
89192
}

offset_fetch_request_test.go

+60
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ var (
1010
0x00, 0x00,
1111
0x00, 0x00, 0x00, 0x00}
1212

13+
offsetFetchRequestNoPartitionsV6 = []byte{
14+
0x05, 'b', 'l', 'a', 'h', 0x01, 0x00}
15+
16+
offsetFetchRequestNoPartitionsV7 = []byte{
17+
0x05, 'b', 'l', 'a', 'h', 0x01, 0x01, 0x00}
18+
1319
offsetFetchRequestNoPartitions = []byte{
1420
0x00, 0x04, 'b', 'l', 'a', 'h',
1521
0x00, 0x00, 0x00, 0x00}
@@ -21,6 +27,20 @@ var (
2127
0x00, 0x00, 0x00, 0x01,
2228
0x4F, 0x4F, 0x4F, 0x4F}
2329

30+
offsetFetchRequestOnePartitionV6 = []byte{
31+
0x05, 'b', 'l', 'a', 'h',
32+
0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't',
33+
0x02,
34+
0x4F, 0x4F, 0x4F, 0x4F,
35+
0x00, 0x00}
36+
37+
offsetFetchRequestOnePartitionV7 = []byte{
38+
0x05, 'b', 'l', 'a', 'h',
39+
0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't',
40+
0x02,
41+
0x4F, 0x4F, 0x4F, 0x4F,
42+
0x00, 0x00, 0x00}
43+
2444
offsetFetchRequestAllPartitions = []byte{
2545
0x00, 0x04, 'b', 'l', 'a', 'h',
2646
0xff, 0xff, 0xff, 0xff}
@@ -36,7 +56,29 @@ func TestOffsetFetchRequestNoPartitions(t *testing.T) {
3656
request.ConsumerGroup = "blah"
3757
testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitions)
3858
}
59+
60+
{ // v6
61+
version := 6
62+
request := new(OffsetFetchRequest)
63+
request.Version = int16(version)
64+
request.ConsumerGroup = "blah"
65+
request.ZeroPartitions()
66+
67+
testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitionsV6)
68+
}
69+
70+
{ // v7
71+
version := 7
72+
request := new(OffsetFetchRequest)
73+
request.Version = int16(version)
74+
request.ConsumerGroup = "blah"
75+
request.RequireStable = true
76+
request.ZeroPartitions()
77+
78+
testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitionsV7)
79+
}
3980
}
81+
4082
func TestOffsetFetchRequest(t *testing.T) {
4183
for version := 0; version <= 5; version++ {
4284
request := new(OffsetFetchRequest)
@@ -45,6 +87,24 @@ func TestOffsetFetchRequest(t *testing.T) {
4587
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
4688
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartition)
4789
}
90+
91+
{ //v6
92+
version := 6
93+
request := new(OffsetFetchRequest)
94+
request.Version = int16(version)
95+
request.ConsumerGroup = "blah"
96+
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
97+
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartitionV6)
98+
}
99+
100+
{ //v7
101+
version := 7
102+
request := new(OffsetFetchRequest)
103+
request.Version = int16(version)
104+
request.ConsumerGroup = "blah"
105+
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
106+
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartitionV7)
107+
}
48108
}
49109

50110
func TestOffsetFetchRequestAllPartitions(t *testing.T) {

0 commit comments

Comments
 (0)