Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,14 @@ <KEY, VALUE> void move(KEY sourceKey, KEY destKey, VALUE value,
*/
DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
throws SequenceNumberNotFoundException;

/**
* Get limited data written to DB since a specific sequence number.
* @param sequenceNumber
* @param limitCount
* @return
* @throws SequenceNumberNotFoundException
*/
DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
throws SequenceNumberNotFoundException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,15 @@ public CodecRegistry getCodecRegistry() {
@Override
public DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
throws SequenceNumberNotFoundException {
return getUpdatesSince(sequenceNumber, Long.MAX_VALUE);
}

@Override
public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
throws SequenceNumberNotFoundException {
if (limitCount <= 0) {
throw new IllegalArgumentException("Illegal count for getUpdatesSince.");
}
DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
try {
TransactionLogIterator transactionLogIterator =
Expand Down Expand Up @@ -415,6 +423,9 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
}
dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(),
result.sequenceNumber());
if (currSequenceNumber - sequenceNumber >= limitCount) {
break;
}
transactionLogIterator.next();
}
} catch (RocksDBException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,30 @@ public void testGetDBUpdatesSince() throws Exception {
}
}

@Test
public void testGetDBUpdatesSinceWithLimitCount() throws Exception {

try (RDBStore newStore =
new RDBStore(folder.newFolder(), options, configSet)) {

try (Table firstTable = newStore.getTable(families.get(1))) {
firstTable.put(
org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key1"),
org.apache.commons.codec.binary.StringUtils
.getBytesUtf16("Value1"));
firstTable.put(
org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key2"),
org.apache.commons.codec.binary.StringUtils
.getBytesUtf16("Value2"));
}
Assert.assertTrue(
newStore.getDb().getLatestSequenceNumber() == 2);

DBUpdatesWrapper dbUpdatesSince = newStore.getUpdatesSince(0, 1);
Assert.assertEquals(1, dbUpdatesSince.getData().size());
}
}

@Test
public void testDowngrade() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,7 @@ message ServiceListRequest {

message DBUpdatesRequest {
required uint64 sequenceNumber = 1;
optional uint64 limitCount = 2;
}

message ServiceListResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3489,8 +3489,12 @@ public boolean isRatisEnabled() {
public DBUpdates getDBUpdates(
DBUpdatesRequest dbUpdatesRequest)
throws SequenceNumberNotFoundException {
long limitCount = Long.MAX_VALUE;
if (dbUpdatesRequest.hasLimitCount()) {
limitCount = dbUpdatesRequest.getLimitCount();
}
DBUpdatesWrapper updatesSince = metadataManager.getStore()
.getUpdatesSince(dbUpdatesRequest.getSequenceNumber());
.getUpdatesSince(dbUpdatesRequest.getSequenceNumber(), limitCount);
DBUpdates dbUpdates = new DBUpdates(updatesSince.getData());
dbUpdates.setCurrentSequenceNumber(updatesSince.getCurrentSequenceNumber());
return dbUpdates;
Expand Down