Skip to content

Commit

Permalink
GEARPUMP-17, fix KafkaStorage lookup timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
manuzhang committed Apr 26, 2016
1 parent 4bde18c commit 678a509
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ class KafkaStorage private[kafka](
}
}


/**
* offsets with timestamp < `time` have already been processed by the system
* so we look up the storage for the first offset with timestamp >= `time` on replay
*
* @param time the timestamp to look up for the earliest unprocessed offset
* @return the earliest unprocessed offset if `time` is in the range, otherwise failure
*/
override def lookUp(time: TimeStamp): Try[Array[Byte]] = {
if (dataByTime.isEmpty) {
Failure(StorageEmpty)
Expand All @@ -106,7 +112,7 @@ class KafkaStorage private[kafka](
} else if (time > max._1) {
Failure(Overflow(max._2))
} else {
Success(dataByTime.reverse.find(_._1 <= time).get._2)
Success(dataByTime.find(_._1 >= time).get._2)
}
}
}
Expand Down

0 comments on commit 678a509

Please sign in to comment.