Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ public static DBStore createDBStore(ConfigurationSource configuration,
return newBuilder(configuration, definition).build();
}

public static DBStore createDBStore(
ConfigurationSource configuration, DBDefinition definition,
RocksDBConfiguration rocksDBConfiguration) throws IOException {
DBStoreBuilder builder = newBuilder(configuration, rocksDBConfiguration);
builder.applyDBDefinition(definition);
return builder.build();
}

public static DBStoreBuilder newBuilder(ConfigurationSource configuration,
DBDefinition definition) {

Expand Down Expand Up @@ -408,6 +416,7 @@ protected void log(InfoLogLevel infoLogLevel, String s) {
// Apply WAL settings.
dbOptions.setWalTtlSeconds(rocksDBConfiguration.getWalTTL());
dbOptions.setWalSizeLimitMB(rocksDBConfiguration.getWalSizeLimit());
dbOptions.setManualWalFlush(rocksDBConfiguration.getManualWalFlush());

// Create statistics.
if (!rocksDbStat.equals(OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class RocksDBConfiguration {
+ "Default 0 means no limit.")
private long walSizeLimit = 0;

private boolean manualWalFlush = false;

public void setRocksdbLoggingEnabled(boolean enabled) {
this.rocksdbLogEnabled = enabled;
}
Expand Down Expand Up @@ -110,4 +112,12 @@ public void setWalSizeLimit(long limit) {
public long getWalSizeLimit() {
return walSizeLimit;
}

public void setManualWalFlush(boolean manualWalFlush) {
this.manualWalFlush = manualWalFlush;
}

public boolean getManualWalFlush() {
return manualWalFlush;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.ratis.statemachine.SnapshotInfo;

import java.io.IOException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;

Expand All @@ -39,9 +39,10 @@
* operation in DB.
*/
public class SCMHADBTransactionBufferImpl implements SCMHADBTransactionBuffer {
private static final Logger LOG =
LoggerFactory.getLogger(SCMHADBTransactionBufferImpl.class);
private final StorageContainerManager scm;
private SCMMetadataStore metadataStore;
private BatchOperation currentBatchOperation;
private TransactionInfo latestTrxInfo;
private SnapshotInfo latestSnapshot;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
Expand All @@ -52,16 +53,12 @@ public SCMHADBTransactionBufferImpl(StorageContainerManager scm)
init();
}

private BatchOperation getCurrentBatchOperation() {
return currentBatchOperation;
}

@Override
public <KEY, VALUE> void addToBuffer(
Table<KEY, VALUE> table, KEY key, VALUE value) throws IOException {
rwLock.readLock().lock();
try {
table.putWithBatch(getCurrentBatchOperation(), key, value);
table.put(key, value);
} finally {
rwLock.readLock().unlock();
}
Expand All @@ -72,7 +69,7 @@ public <KEY, VALUE> void removeFromBuffer(Table<KEY, VALUE> table, KEY key)
throws IOException {
rwLock.readLock().lock();
try {
table.deleteWithBatch(getCurrentBatchOperation(), key);
table.delete(key);
} finally {
rwLock.readLock().unlock();
}
Expand Down Expand Up @@ -110,14 +107,10 @@ public void flush() throws IOException {
// write latest trx info into trx table in the same batch
Table<String, TransactionInfo> transactionInfoTable
= metadataStore.getTransactionInfoTable();
transactionInfoTable.putWithBatch(currentBatchOperation,
TRANSACTION_INFO_KEY, latestTrxInfo);
transactionInfoTable.put(TRANSACTION_INFO_KEY, latestTrxInfo);

metadataStore.getStore().commitBatchOperation(currentBatchOperation);
currentBatchOperation.close();
metadataStore.getStore().flushLog(true);
this.latestSnapshot = latestTrxInfo.toSnapshotInfo();
// reset batch operation
currentBatchOperation = metadataStore.getStore().initBatchOperation();

DeletedBlockLog deletedBlockLog = scm.getScmBlockManager()
.getDeletedBlockLog();
Expand All @@ -135,11 +128,6 @@ public void init() throws IOException {

rwLock.writeLock().lock();
try {
IOUtils.closeQuietly(currentBatchOperation);

// initialize a batch operation during construction time
currentBatchOperation = this.metadataStore.getStore().
initBatchOperation();
latestTrxInfo = this.metadataStore.getTransactionInfoTable()
.get(TRANSACTION_INFO_KEY);
if (latestTrxInfo == null) {
Expand All @@ -164,8 +152,15 @@ public String toString() {

@Override
public void close() throws IOException {
if (currentBatchOperation != null) {
currentBatchOperation.close();
try {
if (metadataStore.getStore() != null
&& !metadataStore.getStore().isClosed()) {
flush();
}
} catch (IOException ex) {
// Ignore as buffer is getting close
LOG.error("Ignore exception occurred in close of transaction buffer",
ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package org.apache.hadoop.hdds.scm.ha;

import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.ratis.statemachine.SnapshotInfo;

Expand All @@ -32,7 +30,6 @@
*/
public class SCMHADBTransactionBufferStub implements SCMHADBTransactionBuffer {
private DBStore dbStore;
private BatchOperation currentBatchOperation;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

public SCMHADBTransactionBufferStub() {
Expand All @@ -42,23 +39,12 @@ public SCMHADBTransactionBufferStub(DBStore store) {
this.dbStore = store;
}

private BatchOperation getCurrentBatchOperation() {
if (currentBatchOperation == null) {
if (dbStore != null) {
currentBatchOperation = dbStore.initBatchOperation();
} else {
currentBatchOperation = new RDBBatchOperation();
}
}
return currentBatchOperation;
}

@Override
public <KEY, VALUE> void addToBuffer(
Table<KEY, VALUE> table, KEY key, VALUE value) throws IOException {
rwLock.readLock().lock();
try {
table.putWithBatch(getCurrentBatchOperation(), key, value);
table.put(key, value);
} finally {
rwLock.readLock().unlock();
}
Expand All @@ -69,7 +55,7 @@ public <KEY, VALUE> void removeFromBuffer(Table<KEY, VALUE> table, KEY key)
throws IOException {
rwLock.readLock().lock();
try {
table.deleteWithBatch(getCurrentBatchOperation(), key);
table.delete(key);
} finally {
rwLock.readLock().unlock();
}
Expand Down Expand Up @@ -100,11 +86,7 @@ public void flush() throws IOException {
rwLock.writeLock().lock();
try {
if (dbStore != null) {
dbStore.commitBatchOperation(getCurrentBatchOperation());
}
if (currentBatchOperation != null) {
currentBatchOperation.close();
currentBatchOperation = null;
dbStore.flushLog(true);
}
} finally {
rwLock.writeLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.hdds.utils.db.RocksDBConfiguration;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;

Expand Down Expand Up @@ -138,7 +139,11 @@ public void start(OzoneConfiguration config)
}


this.store = DBStoreBuilder.createDBStore(config, new SCMDBDefinition());
RocksDBConfiguration rocksdbConf
= configuration.getObject(RocksDBConfiguration.class);
rocksdbConf.setManualWalFlush(true);
this.store = DBStoreBuilder.createDBStore(config, new SCMDBDefinition(),
rocksdbConf);

deletedBlocksTable =
DELETED_BLOCKS.getTable(this.store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,9 @@ private StorageContainerManager(OzoneConfiguration conf,
// and then come back and enable it without any impact.
securityProtocolServer = null;
}

// flush db update for security or seq generator upgrade
scmMetadataStore.getStore().flushLog(true);

scmStarterUser = UserGroupInformation.getCurrentUser().getShortUserName();
Collection<String> scmAdminUsernames =
Expand Down Expand Up @@ -1500,6 +1503,7 @@ public void start() throws IOException {
if (getSecurityProtocolServer() != null) {
getSecurityProtocolServer().start();
persistSCMCertificates();
scmMetadataStore.getStore().flushLog(true);
}

scmBlockManager.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.util.Time;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -316,9 +317,8 @@ public void testContainerManagerTransactionId() throws Exception {

// Create 30 TXs
addTransactions(generateData(30), false);
// Since transactions are not yet flushed deleteTransactionId should be
// 0 for all containers
Assertions.assertEquals(0, getAllTransactions().size());
// Transaction are obtained from buffer transaction, 3 DN, so count 90
Assertions.assertEquals(90, getAllTransactions().size());
for (ContainerInfo containerInfo : containerManager.getContainers()) {
Assertions.assertEquals(0, containerInfo.getDeleteTransactionId());
}
Expand Down Expand Up @@ -678,4 +678,116 @@ private void mockInadequateReplicaUnhealthyContainerInfo(long containerID)
ContainerID.valueOf(containerID)))
.thenReturn(replicaSet);
}


@Test
public void testSCMTransacationBufferPerfTest() throws Exception {
Table<ContainerID, ContainerInfo> tbl1
= scm.getScmMetadataStore().getContainerTable();

HddsProtos.ContainerInfoProto.Builder containerInfoBuilder
= HddsProtos.ContainerInfoProto
.newBuilder()
.setState(HddsProtos.LifeCycleState.OPEN)
.setPipelineID(PipelineID.randomId().getProtobuf())
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.now())
.setOwner("test")
.setContainerID(1)
.setDeleteTransactionId(0)
.setReplicationType(HddsProtos.ReplicationType.RATIS);
HddsProtos.ContainerInfoProto build = containerInfoBuilder.build();
long start = 0;
long end = 0;
long[] addTs = new long[8];
for (int i = 0; i < 8; ++i) {
addTs[i] = 0;
}
scm.getScmMetadataStore().getStore().flushLog(true);
scm.getScmMetadataStore().getStore().flushDB();
int itrLen = 10;
for (int k = 0; k < itrLen; ++k) {
start = System.nanoTime();
for (int i = 0; i < 100000; ++i) {
scm.getScmHAManager().getDBTransactionBuffer().addToBuffer(tbl1,
new ContainerID(i), ContainerInfo.fromProtobuf(build));
}
end = System.nanoTime();
System.out.println("Time taken Add: " + (end - start));
addTs[0] += (end - start);

start = System.nanoTime();
scm.getScmHAManager().getDBTransactionBuffer().close();
end = System.nanoTime();
System.out.println("Time taken Add Flush: " + (end - start));
addTs[1] += (end - start);

start = System.nanoTime();
for (int i = 0; i < 100000; ++i) {
scm.getScmHAManager().getDBTransactionBuffer().removeFromBuffer(tbl1,
new ContainerID(i));
}
end = System.nanoTime();
System.out.println("Time taken Delete: " + (end - start));
addTs[2] += (end - start);

start = System.nanoTime();
scm.getScmHAManager().getDBTransactionBuffer().close();
end = System.nanoTime();
System.out.println("Time taken Delete flush: " + (end - start));
addTs[3] += (end - start);

start = System.nanoTime();
for (int i = 0; i < 100000; ++i) {
scm.getScmHAManager().getDBTransactionBuffer().addToBuffer(tbl1,
new ContainerID(i), ContainerInfo.fromProtobuf(build));
scm.getScmHAManager().getDBTransactionBuffer().removeFromBuffer(tbl1,
new ContainerID(i));
}
end = System.nanoTime();
System.out.println("Time taken Add/Delete: " + (end - start));
addTs[4] += (end - start);

start = System.nanoTime();
scm.getScmHAManager().getDBTransactionBuffer().close();
end = System.nanoTime();
System.out.println("Time taken Add/Delete flush: " + (end - start));
addTs[5] += (end - start);

// Update test
start = System.nanoTime();
for (int i = 0; i < 100000; ++i) {
scm.getScmHAManager().getDBTransactionBuffer().addToBuffer(tbl1,
new ContainerID(i), ContainerInfo.fromProtobuf(build));
scm.getScmHAManager().getDBTransactionBuffer().addToBuffer(tbl1,
new ContainerID(i), ContainerInfo.fromProtobuf(build));
}
end = System.nanoTime();
System.out.println("Time taken Update: " + (end - start));
addTs[6] += (end - start);

start = System.nanoTime();
scm.getScmHAManager().getDBTransactionBuffer().close();
end = System.nanoTime();
System.out.println("Time taken Update Flush: " + (end - start));
addTs[7] += (end - start);

for (int i = 0; i < 100000; ++i) {
scm.getScmHAManager().getDBTransactionBuffer().removeFromBuffer(tbl1,
new ContainerID(i));
}
scm.getScmHAManager().getDBTransactionBuffer().close();
}

System.out.println("Average usages");
System.out.println("Time taken Add: " + addTs[0] / itrLen);
System.out.println("Time taken Add Flush: " + addTs[1] / itrLen);
System.out.println("Time taken Del: " + addTs[2] / itrLen);
System.out.println("Time taken Del Flush: " + addTs[3] / itrLen);
System.out.println("Time taken Add/Del: " + addTs[4] / itrLen);
System.out.println("Time taken Add/Del Flush: " + addTs[5] / itrLen);
System.out.println("Time taken Update: " + addTs[6] / itrLen);
System.out.println("Time taken Update Flush: " + addTs[7] / itrLen);
}
}