diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala new file mode 100644 index 0000000000000..d594f188f4644 --- /dev/null +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.io.{File, RandomAccessFile} +import java.nio.{ByteBuffer, MappedByteBuffer} +import java.nio.channels.FileChannel +import java.util.concurrent.locks.{Lock, ReentrantLock} + +import kafka.log.IndexSearchType.IndexSearchEntity +import kafka.utils.CoreUtils.inLock +import kafka.utils.{CoreUtils, Logging, Os} +import org.apache.kafka.common.utils.Utils +import sun.nio.ch.DirectBuffer + +import scala.math.ceil + +/** + * The abstract index class which holds entry format agnostic methods. + * + * @param _file The index file + * @param baseOffset the base offset of the segment that this index is corresponding to. + * @param maxIndexSize The maximum index size in bytes. + */ +abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) + extends Logging { + + protected def entrySize: Int + + protected val lock = new ReentrantLock + + @volatile + protected var mmap: MappedByteBuffer = { + val newlyCreated = _file.createNewFile() + val raf = new RandomAccessFile(_file, "rw") + try { + /* pre-allocate the file if necessary */ + if(newlyCreated) { + if(maxIndexSize < entrySize) + throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) + raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize)) + } + + /* memory-map the file */ + val len = raf.length() + val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) + + /* set the position in the index for the next entry */ + if(newlyCreated) + idx.position(0) + else + // if this is a pre-existing index, assume it is valid and set position to last entry + idx.position(roundDownToExactMultiple(idx.limit, entrySize)) + idx + } finally { + CoreUtils.swallow(raf.close()) + } + } + + /** + * The maximum number of entries this index can hold + */ + @volatile + private[this] var _maxEntries = mmap.limit / entrySize + + /** The number of entries in this index */ + @volatile + protected var _entries = mmap.position / entrySize + + /** + * True iff there are no more slots available in this index + */ + def isFull: Boolean = _entries >= _maxEntries + + def maxEntries: Int = _maxEntries + + def entries: Int = _entries + + /** + * The index file + */ + def file: File = _file + + /** + * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in + * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at + * loading segments from disk or truncating back to an old segment where a new log segment became active; + * we want to reset the index size to maximum index size to avoid rolling new segment. + */ + def resize(newSize: Int) { + inLock(lock) { + val raf = new RandomAccessFile(_file, "rw") + val roundedNewSize = roundDownToExactMultiple(newSize, entrySize) + val position = mmap.position + + /* Windows won't let us modify the file length while the file is mmapped :-( */ + if(Os.isWindows) + forceUnmap(mmap) + try { + raf.setLength(roundedNewSize) + mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) + _maxEntries = mmap.limit / entrySize + mmap.position(position) + } finally { + CoreUtils.swallow(raf.close()) + } + } + } + + /** + * Rename the file that backs this offset index + * + * @throws IOException if rename fails + */ + def renameTo(f: File) { + try Utils.atomicMoveWithFallback(_file.toPath, f.toPath) + finally _file = f + } + + /** + * Flush the data in the index to disk + */ + def flush() { + inLock(lock) { + mmap.force() + } + } + + /** + * Delete this index file + */ + def delete(): Boolean = { + info(s"Deleting index ${_file.getAbsolutePath}") + if(Os.isWindows) + CoreUtils.swallow(forceUnmap(mmap)) + _file.delete() + } + + /** + * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from + * the file. + */ + def trimToValidSize() { + inLock(lock) { + resize(entrySize * _entries) + } + } + + /** + * The number of bytes actually used by this index + */ + def sizeInBytes = entrySize * _entries + + /** Close the index */ + def close() { + trimToValidSize() + } + + /** + * Do a basic sanity check on this index to detect obvious problems + * + * @throws IllegalArgumentException if any problems are found + */ + def sanityCheck(): Unit + + /** + * Remove all the entries from the index. + */ + def truncate(): Unit + + /** + * Remove all entries from the index which have an offset greater than or equal to the given offset. + * Truncating to an offset larger than the largest in the index has no effect. + */ + def truncateTo(offset: Long): Unit + + /** + * Forcefully free the buffer's mmap. We do this only on windows. + */ + protected def forceUnmap(m: MappedByteBuffer) { + try { + m match { + case buffer: DirectBuffer => + val bufferCleaner = buffer.cleaner() + /* cleaner can be null if the mapped region has size 0 */ + if (bufferCleaner != null) + bufferCleaner.clean() + case _ => + } + } catch { + case t: Throwable => error("Error when freeing index buffer", t) + } + } + + /** + * Execute the given function in a lock only if we are running on windows. We do this + * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it + * and this requires synchronizing reads. + */ + protected def maybeLock[T](lock: Lock)(fun: => T): T = { + if(Os.isWindows) + lock.lock() + try { + fun + } finally { + if(Os.isWindows) + lock.unlock() + } + } + + /** + * To parse an entry in the index. + * + * @param buffer the buffer of this memory mapped index. + * @param n the slot + * @return the index entry stored in the given slot. + */ + protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry + + /** + * Find the slot in which the largest entry less than or equal to the given target key or value is stored. + * The comparison is made using the `IndexEntry.compareTo()` method. + * + * @param idx The index buffer + * @param target The index key to look for + * @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty + */ + protected def indexSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = { + // check if the index is empty + if(_entries == 0) + return -1 + + // check if the target offset is smaller than the least offset + if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) + return -1 + + // binary search for the entry + var lo = 0 + var hi = _entries - 1 + while(lo < hi) { + val mid = ceil(hi/2.0 + lo/2.0).toInt + val found = parseEntry(idx, mid) + val compareResult = compareIndexEntry(found, target, searchEntity) + if(compareResult > 0) + hi = mid - 1 + else if(compareResult < 0) + lo = mid + else + return mid + } + lo + } + + private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchEntity): Int = { + searchEntity match { + case IndexSearchType.KEY => indexEntry.indexKey.compareTo(target) + case IndexSearchType.VALUE => indexEntry.indexValue.compareTo(target) + } + } + + /** + * Round a number to the greatest exact multiple of the given factor less than the given number. + * E.g. roundDownToExactMultiple(67, 8) == 64 + */ + private def roundDownToExactMultiple(number: Int, factor: Int) = factor * (number / factor) + +} + +object IndexSearchType extends Enumeration { + type IndexSearchEntity = Value + val KEY, VALUE = Value +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 8e92f954fed43..57630429e341d 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -117,7 +117,13 @@ class FileMessageSet private[kafka](@volatile var file: File, new FileMessageSet(file, channel, start = this.start + position, - end = math.min(this.start + position + size, sizeInBytes())) + end = { + // Handle the integer overflow + if (this.start + position + size < 0) + sizeInBytes() + else + math.min(this.start + position + size, sizeInBytes()) + }) } /** @@ -126,7 +132,7 @@ class FileMessageSet private[kafka](@volatile var file: File, * @param targetOffset The offset to search for. * @param startingPosition The starting position in the file to begin searching from. */ - def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = { + def searchForOffset(targetOffset: Long, startingPosition: Int): OffsetPosition = { var position = startingPosition val buffer = ByteBuffer.allocate(MessageSet.LogOverhead) val size = sizeInBytes() @@ -135,7 +141,7 @@ class FileMessageSet private[kafka](@volatile var file: File, channel.read(buffer, position) if(buffer.hasRemaining) throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s" - .format(targetOffset, startingPosition, file.getAbsolutePath)) + .format(targetOffset, startingPosition, file.getAbsolutePath)) buffer.rewind() val offset = buffer.getLong() if(offset >= targetOffset) @@ -148,6 +154,72 @@ class FileMessageSet private[kafka](@volatile var file: File, null } + /** + * Search forward for the message whose timestamp is greater than or equals to the target timestamp. + * + * The search will stop immediately when it sees a message in format version before 0.10.0. This is to avoid + * scanning the entire log when all the messages are still in old format. + * + * @param targetTimestamp The timestamp to search for. + * @param startingPosition The starting position to search. + * @return None, if no message exists at or after the starting position. + * Some(the_next_offset_to_read) otherwise. + */ + def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): Option[Long] = { + var maxTimestampChecked = Message.NoTimestamp + var lastOffsetChecked = -1L + val messagesToSearch = read(startingPosition, sizeInBytes) + for (messageAndOffset <- messagesToSearch) { + val message = messageAndOffset.message + lastOffsetChecked = messageAndOffset.offset + // Stop searching once we see message format before 0.10.0. + // This equivalent as treating message without timestamp has the largest timestamp. + // We do this to avoid scanning the entire log if no message has a timestamp. + if (message.magic == Message.MagicValue_V0) + return Some(messageAndOffset.offset) + else if (message.timestamp >= targetTimestamp) { + // We found a message + message.compressionCodec match { + case NoCompressionCodec => + return Some(messageAndOffset.offset) + case _ => + // Iterate over the inner messages to get the exact offset. + for (innerMessage <- ByteBufferMessageSet.deepIterator(messageAndOffset)) { + val timestamp = innerMessage.message.timestamp + if (timestamp >= targetTimestamp) + return Some(innerMessage.offset) + } + throw new IllegalStateException(s"The message set (max timestamp = ${message.timestamp}, max offset = ${messageAndOffset.offset}" + + s" should contain target timestamp $targetTimestamp but it does not.") + } + } else + maxTimestampChecked = math.max(maxTimestampChecked, message.timestamp) + } + + if (lastOffsetChecked >= 0) + Some(lastOffsetChecked + 1) + else + None + } + + /** + * Return the largest timestamp of the messages after a given position in this file message set. + * @param startingPosition The starting position. + * @return The largest timestamp of the messages after the given position. + */ + def largestTimestampAfter(startingPosition: Int): TimestampOffset = { + var maxTimestamp = Message.NoTimestamp + var offsetOfMaxTimestamp = -1L + val messagesToSearch = read(startingPosition, Int.MaxValue) + for (messageAndOffset <- messagesToSearch) { + if (messageAndOffset.message.timestamp > maxTimestamp) { + maxTimestamp = messageAndOffset.message.timestamp + offsetOfMaxTimestamp = messageAndOffset.offset + } + } + TimestampOffset(maxTimestamp, offsetOfMaxTimestamp) + } + /** * Write some of this set to the given channel. * @param destChannel The channel to write to. diff --git a/core/src/main/scala/kafka/log/OffsetPosition.scala b/core/src/main/scala/kafka/log/IndexEntry.scala similarity index 57% rename from core/src/main/scala/kafka/log/OffsetPosition.scala rename to core/src/main/scala/kafka/log/IndexEntry.scala index 24b6dcf0bb789..2f5a6a79693e8 100644 --- a/core/src/main/scala/kafka/log/OffsetPosition.scala +++ b/core/src/main/scala/kafka/log/IndexEntry.scala @@ -17,9 +17,30 @@ package kafka.log +sealed trait IndexEntry { + // We always use Long for both key and value to avoid boxing. + def indexKey: Long + def indexValue: Long +} + /** * The mapping between a logical log offset and the physical position * in some log file of the beginning of the message set entry with the * given offset. */ -case class OffsetPosition(offset: Long, position: Int) +case class OffsetPosition(offset: Long, position: Int) extends IndexEntry { + override def indexKey = offset + override def indexValue = position.toLong +} + + +/** + * The mapping between a timestamp to a message offset. The entry means that any message whose timestamp is greater + * than that timestamp must be at or after that offset. + * @param timestamp The max timestamp before the given offset. + * @param offset The message offset. + */ +case class TimestampOffset(timestamp: Long, offset: Long) extends IndexEntry { + override def indexKey = timestamp + override def indexValue = offset +} diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 1a7719a67650c..b4aa470fa28bf 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -17,6 +17,7 @@ package kafka.log +import kafka.api.OffsetRequest import kafka.utils._ import kafka.message._ import kafka.common._ @@ -30,19 +31,22 @@ import java.text.NumberFormat import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException} import org.apache.kafka.common.record.TimestampType -import scala.collection.JavaConversions +import scala.collection.{Seq, JavaConversions} import com.yammer.metrics.core.Gauge import org.apache.kafka.common.utils.Utils object LogAppendInfo { - val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) + val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, -1L, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) } /** * Struct to hold various quantities we compute about each message set before appending to the log + * * @param firstOffset The first offset in the message set * @param lastOffset The last offset in the message set - * @param timestamp The log append time (if used) of the message set, otherwise Message.NoTimestamp + * @param maxTimestamp The maximum timestamp of the message set. + * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. + * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp * @param sourceCodec The source codec used in the message set (send by the producer) * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any) * @param shallowCount The number of shallow messages @@ -51,7 +55,9 @@ object LogAppendInfo { */ case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, - var timestamp: Long, + var maxTimestamp: Long, + var offsetOfMaxTimestamp: Long, + var logAppendTime: Long, sourceCodec: CompressionCodec, targetCodec: CompressionCodec, shallowCount: Int, @@ -95,7 +101,7 @@ class Log(val dir: File, else 0 } - + val t = time.milliseconds /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] loadSegments() @@ -105,7 +111,8 @@ class Log(val dir: File, val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(dir) - info("Completed load of log %s with log end offset %d".format(name, logEndOffset)) + info("Completed load of log %s with %d log segments and log end offset %d in %d ms" + .format(name, segments.size(), logEndOffset, time.milliseconds - t)) val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString) @@ -167,12 +174,17 @@ class Log(val dir: File, } } - // now do a second pass and load all the .log and .index files + // now do a second pass and load all the .log and all index files for(file <- dir.listFiles if file.isFile) { val filename = file.getName - if(filename.endsWith(IndexFileSuffix)) { + if(filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix)) { // if it is an index file, make sure it has a corresponding .log file - val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix)) + val logFile = + if (filename.endsWith(TimeIndexFileSuffix)) + new File(file.getAbsolutePath.replace(TimeIndexFileSuffix, LogFileSuffix)) + else + new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix)) + if(!logFile.exists) { warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath)) file.delete() @@ -181,6 +193,9 @@ class Log(val dir: File, // if its a log file, load the corresponding log segment val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong val indexFile = Log.indexFilename(dir, start) + val timeIndexFile = Log.timeIndexFilename(dir, start) + + val indexFileExists = indexFile.exists() val segment = new LogSegment(dir = dir, startOffset = start, indexIntervalBytes = config.indexInterval, @@ -189,20 +204,23 @@ class Log(val dir: File, time = time, fileAlreadyExists = true) - if(indexFile.exists()) { + if (indexFileExists) { try { - segment.index.sanityCheck() + segment.index.sanityCheck() + segment.timeIndex.sanityCheck() } catch { case e: java.lang.IllegalArgumentException => - warn("Found a corrupted index file, %s, deleting and rebuilding index. Error Message: %s".format(indexFile.getAbsolutePath, e.getMessage)) + warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " + + s"${indexFile.getAbsolutePath} and rebuilding index...") indexFile.delete() + timeIndexFile.delete() segment.recover(config.maxMessageSize) } - } - else { + } else { error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) segment.recover(config.maxMessageSize) } + segments.put(start, segment) } } @@ -216,8 +234,11 @@ class Log(val dir: File, val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix) val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) + val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix) + val timeIndex = new TimeIndex(timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) val swapSegment = new LogSegment(new FileMessageSet(file = swapFile), index = index, + timeIndex = timeIndex, baseOffset = startOffset, indexIntervalBytes = config.indexInterval, rollJitterMs = config.randomSegmentJitter, @@ -243,6 +264,7 @@ class Log(val dir: File, recoverLog() // reset the index size of the currently active log segment to allow more entries activeSegment.index.resize(config.maxIndexSize) + activeSegment.timeIndex.resize(config.maxIndexSize) } } @@ -298,8 +320,7 @@ class Log(val dir: File, def close() { debug("Closing log " + name) lock synchronized { - for(seg <- logSegments) - seg.close() + logSegments.foreach(_.close()) } } @@ -311,9 +332,7 @@ class Log(val dir: File, * * @param messages The message set to append * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given - * * @throws KafkaStorageException If the append fails due to an I/O error. - * * @return Information about the appended messages including the first and last offset. */ def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = { @@ -335,7 +354,7 @@ class Log(val dir: File, val offset = new LongRef(nextOffsetMetadata.messageOffset) appendInfo.firstOffset = offset.value val now = time.milliseconds - val (validatedMessages, messageSizesMaybeChanged) = try { + val validateAndOffsetAssignResult = try { validMessages.validateMessagesAndAssignOffsets(offset, now, appendInfo.sourceCodec, @@ -347,14 +366,16 @@ class Log(val dir: File, } catch { case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) } - validMessages = validatedMessages + validMessages = validateAndOffsetAssignResult.validatedMessages + appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp + appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.offsetOfMaxTimestamp appendInfo.lastOffset = offset.value - 1 if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) - appendInfo.timestamp = now + appendInfo.logAppendTime = now // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message // format conversion) - if (messageSizesMaybeChanged) { + if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { for (messageAndOffset <- validMessages.shallowIterator) { if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) { // we record the original message set size instead of the trimmed size @@ -383,7 +404,8 @@ class Log(val dir: File, val segment = maybeRoll(validMessages.sizeInBytes) // now append to the log - segment.append(appendInfo.firstOffset, validMessages) + segment.append(firstOffset = appendInfo.firstOffset, largestTimestamp = appendInfo.maxTimestamp, + offsetOfLargestTimestamp = appendInfo.offsetOfMaxTimestamp, messages = validMessages) // increment the log end offset updateLogEndOffset(appendInfo.lastOffset + 1) @@ -424,6 +446,8 @@ class Log(val dir: File, var firstOffset, lastOffset = -1L var sourceCodec: CompressionCodec = NoCompressionCodec var monotonic = true + var maxTimestamp = Message.NoTimestamp + var offsetOfMaxTimestamp = -1L for(messageAndOffset <- messages.shallowIterator) { // update the first offset if on the first message if(firstOffset < 0) @@ -447,7 +471,10 @@ class Log(val dir: File, // check the validity of the message by checking CRC m.ensureValid() - + if (m.timestamp > maxTimestamp) { + maxTimestamp = m.timestamp + offsetOfMaxTimestamp = lastOffset + } shallowMessageCount += 1 validBytesCount += messageSize @@ -459,11 +486,12 @@ class Log(val dir: File, // Apply broker-side compression if any val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) - LogAppendInfo(firstOffset, lastOffset, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic) + LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic) } /** * Trim any invalid bytes from the end of this message set (if there are any) + * * @param messages The message set to trim * @param info The general information of the message set * @return A trimmed message set. This may be the same as what was passed in or it may not. @@ -543,6 +571,71 @@ class Log(val dir: File, FetchDataInfo(nextOffsetMetadata, MessageSet.Empty) } + /** + * Get an offset based on the given timestamp + * The offset returned is the offset of the first message whose timestamp is greater than or equals to the + * given timestamp. + * + * If no such message is found, the log end offset is returned. + * + * `NOTE:` OffsetRequest V0 does not use this method, the behavior of OffsetRequest V0 remains the same as before + * , i.e. it only gives back the timestamp based on the last modification time of the log segments. + * + * @param timestamp The given timestamp for offset fetching. + * @return The offset of the first message whose timestamp is greater than or equals to the given timestamp. + */ + def fetchOffsetsByTimestamp(timestamp: Long): Long = { + debug(s"Searching offset for timestamp $timestamp") + val segsArray = logSegments.toArray + if (timestamp == OffsetRequest.EarliestTime) + return segsArray(0).baseOffset + + // set the target timestamp to be Long.MaxValue if we need to find from the latest. + val targetTimestamp = timestamp match { + case OffsetRequest.LatestTime => Long.MaxValue + case _ => timestamp + } + + var foundOffset: Long = -1L + // We have this while loop here to make sure we are returning the valid offsets to our best knowledge. + // This while loop is to handle the case where the log is truncated during the timestamp search and we did not + // find any message. In this case, we need to retry the search. + do { + val targetSeg = { + // Get all the segments whose largest timestamp is smaller than target timestamp + val earlierSegs = segsArray.takeWhile(_.largestTimestamp < targetTimestamp) + // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one. + if (earlierSegs.length < segsArray.length) + segsArray(earlierSegs.length) + else + earlierSegs.last + } + + // First cache the current log end offset + val leo = logEndOffset + foundOffset = { + // Use the cached log end offsets if + // 1. user is asking for latest messages, or, + // 2. we are searching on the active segment and the target timestamp is greater than the largestTimestamp + // after we cached the log end offset. (We have to use the cached log end offsets because it is possible that + // some messages with a larger timestamp are appended after we check the largest timestamp. Using log end offset + // after the timestamp check might skip those messages.) + if (targetTimestamp == Long.MaxValue + || (targetTimestamp > targetSeg.largestTimestamp && targetSeg == activeSegment)) + leo + else + // The findOffsetByTimestamp() method may return None when the log is truncated during the timestamp search. + // In that case we simply set the foundOffset to -1 so that we will search the timestamp again in the + // while loop. + targetSeg.findOffsetByTimestamp(targetTimestamp) match { + case Some(offset) => offset + case None => -1L + } + } + } while (foundOffset < 0) + foundOffset + } + /** * Given a message offset, find its corresponding offset metadata in the log. * If the message offset is out of range, return unknown offset metadata @@ -559,6 +652,7 @@ class Log(val dir: File, /** * Delete any log segments matching the given predicate function, * starting with the oldest segment and moving forward until a segment doesn't match. + * * @param predicate A function that takes in a single log segment and returns true iff it is deletable * @return The number of segments deleted */ @@ -609,24 +703,22 @@ class Log(val dir: File, * logSegment will be rolled if one of the following conditions met *
    *
  1. The logSegment is full - *
  2. The maxTime has elapsed + *
  3. The maxTime has elapsed since the timestamp of first message in the segment (or since the create time if + * the first message does not have a timestamp) *
  4. The index is full *
* @return The currently active segment after (perhaps) rolling to a new segment */ private def maybeRoll(messagesSize: Int): LogSegment = { val segment = activeSegment + val reachedRollMs = segment.timeWaitedForRoll(time.milliseconds) > config.segmentMs - segment.rollJitterMs if (segment.size > config.segmentSize - messagesSize || - segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs || - segment.index.isFull) { - debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)." - .format(name, - segment.size, - config.segmentSize, - segment.index.entries, - segment.index.maxEntries, - time.milliseconds - segment.created, - config.segmentMs - segment.rollJitterMs)) + (segment.size > 0 && reachedRollMs) || + segment.index.isFull || segment.timeIndex.isFull) { + debug(s"Rolling new log segment in $name (log_size = ${segment.size}/${config.segmentSize}}, " + + s"index_size = ${segment.index.entries}/${segment.index.maxEntries}, " + + s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " + + s"inactive_time_ms = ${segment.timeWaitedForRoll(time.milliseconds)}/${config.segmentMs - segment.rollJitterMs}).") roll() } else { segment @@ -636,6 +728,7 @@ class Log(val dir: File, /** * Roll the log over to a new active segment starting with the current logEndOffset. * This will trim the index to the exact size of the number of entries it currently contains. + * * @return The newly rolled segment */ def roll(): LogSegment = { @@ -644,7 +737,8 @@ class Log(val dir: File, val newOffset = logEndOffset val logFile = logFilename(dir, newOffset) val indexFile = indexFilename(dir, newOffset) - for(file <- List(logFile, indexFile); if file.exists) { + val timeIndexFile = timeIndexFilename(dir, newOffset) + for(file <- List(logFile, indexFile, timeIndexFile); if file.exists) { warn("Newly rolled segment file " + file.getName + " already exists; deleting it first") file.delete() } @@ -652,8 +746,11 @@ class Log(val dir: File, segments.lastEntry() match { case null => case entry => { - entry.getValue.index.trimToValidSize() - entry.getValue.log.trim() + val seg = entry.getValue + seg.onBecomeInactiveSegment() + seg.index.trimToValidSize() + seg.timeIndex.trimToValidSize() + seg.log.trim() } } val segment = new LogSegment(dir, @@ -692,6 +789,7 @@ class Log(val dir: File, /** * Flush log segments for all offsets up to offset-1 + * * @param offset The offset to flush up to (non-inclusive); the new recovery point */ def flush(offset: Long) : Unit = { @@ -723,6 +821,7 @@ class Log(val dir: File, /** * Truncate this log so that it ends with the greatest offset < targetOffset. + * * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete. */ private[log] def truncateTo(targetOffset: Long) { @@ -748,6 +847,7 @@ class Log(val dir: File, /** * Delete all data in the log and start at the new offset + * * @param newOffset The new offset to start the log with */ private[log] def truncateFullyAndStartAt(newOffset: Long) { @@ -826,6 +926,7 @@ class Log(val dir: File, /** * Perform an asynchronous delete on the given file if it exists (otherwise do nothing) + * * @throws KafkaStorageException if the file can't be renamed and still exists */ private def asyncDeleteSegment(segment: LogSegment) { @@ -893,6 +994,7 @@ class Log(val dir: File, } /** * Add the given segment to the segments in this log. If this segment replaces an existing segment, delete it. + * * @param segment The segment to add */ def addSegment(segment: LogSegment) = this.segments.put(segment.baseOffset, segment) @@ -910,6 +1012,9 @@ object Log { /** an index file */ val IndexFileSuffix = ".index" + /** a time index file */ + val TimeIndexFileSuffix = ".timeindex" + /** a file that is scheduled to be deleted */ val DeletedFileSuffix = ".deleted" @@ -920,13 +1025,14 @@ object Log { val SwapFileSuffix = ".swap" /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8. This is required to maintain backwards compatibility - * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */ + * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */ /** TODO: Get rid of CleanShutdownFile in 0.8.2 */ val CleanShutdownFile = ".kafka_cleanshutdown" /** * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros * so that ls sorts the files numerically. + * * @param offset The offset to use in the file name * @return The filename */ @@ -940,6 +1046,7 @@ object Log { /** * Construct a log file name in the given dir with the given base offset + * * @param dir The directory in which the log will reside * @param offset The base offset of the log file */ @@ -948,12 +1055,21 @@ object Log { /** * Construct an index file name in the given dir using the given base offset + * * @param dir The directory in which the log will reside * @param offset The base offset of the log file */ def indexFilename(dir: File, offset: Long) = new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix) + /** + * Construct a time index file name in the given dir using the given base offset + * + * @param dir The directory in which the log will reside + * @param offset The base offset of the log file + */ + def timeIndexFilename(dir: File, offset: Long) = + new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix) /** * Parse the topic and partition out of the directory name of a log diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 25c36e72ff512..d4bb1f2c6eff7 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -334,7 +334,7 @@ private[log] class Cleaner(val id: Int, val deleteHorizonMs = log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { case None => 0L - case Some(seg) => seg.lastModified - log.config.deleteRetentionMs + case Some(seg) => seg.largestTimestamp - log.config.deleteRetentionMs } // group the segments and clean the groups @@ -366,23 +366,32 @@ private[log] class Cleaner(val id: Int, val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix) logFile.delete() val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix) + val timeIndexFile = new File(segments.head.timeIndex.file.getPath + Log.CleanedFileSuffix) indexFile.delete() + timeIndexFile.delete() val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate) val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize) - val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time) + val timeIndex = new TimeIndex(timeIndexFile, segments.head.baseOffset, segments.head.timeIndex.maxIndexSize) + val cleaned = new LogSegment(messages, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time) try { // clean segments into the new destination segment for (old <- segments) { - val retainDeletes = old.lastModified > deleteHorizonMs - info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes." - .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) + val retainDeletes = old.largestTimestamp > deleteHorizonMs + info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes." + .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion.messageFormatVersion) } // trim excess index index.trimToValidSize() + // Append the last index entry + cleaned.onBecomeInactiveSegment() + + // trim time index + timeIndex.trimToValidSize() + // flush new segment to disk before swap cleaned.flush() @@ -422,6 +431,8 @@ private[log] class Cleaner(val id: Int, // read a chunk of messages and copy any that are to be retained to the write buffer to be written out readBuffer.clear() writeBuffer.clear() + var maxTimestamp = Message.NoTimestamp + var offsetOfMaxTimestamp = -1L val messages = new ByteBufferMessageSet(source.log.readInto(readBuffer, position)) throttler.maybeThrottle(messages.sizeInBytes) // check each message to see if it is to be retained @@ -433,6 +444,10 @@ private[log] class Cleaner(val id: Int, if (shouldRetainMessage(source, map, retainDeletes, entry)) { ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) stats.recopyMessage(size) + if (entry.message.timestamp > maxTimestamp) { + maxTimestamp = entry.message.timestamp + offsetOfMaxTimestamp = entry.offset + } } messagesRead += 1 } else { @@ -443,12 +458,16 @@ private[log] class Cleaner(val id: Int, val retainedMessages = new mutable.ArrayBuffer[MessageAndOffset] messages.foreach { messageAndOffset => messagesRead += 1 - if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset)) + if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset)) { retainedMessages += messageAndOffset + // We need the max timestamp and last offset for time index + if (messageAndOffset.message.timestamp > maxTimestamp) + maxTimestamp = messageAndOffset.message.timestamp + } else writeOriginalMessageSet = false } - - // There are no messages compacted out, write the original message set back + offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L + // There are no messages compacted out and no message format conversion, write the original message set back if (writeOriginalMessageSet) ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) else @@ -461,7 +480,8 @@ private[log] class Cleaner(val id: Int, if (writeBuffer.position > 0) { writeBuffer.flip() val retained = new ByteBufferMessageSet(writeBuffer) - dest.append(retained.head.offset, retained) + dest.append(firstOffset = retained.head.offset, largestTimestamp = maxTimestamp, + offsetOfLargestTimestamp = offsetOfMaxTimestamp, messages = retained) throttler.maybeThrottle(writeBuffer.limit) } @@ -569,14 +589,17 @@ private[log] class Cleaner(val id: Int, var group = List(segs.head) var logSize = segs.head.size var indexSize = segs.head.index.sizeInBytes + var timeIndexSize = segs.head.timeIndex.sizeInBytes segs = segs.tail while(segs.nonEmpty && logSize + segs.head.size <= maxSize && indexSize + segs.head.index.sizeInBytes <= maxIndexSize && + timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize && segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) { group = segs.head :: group logSize += segs.head.size indexSize += segs.head.index.sizeInBytes + timeIndexSize += segs.head.timeIndex.sizeInBytes segs = segs.tail } grouped ::= group.reverse diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4357ef4c5d36e..e6c60b9bf96c0 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -108,7 +108,7 @@ class LogManager(val logDirs: Array[File], */ private def loadLogs(): Unit = { info("Loading logs.") - + val startMs = time.milliseconds val threadPools = mutable.ArrayBuffer.empty[ExecutorService] val jobs = mutable.Map.empty[File, Seq[Future[_]]] @@ -177,7 +177,7 @@ class LogManager(val logDirs: Array[File], threadPools.foreach(_.shutdown()) } - info("Logs loading complete.") + info(s"Logs loading complete in ${time.milliseconds - startMs} ms.") } /** @@ -423,7 +423,7 @@ class LogManager(val logDirs: Array[File], if (log.config.retentionMs < 0) return 0 val startMs = time.milliseconds - log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs) + log.deleteOldSegments(startMs - _.largestTimestamp > log.config.retentionMs) } /** diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 6bbc50c7c3c06..d894020d50134 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -36,6 +36,7 @@ import java.io.{IOException, File} * * @param log The message set containing log entries * @param index The offset index + * @param timeIndex The timestamp index * @param baseOffset A lower bound on the offsets in this segment * @param indexIntervalBytes The approximate number of bytes between entries in the index * @param time The time instance @@ -43,6 +44,7 @@ import java.io.{IOException, File} @nonthreadsafe class LogSegment(val log: FileMessageSet, val index: OffsetIndex, + val timeIndex: TimeIndex, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, @@ -53,9 +55,17 @@ class LogSegment(val log: FileMessageSet, /* the number of bytes since we last added an entry in the offset index */ private var bytesSinceLastIndexEntry = 0 + /* The timestamp we used for time based log rolling */ + private var rollingBasedTimestamp: Option[Long] = None + + /* The maximum timestamp we see so far */ + @volatile private var maxTimestampSoFar = timeIndex.lastEntry.timestamp + @volatile private var offsetOfMaxTimestamp = timeIndex.lastEntry.offset + def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) = this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate), new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), + new TimeIndex(Log.timeIndexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), startOffset, indexIntervalBytes, rollJitterMs, @@ -70,21 +80,33 @@ class LogSegment(val log: FileMessageSet, * * It is assumed this method is being called from within a lock. * - * @param offset The first offset in the message set. + * @param firstOffset The first offset in the message set. + * @param largestTimestamp The largest timestamp in the message set. + * @param offsetOfLargestTimestamp The offset of the message that has the largest timestamp in the messages to append. * @param messages The messages to append. */ @nonthreadsafe - def append(offset: Long, messages: ByteBufferMessageSet) { + def append(firstOffset: Long, largestTimestamp: Long, offsetOfLargestTimestamp: Long, messages: ByteBufferMessageSet) { if (messages.sizeInBytes > 0) { - trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes())) + trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at offset %d" + .format(messages.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, offsetOfLargestTimestamp)) + val physicalPosition = log.sizeInBytes() + if (physicalPosition == 0) + rollingBasedTimestamp = Some(largestTimestamp) + // append the messages + log.append(messages) + // Update the in memory max timestamp and corresponding offset. + if (largestTimestamp > maxTimestampSoFar) { + maxTimestampSoFar = largestTimestamp + offsetOfMaxTimestamp = offsetOfLargestTimestamp + } // append an entry to the index (if needed) if(bytesSinceLastIndexEntry > indexIntervalBytes) { - index.append(offset, log.sizeInBytes()) - this.bytesSinceLastIndexEntry = 0 + index.append(firstOffset, physicalPosition) + timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp) + bytesSinceLastIndexEntry = 0 } - // append the messages - log.append(messages) - this.bytesSinceLastIndexEntry += messages.sizeInBytes + bytesSinceLastIndexEntry += messages.sizeInBytes } } @@ -97,13 +119,12 @@ class LogSegment(val log: FileMessageSet, * @param offset The offset we want to translate * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and * when omitted, the search will begin at the position in the offset index. - * * @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria. */ @threadsafe private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = { val mapping = index.lookup(offset) - log.searchFor(offset, max(mapping.position, startingFilePosition)) + log.searchForOffset(offset, max(mapping.position, startingFilePosition)) } /** @@ -165,30 +186,34 @@ class LogSegment(val log: FileMessageSet, * * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this * is corrupt. - * * @return The number of bytes truncated from the log */ @nonthreadsafe def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) + timeIndex.truncate() + timeIndex.resize(timeIndex.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) + maxTimestampSoFar = Message.NoTimestamp try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() + + // The max timestamp should have been put in the outer message, so we don't need to iterate over the inner messages. + if (entry.message.timestamp > maxTimestampSoFar) { + maxTimestampSoFar = entry.message.timestamp + offsetOfMaxTimestamp = entry.offset + } + + // Build offset index if(validBytes - lastIndexEntry > indexIntervalBytes) { - // we need to decompress the message, if required, to get the offset of the first uncompressed message - val startOffset = - entry.message.compressionCodec match { - case NoCompressionCodec => - entry.offset - case _ => - ByteBufferMessageSet.deepIterator(entry).next().offset - } + val startOffset = entry.firstOffset index.append(startOffset, validBytes) + timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) @@ -200,14 +225,35 @@ class LogSegment(val log: FileMessageSet, val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) index.trimToValidSize() + // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well. + timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true) + timeIndex.trimToValidSize() truncated } + def loadLargestTimestamp(readToLogEnd: Boolean = false) { + // Get the last time index entry. If the time index is empty, it will return (-1, baseOffset) + val lastTimeIndexEntry = timeIndex.lastEntry + maxTimestampSoFar = lastTimeIndexEntry.timestamp + offsetOfMaxTimestamp = lastTimeIndexEntry.offset + if (readToLogEnd) { + val offsetPosition = index.lookup(lastTimeIndexEntry.offset) + // Scan the rest of the messages to see if there is a larger timestamp after the last time index entry. + val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position) + if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) { + maxTimestampSoFar = maxTimestampOffsetAfterLastEntry.timestamp + offsetOfMaxTimestamp = maxTimestampOffsetAfterLastEntry.offset + } + } + } + + override def toString = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")" /** * Truncate off all index and log entries with offsets >= the given offset. * If the given offset is larger than the largest message in this segment, do nothing. + * * @param offset The offset to truncate to * @return The number of log bytes truncated */ @@ -217,12 +263,19 @@ class LogSegment(val log: FileMessageSet, if(mapping == null) return 0 index.truncateTo(offset) + timeIndex.truncateTo(offset) // after truncation, reset and allocate more space for the (new currently active) index index.resize(index.maxIndexSize) + timeIndex.resize(timeIndex.maxIndexSize) val bytesTruncated = log.truncateTo(mapping.position) - if(log.sizeInBytes == 0) + if(log.sizeInBytes == 0) { created = time.milliseconds + rollingBasedTimestamp = None + } bytesSinceLastIndexEntry = 0 + // We may need to reload the max timestamp after truncation. + if (maxTimestampSoFar >= 0) + loadLargestTimestamp(readToLogEnd = true) bytesTruncated } @@ -251,6 +304,7 @@ class LogSegment(val log: FileMessageSet, LogFlushStats.logFlushTimer.time { log.flush() index.flush() + timeIndex.flush() } } @@ -270,27 +324,96 @@ class LogSegment(val log: FileMessageSet, catch { case e: IOException => throw kafkaStorageException("index", e) } + try timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix))) + catch { + case e: IOException => throw kafkaStorageException("timeindex", e) + } + } + + /** + * Append the largest time index entry to the time index when this log segment become inactive segment. + * This entry will be used to decide when to delete the segment. + */ + def onBecomeInactiveSegment() { + timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true) + } + + /** + * The time this segment has waited to be rolled. If the first message in the segment does not have a timestamp, + * the time is based on the create time of the segment. Otherwise the time is based on the timestamp of that message. + */ + def timeWaitedForRoll(now: Long) : Long= { + // Load the timestamp of the first message into memory + if (!rollingBasedTimestamp.isDefined) { + val iter = log.iterator + if (iter.hasNext) + rollingBasedTimestamp = Some(iter.next.message.timestamp) + else + // If the log is empty, we return 0 as time waited. + return now - created + } + now - {if (rollingBasedTimestamp.get >= 0) rollingBasedTimestamp.get else created} + } + + /** + * Search the message offset based on timestamp. + * This method returns an option of TimestampOffset. The offset is the offset of the first message whose timestamp is + * greater than or equals to the target timestamp. + * + * If all the message in the segment have smaller timestamps, the returned offset will be last offset + 1 and the + * timestamp will be max timestamp in the segment. + * + * If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp, + * the returned the offset will be the base offset of the segment and the timestamp will be Message.NoTimestamp. + * + * This methods only returns None when the log is not empty but we did not see any messages when scanning the log + * from the indexed position. This could happen if the log is truncated after we get the indexed position but + * before we scan the log from there. In this case we simply return None and the caller will need to check on + * the truncated log and maybe retry or even do the search on another log segment. + * + * @param timestamp The timestamp to search for. + * @return an offset which points to the first message whose timestamp is larger than or equals to the + * target timestamp. + * None maybe returned when the log is truncated. + */ + def findOffsetByTimestamp(timestamp: Long): Option[Long] = { + if (log.end == log.start) { + // The log segment is empty, just return base offset with no timestamp. + Some(baseOffset) + } else { + // Get the index entry with a timestamp less than or equal to the target timestamp + val timestampOffset = timeIndex.lookup(timestamp) + val position = index.lookup(timestampOffset.offset).position + // Search the timestamp + log.searchForTimestamp(timestamp, position) + } } /** * Close this log segment */ def close() { + timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true) CoreUtils.swallow(index.close) + CoreUtils.swallow(timeIndex.close()) CoreUtils.swallow(log.close) } /** * Delete this log segment from the filesystem. + * * @throws KafkaStorageException if the delete fails. */ def delete() { val deletedLog = log.delete() val deletedIndex = index.delete() + val deletedTimeIndex = timeIndex.delete() if(!deletedLog && log.file.exists) throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.") if(!deletedIndex && index.file.exists) throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.") + if(!deletedTimeIndex && timeIndex.file.exists) + throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.") } /** @@ -298,11 +421,17 @@ class LogSegment(val log: FileMessageSet, */ def lastModified = log.file.lastModified + /** + * The largest timestamp this segment contains. + */ + def largestTimestamp = if (maxTimestampSoFar >= 0) maxTimestampSoFar else lastModified + /** * Change the last modified time for this log segment */ def lastModified_=(ms: Long) = { log.file.setLastModified(ms) index.file.setLastModified(ms) + timeIndex.file.setLastModified(ms) } } diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 848fe3b53e7c4..ad1b1969c234b 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -17,18 +17,11 @@ package kafka.log -import org.apache.kafka.common.utils.Utils +import java.io.File +import java.nio.ByteBuffer -import scala.math._ -import java.io._ -import java.nio._ -import java.nio.channels._ -import java.util.concurrent.locks._ - -import kafka.utils._ import kafka.utils.CoreUtils.inLock import kafka.common.InvalidOffsetException -import sun.nio.ch.DirectBuffer /** * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse: @@ -55,137 +48,58 @@ import sun.nio.ch.DirectBuffer * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal * storage format. */ -class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { - - private val lock = new ReentrantLock - - /* initialize the memory mapping for this index */ - @volatile - private[this] var mmap: MappedByteBuffer = { - val newlyCreated = _file.createNewFile() - val raf = new RandomAccessFile(_file, "rw") - try { - /* pre-allocate the file if necessary */ - if (newlyCreated) { - if (maxIndexSize < 8) - throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) - raf.setLength(roundToExactMultiple(maxIndexSize, 8)) - } - - /* memory-map the file */ - val len = raf.length() - val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) - - /* set the position in the index for the next entry */ - if (newlyCreated) - idx.position(0) - else - // if this is a pre-existing index, assume it is all valid and set position to last entry - idx.position(roundToExactMultiple(idx.limit, 8)) - idx - } finally { - CoreUtils.swallow(raf.close()) - } - } +class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1) + extends AbstractIndex[Long, Int](file, baseOffset, maxIndexSize) { - /* the number of eight-byte entries currently in the index */ - @volatile - private[this] var _entries = mmap.position / 8 - - /* The maximum number of eight-byte entries this index can hold */ - @volatile - private[this] var _maxEntries = mmap.limit / 8 - - @volatile - private[this] var _lastOffset = readLastEntry.offset + override def entrySize = 8 + + /* the last offset in the index */ + private[this] var _lastOffset = lastEntry.offset debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" - .format(_file.getAbsolutePath, _maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position)) - - /** The maximum number of entries this index can hold */ - def maxEntries: Int = _maxEntries - - /** The last offset in the index */ - def lastOffset: Long = _lastOffset - - /** The index file */ - def file: File = _file + .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position)) /** * The last entry in the index */ - def readLastEntry(): OffsetPosition = { + private def lastEntry: OffsetPosition = { inLock(lock) { _entries match { case 0 => OffsetPosition(baseOffset, 0) - case s => OffsetPosition(baseOffset + relativeOffset(mmap, s - 1), physical(mmap, s - 1)) + case s => parseEntry(mmap, s - 1).asInstanceOf[OffsetPosition] } } } + def lastOffset: Long = _lastOffset + /** * Find the largest offset less than or equal to the given targetOffset * and return a pair holding this offset and its corresponding physical file position. * * @param targetOffset The offset to look up. - * - * @return The offset found and the corresponding file position for this offset. - * If the target offset is smaller than the least entry in the index (or the index is empty), - * the pair (baseOffset, 0) is returned. + * @return The offset found and the corresponding file position for this offset + * If the target offset is smaller than the least entry in the index (or the index is empty), + * the pair (baseOffset, 0) is returned. */ def lookup(targetOffset: Long): OffsetPosition = { maybeLock(lock) { val idx = mmap.duplicate - val slot = indexSlotFor(idx, targetOffset) + val slot = indexSlotFor(idx, targetOffset, IndexSearchType.KEY) if(slot == -1) OffsetPosition(baseOffset, 0) else - OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot)) - } - } - - /** - * Find the slot in which the largest offset less than or equal to the given - * target offset is stored. - * - * @param idx The index buffer - * @param targetOffset The offset to look for - * - * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty - */ - private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = { - // we only store the difference from the base offset so calculate that - val relOffset = targetOffset - baseOffset - - // check if the index is empty - if (_entries == 0) - return -1 - - // check if the target offset is smaller than the least offset - if (relativeOffset(idx, 0) > relOffset) - return -1 - - // binary search for the entry - var lo = 0 - var hi = _entries - 1 - while (lo < hi) { - val mid = ceil(hi/2.0 + lo/2.0).toInt - val found = relativeOffset(idx, mid) - if (found == relOffset) - return mid - else if (found < relOffset) - lo = mid - else - hi = mid - 1 + parseEntry(idx, slot).asInstanceOf[OffsetPosition] } - lo } - - /* return the nth offset relative to the base offset */ - private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8) - - /* return the nth physical position */ - private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4) + + private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize) + + private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4) + + override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = { + OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n)) + } /** * Get the nth offset mapping from the index @@ -208,37 +122,25 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + _entries + ").") if (_entries == 0 || offset > _lastOffset) { - debug("Adding index entry %d => %d to %s.".format(offset, position, _file.getName)) + debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) mmap.putInt((offset - baseOffset).toInt) mmap.putInt(position) _entries += 1 _lastOffset = offset - require(_entries * 8 == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".") + require(_entries * entrySize == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." - .format(offset, _entries, _lastOffset, _file.getAbsolutePath)) + .format(offset, entries, _lastOffset, file.getAbsolutePath)) } } } - - /** - * True iff there are no more slots available in this index - */ - def isFull: Boolean = _entries >= _maxEntries - - /** - * Truncate the entire index, deleting all entries - */ - def truncate() = truncateToEntries(0) - - /** - * Remove all entries from the index which have an offset greater than or equal to the given offset. - * Truncating to an offset larger than the largest in the index has no effect. - */ - def truncateTo(offset: Long) { + + override def truncate() = truncateToEntries(0) + + override def truncateTo(offset: Long) { inLock(lock) { val idx = mmap.duplicate - val slot = indexSlotFor(idx, offset) + val slot = indexSlotFor(idx, offset, IndexSearchType.KEY) /* There are 3 cases for choosing the new size * 1) if there is no entry in the index <= the offset, delete everything @@ -262,139 +164,19 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, private def truncateToEntries(entries: Int) { inLock(lock) { _entries = entries - mmap.position(_entries * 8) - _lastOffset = readLastEntry.offset - } - } - - /** - * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from - * the file. - */ - def trimToValidSize() { - inLock(lock) { - resize(_entries * 8) + mmap.position(_entries * entrySize) + _lastOffset = lastEntry.offset } } - /** - * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in - * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at - * loading segments from disk or truncating back to an old segment where a new log segment became active; - * we want to reset the index size to maximum index size to avoid rolling new segment. - */ - def resize(newSize: Int) { - inLock(lock) { - val raf = new RandomAccessFile(_file, "rw") - val roundedNewSize = roundToExactMultiple(newSize, 8) - val position = mmap.position - - /* Windows won't let us modify the file length while the file is mmapped :-( */ - if (Os.isWindows) - forceUnmap(mmap) - try { - raf.setLength(roundedNewSize) - mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) - _maxEntries = mmap.limit / 8 - mmap.position(position) - } finally { - CoreUtils.swallow(raf.close()) - } - } - } - - /** - * Forcefully free the buffer's mmap. We do this only on windows. - */ - private def forceUnmap(m: MappedByteBuffer) { - try { - m match { - case buffer: DirectBuffer => - val bufferCleaner = buffer.cleaner() - /* cleaner can be null if the mapped region has size 0 */ - if (bufferCleaner != null) - bufferCleaner.clean() - case _ => - } - } catch { - case t: Throwable => warn("Error when freeing index buffer", t) - } - } - - /** - * Flush the data in the index to disk - */ - def flush() { - inLock(lock) { - mmap.force() - } - } - - /** - * Delete this index file - */ - def delete(): Boolean = { - info("Deleting index " + _file.getAbsolutePath) - if (Os.isWindows) - CoreUtils.swallow(forceUnmap(mmap)) - _file.delete() - } - - /** The number of entries in this index */ - def entries = _entries - - /** - * The number of bytes actually used by this index - */ - def sizeInBytes() = 8 * _entries - - /** Close the index */ - def close() { - trimToValidSize() - } - - /** - * Rename the file that backs this offset index - * @throws IOException if rename fails - */ - def renameTo(f: File) { - try Utils.atomicMoveWithFallback(_file.toPath, f.toPath) - finally _file = f - } - - /** - * Do a basic sanity check on this index to detect obvious problems - * @throws IllegalArgumentException if any problems are found - */ - def sanityCheck() { - require(_entries == 0 || lastOffset > baseOffset, - "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" - .format(_file.getAbsolutePath, lastOffset, baseOffset)) - val len = _file.length() - require(len % 8 == 0, - "Index file " + _file.getName + " is corrupt, found " + len + + override def sanityCheck() { + require(_entries == 0 || _lastOffset > baseOffset, + s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size but the last offset " + + s"is ${_lastOffset} which is no larger than the base offset $baseOffset.") + val len = file.length() + require(len % entrySize == 0, + "Index file " + file.getAbsolutePath + " is corrupt, found " + len + " bytes which is not positive or not a multiple of 8.") } - - /** - * Round a number to the greatest exact multiple of the given factor less than the given number. - * E.g. roundToExactMultiple(67, 8) == 64 - */ - private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor) - - /** - * Execute the given function in a lock only if we are running on windows. We do this - * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it - * and this requires synchronizing reads. - */ - private def maybeLock[T](lock: Lock)(fun: => T): T = { - if(Os.isWindows) - lock.lock() - try { - fun - } finally { - if(Os.isWindows) - lock.unlock() - } - } + } diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala new file mode 100644 index 0000000000000..7f24081721954 --- /dev/null +++ b/core/src/main/scala/kafka/log/TimeIndex.scala @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.io.File +import java.nio.ByteBuffer + +import kafka.common.InvalidOffsetException +import kafka.message.Message +import kafka.utils.CoreUtils._ +import kafka.utils.Logging + +/** + * An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be + * sparse, i.e. it may not hold an entry for all the messages in the segment. + * + * The index is stored in a file that is preallocated to hold a fixed maximum amount of 12-byte time index entries. + * The file format is a series of time index entries. The physical format is a 8 bytes timestamp and a 4 bytes "relative" + * offset used in the [[OffsetIndex]]. A time index entry (TIMESTAMP, OFFSET) means that the biggest timestamp seen + * before OFFSET is TIMESTAMP. i.e. Any message whose timestamp is greater than TIMESTAMP must come after OFFSET. + * + * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal + * storage format. + * + * The timestamps in the same time index file are guaranteed to be monotonically increasing. + * + * The index support timestamp lookup for a memory map of this file. The lookup is done using a binary search to find + * the offset of the message whose indexed timestamp is closest but smaller or equals to the target timestamp. + * + * Time index files can be opened in two ways: either as an empty, mutable index that allows appends or + * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an + * immutable one and truncate off any extra bytes. This is done when the index file is rolled over. + * + * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt. + * + */ +class TimeIndex(file: File, + baseOffset: Long, + maxIndexSize: Int = -1) + extends AbstractIndex[Long, Long](file, baseOffset, maxIndexSize) with Logging { + + override def entrySize = 12 + + // We override the full check to reserve the last time index entry slot for the on roll call. + override def isFull: Boolean = entries >= maxEntries - 1 + + private def timestamp(buffer: ByteBuffer, n: Int): Long = buffer.getLong(n * entrySize) + + private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 8) + + /** + * The last entry in the index + */ + def lastEntry: TimestampOffset = { + inLock(lock) { + _entries match { + case 0 => TimestampOffset(Message.NoTimestamp, baseOffset) + case s => parseEntry(mmap, s - 1).asInstanceOf[TimestampOffset] + } + } + } + + /** + * Get the nth timestamp mapping from the time index + * @param n The entry number in the time index + * @return The timestamp/offset pair at that entry + */ + def entry(n: Int): TimestampOffset = { + maybeLock(lock) { + if(n >= _entries) + throw new IllegalArgumentException("Attempt to fetch the %dth entry from a time index of size %d.".format(n, _entries)) + val idx = mmap.duplicate + TimestampOffset(timestamp(idx, n), relativeOffset(idx, n)) + } + } + + override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = { + TimestampOffset(timestamp(buffer, n), baseOffset + relativeOffset(buffer, n)) + } + + /** + * Attempt to append a time index entry to the time index. + * The new entry is appended only if both the timestamp and offsets are greater than the last appended timestamp and + * the last appended offset. + * + * @param timestamp The timestamp of the new time index entry + * @param offset The offset of the new time index entry + * @param skipFullCheck To skip checking whether the segment is full or not. We only skip the check when the segment + * gets rolled or the segment is closed. + */ + def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false) { + inLock(lock) { + if (!skipFullCheck) + require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").") + // We do not throw exception when the offset equals to the offset of last entry. That means we are trying + // to insert the same time index entry as the last entry. + // If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion + // because that could happen in the following two scenarios: + // 1. An log segment is closed. + // 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled. + if (_entries != 0 && offset < lastEntry.offset) + throw new InvalidOffsetException("Attempt to append an offset (%d) to slot %d no larger than the last offset appended (%d) to %s." + .format(offset, _entries, lastEntry.offset, file.getAbsolutePath)) + if (_entries != 0 && timestamp < lastEntry.timestamp) + throw new IllegalStateException("Attempt to append an timestamp (%d) to slot %d no larger than the last timestamp appended (%d) to %s." + .format(timestamp, _entries, lastEntry.timestamp, file.getAbsolutePath)) + // We only append to the time index when the timestamp is greater than the last inserted timestamp. + // If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time + // index will be empty. + if (timestamp > lastEntry.timestamp) { + debug("Adding index entry %d => %d to %s.".format(timestamp, offset, file.getName)) + mmap.putLong(timestamp) + mmap.putInt((offset - baseOffset).toInt) + _entries += 1 + require(_entries * entrySize == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".") + } + } + } + + /** + * Find the time index entry whose timestamp is less than or equal to the given timestamp. + * If the target timestamp is smaller than the least timestamp in the time index, (NoTimestamp, baseOffset) is + * returned. + * + * @param targetTimestamp The timestamp to look up. + * @return The time index entry found. + */ + def lookup(targetTimestamp: Long): TimestampOffset = { + maybeLock(lock) { + val idx = mmap.duplicate + val slot = indexSlotFor(idx, targetTimestamp, IndexSearchType.KEY) + if (slot == -1) + TimestampOffset(Message.NoTimestamp, baseOffset) + else { + val entry = parseEntry(idx, slot).asInstanceOf[TimestampOffset] + TimestampOffset(entry.timestamp, entry.offset) + } + } + } + + override def truncate() = truncateToEntries(0) + + /** + * Remove all entries from the index which have an offset greater than or equal to the given offset. + * Truncating to an offset larger than the largest in the index has no effect. + */ + override def truncateTo(offset: Long) { + inLock(lock) { + val idx = mmap.duplicate + val slot = indexSlotFor(idx, offset, IndexSearchType.VALUE) + + /* There are 3 cases for choosing the new size + * 1) if there is no entry in the index <= the offset, delete everything + * 2) if there is an entry for this exact offset, delete it and everything larger than it + * 3) if there is no entry for this offset, delete everything larger than the next smallest + */ + val newEntries = + if(slot < 0) + 0 + else if(relativeOffset(idx, slot) == offset - baseOffset) + slot + else + slot + 1 + truncateToEntries(newEntries) + } + } + + /** + * Truncates index to a known number of entries. + */ + private def truncateToEntries(entries: Int) { + inLock(lock) { + _entries = entries + mmap.position(_entries * entrySize) + } + } + + override def sanityCheck() { + val entry = lastEntry + val lastTimestamp = entry.timestamp + val lastOffset = entry.offset + require(_entries == 0 || (lastTimestamp >= timestamp(mmap, 0)), + s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last timestamp " + + s"is $lastTimestamp which is no larger than the first timestamp ${timestamp(mmap, 0)}") + require(_entries == 0 || lastOffset >= baseOffset, + s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last offset " + + s"is $lastOffset which is smaller than the first offset $baseOffset") + val len = file.length() + require(len % entrySize == 0, + "Time index file " + file.getAbsolutePath + " is corrupt, found " + len + + " bytes which is not positive or not a multiple of 12.") + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 15d4eea455e3b..aadda86cf0e2c 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -400,7 +400,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi * If no format conversion or value overwriting is required for messages, this method will perform in-place * operations and avoid re-compression. * - * Returns the message set and a boolean indicating whether the message sizes may have changed. + * Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset + * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed. */ private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef, now: Long, @@ -409,18 +410,16 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi compactedTopic: Boolean = false, messageFormatVersion: Byte = Message.CurrentMagicValue, messageTimestampType: TimestampType, - messageTimestampDiffMaxMs: Long): (ByteBufferMessageSet, Boolean) = { + messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = { if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { // check the magic value - if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) { - // Message format conversion - (convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs, - messageFormatVersion), true) - } else { + if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) + convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs, + messageFormatVersion) + else // Do in-place validation, offset assignment and maybe set timestamp - (validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType, - messageTimestampDiffMaxMs), false) - } + validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType, + messageTimestampDiffMaxMs) } else { // Deal with compressed messages // We cannot do in place assignment in one of the following situations: @@ -433,6 +432,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0 var maxTimestamp = Message.NoTimestamp + var offsetOfMaxTimestamp = -1L val expectedInnerOffset = new LongRef(0) val validatedMessages = new mutable.ArrayBuffer[Message] this.internalIterator(isShallow = false).foreach { messageAndOffset => @@ -446,7 +446,10 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi // Check if we need to overwrite offset if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement()) inPlaceAssignment = false - maxTimestamp = math.max(maxTimestamp, message.timestamp) + if (message.timestamp > maxTimestamp) { + maxTimestamp = message.timestamp + offsetOfMaxTimestamp = offsetCounter.value + expectedInnerOffset.value - 1 + } } if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec) @@ -462,20 +465,23 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi if (!inPlaceAssignment) { // Cannot do in place assignment. - val wrapperMessageTimestamp = { + val (largestTimestampOfMessageSet, offsetOfMaxTimestampInMessageSet) = { if (messageFormatVersion == Message.MagicValue_V0) - Some(Message.NoTimestamp) - else if (messageFormatVersion > Message.MagicValue_V0 && messageTimestampType == TimestampType.CREATE_TIME) - Some(maxTimestamp) + (Some(Message.NoTimestamp), -1L) + else if (messageTimestampType == TimestampType.CREATE_TIME) + (Some(maxTimestamp), {if (targetCodec == NoCompressionCodec) offsetOfMaxTimestamp else offsetCounter.value + validatedMessages.length - 1}) else // Log append time - Some(now) + (Some(now), {if (targetCodec == NoCompressionCodec) offsetCounter.value else offsetCounter.value + validatedMessages.length - 1}) } - (new ByteBufferMessageSet(compressionCodec = targetCodec, - offsetCounter = offsetCounter, - wrapperMessageTimestamp = wrapperMessageTimestamp, - timestampType = messageTimestampType, - messages = validatedMessages: _*), true) + ValidationAndOffsetAssignResult(validatedMessages = new ByteBufferMessageSet(compressionCodec = targetCodec, + offsetCounter = offsetCounter, + wrapperMessageTimestamp = largestTimestampOfMessageSet, + timestampType = messageTimestampType, + messages = validatedMessages: _*), + maxTimestamp = largestTimestampOfMessageSet.get, + offsetOfMaxTimestamp = offsetOfMaxTimestampInMessageSet, + messageSizeMaybeChanged = true) } else { // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message. buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1) @@ -487,6 +493,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi val attributeOffset = MessageSet.LogOverhead + Message.AttributesOffset val timestamp = buffer.getLong(timestampOffset) val attributes = buffer.get(attributeOffset) + buffer.putLong(timestampOffset, maxTimestamp) if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp) // We don't need to recompute crc if the timestamp is not updated. crcUpdateNeeded = false @@ -503,7 +510,11 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum) } buffer.rewind() - (this, false) + // For compressed messages, + ValidationAndOffsetAssignResult(validatedMessages = this, + maxTimestamp = buffer.getLong(timestampOffset), + offsetOfMaxTimestamp = buffer.getLong(0), + messageSizeMaybeChanged = false) } } } @@ -516,12 +527,14 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi now: Long, timestampType: TimestampType, messageTimestampDiffMaxMs: Long, - toMagicValue: Byte): ByteBufferMessageSet = { + toMagicValue: Byte): ValidationAndOffsetAssignResult = { val sizeInBytesAfterConversion = shallowValidBytes + this.internalIterator(isShallow = true).map { messageAndOffset => Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue) }.sum val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion) var newMessagePosition = 0 + var maxTimestamp = Message.NoTimestamp + var offsetOfMaxTimestamp = -1L this.internalIterator(isShallow = true).foreach { case MessageAndOffset(message, _) => validateMessageKey(message, compactedTopic) validateTimestamp(message, now, timestampType, messageTimestampDiffMaxMs) @@ -532,20 +545,31 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi val newMessageBuffer = newBuffer.slice() newMessageBuffer.limit(newMessageSize) message.convertToBuffer(toMagicValue, newMessageBuffer, now, timestampType) - + if (toMagicValue > Message.MagicValue_V0) { + val timestamp = newMessageBuffer.getLong(Message.TimestampOffset) + if (maxTimestamp < timestamp) { + maxTimestamp = timestamp + offsetOfMaxTimestamp = offsetCounter.value - 1 + } + } newMessagePosition += MessageSet.LogOverhead + newMessageSize } newBuffer.rewind() - new ByteBufferMessageSet(newBuffer) + new ValidationAndOffsetAssignResult(validatedMessages = new ByteBufferMessageSet(newBuffer), + maxTimestamp = maxTimestamp, + offsetOfMaxTimestamp = offsetOfMaxTimestamp, + messageSizeMaybeChanged = true) } private def validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: LongRef, now: Long, compactedTopic: Boolean, timestampType: TimestampType, - timestampDiffMaxMs: Long): ByteBufferMessageSet = { + timestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = { // do in-place validation and offset assignment var messagePosition = 0 + var maxTimestamp = Message.NoTimestamp + var offsetOfMaxTimestamp = -1L buffer.mark() while (messagePosition < sizeInBytes - MessageSet.LogOverhead) { buffer.position(messagePosition) @@ -562,11 +586,19 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi message.buffer.put(Message.AttributesOffset, timestampType.updateAttributes(message.attributes)) Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, message.computeChecksum) } + if (message.timestamp > maxTimestamp) { + maxTimestamp = message.timestamp + offsetOfMaxTimestamp = offsetCounter.value - 1 + } } + messagePosition += MessageSet.LogOverhead + messageSize } buffer.reset() - this + ValidationAndOffsetAssignResult(validatedMessages = this, + maxTimestamp = maxTimestamp, + offsetOfMaxTimestamp = offsetOfMaxTimestamp, + messageSizeMaybeChanged = false) } private def validateMessageKey(message: Message, compactedTopic: Boolean) { @@ -614,3 +646,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi override def hashCode: Int = buffer.hashCode } + +case class ValidationAndOffsetAssignResult(validatedMessages: ByteBufferMessageSet, + maxTimestamp: Long, + offsetOfMaxTimestamp: Long, + messageSizeMaybeChanged: Boolean) diff --git a/core/src/main/scala/kafka/message/MessageAndOffset.scala b/core/src/main/scala/kafka/message/MessageAndOffset.scala index 51edf9f5da9d1..fab6898298f89 100644 --- a/core/src/main/scala/kafka/message/MessageAndOffset.scala +++ b/core/src/main/scala/kafka/message/MessageAndOffset.scala @@ -24,5 +24,13 @@ case class MessageAndOffset(message: Message, offset: Long) { * Compute the offset of the next message in the log */ def nextOffset: Long = offset + 1 + + /** + * We need to decompress the message, if required, to get the offset of the first uncompressed message. + */ + def firstOffset: Long = message.compressionCodec match { + case NoCompressionCodec => offset + case _ => ByteBufferMessageSet.deepIterator(this).next().offset + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bb219ca2f0c94..6eb574f575dbc 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -38,10 +38,11 @@ import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol} -import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, SaslHandshakeResponse, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse} +import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.requests.SaslHandshakeResponse import scala.collection._ import scala.collection.JavaConverters._ diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 2b977839e1c35..f94cbf998aba1 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -335,7 +335,7 @@ class ReplicaManager(val config: KafkaConfig, topicPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset - new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.timestamp)) // response status + new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.logAppendTime)) // response status } if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) { diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index dc99672136b2b..0a659f49db55f 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -25,12 +25,13 @@ import kafka.coordinator.{GroupMetadataKey, GroupMetadataManager, OffsetKey} import kafka.log._ import kafka.message._ import kafka.serializer.Decoder -import kafka.utils.{VerifiableProperties, _} +import kafka.utils._ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.KafkaException import org.apache.kafka.common.utils.Utils import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer object DumpLogSegments { @@ -85,6 +86,7 @@ object DumpLogSegments { } val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]] + val timeIndexDumpErrors = new TimeIndexDumpErrors val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]] for(arg <- files) { @@ -95,8 +97,12 @@ object DumpLogSegments { } else if(file.getName.endsWith(Log.IndexFileSuffix)) { println("Dumping " + file) dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize) + } else if(file.getName.endsWith(Log.TimeIndexFileSuffix)) { + println("Dumping " + file) + dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize) } } + misMatchesForIndexFilesMap.foreach { case (fileName, listOfMismatches) => { System.err.println("Mismatches in :" + fileName) @@ -105,6 +111,9 @@ object DumpLogSegments { }) } } + + timeIndexDumpErrors.printErrors() + nonConsecutivePairsForLogFilesMap.foreach { case (fileName, listOfNonConsecutivePairs) => { System.err.println("Non-secutive offsets in :" + fileName) @@ -150,6 +159,58 @@ object DumpLogSegments { } } + private def dumpTimeIndex(file: File, + indexSanityOnly: Boolean, + verifyOnly: Boolean, + timeIndexDumpErrors: TimeIndexDumpErrors, + maxMessageSize: Int) { + val startOffset = file.getName().split("\\.")(0).toLong + val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix) + val messageSet = new FileMessageSet(logFile, false) + val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.IndexFileSuffix) + val index = new OffsetIndex(indexFile, baseOffset = startOffset) + val timeIndex = new TimeIndex(file, baseOffset = startOffset) + + //Check that index passes sanityCheck, this is the check that determines if indexes will be rebuilt on startup or not. + if (indexSanityOnly) { + timeIndex.sanityCheck + println(s"$file passed sanity check.") + return + } + + var prevTimestamp = Message.NoTimestamp + for(i <- 0 until timeIndex.entries) { + val entry = timeIndex.entry(i) + val position = index.lookup(entry.offset + timeIndex.baseOffset).position + val partialFileMessageSet: FileMessageSet = messageSet.read(position, Int.MaxValue) + val shallowIter = partialFileMessageSet.iterator + var maxTimestamp = Message.NoTimestamp + // We first find the message by offset then check if the timestamp is correct. + val wrapperMessageOpt = shallowIter.find(_.offset >= entry.offset + timeIndex.baseOffset) + if (!wrapperMessageOpt.isDefined || wrapperMessageOpt.get.offset != entry.offset + timeIndex.baseOffset) { + timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset, + {if (wrapperMessageOpt.isDefined) wrapperMessageOpt.get.offset else -1}) + } else { + val deepIter = getIterator(wrapperMessageOpt.get, isDeepIteration = true) + for (messageAndOffset <- deepIter) + maxTimestamp = math.max(maxTimestamp, messageAndOffset.message.timestamp) + + if (maxTimestamp != entry.timestamp) + timeIndexDumpErrors.recordMismatchTimeIndex(file, entry.timestamp, maxTimestamp) + + if (prevTimestamp >= entry.timestamp) + timeIndexDumpErrors.recordOutOfOrderIndexTimestamp(file, entry.timestamp, prevTimestamp) + + // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one + if (entry.offset == 0 && i > 0) + return + } + if (!verifyOnly) + println("timestamp: %s offset: %s".format(entry.timestamp, timeIndex.baseOffset + entry.offset)) + prevTimestamp = entry.timestamp + } + } + private trait MessageParser[K, V] { def parse(message: Message): (Option[K], Option[V]) } @@ -261,7 +322,8 @@ object DumpLogSegments { } lastOffset = messageAndOffset.offset - print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid + + print("offset: " + messageAndOffset.offset + " position: " + validBytes + + " " + msg.timestampType + ": " + msg.timestamp + " isvalid: " + msg.isValid + " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum) if(msg.hasKey) @@ -307,4 +369,60 @@ object DumpLogSegments { } } + class TimeIndexDumpErrors { + val misMatchesForTimeIndexFilesMap = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]] + val outOfOrderTimestamp = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]] + val shallowOffsetNotFound = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]] + + def recordMismatchTimeIndex(file: File, indexTimestamp: Long, logTimestamp: Long) { + var misMatchesSeq = misMatchesForTimeIndexFilesMap.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]()) + if (misMatchesSeq.isEmpty) + misMatchesForTimeIndexFilesMap.put(file.getAbsolutePath, misMatchesSeq) + misMatchesSeq += ((indexTimestamp, logTimestamp)) + } + + def recordOutOfOrderIndexTimestamp(file: File, indexTimestamp: Long, prevIndexTimestamp: Long) { + var outOfOrderSeq = outOfOrderTimestamp.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]()) + if (outOfOrderSeq.isEmpty) + outOfOrderTimestamp.put(file.getAbsolutePath, outOfOrderSeq) + outOfOrderSeq += ((indexTimestamp, prevIndexTimestamp)) + } + + def recordShallowOffsetNotFound(file: File, indexOffset: Long, logOffset: Long) { + var shallowOffsetNotFoundSeq = shallowOffsetNotFound.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]()) + if (shallowOffsetNotFoundSeq.isEmpty) + shallowOffsetNotFound.put(file.getAbsolutePath, shallowOffsetNotFoundSeq) + shallowOffsetNotFoundSeq += ((indexOffset, logOffset)) + } + + def printErrors() { + misMatchesForTimeIndexFilesMap.foreach { + case (fileName, listOfMismatches) => { + System.err.println("Found timestamp mismatch in :" + fileName) + listOfMismatches.foreach(m => { + System.err.println(" Index timestamp: %d, log timestamp: %d".format(m._1, m._2)) + }) + } + } + + outOfOrderTimestamp.foreach { + case (fileName, outOfOrderTimestamps) => { + System.err.println("Found out of order timestamp in :" + fileName) + outOfOrderTimestamps.foreach(m => { + System.err.println(" Index timestamp: %d, Previously indexed timestamp: %d".format(m._1, m._2)) + }) + } + } + + shallowOffsetNotFound.foreach { + case (fileName, listOfShallowOffsetNotFound) => { + System.err.println("The following indexed offsets are not found in the log.") + listOfShallowOffsetNotFound.foreach(m => { + System.err.println("Indexed offset: %s, found log offset: %s".format(m._1, m._2)) + }) + } + } + } + } + } diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 82121218928a8..15920ad8b8c81 100755 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -233,7 +233,7 @@ class CleanerTest extends JUnitSuite { assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize)) // check grouping by index size - val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes()).sum + 1 + val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1 groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize) checkSegmentOrder(groups) assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize)) @@ -391,8 +391,9 @@ class CleanerTest extends JUnitSuite { for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) } + System.out.println("here") log = recoverAndCheck(config, cleanedKeys) - + // add some more messages and clean the log again while(log.numberOfSegments < 10) { log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala index a64454d3165f3..82496f2a8498c 100644 --- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala @@ -111,21 +111,21 @@ class FileMessageSetTest extends BaseMessageSetTestCases { var position = 0 assertEquals("Should be able to find the first message by its offset", OffsetPosition(0L, position), - messageSet.searchFor(0, 0)) + messageSet.searchForOffset(0, 0)) position += MessageSet.entrySize(messageSet.head.message) assertEquals("Should be able to find second message when starting from 0", OffsetPosition(1L, position), - messageSet.searchFor(1, 0)) + messageSet.searchForOffset(1, 0)) assertEquals("Should be able to find second message starting from its offset", OffsetPosition(1L, position), - messageSet.searchFor(1, position)) + messageSet.searchForOffset(1, position)) position += MessageSet.entrySize(messageSet.tail.head.message) + MessageSet.entrySize(messageSet.tail.tail.head.message) assertEquals("Should be able to find fourth message from a non-existant offset", OffsetPosition(50L, position), - messageSet.searchFor(3, position)) + messageSet.searchForOffset(3, position)) assertEquals("Should be able to find fourth message by correct offset", OffsetPosition(50L, position), - messageSet.searchFor(50, position)) + messageSet.searchForOffset(50, position)) } /** @@ -134,7 +134,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases { @Test def testIteratorWithLimits() { val message = messageSet.toList(1) - val start = messageSet.searchFor(1, 0).position + val start = messageSet.searchForOffset(1, 0).position val size = message.message.size + 12 val slice = messageSet.read(start, size) assertEquals(List(message), slice.toList) @@ -148,7 +148,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases { @Test def testTruncate() { val message = messageSet.toList.head - val end = messageSet.searchFor(1, 0).position + val end = messageSet.searchForOffset(1, 0).position messageSet.truncateTo(end) assertEquals(List(message), messageSet.toList) assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes) @@ -272,7 +272,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases { @Test def testFormatConversionWithPartialMessage() { val message = messageSet.toList(1) - val start = messageSet.searchFor(1, 0).position + val start = messageSet.searchForOffset(1, 0).position val size = message.message.size + 12 val slice = messageSet.read(start, size - 1) val messageV0 = slice.toMessageFormat(Message.MagicValue_V0) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 7b52a094c6c20..dc4cc7974fdfc 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -100,7 +100,7 @@ class LogManagerTest { time.sleep(maxLogAgeMs + 1) assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) - assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length) + assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length) assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).messageSet.sizeInBytes) try { @@ -146,7 +146,7 @@ class LogManagerTest { time.sleep(logManager.InitialTaskDelayMs) assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) - assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length) + assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length) assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).messageSet.sizeInBytes) try { log.read(0, 1024) diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index edbfd99482d03..64140e824ed68 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -26,19 +26,23 @@ import kafka.message._ import kafka.utils.SystemTime import scala.collection._ + import scala.collection.mutable.ListBuffer class LogSegmentTest { val segments = mutable.ArrayBuffer[LogSegment]() /* create a segment with the given base offset */ - def createSegment(offset: Long): LogSegment = { + def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = { val msFile = TestUtils.tempFile() val ms = new FileMessageSet(msFile) val idxFile = TestUtils.tempFile() + val timeIdxFile = TestUtils.tempFile() idxFile.delete() + timeIdxFile.delete() val idx = new OffsetIndex(idxFile, offset, 1000) - val seg = new LogSegment(ms, idx, offset, 10, 0, SystemTime) + val timeIdx = new TimeIndex(timeIdxFile, offset, 1500) + val seg = new LogSegment(ms, idx, timeIdx, offset, indexIntervalBytes, 0, SystemTime) segments += seg seg } @@ -47,7 +51,7 @@ class LogSegmentTest { def messages(offset: Long, messages: String*): ByteBufferMessageSet = { new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, offsetCounter = new LongRef(offset), - messages = messages.map(s => new Message(s.getBytes)):_*) + messages = messages.map(s => new Message(s.getBytes, offset * 10, Message.MagicValue_V1)):_*) } @After @@ -76,7 +80,7 @@ class LogSegmentTest { def testReadBeforeFirstOffset() { val seg = createSegment(40) val ms = messages(50, "hello", "there", "little", "bee") - seg.append(50, ms) + seg.append(50, Message.NoTimestamp, -1L, ms) val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).messageSet assertEquals(ms.toList, read.toList) } @@ -90,7 +94,7 @@ class LogSegmentTest { val baseOffset = 50 val seg = createSegment(baseOffset) val ms = messages(baseOffset, "hello", "there", "beautiful") - seg.append(baseOffset, ms) + seg.append(baseOffset, Message.NoTimestamp, -1L, ms) def validate(offset: Long) = assertEquals(ms.filter(_.offset == offset).toList, seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).messageSet.toList) @@ -106,7 +110,7 @@ class LogSegmentTest { def testReadAfterLast() { val seg = createSegment(40) val ms = messages(50, "hello", "there") - seg.append(50, ms) + seg.append(50, Message.NoTimestamp, -1L, ms) val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None) assertNull("Read beyond the last offset in the segment should give null", read) } @@ -119,9 +123,9 @@ class LogSegmentTest { def testReadFromGap() { val seg = createSegment(40) val ms = messages(50, "hello", "there") - seg.append(50, ms) + seg.append(50, Message.NoTimestamp, -1L, ms) val ms2 = messages(60, "alpha", "beta") - seg.append(60, ms2) + seg.append(60, Message.NoTimestamp, -1L, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) assertEquals(ms2.toList, read.messageSet.toList) } @@ -136,9 +140,9 @@ class LogSegmentTest { var offset = 40 for(i <- 0 until 30) { val ms1 = messages(offset, "hello") - seg.append(offset, ms1) - val ms2 = messages(offset+1, "hello") - seg.append(offset+1, ms2) + seg.append(offset, Message.NoTimestamp, -1L, ms1) + val ms2 = messages(offset + 1, "hello") + seg.append(offset + 1, Message.NoTimestamp, -1L, ms2) // check that we can read back both messages val read = seg.read(offset, None, 10000) assertEquals(List(ms1.head, ms2.head), read.messageSet.toList) @@ -150,7 +154,25 @@ class LogSegmentTest { offset += 1 } } - + + @Test + def testReloadLargestTimestampAfterTruncation() { + val numMessages = 30 + val seg = createSegment(40, 2 * messages(0, "hello").sizeInBytes - 1) + var offset = 40 + for (i <- 0 until numMessages) { + seg.append(offset, offset, offset, messages(offset, "hello")) + offset += 1 + } + val expectedNumEntries = numMessages / 2 - 1 + assertEquals(s"Should have $expectedNumEntries time indexes", expectedNumEntries, seg.timeIndex.entries) + + seg.truncateTo(41) + assertEquals(s"Should have 0 time indexes", 0, seg.timeIndex.entries) + assertEquals(s"Largest timestamp should be 400", 400L, seg.largestTimestamp) + + } + /** * Test truncating the whole segment, and check that we can reappend with the original offset. */ @@ -158,12 +180,38 @@ class LogSegmentTest { def testTruncateFull() { // test the case where we fully truncate the log val seg = createSegment(40) - seg.append(40, messages(40, "hello", "there")) + seg.append(40, Message.NoTimestamp, -1L, messages(40, "hello", "there")) seg.truncateTo(0) assertNull("Segment should be empty.", seg.read(0, None, 1024)) - seg.append(40, messages(40, "hello", "there")) + seg.append(40, Message.NoTimestamp, -1L, messages(40, "hello", "there")) } - + + /** + * Append messages with timestamp and search message by timestamp. + */ + @Test + def testFindOffsetByTimestamp() { + val messageSize = messages(0, s"msg00").sizeInBytes + val seg = createSegment(40, messageSize * 2 - 1) + // Produce some messages + for (i <- 40 until 50) + seg.append(i, i * 10, i, messages(i, s"msg$i")) + + assertEquals(490, seg.largestTimestamp) + // Search for an indexed timestamp + assertEquals(42, seg.findOffsetByTimestamp(420).get) + assertEquals(43, seg.findOffsetByTimestamp(421).get) + // Search for an un-indexed timestamp + assertEquals(43, seg.findOffsetByTimestamp(430).get) + assertEquals(44, seg.findOffsetByTimestamp(431).get) + // Search beyond the last timestamp + assertEquals(50, seg.findOffsetByTimestamp(491).get) + // Search before the first indexed timestamp + assertEquals(41, seg.findOffsetByTimestamp(401).get) + // Search before the first timestamp + assertEquals(40, seg.findOffsetByTimestamp(399).get) + } + /** * Test that offsets are assigned sequentially and that the nextOffset variable is incremented */ @@ -171,7 +219,7 @@ class LogSegmentTest { def testNextOffsetCalculation() { val seg = createSegment(40) assertEquals(40, seg.nextOffset) - seg.append(50, messages(50, "hello", "there", "you")) + seg.append(50, Message.NoTimestamp, -1L, messages(50, "hello", "there", "you")) assertEquals(53, seg.nextOffset()) } @@ -198,13 +246,31 @@ class LogSegmentTest { def testRecoveryFixesCorruptIndex() { val seg = createSegment(0) for(i <- 0 until 100) - seg.append(i, messages(i, i.toString)) + seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString)) val indexFile = seg.index.file TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt) seg.recover(64*1024) for(i <- 0 until 100) assertEquals(i, seg.read(i, Some(i+1), 1024).messageSet.head.offset) } + + /** + * Create a segment with some data and an index. Then corrupt the index, + * and recover the segment, the entries should all be readable. + */ + @Test + def testRecoveryFixesCorruptTimeIndex() { + val seg = createSegment(0) + for(i <- 0 until 100) + seg.append(i, i * 10, i, messages(i, i.toString)) + val timeIndexFile = seg.timeIndex.file + TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt) + seg.recover(64*1024) + for(i <- 0 until 100) { + assertEquals(i, seg.findOffsetByTimestamp(i * 10).get) + assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1).get) + } + } /** * Randomly corrupt a log a number of times and attempt recovery. @@ -215,10 +281,10 @@ class LogSegmentTest { for(iteration <- 0 until 10) { val seg = createSegment(0) for(i <- 0 until messagesAppended) - seg.append(i, messages(i, i.toString)) + seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString)) val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended) // start corrupting somewhere in the middle of the chosen record all the way to the end - val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15) + val position = seg.log.searchForOffset(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15) TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position) seg.recover(64*1024) assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList) @@ -227,7 +293,7 @@ class LogSegmentTest { } /* create a segment with pre allocate */ - def createSegment(offset: Long, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): LogSegment = { + def createSegment(offset: Long, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean): LogSegment = { val tempDir = TestUtils.tempDir() val seg = new LogSegment(tempDir, offset, 10, 1000, 0, SystemTime, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate) segments += seg @@ -239,9 +305,9 @@ class LogSegmentTest { def testCreateWithInitFileSizeAppendMessage() { val seg = createSegment(40, false, 512*1024*1024, true) val ms = messages(50, "hello", "there") - seg.append(50, ms) + seg.append(50, Message.NoTimestamp, -1L, ms) val ms2 = messages(60, "alpha", "beta") - seg.append(60, ms2) + seg.append(60, Message.NoTimestamp, -1L, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) assertEquals(ms2.toList, read.messageSet.toList) } @@ -253,9 +319,9 @@ class LogSegmentTest { val seg = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, false, 512*1024*1024, true) val ms = messages(50, "hello", "there") - seg.append(50, ms) + seg.append(50, Message.NoTimestamp, -1L, ms) val ms2 = messages(60, "alpha", "beta") - seg.append(60, ms2) + seg.append(60, Message.NoTimestamp, -1L, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) assertEquals(ms2.toList, read.messageSet.toList) val oldSize = seg.log.sizeInBytes() diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 33dd68ef3ab42..2466ef265b138 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -35,7 +35,7 @@ class LogTest extends JUnitSuite { val tmpDir = TestUtils.tempDir() val logDir = TestUtils.randomPartitionLogDir(tmpDir) - val time = new MockTime(0) + val time = new MockTime(100) var config: KafkaConfig = null val logConfig = LogConfig() @@ -88,6 +88,20 @@ class LogTest extends JUnitSuite { assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments) } + time.sleep(log.config.segmentMs + 1) + val setWithTimestamp = + TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + log.config.segmentMs + 1) + log.append(setWithTimestamp) + assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments) + + time.sleep(log.config.segmentMs + 1) + log.append(set) + assertEquals("Log should not roll because the roll should depend on the index of the first time index entry.", 5, log.numberOfSegments) + + time.sleep(log.config.segmentMs + 1) + log.append(set) + assertEquals("Log should roll because the time since the timestamp of first time index entry has expired.", 6, log.numberOfSegments) + val numSegments = log.numberOfSegments time.sleep(log.config.segmentMs + 1) log.append(new ByteBufferMessageSet()) @@ -457,27 +471,63 @@ class LogTest extends JUnitSuite { val config = LogConfig(logProps) var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) for(i <- 0 until numMessages) - log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize))) + log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(messageSize), + timestamp = time.milliseconds + i * 10)) assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) val lastIndexOffset = log.activeSegment.index.lastOffset val numIndexEntries = log.activeSegment.index.entries val lastOffset = log.logEndOffset + // After segment is closed, the last entry in the time index should be (largest timestamp -> last offset). + val lastTimeIndexOffset = log.logEndOffset - 1 + val lastTimeIndexTimestamp = log.activeSegment.largestTimestamp + // Depending on when the last time index entry is inserted, an entry may or may not be inserted into the time index. + val numTimeIndexEntries = log.activeSegment.timeIndex.entries + { + if (log.activeSegment.timeIndex.lastEntry.offset == log.logEndOffset - 1) 0 else 1 + } log.close() + def verifyRecoveredLog(log: Log) { + assertEquals(s"Should have $numMessages messages when log is reopened w/o recovery", numMessages, log.logEndOffset) + assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) + assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) + assertEquals("Should have same last time index timestamp", lastTimeIndexTimestamp, log.activeSegment.timeIndex.lastEntry.timestamp) + assertEquals("Should have same last time index offset", lastTimeIndexOffset, log.activeSegment.timeIndex.lastEntry.offset) + assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries) + } + log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time) - assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) - assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) - assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) + verifyRecoveredLog(log) log.close() // test recovery case log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) - assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) - assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) - assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) + verifyRecoveredLog(log) log.close() } + /** + * Test building the time index on the follower by setting assignOffsets to false. + */ + @Test + def testBuildTimeIndexWhenNotAssigningOffsets() { + val numMessages = 100 + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 10000: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) + + val config = LogConfig(logProps) + val log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + + val messages = (0 until numMessages).map { i => + new ByteBufferMessageSet(NoCompressionCodec, new LongRef(100 + i), new Message(i.toString.getBytes(), time.milliseconds + i, Message.MagicValue_V1)) + } + messages.foreach(log.append(_, assignOffsets = false)) + val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries } + assertEquals(s"There should be ${numMessages - 1} time index entries", numMessages - 1, timeIndexEntries) + assertEquals(s"The last time index entry should have timestamp ${time.milliseconds + numMessages - 1}", + time.milliseconds + numMessages - 1, log.activeSegment.timeIndex.lastEntry.timestamp) + } + /** * Test that if we manually delete an index segment it is rebuilt when the log is re-opened */ @@ -492,19 +542,58 @@ class LogTest extends JUnitSuite { val config = LogConfig(logProps) var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) for(i <- 0 until numMessages) - log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10))) + log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) val indexFiles = log.logSegments.map(_.index.file) + val timeIndexFiles = log.logSegments.map(_.timeIndex.file) log.close() // delete all the index files indexFiles.foreach(_.delete()) + timeIndexFiles.foreach(_.delete()) // reopen the log log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) - for(i <- 0 until numMessages) + assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0) + assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0) + for(i <- 0 until numMessages) { assertEquals(i, log.read(i, 100, None).messageSet.head.offset) + if (i == 0) + assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10)) + else + assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10)) + } + log.close() + } + + /** + * Test that if messages format version of the messages in a segment is before 0.10.0, the time index should be empty. + */ + @Test + def testRebuildTimeIndexForOldMessages() { + val numMessages = 200 + val segmentSize = 200 + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) + logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0") + + val config = LogConfig(logProps) + var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + for(i <- 0 until numMessages) + log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) + val timeIndexFiles = log.logSegments.map(_.timeIndex.file) log.close() + + // Delete the time index. + timeIndexFiles.foreach(_.delete()) + + // The rebuilt time index should be empty + log = new Log(logDir, config, recoveryPoint = numMessages + 1, time.scheduler, time) + val segArray = log.logSegments.toArray + for (i <- 0 until segArray.size - 1) + assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries) + } /** @@ -521,8 +610,9 @@ class LogTest extends JUnitSuite { val config = LogConfig(logProps) var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) for(i <- 0 until numMessages) - log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10))) + log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) val indexFiles = log.logSegments.map(_.index.file) + val timeIndexFiles = log.logSegments.map(_.timeIndex.file) log.close() // corrupt all the index files @@ -532,11 +622,23 @@ class LogTest extends JUnitSuite { bw.close() } + // corrupt all the index files + for( file <- timeIndexFiles) { + val bw = new BufferedWriter(new FileWriter(file)) + bw.write(" ") + bw.close() + } + // reopen the log log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) - for(i <- 0 until numMessages) + for(i <- 0 until numMessages) { assertEquals(i, log.read(i, 100, None).messageSet.head.offset) + if (i == 0) + assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10)) + else + assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10)) + } log.close() } @@ -602,27 +704,37 @@ class LogTest extends JUnitSuite { */ @Test def testIndexResizingAtTruncation() { - val set = TestUtils.singleMessageSet("test".getBytes) - val setSize = set.sizeInBytes + val setSize = TestUtils.singleMessageSet(payload = "test".getBytes).sizeInBytes val msgPerSeg = 10 val segmentSize = msgPerSeg * setSize // each segment will be 10 messages val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, (setSize - 1): java.lang.Integer) val config = LogConfig(logProps) val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) + for (i<- 1 to msgPerSeg) - log.append(set) + log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i)) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) + + time.sleep(msgPerSeg) for (i<- 1 to msgPerSeg) - log.append(set) + log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i)) assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments) - assertEquals("The index of the first segment should be trimmed to empty", 0, log.logSegments.toList.head.index.maxEntries) + val expectedEntries = msgPerSeg - 1 + + assertEquals(s"The index of the first segment should have $expectedEntries entries", expectedEntries, log.logSegments.toList.head.index.maxEntries) + assertEquals(s"The time index of the first segment should have $expectedEntries entries", expectedEntries, log.logSegments.toList.head.timeIndex.maxEntries) + log.truncateTo(0) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList.head.index.maxEntries) + assertEquals("The time index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/12, log.logSegments.toList.head.timeIndex.maxEntries) + + time.sleep(msgPerSeg) for (i<- 1 to msgPerSeg) - log.append(set) + log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i)) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) } @@ -632,7 +744,9 @@ class LogTest extends JUnitSuite { @Test def testBogusIndexSegmentsAreRemoved() { val bogusIndex1 = Log.indexFilename(logDir, 0) + val bogusTimeIndex1 = Log.timeIndexFilename(logDir, 0) val bogusIndex2 = Log.indexFilename(logDir, 5) + val bogusTimeIndex2 = Log.timeIndexFilename(logDir, 5) val set = TestUtils.singleMessageSet("test".getBytes) val logProps = new Properties() @@ -646,7 +760,9 @@ class LogTest extends JUnitSuite { time) assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) + assertTrue("The first time index file should have been replaced with a larger file", bogusTimeIndex1.length > 0) assertFalse("The second index file should have been deleted.", bogusIndex2.exists) + assertFalse("The second time index file should have been deleted.", bogusTimeIndex2.exists) // check that we can append to the log for(i <- 0 until 10) diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala new file mode 100644 index 0000000000000..bc60c7236da22 --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.io.File + +import kafka.common.InvalidOffsetException +import kafka.utils.TestUtils +import org.junit.{Test, After, Before} +import org.junit.Assert.{assertEquals} +import org.scalatest.junit.JUnitSuite + +/** + * Unit test for time index. + */ +class TimeIndexTest extends JUnitSuite { + var idx: TimeIndex = null + val maxEntries = 30 + val baseOffset = 45L + + @Before + def setup() { + this.idx = new TimeIndex(file = nonExistantTempFile(), baseOffset = baseOffset, maxIndexSize = maxEntries * 12) + } + + @After + def teardown() { + if(this.idx != null) + this.idx.file.delete() + } + + @Test + def testLookUp() { + // Empty time index + assertEquals(TimestampOffset(-1L, baseOffset), idx.lookup(100L)) + + // Add several time index entries. + appendEntries(maxEntries - 1) + + // look for timestamp smaller than the earliest entry + assertEquals(TimestampOffset(-1L, baseOffset), idx.lookup(9)) + // look for timestamp in the middle of two entries. + assertEquals(TimestampOffset(20L, 65L), idx.lookup(25)) + // look for timestamp same as the one in the entry + assertEquals(TimestampOffset(30L, 75L), idx.lookup(30)) + } + + @Test + def testTruncate() { + appendEntries(maxEntries - 1) + idx.truncate() + assertEquals(0, idx.entries) + + appendEntries(maxEntries - 1) + idx.truncateTo(10 + baseOffset) + assertEquals(0, idx.entries) + } + + @Test + def testAppend() { + appendEntries(maxEntries - 1) + intercept[IllegalArgumentException] { + idx.maybeAppend(10000L, 1000L) + } + intercept[InvalidOffsetException] { + idx.maybeAppend(10000L, (maxEntries - 2) * 10, true) + } + idx.maybeAppend(10000L, 1000L, true) + } + + private def appendEntries(numEntries: Int) { + for (i <- 1 to numEntries) + idx.maybeAppend(i * 10, i * 10 + baseOffset) + } + + def nonExistantTempFile(): File = { + val file = TestUtils.tempFile() + file.delete() + file + } +} + diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala index 481000946a453..39eb84c8fd48a 100644 --- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala @@ -152,56 +152,69 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { @Test def testLogAppendTime() { - val startTime = System.currentTimeMillis() + val now = System.currentTimeMillis() // The timestamps should be overwritten val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = 0L, codec = NoCompressionCodec) val compressedMessagesWithRecompresion = getMessages(magicValue = Message.MagicValue_V0, codec = DefaultCompressionCodec) val compressedMessagesWithoutRecompression = - getMessages(magicValue = Message.MagicValue_V1, timestamp = -1L, codec = DefaultCompressionCodec) - - val (validatedMessages, _) = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), - now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, - messageFormatVersion = 1, - messageTimestampType = TimestampType.LOG_APPEND_TIME, - messageTimestampDiffMaxMs = 1000L) - - val (validatedCompressedMessages, _) = + getMessages(magicValue = Message.MagicValue_V1, timestamp = 0L, codec = DefaultCompressionCodec) + + val validatingResults = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), + now = now, + sourceCodec = NoCompressionCodec, + targetCodec = NoCompressionCodec, + messageFormatVersion = 1, + messageTimestampType = TimestampType.LOG_APPEND_TIME, + messageTimestampDiffMaxMs = 1000L) + val validatedMessages = validatingResults.validatedMessages + + val validatingCompressedMessagesResults = compressedMessagesWithRecompresion.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), - now = System.currentTimeMillis(), + now = now, sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, messageFormatVersion = 1, messageTimestampType = TimestampType.LOG_APPEND_TIME, messageTimestampDiffMaxMs = 1000L) + val validatedCompressedMessages = validatingCompressedMessagesResults.validatedMessages - val (validatedCompressedMessagesWithoutRecompression, _) = + val validatingCompressedMessagesWithoutRecompressionResults = compressedMessagesWithoutRecompression.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), - now = System.currentTimeMillis(), + now = now, sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, messageFormatVersion = 1, messageTimestampType = TimestampType.LOG_APPEND_TIME, messageTimestampDiffMaxMs = 1000L) - val now = System.currentTimeMillis() + val validatedCompressedMessagesWithoutRecompression = validatingCompressedMessagesWithoutRecompressionResults.validatedMessages + assertEquals("message set size should not change", messages.size, validatedMessages.size) validatedMessages.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message)) + assertEquals(s"Max timestamp should be $now", now, validatingResults.maxTimestamp) + assertEquals(s"The offset of max timestamp should be 0", 0, validatingResults.offsetOfMaxTimestamp) + assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged) assertEquals("message set size should not change", compressedMessagesWithRecompresion.size, validatedCompressedMessages.size) validatedCompressedMessages.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message)) assertTrue("MessageSet should still valid", validatedCompressedMessages.shallowIterator.next().message.isValid) + assertEquals(s"Max timestamp should be $now", now, validatingCompressedMessagesResults.maxTimestamp) + assertEquals(s"The offset of max timestamp should be ${compressedMessagesWithRecompresion.size - 1}", + compressedMessagesWithRecompresion.size - 1, validatingCompressedMessagesResults.offsetOfMaxTimestamp) + assertTrue("Message size may have been changed", validatingCompressedMessagesResults.messageSizeMaybeChanged) assertEquals("message set size should not change", compressedMessagesWithoutRecompression.size, validatedCompressedMessagesWithoutRecompression.size) validatedCompressedMessagesWithoutRecompression.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message)) assertTrue("MessageSet should still valid", validatedCompressedMessagesWithoutRecompression.shallowIterator.next().message.isValid) + assertEquals(s"Max timestamp should be $now", now, validatingCompressedMessagesWithoutRecompressionResults.maxTimestamp) + assertEquals(s"The offset of max timestamp should be ${compressedMessagesWithoutRecompression.size - 1}", + compressedMessagesWithoutRecompression.size - 1, validatingCompressedMessagesWithoutRecompressionResults.offsetOfMaxTimestamp) + assertFalse("Message size should not have been changed", validatingCompressedMessagesWithoutRecompressionResults.messageSizeMaybeChanged) def validateLogAppendTime(message: Message) { message.ensureValid() - assertTrue(s"Timestamp of message $message should be between $startTime and $now", - message.timestamp >= startTime && message.timestamp <= now) + assertEquals(s"Timestamp of message $message should be $now", now, message.timestamp) assertEquals(TimestampType.LOG_APPEND_TIME, message.timestampType) } } @@ -209,18 +222,28 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { @Test def testCreateTime() { val now = System.currentTimeMillis() - val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = NoCompressionCodec) - val compressedMessages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = DefaultCompressionCodec) - - val (validatedMessages, _) = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), - now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, - messageFormatVersion = 1, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 1000L) - - val (validatedCompressedMessages, _) = + val timestampSeq = Seq(now - 1, now + 1, now) + val messages = + new ByteBufferMessageSet(NoCompressionCodec, + new Message("hello".getBytes, timestamp = timestampSeq(0), magicValue = Message.MagicValue_V1), + new Message("there".getBytes, timestamp = timestampSeq(1), magicValue = Message.MagicValue_V1), + new Message("beautiful".getBytes, timestamp = timestampSeq(2), magicValue = Message.MagicValue_V1)) + val compressedMessages = + new ByteBufferMessageSet(DefaultCompressionCodec, + new Message("hello".getBytes, timestamp = timestampSeq(0), magicValue = Message.MagicValue_V1), + new Message("there".getBytes, timestamp = timestampSeq(1), magicValue = Message.MagicValue_V1), + new Message("beautiful".getBytes, timestamp = timestampSeq(2), magicValue = Message.MagicValue_V1)) + + val validatingResults = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), + now = System.currentTimeMillis(), + sourceCodec = NoCompressionCodec, + targetCodec = NoCompressionCodec, + messageFormatVersion = 1, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 1000L) + val validatedMessages = validatingResults.validatedMessages + + val validatingCompressedMessagesResults = compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, @@ -228,17 +251,29 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { messageFormatVersion = 1, messageTimestampType = TimestampType.CREATE_TIME, messageTimestampDiffMaxMs = 1000L) + val validatedCompressedMessages = validatingCompressedMessagesResults.validatedMessages + var i = 0 for (messageAndOffset <- validatedMessages) { messageAndOffset.message.ensureValid() - assertEquals(messageAndOffset.message.timestamp, now) + assertEquals(messageAndOffset.message.timestamp, timestampSeq(i)) assertEquals(messageAndOffset.message.timestampType, TimestampType.CREATE_TIME) + i += 1 } + assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp) + assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.offsetOfMaxTimestamp) + assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged) + i = 0 for (messageAndOffset <- validatedCompressedMessages) { messageAndOffset.message.ensureValid() - assertEquals(messageAndOffset.message.timestamp, now) + assertEquals(messageAndOffset.message.timestamp, timestampSeq(i)) assertEquals(messageAndOffset.message.timestampType, TimestampType.CREATE_TIME) + i += 1 } + assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp) + assertEquals(s"Offset of max timestamp should be ${validatedCompressedMessages.size - 1}", + validatedCompressedMessages.size - 1, validatingCompressedMessagesResults.offsetOfMaxTimestamp) + assertFalse("Message size should not have been changed", validatingCompressedMessagesResults.messageSizeMaybeChanged) } @Test @@ -287,7 +322,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { targetCodec = NoCompressionCodec, messageFormatVersion = 0, messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 1000L)._1, offset) + messageTimestampDiffMaxMs = 1000L).validatedMessages, offset) // check compressed messages checkOffsets(compressedMessages, 0) @@ -297,7 +332,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { targetCodec = DefaultCompressionCodec, messageFormatVersion = 0, messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 1000L)._1, offset) + messageTimestampDiffMaxMs = 1000L).validatedMessages, offset) } @@ -310,22 +345,22 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { // check uncompressed offsets checkOffsets(messages, 0) val offset = 1234567 - val (messageWithOffset, _) = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), - now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 5000L) + val messageWithOffset = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = NoCompressionCodec, + targetCodec = NoCompressionCodec, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 5000L).validatedMessages checkOffsets(messageWithOffset, offset) // check compressed messages checkOffsets(compressedMessages, 0) - val (compressedMessagesWithOffset, _) = compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), - now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 5000L) + val compressedMessagesWithOffset = compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 5000L).validatedMessages checkOffsets(compressedMessagesWithOffset, offset) } @@ -343,7 +378,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { targetCodec = NoCompressionCodec, messageFormatVersion = 1, messageTimestampType = TimestampType.LOG_APPEND_TIME, - messageTimestampDiffMaxMs = 1000L)._1, offset) + messageTimestampDiffMaxMs = 1000L).validatedMessages, offset) // check compressed messages checkOffsets(compressedMessagesV0, 0) @@ -353,7 +388,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { targetCodec = DefaultCompressionCodec, messageFormatVersion = 1, messageTimestampType = TimestampType.LOG_APPEND_TIME, - messageTimestampDiffMaxMs = 1000L)._1, offset) + messageTimestampDiffMaxMs = 1000L).validatedMessages, offset) // Check down conversion val now = System.currentTimeMillis() @@ -368,7 +403,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { targetCodec = NoCompressionCodec, messageFormatVersion = 0, messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 5000L)._1, offset) + messageTimestampDiffMaxMs = 5000L).validatedMessages, offset) // check compressed messages checkOffsets(compressedMessagesV1, 0) @@ -378,7 +413,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { targetCodec = DefaultCompressionCodec, messageFormatVersion = 0, messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 5000L)._1, offset) + messageTimestampDiffMaxMs = 5000L).validatedMessages, offset) } @Test diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 05b84eff38e51..131a24a7f75fc 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -274,8 +274,9 @@ object TestUtils extends Logging { def singleMessageSet(payload: Array[Byte], codec: CompressionCodec = NoCompressionCodec, key: Array[Byte] = null, + timestamp: Long = Message.NoTimestamp, magicValue: Byte = Message.CurrentMagicValue) = - new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload, key, Message.NoTimestamp, magicValue)) + new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload, key, timestamp, magicValue)) /** * Generate an array of random bytes diff --git a/docs/upgrade.html b/docs/upgrade.html index dfd20f446b9d5..eef21cf3b6a8b 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -15,10 +15,24 @@ limitations under the License. --> + +

1.5 Upgrading From Previous Versions

-
Notable changes in 0.10.1.0
+

Upgrading from 0.10.0.X to 0.10.1.0

+0.10.1.0 is compatible with 0.10.0.X in terms of wire protocol. The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it. +However, please notice the Potential breaking changes in 0.10.1.0 before upgrade. +
Potential breaking changes in 0.10.1.0
+ + +
Notable changes in 0.10.1.0