Skip to content

Commit 2608e65

Browse files
committed
fix the failing unit tests
1 parent 307eb37 commit 2608e65

File tree

2 files changed

+41
-11
lines changed

2 files changed

+41
-11
lines changed

core/src/main/scala/kafka/server/ReplicaManager.scala

+16-11
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import kafka.log.remote.RemoteLogManager
2323
import kafka.log.{LogManager, OffsetResultHolder, UnifiedLog}
2424
import kafka.server.HostedPartition.Online
2525
import kafka.server.QuotaFactory.QuotaManagers
26-
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult}
26+
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
2727
import kafka.server.metadata.ZkMetadataCache
2828
import kafka.utils.Implicits._
2929
import kafka.utils._
@@ -226,6 +226,14 @@ object ReplicaManager {
226226

227227
private[server] val MetricNames = GaugeMetricNames.union(MeterMetricNames)
228228

229+
private val timestampMinSupportedVersion: immutable.Map[Long, Short] = immutable.Map[Long, Short](
230+
ListOffsetsRequest.EARLIEST_TIMESTAMP -> 1.toShort,
231+
ListOffsetsRequest.LATEST_TIMESTAMP -> 1.toShort,
232+
ListOffsetsRequest.MAX_TIMESTAMP -> 7.toShort,
233+
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP -> 8.toShort,
234+
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort
235+
)
236+
229237
def createLogReadResult(highWatermark: Long,
230238
leaderLogStartOffset: Long,
231239
leaderLogEndOffset: Long,
@@ -252,6 +260,11 @@ object ReplicaManager {
252260
lastStableOffset = None,
253261
exception = Some(e))
254262
}
263+
264+
private[server] def isListOffsetsTimestampUnsupported(timestamp: JLong, version: Short): Boolean = {
265+
timestamp < 0 &&
266+
(!timestampMinSupportedVersion.contains(timestamp) || version < timestampMinSupportedVersion(timestamp))
267+
}
255268
}
256269

257270
class ReplicaManager(val config: KafkaConfig,
@@ -1462,13 +1475,6 @@ class ReplicaManager(val config: KafkaConfig,
14621475
version: Short,
14631476
buildErrorResponse: (Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse,
14641477
responseCallback: List[ListOffsetsTopicResponse] => Unit): Unit = {
1465-
val timestampMinSupportedVersion = immutable.Map[Long, Short](
1466-
ListOffsetsRequest.EARLIEST_TIMESTAMP -> 1.toShort,
1467-
ListOffsetsRequest.LATEST_TIMESTAMP -> 1.toShort,
1468-
ListOffsetsRequest.MAX_TIMESTAMP -> 7.toShort,
1469-
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP -> 8.toShort,
1470-
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort
1471-
)
14721478
val statusByPartition = mutable.Map[TopicPartition, ListOffsetsPartitionStatus]()
14731479
topics.foreach { topic =>
14741480
topic.partitions.asScala.foreach { partition =>
@@ -1477,9 +1483,8 @@ class ReplicaManager(val config: KafkaConfig,
14771483
debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " +
14781484
s"failed because the partition is duplicated in the request.")
14791485
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.INVALID_REQUEST, partition)))
1480-
} else if (partition.timestamp() < 0 &&
1481-
(!timestampMinSupportedVersion.contains(partition.timestamp()) || version < timestampMinSupportedVersion(partition.timestamp()))) {
1482-
buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition)
1486+
} else if (isListOffsetsTimestampUnsupported(partition.timestamp(), version)) {
1487+
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition)))
14831488
} else {
14841489
try {
14851490
val fetchOnlyFromLeader = replicaId != ListOffsetsRequest.DEBUGGING_REPLICA_ID

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

+25
Original file line numberDiff line numberDiff line change
@@ -10165,6 +10165,31 @@ class KafkaApisTest extends Logging {
1016510165
.setPartitionIndex(tp.partition)
1016610166
.setTimestamp(timestamp)).asJava)).asJava
1016710167

10168+
when(replicaManager.fetchOffset(
10169+
ArgumentMatchers.any[Seq[ListOffsetsTopic]](),
10170+
ArgumentMatchers.eq(Set.empty[TopicPartition]),
10171+
ArgumentMatchers.eq(IsolationLevel.READ_UNCOMMITTED),
10172+
ArgumentMatchers.eq(ListOffsetsRequest.CONSUMER_REPLICA_ID),
10173+
ArgumentMatchers.eq[String](clientId),
10174+
ArgumentMatchers.anyInt(), // correlationId
10175+
ArgumentMatchers.anyShort(), // version
10176+
ArgumentMatchers.any[(Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse](),
10177+
ArgumentMatchers.any[List[ListOffsetsTopicResponse] => Unit]()
10178+
)).thenAnswer(ans => {
10179+
val version = ans.getArgument[Short](6)
10180+
val callback = ans.getArgument[List[ListOffsetsTopicResponse] => Unit](8)
10181+
val errorCode = if (ReplicaManager.isListOffsetsTimestampUnsupported(timestamp, version))
10182+
Errors.UNSUPPORTED_VERSION.code()
10183+
else
10184+
Errors.INVALID_REQUEST.code()
10185+
val partitionResponse = new ListOffsetsPartitionResponse()
10186+
.setErrorCode(errorCode)
10187+
.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
10188+
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
10189+
.setPartitionIndex(tp.partition())
10190+
callback(List(new ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
10191+
})
10192+
1016810193
val data = new ListOffsetsRequestData().setTopics(targetTimes).setReplicaId(ListOffsetsRequest.CONSUMER_REPLICA_ID)
1016910194
val listOffsetRequest = ListOffsetsRequest.parse(MessageUtil.toByteBuffer(data, version), version)
1017010195
val request = buildRequest(listOffsetRequest)

0 commit comments

Comments
 (0)