Skip to content

Commit

Permalink
[Broker]Reset cursor with a non-exists position (#6120)
Browse files Browse the repository at this point in the history
`ManagedCursorImpl.asyncResetCursor` is used in three kinds of circumstances:
- REST API: create a subscription with messageId. Per the document: Reset subscription to message position closest to given position.
- REST API: reset subscription to a given position: Per the document: Reset subscription to message position closest to given position.
- Consumer seek command.

In all the cases above, when the user provides a MessageId, we should make the best effort to find the closest position, instead of throwing an InvalidCursorPosition Exception. 

This is because if a user provids an invalid position, it's not possible for he or she gets a valid position, since ledger ids for a given topic may not be continuous and only brokers are aware of the order. Therefore, we should avoid throw invalid cursor position but find the nearest position and do the reset stuff.

(cherry picked from commit d2f37a7)
  • Loading branch information
yjshen authored and tuteng committed Apr 13, 2020
1 parent fc51a13 commit b40b7f5
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -948,14 +948,21 @@ public void asyncResetCursor(Position newPos, AsyncCallbacks.ResetCursorCallback

// order trim and reset operations on a ledger
ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
if (ledger.isValidPosition(newPosition) || newPosition.equals(PositionImpl.earliest)
|| newPosition.equals(PositionImpl.latest)) {
internalResetCursor(newPosition, callback);
} else {
// caller (replay) should handle this error and retry cursor reset
callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException(newPosition.toString()),
newPosition);
PositionImpl actualPosition = newPosition;

if (!ledger.isValidPosition(actualPosition) &&
!actualPosition.equals(PositionImpl.earliest) &&
!actualPosition.equals(PositionImpl.latest)) {
actualPosition = ledger.getNextValidPosition(actualPosition);

if (actualPosition == null) {
// next valid position would only return null when newPos
// is larger than all available positions, then it's latest in effect.
actualPosition = PositionImpl.latest;
}
}

internalResetCursor(actualPosition, callback);
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,77 @@ void testResetCursor() throws Exception {
ledger.close();
}

@Test(timeOut = 20000)
void testResetCursor1() throws Exception {
ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
ManagedCursor cursor = ledger.openCursor("trc1");
PositionImpl actualEarliest = (PositionImpl) ledger.addEntry("dummy-entry-1".getBytes(Encoding));
ledger.addEntry("dummy-entry-2".getBytes(Encoding));
ledger.addEntry("dummy-entry-3".getBytes(Encoding));
PositionImpl lastInPrev = (PositionImpl) ledger.addEntry("dummy-entry-4".getBytes(Encoding));
PositionImpl firstInNext = (PositionImpl) ledger.addEntry("dummy-entry-5".getBytes(Encoding));
ledger.addEntry("dummy-entry-6".getBytes(Encoding));
ledger.addEntry("dummy-entry-7".getBytes(Encoding));
ledger.addEntry("dummy-entry-8".getBytes(Encoding));
ledger.addEntry("dummy-entry-9".getBytes(Encoding));
PositionImpl last = (PositionImpl) ledger.addEntry("dummy-entry-10".getBytes(Encoding));

final AtomicBoolean moveStatus = new AtomicBoolean(false);

// reset to earliest
PositionImpl earliest = PositionImpl.earliest;
try {
cursor.resetCursor(earliest);
moveStatus.set(true);
} catch (Exception e) {
log.warn("error in reset cursor", e.getCause());
}
assertTrue(moveStatus.get());
PositionImpl earliestPos = new PositionImpl(actualEarliest.getLedgerId(), -1);
assertEquals(earliestPos, cursor.getReadPosition());
moveStatus.set(false);

// reset to one after last entry in a ledger should point to the first entry in the next ledger
PositionImpl resetPosition = new PositionImpl(lastInPrev.getLedgerId(), lastInPrev.getEntryId() + 1);
try {
cursor.resetCursor(resetPosition);
moveStatus.set(true);
} catch (Exception e) {
log.warn("error in reset cursor", e.getCause());
}
assertTrue(moveStatus.get());
assertEquals(firstInNext, cursor.getReadPosition());
moveStatus.set(false);

// reset to a non exist larger ledger should point to the first non-exist entry in the last ledger
PositionImpl latest = new PositionImpl(last.getLedgerId() + 2, 0);
try {
cursor.resetCursor(latest);
moveStatus.set(true);
} catch (Exception e) {
log.warn("error in reset cursor", e.getCause());
}
assertTrue(moveStatus.get());
PositionImpl lastPos = new PositionImpl(last.getLedgerId(), last.getEntryId() + 1);
assertEquals(lastPos, cursor.getReadPosition());
moveStatus.set(false);

// reset to latest should point to the first non-exist entry in the last ledger
PositionImpl anotherLast = PositionImpl.latest;
try {
cursor.resetCursor(anotherLast);
moveStatus.set(true);
} catch (Exception e) {
log.warn("error in reset cursor", e.getCause());
}
assertTrue(moveStatus.get());
assertEquals(lastPos, cursor.getReadPosition());

cursor.close();
ledger.close();
}

@Test(timeOut = 20000)
void testasyncResetCursor() throws Exception {
ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,8 @@ public void testResetCursorOnPosition(String namespaceName) throws Exception {
try {
messageId = new MessageIdImpl(0, 0, -1);
admin.topics().resetCursor(topicName, "my-sub", messageId);
fail("It should have failed due to invalid subscription name");
} catch (PulsarAdminException.PreconditionFailedException e) {
// Ok
fail("It shouldn't fail for a invalid position");
}

consumer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,9 +501,8 @@ public void testResetCursorOnPosition(String namespaceName) throws Exception {
try {
messageId = new MessageIdImpl(0, 0, -1);
admin.topics().resetCursor(topicName, "my-sub", messageId);
fail("It should have failed due to invalid subscription name");
} catch (PulsarAdminException.PreconditionFailedException e) {
// Ok
fail("It shouldn't fail for a invalid position");
}

consumer.close();
Expand Down

0 comments on commit b40b7f5

Please sign in to comment.