diff --git a/Dockerfile.kafka b/Dockerfile.kafka index 5b387b9ed..91d41d8a1 100644 --- a/Dockerfile.kafka +++ b/Dockerfile.kafka @@ -51,9 +51,9 @@ RUN curl -sLO "https://repo1.maven.org/maven2/javax/xml/bind/jaxb-api/2.3.0/jaxb && for DIR in /opt/kafka-*; do cp -v jaxb-api-2.3.0.jar $DIR/libs/ ; done \ && rm -f jaxb-api-2.3.0.jar -# older kafka versions with the zookeeper 3.4.13 client aren't compatible with Java 17 so quietly bump them to 3.5.9 -RUN [ -f "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.13.jar" ] || exit 0 ; \ - rm -f "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.13.jar" \ +# older kafka versions with the zookeeper 3.4.13/3.4.14 client aren't compatible with Java 17 so quietly bump them to 3.5.9 +RUN if ! stat /opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.*.jar; then exit 0; fi ; \ + rm -f /opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.*.jar \ && curl --fail -sSL -o "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.5.9.jar" "https://repo1.maven.org/maven2/org/apache/zookeeper/zookeeper/3.5.9/zookeeper-3.5.9.jar" \ && curl --fail -sSL -o "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-jute-3.5.9.jar" "https://repo1.maven.org/maven2/org/apache/zookeeper/zookeeper-jute/3.5.9/zookeeper-jute-3.5.9.jar" diff --git a/client.go b/client.go index 740e5f990..0dc29e225 100644 --- a/client.go +++ b/client.go @@ -866,22 +866,7 @@ func (client *client) getOffset(topic string, partitionID int32, timestamp int64 return -1, err } - request := &OffsetRequest{} - if client.conf.Version.IsAtLeast(V2_1_0_0) { - // Version 4 adds the current leader epoch, which is used for fencing. - request.Version = 4 - } else if client.conf.Version.IsAtLeast(V2_0_0_0) { - // Version 3 is the same as version 2. - request.Version = 3 - } else if client.conf.Version.IsAtLeast(V0_11_0_0) { - // Version 2 adds the isolation level, which is used for transactional reads. - request.Version = 2 - } else if client.conf.Version.IsAtLeast(V0_10_1_0) { - // Version 1 removes MaxNumOffsets. From this version forward, only a single - // offset can be returned. - request.Version = 1 - } - + request := NewOffsetRequest(client.conf.Version) request.AddBlock(topic, partitionID, timestamp, 1) response, err := broker.GetAvailableOffsets(request) diff --git a/offset_request.go b/offset_request.go index 82ac5bbf5..01fbb339e 100644 --- a/offset_request.go +++ b/offset_request.go @@ -52,6 +52,28 @@ type OffsetRequest struct { blocks map[string]map[int32]*offsetRequestBlock } +func NewOffsetRequest(version KafkaVersion) *OffsetRequest { + request := &OffsetRequest{} + if version.IsAtLeast(V2_2_0_0) { + // Version 5 adds a new error code, OFFSET_NOT_AVAILABLE. + request.Version = 5 + } else if version.IsAtLeast(V2_1_0_0) { + // Version 4 adds the current leader epoch, which is used for fencing. + request.Version = 4 + } else if version.IsAtLeast(V2_0_0_0) { + // Version 3 is the same as version 2. + request.Version = 3 + } else if version.IsAtLeast(V0_11_0_0) { + // Version 2 adds the isolation level, which is used for transactional reads. + request.Version = 2 + } else if version.IsAtLeast(V0_10_1_0) { + // Version 1 removes MaxNumOffsets. From this version forward, only a single + // offset can be returned. + request.Version = 1 + } + return request +} + func (r *OffsetRequest) setVersion(v int16) { r.Version = v } @@ -160,11 +182,13 @@ func (r *OffsetRequest) headerVersion() int16 { } func (r *OffsetRequest) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 4 + return r.Version >= 0 && r.Version <= 5 } func (r *OffsetRequest) requiredVersion() KafkaVersion { switch r.Version { + case 5: + return V2_2_0_0 case 4: return V2_1_0_0 case 3: diff --git a/offset_response.go b/offset_response.go index 3bbcf9a66..2dc50e0ce 100644 --- a/offset_response.go +++ b/offset_response.go @@ -190,11 +190,13 @@ func (r *OffsetResponse) headerVersion() int16 { } func (r *OffsetResponse) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 4 + return r.Version >= 0 && r.Version <= 5 } func (r *OffsetResponse) requiredVersion() KafkaVersion { switch r.Version { + case 5: + return V2_2_0_0 case 4: return V2_1_0_0 case 3: diff --git a/request_test.go b/request_test.go index 6d120cb9c..e38fc838b 100644 --- a/request_test.go +++ b/request_test.go @@ -306,7 +306,7 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { { V2_2_0_0, map[int16]int16{ - // apiKeyListOffsets: 5, // up from 4, TODO: not supported by Sarama yet + apiKeyListOffsets: 5, // up from 4 apiKeyLeaderAndIsr: 2, // up from 1 apiKeyStopReplica: 1, // up from 0 apiKeyUpdateMetadata: 5, // up from 4 @@ -316,6 +316,19 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { apiKeyElectLeaders: 0, // new in 2.2 }, }, + { + V2_3_0_0, + map[int16]int16{ + apiKeyFetch: 11, // up from 10 + apiKeyMetadata: 8, // up from 7 + apiKeyOffsetCommit: 7, // up from 6 + apiKeyJoinGroup: 5, // up from 4 + apiKeyHeartbeat: 3, // up from 2 + apiKeySyncGroup: 3, // up from 2 + apiKeyDescribeGroups: 3, // up from 2 + apiKeyIncrementalAlterConfigs: 0, // new in 2.3 + }, + }, { saramaMaxVersions, // placeholder version for current maximums implemented by Sarama map[int16]int16{