Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ public final class OzoneConfigKeys {
public static final double
HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD_DEFAULT = 0.95;

public static final String HDDS_DATANODE_METADATA_ROCKSDB_CACHE_SIZE =
"hdds.datanode.metadata.rocksdb.cache.size";
public static final String
HDDS_DATANODE_METADATA_ROCKSDB_CACHE_SIZE_DEFAULT = "64MB";

public static final String OZONE_SECURITY_ENABLED_KEY =
"ozone.security.enabled";
public static final boolean OZONE_SECURITY_ENABLED_DEFAULT = false;
Expand Down
10 changes: 10 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,16 @@
</description>
</property>

<property>
<name>hdds.datanode.metadata.rocksdb.cache.size</name>
<value>64MB</value>
<tag>OZONE, DATANODE, MANAGEMENT</tag>
<description>
Size of the block metadata cache shared among RocksDB instances on each
datanode. All containers on a datanode will share this cache.
</description>
</property>

<property>
<name>hdds.command.status.report.interval</name>
<value>30s</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,41 @@
*/
package org.apache.hadoop.ozone.container.metadata;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.hdds.utils.db.*;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.LRUCache;
import org.rocksdb.RocksDB;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
import org.rocksdb.util.SizeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF;
import static org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_METADATA_ROCKSDB_CACHE_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_METADATA_ROCKSDB_CACHE_SIZE_DEFAULT;

/**
* Implementation of the {@link DatanodeStore} interface that contains
Expand All @@ -55,10 +68,15 @@ public abstract class AbstractDatanodeStore implements DatanodeStore {
private Table<String, ChunkInfoList> deletedBlocksTable;

private static final Logger LOG =
LoggerFactory.getLogger(AbstractDatanodeStore.class);
LoggerFactory.getLogger(AbstractDatanodeStore.class);
private DBStore store;
private final AbstractDatanodeDBDefinition dbDef;
private final long containerID;
private final ColumnFamilyOptions cfOptions;

private static final DBProfile DEFAULT_PROFILE = DBProfile.DISK;
private static final Map<ConfigurationSource, ColumnFamilyOptions>
OPTIONS_CACHE = new ConcurrentHashMap<>();

/**
* Constructs the metadata store and starts the DB services.
Expand All @@ -67,18 +85,26 @@ public abstract class AbstractDatanodeStore implements DatanodeStore {
* @throws IOException - on Failure.
*/
protected AbstractDatanodeStore(ConfigurationSource config, long containerID,
AbstractDatanodeDBDefinition dbDef)
throws IOException {
AbstractDatanodeDBDefinition dbDef) throws IOException {

// The same config instance is used on each datanode, so we can share the
// corresponding column family options, providing a single shared cache
// for all containers on a datanode.
if (!OPTIONS_CACHE.containsKey(config)) {
OPTIONS_CACHE.put(config, buildColumnFamilyOptions(config));
}
cfOptions = OPTIONS_CACHE.get(config);

this.dbDef = dbDef;
this.containerID = containerID;
start(config);
}

@Override
public void start(ConfigurationSource config)
throws IOException {
throws IOException {
if (this.store == null) {
DBOptions options = new DBOptions();
DBOptions options = DEFAULT_PROFILE.getDBOptions();
options.setCreateIfMissing(true);
options.setCreateMissingColumnFamilies(true);

Expand All @@ -93,7 +119,8 @@ public void start(ConfigurationSource config)
}

this.store = DBStoreBuilder.newBuilder(config, dbDef)
.setDBOption(options)
.setDBOptions(options)
.setDefaultCFOptions(cfOptions)
.build();

// Use the DatanodeTable wrapper to disable the table iterator on
Expand Down Expand Up @@ -179,6 +206,12 @@ public void compactDB() throws IOException {
store.compactDB();
}

@VisibleForTesting
public static Map<ConfigurationSource, ColumnFamilyOptions>
getColumnFamilyOptionsCache() {
return Collections.unmodifiableMap(OPTIONS_CACHE);
}

private static void checkTableStatus(Table<?, ?> table, String name)
throws IOException {
String logMessage = "Unable to get a reference to %s table. Cannot " +
Expand All @@ -191,6 +224,26 @@ private static void checkTableStatus(Table<?, ?> table, String name)
}
}

private static ColumnFamilyOptions buildColumnFamilyOptions(
ConfigurationSource config) {
long cacheSize = (long) config.getStorageSize(
HDDS_DATANODE_METADATA_ROCKSDB_CACHE_SIZE,
HDDS_DATANODE_METADATA_ROCKSDB_CACHE_SIZE_DEFAULT,
StorageUnit.BYTES);

// Enables static creation of RocksDB objects.
RocksDB.loadLibrary();

BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCache(new LRUCache(cacheSize * SizeUnit.MB))
.setPinL0FilterAndIndexBlocksInCache(true)
.setFilterPolicy(new BloomFilter());

return DEFAULT_PROFILE
.getColumnFamilyOptions()
.setTableFormatConfig(tableConfig);
}

/**
* Block Iterator for KeyValue Container. This block iterator returns blocks
* which match with the {@link MetadataKeyFilters.KeyPrefixFilter}. If no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;

Expand All @@ -37,6 +38,7 @@
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
Expand All @@ -50,6 +52,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.rocksdb.ColumnFamilyOptions;

import java.io.File;

Expand Down Expand Up @@ -79,7 +82,6 @@ public class TestKeyValueContainer {
@Rule
public TemporaryFolder folder = new TemporaryFolder();

private OzoneConfiguration conf;
private String scmId = UUID.randomUUID().toString();
private VolumeSet volumeSet;
private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
Expand All @@ -89,6 +91,11 @@ public class TestKeyValueContainer {

private final ChunkLayOutVersion layout;

// Use one configuration object across parameterized runs of tests.
// This preserves the column family options in the container options
// cache for testContainersShareColumnFamilyOptions.
private static final OzoneConfiguration CONF = new OzoneConfiguration();

public TestKeyValueContainer(ChunkLayOutVersion layout) {
this.layout = layout;
}
Expand All @@ -100,10 +107,9 @@ public static Iterable<Object[]> parameters() {

@Before
public void setUp() throws Exception {
conf = new OzoneConfiguration();
datanodeId = UUID.randomUUID();
HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
.getAbsolutePath()).conf(conf).datanodeUuid(datanodeId
.getAbsolutePath()).conf(CONF).datanodeUuid(datanodeId
.toString()).build();

volumeSet = mock(MutableVolumeSet.class);
Expand All @@ -116,14 +122,14 @@ public void setUp() throws Exception {
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
datanodeId.toString());

keyValueContainer = new KeyValueContainer(keyValueContainerData, conf);
keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF);
}

private void addBlocks(int count) throws Exception {
long containerId = keyValueContainerData.getContainerID();

try(ReferenceCountedDB metadataStore = BlockUtils.getDB(keyValueContainer
.getContainerData(), conf)) {
.getContainerData(), CONF)) {
for (int i = 0; i < count; i++) {
// Creating BlockData
BlockID blockID = new BlockID(containerId, i);
Expand Down Expand Up @@ -180,7 +186,7 @@ public void testContainerImportExport() throws Exception {
long numberOfKeysToWrite = 12;
//write one few keys to check the key count after import
try(ReferenceCountedDB metadataStore =
BlockUtils.getDB(keyValueContainerData, conf)) {
BlockUtils.getDB(keyValueContainerData, CONF)) {
Table<String, BlockData> blockDataTable =
metadataStore.getStore().getBlockDataTable();

Expand All @@ -193,7 +199,7 @@ public void testContainerImportExport() throws Exception {
metadataStore.getStore().getMetadataTable()
.put(OzoneConsts.BLOCK_COUNT, numberOfKeysToWrite);
}
BlockUtils.removeDB(keyValueContainerData, conf);
BlockUtils.removeDB(keyValueContainerData, CONF);

Map<String, String> metadata = new HashMap<>();
metadata.put("key1", "value1");
Expand All @@ -219,7 +225,7 @@ public void testContainerImportExport() throws Exception {
keyValueContainerData.getLayOutVersion(),
keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
datanodeId.toString());
KeyValueContainer container = new KeyValueContainer(containerData, conf);
KeyValueContainer container = new KeyValueContainer(containerData, CONF);

HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
.getVolumesList(), 1);
Expand Down Expand Up @@ -291,7 +297,7 @@ public void testDeleteContainer() throws Exception {
keyValueContainerData.setState(ContainerProtos.ContainerDataProto.State
.CLOSED);
keyValueContainer = new KeyValueContainer(
keyValueContainerData, conf);
keyValueContainerData, CONF);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
keyValueContainer.delete();

Expand Down Expand Up @@ -370,7 +376,7 @@ public void testUpdateContainerUnsupportedRequest() throws Exception {
try {
keyValueContainerData.setState(
ContainerProtos.ContainerDataProto.State.CLOSED);
keyValueContainer = new KeyValueContainer(keyValueContainerData, conf);
keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
Map<String, String> metadata = new HashMap<>();
metadata.put(OzoneConsts.VOLUME, OzoneConsts.OZONE);
Expand All @@ -383,4 +389,32 @@ public void testUpdateContainerUnsupportedRequest() throws Exception {
.getResult());
}
}

@Test
public void testContainersShareColumnFamilyOptions() throws Exception {
// Get a read only view (not a copy) of the options cache.
Map<ConfigurationSource, ColumnFamilyOptions> cachedOptions =
AbstractDatanodeStore.getColumnFamilyOptionsCache();

// Create Container 1
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
Assert.assertEquals(1, cachedOptions.size());
ColumnFamilyOptions options1 = cachedOptions.get(CONF);
Assert.assertNotNull(options1);

// Create Container 2
keyValueContainerData = new KeyValueContainerData(2L,
layout,
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
datanodeId.toString());
keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);

Assert.assertEquals(1, cachedOptions.size());
ColumnFamilyOptions options2 = cachedOptions.get(CONF);
Assert.assertNotNull(options2);

// Column family options object should be reused.
Assert.assertSame(options1, options2);
}
}
Loading