Skip to content

Commit b7b69dc

Browse files
summeriiiilhotari
authored andcommitted
[fix][client] Fix Reader.hasMessageAvailable return wrong value after seeking by timestamp with startMessageIdInclusive (#23502)
(cherry picked from commit fcb3592)
1 parent eddf395 commit b7b69dc

File tree

2 files changed

+42
-0
lines changed

2 files changed

+42
-0
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java

+40
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,46 @@ public void testHasMessageAvailableAfterSeekTimestamp(boolean initializeLastMess
855855
}
856856
}
857857

858+
@Test
859+
public void testHasMessageAvailableAfterSeekTimestampWithMessageIdInclusive() throws Exception {
860+
final String topic = "persistent://my-property/my-ns/" +
861+
"testHasMessageAvailableAfterSeekTimestampWithMessageInclusive";
862+
863+
@Cleanup
864+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
865+
final long timestampBeforeSend = System.currentTimeMillis();
866+
final MessageId sentMsgId = producer.send("msg");
867+
868+
final List<MessageId> messageIds = new ArrayList<>();
869+
messageIds.add(MessageId.earliest);
870+
messageIds.add(sentMsgId);
871+
messageIds.add(MessageId.latest);
872+
873+
for (MessageId messageId : messageIds) {
874+
@Cleanup
875+
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
876+
.startMessageIdInclusive()
877+
.startMessageId(messageId).create();
878+
assertTrue(reader.hasMessageAvailable());
879+
880+
reader.seek(System.currentTimeMillis());
881+
assertFalse(reader.hasMessageAvailable());
882+
Message<String> message = reader.readNext(10, TimeUnit.SECONDS);
883+
assertNull(message);
884+
}
885+
886+
for (MessageId messageId : messageIds) {
887+
@Cleanup
888+
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
889+
.startMessageIdInclusive()
890+
.startMessageId(messageId).create();
891+
assertTrue(reader.hasMessageAvailable());
892+
893+
reader.seek(timestampBeforeSend);
894+
assertTrue(reader.hasMessageAvailable());
895+
}
896+
}
897+
858898
@Test
859899
public void testReaderBuilderStateOnRetryFailure() throws Exception {
860900
String ns = "my-property/my-ns";

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

+2
Original file line numberDiff line numberDiff line change
@@ -2426,6 +2426,8 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
24262426
.result();
24272427
if (lastMessageId.getEntryId() < 0) {
24282428
completehasMessageAvailableWithValue(booleanFuture, false);
2429+
} else if (hasSoughtByTimestamp) {
2430+
completehasMessageAvailableWithValue(booleanFuture, result < 0);
24292431
} else {
24302432
completehasMessageAvailableWithValue(booleanFuture,
24312433
resetIncludeHead ? result <= 0 : result < 0);

0 commit comments

Comments
 (0)