Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Dockerfile.kafka
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ RUN curl -sLO "https://repo1.maven.org/maven2/javax/xml/bind/jaxb-api/2.3.0/jaxb
&& for DIR in /opt/kafka-*; do cp -v jaxb-api-2.3.0.jar $DIR/libs/ ; done \
&& rm -f jaxb-api-2.3.0.jar

# older kafka versions with the zookeeper 3.4.13 client aren't compatible with Java 17 so quietly bump them to 3.5.9
RUN [ -f "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.13.jar" ] || exit 0 ; \
rm -f "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.13.jar" \
# older kafka versions with the zookeeper 3.4.13/3.4.14 client aren't compatible with Java 17 so quietly bump them to 3.5.9
RUN if ! stat /opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.*.jar; then exit 0; fi ; \
rm -f /opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.*.jar \
&& curl --fail -sSL -o "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.5.9.jar" "https://repo1.maven.org/maven2/org/apache/zookeeper/zookeeper/3.5.9/zookeeper-3.5.9.jar" \
&& curl --fail -sSL -o "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-jute-3.5.9.jar" "https://repo1.maven.org/maven2/org/apache/zookeeper/zookeeper-jute/3.5.9/zookeeper-jute-3.5.9.jar"

Expand Down
17 changes: 1 addition & 16 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,22 +866,7 @@ func (client *client) getOffset(topic string, partitionID int32, timestamp int64
return -1, err
}

request := &OffsetRequest{}
if client.conf.Version.IsAtLeast(V2_1_0_0) {
// Version 4 adds the current leader epoch, which is used for fencing.
request.Version = 4
} else if client.conf.Version.IsAtLeast(V2_0_0_0) {
// Version 3 is the same as version 2.
request.Version = 3
} else if client.conf.Version.IsAtLeast(V0_11_0_0) {
// Version 2 adds the isolation level, which is used for transactional reads.
request.Version = 2
} else if client.conf.Version.IsAtLeast(V0_10_1_0) {
// Version 1 removes MaxNumOffsets. From this version forward, only a single
// offset can be returned.
request.Version = 1
}

request := NewOffsetRequest(client.conf.Version)
request.AddBlock(topic, partitionID, timestamp, 1)

response, err := broker.GetAvailableOffsets(request)
Expand Down
26 changes: 25 additions & 1 deletion offset_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,28 @@ type OffsetRequest struct {
blocks map[string]map[int32]*offsetRequestBlock
}

func NewOffsetRequest(version KafkaVersion) *OffsetRequest {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we want to export this? (I suspect yes, because we export a lot of request constructors).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah at the moment we export all the protocol, that may change when we move to a v2 api, but for now we keep them the same

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I think mentioned elsewhere, for a v2 we probably want to split up the protocol from the API. That’s what I also did with sftp v2.

Building a transparent shim of the protocol, and then the complex API hiding implementation details really simplified a lot of the exposed API, while keeping the protocol open and standardized as well.

request := &OffsetRequest{}
if version.IsAtLeast(V2_2_0_0) {
// Version 5 adds a new error code, OFFSET_NOT_AVAILABLE.
request.Version = 5
} else if version.IsAtLeast(V2_1_0_0) {
// Version 4 adds the current leader epoch, which is used for fencing.
request.Version = 4
} else if version.IsAtLeast(V2_0_0_0) {
// Version 3 is the same as version 2.
request.Version = 3
} else if version.IsAtLeast(V0_11_0_0) {
// Version 2 adds the isolation level, which is used for transactional reads.
request.Version = 2
} else if version.IsAtLeast(V0_10_1_0) {
// Version 1 removes MaxNumOffsets. From this version forward, only a single
// offset can be returned.
request.Version = 1
}
return request
}

func (r *OffsetRequest) setVersion(v int16) {
r.Version = v
}
Expand Down Expand Up @@ -160,11 +182,13 @@ func (r *OffsetRequest) headerVersion() int16 {
}

func (r *OffsetRequest) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 4
return r.Version >= 0 && r.Version <= 5
}

func (r *OffsetRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 5:
return V2_2_0_0
case 4:
return V2_1_0_0
case 3:
Expand Down
4 changes: 3 additions & 1 deletion offset_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,13 @@ func (r *OffsetResponse) headerVersion() int16 {
}

func (r *OffsetResponse) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 4
return r.Version >= 0 && r.Version <= 5
}

func (r *OffsetResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 5:
return V2_2_0_0
case 4:
return V2_1_0_0
case 3:
Expand Down
15 changes: 14 additions & 1 deletion request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func TestAllocateBodyProtocolVersions(t *testing.T) {
{
V2_2_0_0,
map[int16]int16{
// apiKeyListOffsets: 5, // up from 4, TODO: not supported by Sarama yet
apiKeyListOffsets: 5, // up from 4
apiKeyLeaderAndIsr: 2, // up from 1
apiKeyStopReplica: 1, // up from 0
apiKeyUpdateMetadata: 5, // up from 4
Expand All @@ -316,6 +316,19 @@ func TestAllocateBodyProtocolVersions(t *testing.T) {
apiKeyElectLeaders: 0, // new in 2.2
},
},
{
V2_3_0_0,
map[int16]int16{
apiKeyFetch: 11, // up from 10
apiKeyMetadata: 8, // up from 7
apiKeyOffsetCommit: 7, // up from 6
apiKeyJoinGroup: 5, // up from 4
apiKeyHeartbeat: 3, // up from 2
apiKeySyncGroup: 3, // up from 2
apiKeyDescribeGroups: 3, // up from 2
apiKeyIncrementalAlterConfigs: 0, // new in 2.3
},
},
{
saramaMaxVersions, // placeholder version for current maximums implemented by Sarama
map[int16]int16{
Expand Down
Loading