Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ public class RDBSstFileWriter implements DumpFileWriter, Closeable {
private SstFileWriter sstFileWriter;
private File sstFile;
private AtomicLong keyCounter;
private Options emptyOption = new Options();

public RDBSstFileWriter() {
EnvOptions envOptions = new EnvOptions();
this.sstFileWriter = new SstFileWriter(envOptions, new Options());
this.sstFileWriter = new SstFileWriter(envOptions, emptyOption);
this.keyCounter = new AtomicLong(0);
}

Expand Down Expand Up @@ -83,6 +84,7 @@ public void close() throws IOException {
} finally {
sstFileWriter.close();
sstFileWriter = null;
emptyOption.close();
}

keyCounter.set(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,8 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
throw new IllegalArgumentException("Illegal count for getUpdatesSince.");
}
DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
try {
TransactionLogIterator transactionLogIterator =
db.getUpdatesSince(sequenceNumber);
try (TransactionLogIterator transactionLogIterator =
db.getUpdatesSince(sequenceNumber)) {

// Only the first record needs to be checked if its seq number <
// ( 1 + passed_in_sequence_number). For example, if seqNumber passed
Expand All @@ -298,24 +297,28 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
while (transactionLogIterator.isValid()) {
TransactionLogIterator.BatchResult result =
transactionLogIterator.getBatch();
long currSequenceNumber = result.sequenceNumber();
if (checkValidStartingSeqNumber &&
currSequenceNumber > 1 + sequenceNumber) {
throw new SequenceNumberNotFoundException("Unable to read data from" +
" RocksDB wal to get delta updates. It may have already been" +
"flushed to SSTs.");
}
// If the above condition was not satisfied, then it is OK to reset
// the flag.
checkValidStartingSeqNumber = false;
if (currSequenceNumber <= sequenceNumber) {
transactionLogIterator.next();
continue;
}
dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(),
result.sequenceNumber());
if (currSequenceNumber - sequenceNumber >= limitCount) {
break;
try {
long currSequenceNumber = result.sequenceNumber();
if (checkValidStartingSeqNumber &&
currSequenceNumber > 1 + sequenceNumber) {
throw new SequenceNumberNotFoundException("Unable to read data from"
+ " RocksDB wal to get delta updates. It may have already been"
+ "flushed to SSTs.");
}
// If the above condition was not satisfied, then it is OK to reset
// the flag.
checkValidStartingSeqNumber = false;
if (currSequenceNumber <= sequenceNumber) {
transactionLogIterator.next();
continue;
}
dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(),
result.sequenceNumber());
if (currSequenceNumber - sequenceNumber >= limitCount) {
break;
}
} finally {
result.writeBatch().close();
}
transactionLogIterator.next();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,11 @@ static IOException toIOException(Object name, String op, RocksDBException e) {
* Read DB and return existing column families.
*
* @return a list of column families.
* @see RocksDB#listColumnFamilies(Options, String)
*/
private static List<TableConfig> getColumnFamilies(File file)
throws RocksDBException {
final List<TableConfig> columnFamilies = RocksDB.listColumnFamilies(
new Options(), file.getAbsolutePath())
final List<TableConfig> columnFamilies = listColumnFamiliesEmptyOptions(
file.getAbsolutePath())
.stream()
.map(TableConfig::newTableConfig)
.collect(Collectors.toList());
Expand All @@ -88,6 +87,21 @@ private static List<TableConfig> getColumnFamilies(File file)
return columnFamilies;
}

/**
* Read DB column families without Options.
* @param path
* @return A list of column family names
* @throws RocksDBException
*
* @see RocksDB#listColumnFamilies(Options, String)
*/
public static List<byte[]> listColumnFamiliesEmptyOptions(final String path)
throws RocksDBException {
try (Options emptyOptions = new Options()) {
return RocksDB.listColumnFamilies(emptyOptions, path);
}
}

static RocksDatabase open(File dbFile, DBOptions dbOptions,
WriteOptions writeOptions, Set<TableConfig> families,
boolean readOnly) throws IOException {
Expand Down
Loading