Skip to content

Commit

Permalink
Seek to the first one >= timestamp (apache#6393)
Browse files Browse the repository at this point in the history
The current logic for `resetCursor` by timestamp is odd. The first message it returns is the last message earlier or equal to the designated timestamp. This "earlier" message should be avoided to emit.

(cherry picked from commit 81f8afd)
  • Loading branch information
yjshen authored and tuteng committed Mar 21, 2020
1 parent eaba596 commit 3313e6c
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback
MessageImpl msg = null;
try {
msg = MessageImpl.deserialize(entry.getDataBuffer());
return msg.getPublishTime() <= timestamp;
return msg.getPublishTime() < timestamp;
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ public void findEntryComplete(Position position, Object ctx) {
"[{}][{}] Unable to find position for timestamp {}. Resetting cursor to first position {} in ledger",
topicName, subName, timestamp, finalPosition);
} else {
finalPosition = position;
finalPosition = position.getNext();
}
resetCursor(finalPosition, future);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1593,14 +1593,14 @@ public void persistentTopicsCursorReset(String topicName) throws Exception {

int receivedAfterReset = 0;

for (int i = 4; i < 10; i++) {
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(message.getData(), expected.getBytes());
}
assertEquals(receivedAfterReset, 6);
assertEquals(receivedAfterReset, 5);

consumer.close();

Expand Down Expand Up @@ -1652,29 +1652,29 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep

int receivedAfterReset = 0;

// Should received messages from 4-9
for (int i = 4; i < 10; i++) {
// Should received messages from 5-9
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(new String(message.getData()), expected);
}
assertEquals(receivedAfterReset, 6);
assertEquals(receivedAfterReset, 5);

// Reset at 2nd timestamp
receivedAfterReset = 0;
admin.topics().resetCursor(topicName, "my-sub", secondTimestamp);

// Should received messages from 7-9
for (int i = 7; i < 10; i++) {
// Should received messages from 8-9
for (int i = 8; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(new String(message.getData()), expected);
}
assertEquals(receivedAfterReset, 3);
assertEquals(receivedAfterReset, 2);

consumer.close();
admin.topics().deleteSubscription(topicName, "my-sub");
Expand Down Expand Up @@ -1722,14 +1722,14 @@ public void persistentTopicsCursorResetAndFailover() throws Exception {
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

int receivedAfterReset = 0;
for (int i = 4; i < 10; i++) {
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumerA.receive(5, TimeUnit.SECONDS);
consumerA.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(message.getData(), expected.getBytes());
}
assertEquals(receivedAfterReset, 6);
assertEquals(receivedAfterReset, 5);

// Closing consumerA activates consumerB
consumerA.close();
Expand Down Expand Up @@ -1785,7 +1785,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception {

Set<String> expectedMessages = Sets.newHashSet();
Set<String> receivedMessages = Sets.newHashSet();
for (int i = 4; i < 10; i++) {
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
expectedMessages.add("message-" + i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1487,14 +1487,14 @@ public void persistentTopicsCursorReset(String topicName) throws Exception {

int receivedAfterReset = 0;

for (int i = 4; i < 10; i++) {
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(message.getData(), expected.getBytes());
}
assertEquals(receivedAfterReset, 6);
assertEquals(receivedAfterReset, 5);

consumer.close();

Expand Down Expand Up @@ -1546,29 +1546,29 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep

int receivedAfterReset = 0;

// Should received messages from 4-9
for (int i = 4; i < 10; i++) {
// Should received messages from 5-9
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(new String(message.getData()), expected);
}
assertEquals(receivedAfterReset, 6);
assertEquals(receivedAfterReset, 5);

// Reset at 2nd timestamp
receivedAfterReset = 0;
admin.topics().resetCursor(topicName, "my-sub", secondTimestamp);

// Should received messages from 7-9
for (int i = 7; i < 10; i++) {
// Should received messages from 8-9
for (int i = 8; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(new String(message.getData()), expected);
}
assertEquals(receivedAfterReset, 3);
assertEquals(receivedAfterReset, 2);

consumer.close();
admin.topics().deleteSubscription(topicName, "my-sub");
Expand Down Expand Up @@ -1611,7 +1611,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception {

Set<String> expectedMessages = Sets.newHashSet();
Set<String> receivedMessages = Sets.newHashSet();
for (int i = 4; i < 10; i++) {
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
expectedMessages.add("message-" + i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testSeekTime() throws Exception {

long currentTimestamp = System.currentTimeMillis();
consumer.seek(currentTimestamp);
assertEquals(sub.getNumberOfEntriesInBacklog(false), 1);
assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);

// Wait for consumer to reconnect
Thread.sleep(1000);
Expand Down Expand Up @@ -187,7 +187,7 @@ public void testSeekTimeOnPartitionedTopic() throws Exception {
for (PersistentSubscription sub : subs) {
backlogs += sub.getNumberOfEntriesInBacklog(false);
}
assertEquals(backlogs, 2);
assertEquals(backlogs, 0);

// Wait for consumer to reconnect
Thread.sleep(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception {
reader.seek(l + plusTime);

Set<String> messageSet = Sets.newHashSet();
for (int i = halfMessages; i < numOfMessage; i++) {
for (int i = halfMessages + 1; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
String receivedMessage = new String(message.getData());
String expectedMessage = String.format("msg num %d", i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ public void testReaderWithTimeLong() throws Exception {
receivedMessageIds.add(msg.getMessageId());
}

assertEquals(receivedMessageIds.size(), totalMsg + 1);
assertEquals(receivedMessageIds.get(0), lastMsgId);
assertEquals(receivedMessageIds.size(), totalMsg);
assertEquals(receivedMessageIds.get(0), firstMsgId);

restartBroker();

Expand Down

0 comments on commit 3313e6c

Please sign in to comment.