From 3313e6cc85d05e60451d1791a333851ad7eecfe3 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 24 Feb 2020 12:25:52 +0800 Subject: [PATCH] Seek to the first one >= timestamp (#6393) The current logic for `resetCursor` by timestamp is odd. The first message it returns is the last message earlier or equal to the designated timestamp. This "earlier" message should be avoided to emit. (cherry picked from commit 81f8afd18d4f7a097e650997efc3d86e1920fb88) --- .../persistent/PersistentMessageFinder.java | 2 +- .../persistent/PersistentSubscription.java | 2 +- .../pulsar/broker/admin/AdminApiTest.java | 22 +++++++++---------- .../broker/admin/v1/V1_AdminApiTest.java | 18 +++++++-------- .../broker/service/SubscriptionSeekTest.java | 4 ++-- .../pulsar/client/api/TopicReaderTest.java | 2 +- .../apache/pulsar/client/impl/ReaderTest.java | 4 ++-- 7 files changed, 27 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java index 9e7514987bf56..c90b01f497058 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java @@ -66,7 +66,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback MessageImpl msg = null; try { msg = MessageImpl.deserialize(entry.getDataBuffer()); - return msg.getPublishTime() <= timestamp; + return msg.getPublishTime() < timestamp; } catch (Exception e) { log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e); } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 1b0ae9f77ca07..5e6a2135fdd92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -654,7 +654,7 @@ public void findEntryComplete(Position position, Object ctx) { "[{}][{}] Unable to find position for timestamp {}. Resetting cursor to first position {} in ledger", topicName, subName, timestamp, finalPosition); } else { - finalPosition = position; + finalPosition = position.getNext(); } resetCursor(finalPosition, future); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 55cfc13b22ea7..61d8cd0d58df3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1593,14 +1593,14 @@ public void persistentTopicsCursorReset(String topicName) throws Exception { int receivedAfterReset = 0; - for (int i = 4; i < 10; i++) { + for (int i = 5; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(message.getData(), expected.getBytes()); } - assertEquals(receivedAfterReset, 6); + assertEquals(receivedAfterReset, 5); consumer.close(); @@ -1652,29 +1652,29 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep int receivedAfterReset = 0; - // Should received messages from 4-9 - for (int i = 4; i < 10; i++) { + // Should received messages from 5-9 + for (int i = 5; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(new String(message.getData()), expected); } - assertEquals(receivedAfterReset, 6); + assertEquals(receivedAfterReset, 5); // Reset at 2nd timestamp receivedAfterReset = 0; admin.topics().resetCursor(topicName, "my-sub", secondTimestamp); - // Should received messages from 7-9 - for (int i = 7; i < 10; i++) { + // Should received messages from 8-9 + for (int i = 8; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(new String(message.getData()), expected); } - assertEquals(receivedAfterReset, 3); + assertEquals(receivedAfterReset, 2); consumer.close(); admin.topics().deleteSubscription(topicName, "my-sub"); @@ -1722,14 +1722,14 @@ public void persistentTopicsCursorResetAndFailover() throws Exception { .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); int receivedAfterReset = 0; - for (int i = 4; i < 10; i++) { + for (int i = 5; i < 10; i++) { Message message = consumerA.receive(5, TimeUnit.SECONDS); consumerA.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(message.getData(), expected.getBytes()); } - assertEquals(receivedAfterReset, 6); + assertEquals(receivedAfterReset, 5); // Closing consumerA activates consumerB consumerA.close(); @@ -1785,7 +1785,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception { Set expectedMessages = Sets.newHashSet(); Set receivedMessages = Sets.newHashSet(); - for (int i = 4; i < 10; i++) { + for (int i = 5; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); expectedMessages.add("message-" + i); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index eec4e0865dc8b..9d765ec405b1a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -1487,14 +1487,14 @@ public void persistentTopicsCursorReset(String topicName) throws Exception { int receivedAfterReset = 0; - for (int i = 4; i < 10; i++) { + for (int i = 5; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(message.getData(), expected.getBytes()); } - assertEquals(receivedAfterReset, 6); + assertEquals(receivedAfterReset, 5); consumer.close(); @@ -1546,29 +1546,29 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep int receivedAfterReset = 0; - // Should received messages from 4-9 - for (int i = 4; i < 10; i++) { + // Should received messages from 5-9 + for (int i = 5; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(new String(message.getData()), expected); } - assertEquals(receivedAfterReset, 6); + assertEquals(receivedAfterReset, 5); // Reset at 2nd timestamp receivedAfterReset = 0; admin.topics().resetCursor(topicName, "my-sub", secondTimestamp); - // Should received messages from 7-9 - for (int i = 7; i < 10; i++) { + // Should received messages from 8-9 + for (int i = 8; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(new String(message.getData()), expected); } - assertEquals(receivedAfterReset, 3); + assertEquals(receivedAfterReset, 2); consumer.close(); admin.topics().deleteSubscription(topicName, "my-sub"); @@ -1611,7 +1611,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception { Set expectedMessages = Sets.newHashSet(); Set receivedMessages = Sets.newHashSet(); - for (int i = 4; i < 10; i++) { + for (int i = 5; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); expectedMessages.add("message-" + i); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index c9cc07ccaed00..9ea1e0cca07a2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -135,7 +135,7 @@ public void testSeekTime() throws Exception { long currentTimestamp = System.currentTimeMillis(); consumer.seek(currentTimestamp); - assertEquals(sub.getNumberOfEntriesInBacklog(false), 1); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 0); // Wait for consumer to reconnect Thread.sleep(1000); @@ -187,7 +187,7 @@ public void testSeekTimeOnPartitionedTopic() throws Exception { for (PersistentSubscription sub : subs) { backlogs += sub.getNumberOfEntriesInBacklog(false); } - assertEquals(backlogs, 2); + assertEquals(backlogs, 0); // Wait for consumer to reconnect Thread.sleep(1000); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 8aa1bcdaa917c..552f69db57609 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -754,7 +754,7 @@ public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception { reader.seek(l + plusTime); Set messageSet = Sets.newHashSet(); - for (int i = halfMessages; i < numOfMessage; i++) { + for (int i = halfMessages + 1; i < numOfMessage; i++) { Message message = reader.readNext(); String receivedMessage = new String(message.getData()); String expectedMessage = String.format("msg num %d", i); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index e0a0fe28a78c1..0f801d7356d71 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -245,8 +245,8 @@ public void testReaderWithTimeLong() throws Exception { receivedMessageIds.add(msg.getMessageId()); } - assertEquals(receivedMessageIds.size(), totalMsg + 1); - assertEquals(receivedMessageIds.get(0), lastMsgId); + assertEquals(receivedMessageIds.size(), totalMsg); + assertEquals(receivedMessageIds.get(0), firstMsgId); restartBroker();