@@ -25,7 +25,6 @@ import org.apache.kafka.clients.admin._
25
25
import org .apache .kafka .clients .producer .ProducerRecord
26
26
import org .apache .kafka .common .TopicPartition
27
27
import org .apache .kafka .common .config .TopicConfig
28
- import org .apache .kafka .common .record .RecordBatch
29
28
import org .apache .kafka .common .requests .ListOffsetsResponse
30
29
import org .apache .kafka .common .utils .{MockTime , Time , Utils }
31
30
import org .apache .kafka .server .config .ServerLogConfigs
@@ -45,7 +44,6 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
45
44
private val topicNameWithCustomConfigs = " foo2"
46
45
private var adminClient : Admin = _
47
46
private val mockTime : Time = new MockTime (1 )
48
- private var version = RecordBatch .MAGIC_VALUE_V2
49
47
private val dataFolder = Seq (tempDir().getAbsolutePath, tempDir().getAbsolutePath)
50
48
51
49
@ BeforeEach
@@ -73,20 +71,6 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
73
71
assertEquals(ListOffsetsResponse .UNKNOWN_TIMESTAMP , maxTimestampOffset.timestamp())
74
72
}
75
73
76
- @ ParameterizedTest
77
- @ ValueSource (strings = Array (" zk" ))
78
- def testListVersion0 (quorum : String ): Unit = {
79
- // create records for version 0
80
- createMessageFormatBrokers(RecordBatch .MAGIC_VALUE_V0 )
81
- produceMessagesInSeparateBatch()
82
-
83
- // update version to version 1 to list offset for max timestamp
84
- createMessageFormatBrokers(RecordBatch .MAGIC_VALUE_V1 )
85
- // the offset of max timestamp is always -1 if the batch version is 0
86
- verifyListOffsets(expectedMaxTimestampOffset = - 1 )
87
- }
88
-
89
-
90
74
@ ParameterizedTest
91
75
@ ValueSource (strings = Array (" kraft" ))
92
76
def testThreeCompressedRecordsInOneBatch (quorum : String ): Unit = {
@@ -129,38 +113,6 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
129
113
verifyListOffsets(topic = topicNameWithCustomConfigs, 2 )
130
114
}
131
115
132
- // The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0
133
- @ ParameterizedTest
134
- @ ValueSource (strings = Array (" zk" ))
135
- def testThreeRecordsInOneBatchWithMessageConversion (quorum : String ): Unit = {
136
- createMessageFormatBrokers(RecordBatch .MAGIC_VALUE_V1 )
137
- produceMessagesInOneBatch()
138
- verifyListOffsets()
139
-
140
- // test LogAppendTime case
141
- setUpForLogAppendTimeCase()
142
- produceMessagesInOneBatch(topic = topicNameWithCustomConfigs)
143
- // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
144
- // So in this one batch test, it'll be the first offset 0
145
- verifyListOffsets(topic = topicNameWithCustomConfigs, 0 )
146
- }
147
-
148
- // The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0
149
- @ ParameterizedTest
150
- @ ValueSource (strings = Array (" zk" ))
151
- def testThreeRecordsInSeparateBatchWithMessageConversion (quorum : String ): Unit = {
152
- createMessageFormatBrokers(RecordBatch .MAGIC_VALUE_V1 )
153
- produceMessagesInSeparateBatch()
154
- verifyListOffsets()
155
-
156
- // test LogAppendTime case
157
- setUpForLogAppendTimeCase()
158
- produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
159
- // In LogAppendTime's case, the maxTimestampOffset is the message in the last batch since we advance the time
160
- // for each batch, So it'll be the last offset 2
161
- verifyListOffsets(topic = topicNameWithCustomConfigs, 2 )
162
- }
163
-
164
116
@ ParameterizedTest
165
117
@ ValueSource (strings = Array (" kraft" ))
166
118
def testThreeRecordsInOneBatchHavingDifferentCompressionTypeWithServer (quorum : String ): Unit = {
@@ -201,15 +153,6 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
201
153
createTopicWithConfig(topicNameWithCustomConfigs, props)
202
154
}
203
155
204
- private def createMessageFormatBrokers (recordVersion : Byte ): Unit = {
205
- version = recordVersion
206
- recreateBrokers(reconfigure = true , startup = true )
207
- Utils .closeQuietly(adminClient, " ListOffsetsAdminClient" )
208
- adminClient = Admin .create(Map [String , Object ](
209
- AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
210
- ).asJava)
211
- }
212
-
213
156
private def createTopicWithConfig (topic : String , props : Properties ): Unit = {
214
157
createTopic(topic, 1 , 1 .toShort, topicConfig = props)
215
158
}
@@ -224,12 +167,9 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
224
167
225
168
val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec .maxTimestamp(), topic)
226
169
assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
227
- if (version >= RecordBatch .MAGIC_VALUE_V2 )
228
- // the epoch is related to the returned offset.
229
- // Hence, it should be zero (the earliest leader epoch), regardless of new leader election
230
- assertEquals(Optional .of(0 ), maxTimestampOffset.leaderEpoch())
231
- else
232
- assertEquals(Optional .empty(), maxTimestampOffset.leaderEpoch())
170
+ // the epoch is related to the returned offset.
171
+ // Hence, it should be zero (the earliest leader epoch), regardless of new leader election
172
+ assertEquals(Optional .of(0 ), maxTimestampOffset.leaderEpoch())
233
173
}
234
174
235
175
// case 0: test the offsets from leader's append path
@@ -336,15 +276,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
336
276
}
337
277
338
278
def generateConfigs : Seq [KafkaConfig ] = {
339
- TestUtils .createBrokerConfigs(2 , zkConnectOrNull).zipWithIndex.map{ case (props, index) =>
340
- if (version == RecordBatch .MAGIC_VALUE_V0 ) {
341
- props.setProperty(" log.message.format.version" , " 0.9.0" )
342
- props.setProperty(" inter.broker.protocol.version" , " 0.9.0" )
343
- }
344
- if (version == RecordBatch .MAGIC_VALUE_V1 ) {
345
- props.setProperty(" log.message.format.version" , " 0.10.0" )
346
- props.setProperty(" inter.broker.protocol.version" , " 0.10.0" )
347
- }
279
+ TestUtils .createBrokerConfigs(2 , null ).zipWithIndex.map{ case (props, index) =>
348
280
// We use mock timer so the records can get removed if the test env is too busy to complete
349
281
// tests before kafka-log-retention. Hence, we disable the retention to avoid failed tests
350
282
props.setProperty(ServerLogConfigs .LOG_RETENTION_TIME_MILLIS_CONFIG , " -1" )
0 commit comments