Skip to content

Commit 4b9e8f6

Browse files
committed
fix: restore (*OffsetCommitRequest) AddBlock func
In v1.34.0 a breaking API change to (*OffsetCommitRequest) AddBlock was inadvertently introduced by 59a3d39 We weren't aware that anyone was driving the offset commit protocol directly via the broker.go call rather than via offset_manager.go (or via a consumer client) For now we restore the old AddBlock signature and move the new one to AddBlockWithLeaderEpoch. This will unfortunately impact anyone who had updated their own code to call the new func signature, but it's probably more important that we restore the longer term backwards compatibility now until we move to a v2 release. Fixes #2358 Signed-off-by: Dominic Evans <[email protected]>
1 parent c2cab9d commit 4b9e8f6

File tree

3 files changed

+9
-5
lines changed

3 files changed

+9
-5
lines changed

offset_commit_request.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,11 @@ func (r *OffsetCommitRequest) requiredVersion() KafkaVersion {
220220
}
221221
}
222222

223-
func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, leaderEpoch int32, timestamp int64, metadata string) {
223+
func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
224+
r.AddBlockWithLeaderEpoch(topic, partitionID, offset, 0, timestamp, metadata)
225+
}
226+
227+
func (r *OffsetCommitRequest) AddBlockWithLeaderEpoch(topic string, partitionID int32, offset int64, leaderEpoch int32, timestamp int64, metadata string) {
224228
if r.blocks == nil {
225229
r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
226230
}

offset_commit_request_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestOffsetCommitRequestV0(t *testing.T) {
7070
request.ConsumerGroup = "foobar"
7171
testRequest(t, "no blocks v0", request, offsetCommitRequestNoBlocksV0)
7272

73-
request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, 0, "metadata")
73+
request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata")
7474
testRequest(t, "one block v0", request, offsetCommitRequestOneBlockV0)
7575
}
7676

@@ -82,7 +82,7 @@ func TestOffsetCommitRequestV1(t *testing.T) {
8282
request.Version = 1
8383
testRequest(t, "no blocks v1", request, offsetCommitRequestNoBlocksV1)
8484

85-
request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, ReceiveTime, "metadata")
85+
request.AddBlock("topic", 0x5221, 0xDEADBEEF, ReceiveTime, "metadata")
8686
testRequest(t, "one block v1", request, offsetCommitRequestOneBlockV1)
8787
}
8888

@@ -96,7 +96,7 @@ func TestOffsetCommitRequestV2ToV4(t *testing.T) {
9696
request.Version = int16(version)
9797
testRequest(t, fmt.Sprintf("no blocks v%d", version), request, offsetCommitRequestNoBlocksV2)
9898

99-
request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, 0, "metadata")
99+
request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata")
100100
testRequest(t, fmt.Sprintf("one block v%d", version), request, offsetCommitRequestOneBlockV2)
101101
}
102102
}

offset_manager.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func (om *offsetManager) constructRequest() *OffsetCommitRequest {
304304
for _, pom := range topicManagers {
305305
pom.lock.Lock()
306306
if pom.dirty {
307-
r.AddBlock(pom.topic, pom.partition, pom.offset, pom.leaderEpoch, perPartitionTimestamp, pom.metadata)
307+
r.AddBlockWithLeaderEpoch(pom.topic, pom.partition, pom.offset, pom.leaderEpoch, perPartitionTimestamp, pom.metadata)
308308
}
309309
pom.lock.Unlock()
310310
}

0 commit comments

Comments
 (0)