Skip to content

KAFKA-15859: Make RemoteListOffsets call an async operation #16602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.AsyncOffsetReadFutureHolder;
import kafka.log.UnifiedLog;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.DelayedRemoteListOffsets;
import kafka.server.StopPartition;
import kafka.server.TopicPartitionOperationKey;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
Expand Down Expand Up @@ -132,11 +136,13 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import scala.Option;
import scala.collection.JavaConverters;
import scala.util.Either;

import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
Expand Down Expand Up @@ -200,6 +206,7 @@ public class RemoteLogManager implements Closeable {

private volatile boolean remoteLogManagerConfigured = false;
private final Timer remoteReadTimer;
private DelayedOperationPurgatory<DelayedRemoteListOffsets> delayedRemoteListOffsetsPurgatory;

/**
* Creates RemoteLogManager instance with the given arguments.
Expand Down Expand Up @@ -263,6 +270,10 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
);
}

public void setDelayedOperationPurgatory(DelayedOperationPurgatory<DelayedRemoteListOffsets> delayedRemoteListOffsetsPurgatory) {
this.delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatory;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delayedRemoteListOffsetsPurgatory is written and read by different threads. Does it need to be volatile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other purgatories are also accessed by multiple threads but they don't have the volatile. So, followed the same approach. Let me know whether it is required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are talking about the purgatories in ReplicaManager? They are set during the creation of ReplicaManager. Here, delayedRemoteListOffsetsPurgatory is not set during the creation of RemoteLogManager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the volatile to the delayedRemoteListOffsetsPurgatory in RemoteLogManager. My understanding was that we instantiate the dataPlaneRequestProcessor after calling the ReplicaManager#startup, so there won't be an issue. It is good to be on the safer side.

}

public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
indexCache.resizeCacheSize(remoteLogIndexFileCacheSize);
}
Expand Down Expand Up @@ -620,6 +631,23 @@ private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Optional.empty() : Optional.of(leaderEpoch);
}

public AsyncOffsetReadFutureHolder<Either<Exception, Option<FileRecords.TimestampAndOffset>>> asyncOffsetRead(
TopicPartition topicPartition,
Long timestamp,
Long startingOffset,
LeaderEpochFileCache leaderEpochCache,
Supplier<Option<FileRecords.TimestampAndOffset>> searchLocalLog) {
CompletableFuture<Either<Exception, Option<FileRecords.TimestampAndOffset>>> taskFuture = new CompletableFuture<>();
Future<Void> jobFuture = remoteStorageReaderThreadPool.submit(
new RemoteLogOffsetReader(this, topicPartition, timestamp, startingOffset, leaderEpochCache, searchLocalLog, result -> {
TopicPartitionOperationKey key = new TopicPartitionOperationKey(topicPartition.topic(), topicPartition.partition());
taskFuture.complete(result);
delayedRemoteListOffsetsPurgatory.checkAndComplete(key);
})
);
return new AsyncOffsetReadFutureHolder<>(jobFuture, taskFuture);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon me, why we need two futures here? Is CompletableFuture.supplyAsync unsuitable to this case? for example:

        CompletableFuture<Optional<FileRecords.TimestampAndOffset>> taskFuture = CompletableFuture.supplyAsync(() -> {
            try {
                // If it is not found in remote storage, then search in the local storage starting with local log start offset.
                Optional<FileRecords.TimestampAndOffset> rval = findOffsetByTimestamp(topicPartition, timestamp, startingOffset, leaderEpochCache);
                if (rval.isPresent()) return rval;
                return OptionConverters.toJava(searchLocalLog.get());
            } catch (Exception e) {
                // NOTE: All the exceptions from the secondary storage are catched instead of only the KafkaException.
                LOGGER.error("Error occurred while reading the remote log offset for {}", topicPartition, e);
                throw new RuntimeException(e);
            }
        }, remoteStorageReaderThreadPool);

Copy link
Contributor Author

@kamalcph kamalcph Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!

The reason for maintaining 2 futures: jobFuture and taskFuture. They are required to trigger the delayed operation completion (delayedRemoteListOffsetsPurgatory#checkAndComplete(key)) in the same remote-log-reader thread after the RemoteLogManager#findOffsetByTimestamp ++ searchLocalLog operation completes.

In DelayedRemoteListOffsets purgatory, we return the result when all the partitions results are received. Then, the delayedOperation gets completed.

We have ActionQueue to complete the pending actions but the LIST_OFFSETS request can be served by any replica (least loaded node). If the node serving the request doesn't have leadership for any of the partitions, then the result might not be complete.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, but "trigger the delayed operation completion in the same thread" seems to work by CompletableFuture.supplyAsync, right?

        CompletableFuture<Either<Exception, Option<FileRecords.TimestampAndOffset>>> taskFuture = CompletableFuture.supplyAsync(() -> {
            Either<Exception, Option<FileRecords.TimestampAndOffset>> result;
            try {
                // If it is not found in remote storage, then search in the local storage starting with local log start offset.
                Option<FileRecords.TimestampAndOffset> timestampAndOffsetOpt =
                        OptionConverters.toScala(findOffsetByTimestamp(topicPartition, timestamp, startingOffset, leaderEpochCache))
                                .orElse(searchLocalLog::get);
                result = Right.apply(timestampAndOffsetOpt);
            } catch (Exception e) {
                // NOTE: All the exceptions from the secondary storage are catched instead of only the KafkaException.
                LOGGER.error("Error occurred while reading the remote log offset for {}", topicPartition, e);
                result = Left.apply(e);
            } finally {
                TopicPartitionOperationKey key = new TopicPartitionOperationKey(topicPartition.topic(), topicPartition.partition());
                delayedRemoteListOffsetsPurgatory.checkAndComplete(key);
            }
            return result;
        }, remoteStorageReaderThreadPool);

I notice there is another similar pattern DelayedRemoteFetch, so it is OK to keep current design for consistency. However, it would be great to let me known (for my own education) what the side-effect happens if using CompletableFuture.supplyAsync :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work. When delayedRemoteListOffsetsPurgatory.checkAndComplete(key) is invoked, it calls DelayedRemoteListOffset#tryComplete which checks whether the taskFuture gets completed or not. It is not completed so the delayedOperation won't complete.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not completed so the delayedOperation won't complete.

oh, you are totally right, thanks!!!

}

/**
* Search the message offset in the remote storage based on timestamp and offset.
* <p>
Expand Down
79 changes: 79 additions & 0 deletions core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log.remote;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Supplier;

import scala.Option;
import scala.compat.java8.OptionConverters;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;

public class RemoteLogOffsetReader implements Callable<Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogOffsetReader.class);
private final RemoteLogManager rlm;
private final TopicPartition tp;
private final long timestamp;
private final long startingOffset;
private final LeaderEpochFileCache leaderEpochCache;
private final Supplier<Option<FileRecords.TimestampAndOffset>> searchInLocalLog;
private final Consumer<Either<Exception, Option<FileRecords.TimestampAndOffset>>> callback;

public RemoteLogOffsetReader(RemoteLogManager rlm,
TopicPartition tp,
long timestamp,
long startingOffset,
LeaderEpochFileCache leaderEpochCache,
Supplier<Option<FileRecords.TimestampAndOffset>> searchInLocalLog,
Consumer<Either<Exception, Option<FileRecords.TimestampAndOffset>>> callback) {
this.rlm = rlm;
this.tp = tp;
this.timestamp = timestamp;
this.startingOffset = startingOffset;
this.leaderEpochCache = leaderEpochCache;
this.searchInLocalLog = searchInLocalLog;
this.callback = callback;
}

@Override
public Void call() throws Exception {
Either<Exception, Option<FileRecords.TimestampAndOffset>> result;
try {
// If it is not found in remote storage, then search in the local storage starting with local log start offset.
Option<FileRecords.TimestampAndOffset> timestampAndOffsetOpt =
OptionConverters.toScala(rlm.findOffsetByTimestamp(tp, timestamp, startingOffset, leaderEpochCache))
.orElse(searchInLocalLog::get);
result = Right.apply(timestampAndOffsetOpt);
} catch (Exception e) {
// NOTE: All the exceptions from the secondary storage are catched instead of only the KafkaException.
LOGGER.error("Error occurred while reading the remote log offset for {}", tp, e);
result = Left.apply(e);
}
callback.accept(result);
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import kafka.server.DelayedOperationPurgatory;
import kafka.server.DelayedProduce;
import kafka.server.DelayedRemoteFetch;
import kafka.server.DelayedRemoteListOffsets;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory.QuotaManagers;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class ReplicaManagerBuilder {
private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> delayedDeleteRecordsPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedElectLeader>> delayedElectLeaderPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> delayedRemoteFetchPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteListOffsets>> delayedRemoteListOffsetsPurgatory = Optional.empty();
private Optional<String> threadNamePrefix = Optional.empty();
private Long brokerEpoch = -1L;
private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager = Optional.empty();
Expand Down Expand Up @@ -210,6 +212,7 @@ public ReplicaManager build() {
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
OptionConverters.toScala(delayedElectLeaderPurgatory),
OptionConverters.toScala(delayedRemoteFetchPurgatory),
OptionConverters.toScala(delayedRemoteListOffsetsPurgatory),
OptionConverters.toScala(threadNamePrefix),
() -> brokerEpoch,
OptionConverters.toScala(addPartitionsToTxnManager),
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/kafka/server/share/ShareFetchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, Replic
// Isolation level is only required when reading from the latest offset hence use Option.empty() for now.
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(),
Optional.empty(), true);
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset;
}
}
16 changes: 10 additions & 6 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,7 @@ class Partition(val topicPartition: TopicPartition,
isolationLevel: Option[IsolationLevel],
currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean,
remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = inReadLock(leaderIsrUpdateLock) {
remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = inReadLock(leaderIsrUpdateLock) {
// decide whether to only fetch from leader
val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader)

Expand All @@ -1601,21 +1601,25 @@ class Partition(val topicPartition: TopicPartition,
s"high watermark (${localLog.highWatermark}) is lagging behind the " +
s"start offset from the beginning of this epoch ($epochStart)."))

def getOffsetByTimestamp: Option[TimestampAndOffset] = {
logManager.getLog(topicPartition).flatMap(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager))
def getOffsetByTimestamp: OffsetResultHolder = {
logManager.getLog(topicPartition)
.map(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager))
.getOrElse(OffsetResultHolder(timestampAndOffsetOpt = None))
}

// If we're in the lagging HW state after a leader election, throw OffsetNotAvailable for "latest" offset
// or for a timestamp lookup that is beyond the last fetchable offset.
timestamp match {
case ListOffsetsRequest.LATEST_TIMESTAMP =>
maybeOffsetsError.map(e => throw e)
.orElse(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch))))
.getOrElse(OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch)))))
case ListOffsetsRequest.EARLIEST_TIMESTAMP | ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP =>
getOffsetByTimestamp
case _ =>
getOffsetByTimestamp.filter(timestampAndOffset => timestampAndOffset.offset < lastFetchableOffset)
.orElse(maybeOffsetsError.map(e => throw e))
val offsetResultHolder = getOffsetByTimestamp
offsetResultHolder.maybeOffsetsError = maybeOffsetsError
offsetResultHolder.lastFetchableOffset = Some(lastFetchableOffset)
offsetResultHolder
}
}

Expand Down
38 changes: 38 additions & 0 deletions core/src/main/scala/kafka/log/OffsetResultHolder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log

import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset

import java.util.concurrent.{CompletableFuture, Future}

case class OffsetResultHolder(timestampAndOffsetOpt: Option[TimestampAndOffset],
futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None) {

var maybeOffsetsError: Option[ApiException] = None
var lastFetchableOffset: Option[Long] = None
}

/**
* A remote log offset read task future holder. It contains two futures:
* 1. JobFuture - Use this future to cancel the running job.
* 2. TaskFuture - Use this future to get the result of the job/computation.
*/
case class AsyncOffsetReadFutureHolder[T](jobFuture: Future[Void], taskFuture: CompletableFuture[T]) {

}
37 changes: 21 additions & 16 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* None if no such message is found.
*/
@nowarn("cat=deprecation")
def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = {
def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change the description of the return value accordingly?

Copy link
Contributor Author

@kamalcph kamalcph Oct 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this in #17487.

maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") {
debug(s"Searching offset for timestamp $targetTimestamp")

Expand All @@ -1285,7 +1285,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Optional.of[Integer](earliestEpochEntry.get().epoch)
} else Optional.empty[Integer]()

Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt)))
} else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
val curLocalLogStartOffset = localLogStartOffset()

Expand All @@ -1297,15 +1297,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Optional.empty()
}

Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult)))
} else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
val epoch = leaderEpochCache match {
case Some(cache) =>
val latestEpoch = cache.latestEpoch()
if (latestEpoch.isPresent) Optional.of[Integer](latestEpoch.getAsInt) else Optional.empty[Integer]()
case None => Optional.empty[Integer]()
}
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch)))
} else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
if (remoteLogEnabled()) {
val curHighestRemoteOffset = highestOffsetInRemoteStorage()
Expand All @@ -1324,9 +1324,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Optional.empty()
}

Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult)))
} else {
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1))))
}
} else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
Expand All @@ -1336,34 +1336,39 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar
// lookup the position of batch to avoid extra I/O
val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
latestTimestampSegment.log.batchesFrom(position.position).asScala
val timestampAndOffsetOpt = latestTimestampSegment.log.batchesFrom(position.position).asScala
.find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
.flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _,
Optional.of[Integer](batch.partitionLeaderEpoch()).filter(_ >= 0))))
OffsetResultHolder(timestampAndOffsetOpt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon me, why MAX_TIMESTAMP does not consider the records in the remote storage?

Copy link
Contributor Author

@kamalcph kamalcph Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went over KIP-734, the purpose of MAX_TIMESTAMP is to get the offset of the record with highest timestamp in the partition:

Used to retrieve the offset with the largest timestamp of a partition as message timestamps can be specified client side this may not match the log end offset returned by LatestSpec

With remote storage enabled, all the passive segments might be uploaded to remote and removed from local-log. The local-log might contain only one empty active segment. We have to handle the MAX_TIMESTAMP case for remote storage. Thanks for catching this! Filed KAFKA-17552 to track this issue separately.

Copy link
Contributor Author

@kamalcph kamalcph Sep 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went over #15621 to see how do we handle the MAX_TIMESTAMP case for normal topics.

Now, we maintain the shallowOffsetOfMaxTimestampSoFar in LogSegment instead of the real-max-timestamp-offset. While uploading the LogSegment to remote, we create the RemoteLogSegmentMetadata event which holds the metadata information about the segment.

Even, if we pass the shallowOffsetOfMaxTimestampSoFar in the RemoteLogSegmentMetadata event, we have to download the remote-log-segment to find the real offsetOfMaxTimestampSoFar. This will increase the load on remote storage assuming that the original intention of the KIP-734 is to Confirming topic/partition "liveness" which means the Admin client will repeatedly invoke the list-offsets API for MAX_TIMESTAMP.

The predominant case to find the "Confirming topic/partition livness" is to query the local-log which will work as expected. For MAX_TIMESTAMP, when enabled with remote storage, the results can go wrong when:

  1. Assuming the timestamps are monotonic and all the passive segments are uploaded to remote and deleted from local. The only active segment in local disk is empty (post the log roll), then the max-timestamp will returned as "-1".
  2. Assuming the timestamps are non-monotonic, then the (timestamp, offset) returned by the API may not be "TRUE" (max-timestamp, max-timestamp-offset) as it considers the segments only in the local-log.

Should we handle/drop the MAX_TIMESTAMP case for topics enabled with remote storage? This can cause high load:

  1. On the RemoteLogMetadataManager, as we have to scan all the uploaded segment events to find the max-timestamp. Then, compare it with the max-timestamp computed from the local-log segments. And, the search should always proceed from remote to local storage.
  2. On the RemoteStorageManager, when there exists the MAX_TIMESTAMP record in the remote storage, then we have to download that segment (few bytes) repeatedly to serve the query.

In KIP-734, can we make a addendum to say that MAX_TIMESTAMP is not supported for topics enabled with remote storage? Note that when the KIP was proposed, the intention was not to read from the disk:

Snippet from KIP-734:

LogSegments track the highest timestamp and associated offset so we don't have to go to disk to fetch this

cc @satishd @showuon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even, if we pass the shallowOffsetOfMaxTimestampSoFar in the RemoteLogSegmentMetadata event, we have to download the remote-log-segment to find the real offsetOfMaxTimestampSoFar.

IMHO, we don't need to pass shallowOffsetOfMaxTimestampSoFar to RemoteLogSegmentMetadata as RemoteLogSegmentMetadata has maxTimestampMs already, so the basic behavior is listed below (same as you described I'd say)

  1. find the max timestamp from local segments
  2. query findOffsetByTimestamp by max timestamp from step_1
  3. compare the timestamp of record from remote to local to pick up correct offset

This will increase the load on remote storage assuming that the original intention of the KIP-734 is to Confirming topic/partition "liveness" which means the Admin client will repeatedly invoke the list-offsets API for MAX_TIMESTAMP.

The impl of KIP-734 was wrong because we don't loop all records in all path (because of cost issue). Hence, we rename the offsetOfMaxTimestampSoFar to shallowOffsetOfMaxTimestampSoFar based on true story.

Should we handle/drop the MAX_TIMESTAMP case for topics enabled with remote storage? This can cause high load:

that is a acceptable approach. We can REJECT the MAX_TIMESTAMP request for now as it is rare operation. Or we can make the call an async op too as it needs to iterate all metadata of remote segments.

} else {
// We need to search the first segment whose largest timestamp is >= the target timestamp if there is one.
if (remoteLogEnabled()) {
if (remoteLogEnabled() && !isEmpty) {
if (remoteLogManager.isEmpty) {
throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled.")
}
if (recordVersion.value < RecordVersion.V2.value) {
throw new KafkaException("Tiered storage is supported only with versions supporting leader epochs, that means RecordVersion must be >= 2.")
}

val remoteOffset = remoteLogManager.get.findOffsetByTimestamp(topicPartition, targetTimestamp, logStartOffset, leaderEpochCache.get)
if (remoteOffset.isPresent) {
remoteOffset.asScala
} else {
// If it is not found in remote log storage, search in the local log storage from local log start offset.
searchOffsetInLocalLog(targetTimestamp, localLogStartOffset())
}
val asyncOffsetReadFutureHolder = remoteLogManager.get.asyncOffsetRead(topicPartition, targetTimestamp,
logStartOffset, leaderEpochCache.get, () => searchOffsetInLocalLog(targetTimestamp, localLogStartOffset()))
OffsetResultHolder(None, Some(asyncOffsetReadFutureHolder))
} else {
searchOffsetInLocalLog(targetTimestamp, logStartOffset)
OffsetResultHolder(searchOffsetInLocalLog(targetTimestamp, logStartOffset))
}
}
}
}

/**
* Checks if the log is empty.
* @return Returns True when the log is empty. Otherwise, false.
*/
private[log] def isEmpty = {
logStartOffset == logEndOffset
}

private def searchOffsetInLocalLog(targetTimestamp: Long, startOffset: Long): Option[TimestampAndOffset] = {
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
// constant time access while being safe to use with concurrent collections unlike `toArray`.
Expand Down
Loading
Loading