Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ public enum HDDSLayoutFeature implements LayoutFeature {
INITIAL_VERSION(0, "Initial Layout Version"),
DATANODE_SCHEMA_V2(1, "Datanode RocksDB Schema Version 2 (with column " +
"families)"),
SCM_HA(2, "Storage Container Manager HA");
SCM_HA(2, "Storage Container Manager HA"),
DATANODE_SCHEMA_V3(3, "Datanode RocksDB Schema Version 3 (one rocksdb " +
"per disk)");

////////////////////////////// //////////////////////////////

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ public final class OzoneConfigKeys {
public static final String
HDDS_DATANODE_METADATA_ROCKSDB_CACHE_SIZE_DEFAULT = "1GB";

// Specifying the dedicated volumes for per-disk db instances.
// For container schema v3 only.
public static final String HDDS_DATANODE_CONTAINER_DB_DIR =
"hdds.datanode.container.db.dir";

public static final String OZONE_SECURITY_ENABLED_KEY =
"ozone.security.enabled";
public static final boolean OZONE_SECURITY_ENABLED_DEFAULT = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public final class OzoneConsts {
public static final String SCM_DB_NAME = "scm.db";
public static final String OM_DB_BACKUP_PREFIX = "om.db.backup.";
public static final String SCM_DB_BACKUP_PREFIX = "scm.db.backup.";
public static final String CONTAINER_DB_NAME = "container.db";

public static final String STORAGE_DIR_CHUNKS = "chunks";
public static final String OZONE_DB_CHECKPOINT_REQUEST_FLUSH =
Expand Down
14 changes: 14 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@
tagged explicitly.
</description>
</property>
<property>
<name>hdds.datanode.container.db.dir</name>
<value/>
<tag>OZONE, CONTAINER, STORAGE, MANAGEMENT</tag>
<description>Determines where the per-disk rocksdb instances will be
stored. This setting is optional. If unspecified, then rocksdb instances
are stored on the same disk as HDDS data.
The directories should be tagged with corresponding storage types
([SSD]/[DISK]/[ARCHIVE]/[RAM_DISK]) for storage policies. The default
storage type will be DISK if the directory does not have a storage type
tagged explicitly. Ideally, this should be mapped to a fast disk
like an SSD.
</description>
</property>
<property>
<name>hdds.datanode.dir.du.reserved</name>
<value/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
Expand Down Expand Up @@ -319,8 +319,8 @@ private void startRatisForTest() throws IOException {

for (Map.Entry<String, StorageVolume> entry : volumeMap.entrySet()) {
HddsVolume hddsVolume = (HddsVolume) entry.getValue();
boolean result = HddsVolumeUtil.checkVolume(hddsVolume, clusterId,
clusterId, conf, LOG);
boolean result = StorageVolumeUtil.checkVolume(hddsVolume, clusterId,
clusterId, conf, LOG, null);
if (!result) {
volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,17 @@ public class DatanodeConfiguration {
"hdds.datanode.failed.data.volumes.tolerated";
public static final String FAILED_METADATA_VOLUMES_TOLERATED_KEY =
"hdds.datanode.failed.metadata.volumes.tolerated";
public static final String FAILED_DB_VOLUMES_TOLERATED_KEY =
"hdds.datanode.failed.db.volumes.tolerated";
public static final String DISK_CHECK_MIN_GAP_KEY =
"hdds.datanode.disk.check.min.gap";
public static final String DISK_CHECK_TIMEOUT_KEY =
"hdds.datanode.disk.check.timeout";

public static final String WAIT_ON_ALL_FOLLOWERS =
"hdds.datanode.wait.on.all.followers";
public static final String CONTAINER_SCHEMA_V3_ENABLED =
"hdds.datanode.container.schema.v3.enabled";

static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false;

Expand All @@ -67,6 +71,8 @@ public class DatanodeConfiguration {
static final long DISK_CHECK_TIMEOUT_DEFAULT =
Duration.ofMinutes(10).toMillis();

static final boolean CONTAINER_SCHEMA_V3_ENABLED_DEFAULT = false;

/**
* Number of threads per volume that Datanode will use for chunk read.
*/
Expand Down Expand Up @@ -195,6 +201,17 @@ public void setBlockDeletionLimit(int limit) {
)
private int failedMetadataVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;

@Config(key = "failed.db.volumes.tolerated",
defaultValue = "-1",
type = ConfigType.INT,
tags = { DATANODE },
description = "The number of db volumes that are allowed to fail "
+ "before a datanode stops offering service. "
+ "Config this to -1 means unlimited, but we should have "
+ "at least one good volume left."
)
private int failedDbVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;

@Config(key = "disk.check.min.gap",
defaultValue = "15m",
type = ConfigType.TIME,
Expand Down Expand Up @@ -245,6 +262,15 @@ public void setWaitOnAllFollowers(boolean val) {
this.waitOnAllFollowers = val;
}

@Config(key = "container.schema.v3.enabled",
defaultValue = "false",
type = ConfigType.BOOLEAN,
tags = { DATANODE },
description = "Enable use of container schema v3(one rocksdb per disk)."
)
private boolean containerSchemaV3Enabled =
CONTAINER_SCHEMA_V3_ENABLED_DEFAULT;

@PostConstruct
public void validate() {
if (containerDeleteThreads < 1) {
Expand Down Expand Up @@ -277,6 +303,13 @@ public void validate() {
failedMetadataVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
}

if (failedDbVolumesTolerated < -1) {
LOG.warn(FAILED_DB_VOLUMES_TOLERATED_KEY +
"must be greater than -1 and was set to {}. Defaulting to {}",
failedDbVolumesTolerated, FAILED_VOLUMES_TOLERATED_DEFAULT);
failedDbVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
}

if (diskCheckMinGap < 0) {
LOG.warn(DISK_CHECK_MIN_GAP_KEY +
" must be greater than zero and was set to {}. Defaulting to {}",
Expand Down Expand Up @@ -325,6 +358,14 @@ public void setFailedMetadataVolumesTolerated(int failedVolumesTolerated) {
this.failedMetadataVolumesTolerated = failedVolumesTolerated;
}

public int getFailedDbVolumesTolerated() {
return failedDbVolumesTolerated;
}

public void setFailedDbVolumesTolerated(int failedVolumesTolerated) {
this.failedDbVolumesTolerated = failedVolumesTolerated;
}

public Duration getDiskCheckMinGap() {
return Duration.ofMillis(diskCheckMinGap);
}
Expand Down Expand Up @@ -372,4 +413,12 @@ public void setNumReadThreadPerVolume(int threads) {
public int getNumReadThreadPerVolume() {
return numReadThreadPerVolume;
}

public boolean getContainerSchemaV3Enabled() {
return this.containerSchemaV3Enabled;
}

public void setContainerSchemaV3Enabled(boolean containerSchemaV3Enabled) {
this.containerSchemaV3Enabled = containerSchemaV3Enabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,8 @@ private void startStateMachineThread() throws IOException {
public void handleFatalVolumeFailures() {
LOG.error("DatanodeStateMachine Shutdown due to too many bad volumes, "
+ "check " + DatanodeConfiguration.FAILED_DATA_VOLUMES_TOLERATED_KEY
+ " and "
+ DatanodeConfiguration.FAILED_METADATA_VOLUMES_TOLERATED_KEY);
+ " and " + DatanodeConfiguration.FAILED_METADATA_VOLUMES_TOLERATED_KEY
+ " and " + DatanodeConfiguration.FAILED_DB_VOLUMES_TOLERATED_KEY);
hddsDatanodeStopService.stopService();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@
package org.apache.hadoop.ozone.container.common.states.endpoint;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures.SchemaV3;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;

Expand Down Expand Up @@ -78,36 +77,17 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
String scmId = response.getValue(OzoneConsts.SCM_ID);
String clusterId = response.getValue(OzoneConsts.CLUSTER_ID);

// Check volumes
MutableVolumeSet volumeSet = ozoneContainer.getVolumeSet();
volumeSet.writeLock();
try {
Map<String, StorageVolume> volumeMap = volumeSet.getVolumeMap();
Preconditions.checkNotNull(scmId,
"Reply from SCM: scmId cannot be null");
Preconditions.checkNotNull(clusterId,
"Reply from SCM: clusterId cannot be null");

Preconditions.checkNotNull(scmId,
"Reply from SCM: scmId cannot be null");
Preconditions.checkNotNull(clusterId,
"Reply from SCM: clusterId cannot be null");

// If version file does not exist
// create version file and also set scm ID or cluster ID.
for (Map.Entry<String, StorageVolume> entry
: volumeMap.entrySet()) {
StorageVolume volume = entry.getValue();
boolean result = HddsVolumeUtil.checkVolume((HddsVolume) volume,
scmId, clusterId, configuration, LOG);
if (!result) {
volumeSet.failVolume(volume.getStorageDir().getPath());
}
}
if (volumeSet.getVolumesList().size() == 0) {
// All volumes are in inconsistent state
throw new DiskOutOfSpaceException(
"All configured Volumes are in Inconsistent State");
}
} finally {
volumeSet.writeUnlock();
// Check DbVolumes
if (SchemaV3.isFinalizedAndEnabled(configuration)) {
checkVolumeSet(ozoneContainer.getDbVolumeSet(), scmId, clusterId);
}
// Check HddsVolumes
checkVolumeSet(ozoneContainer.getVolumeSet(), scmId, clusterId);

// Start the container services after getting the version information
ozoneContainer.start(clusterId);
Expand All @@ -129,4 +109,32 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
}
return rpcEndPoint.getState();
}

private void checkVolumeSet(MutableVolumeSet volumeSet,
String scmId, String clusterId) throws DiskOutOfSpaceException {
if (volumeSet == null) {
return;
}

volumeSet.writeLock();
try {
// If version file does not exist
// create version file and also set scm ID or cluster ID.
for (StorageVolume volume : volumeSet.getVolumeMap().values()) {
boolean result = StorageVolumeUtil.checkVolume(volume,
scmId, clusterId, configuration, LOG,
ozoneContainer.getDbVolumeSet());
if (!result) {
volumeSet.failVolume(volume.getStorageDir().getPath());
}
}
if (volumeSet.getVolumesList().size() == 0) {
// All volumes are in inconsistent state
throw new DiskOutOfSpaceException(
"All configured Volumes are in Inconsistent State");
}
} finally {
volumeSet.writeUnlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,17 @@ public RawDB getDB(String containerDBPath) {
}

public void removeDB(String containerDBPath) {
datanodeStoreMap.remove(containerDBPath);
RawDB db = datanodeStoreMap.remove(containerDBPath);
if (db == null) {
LOG.debug("DB {} already removed", containerDBPath);
return;
}

try {
db.getStore().stop();
} catch (Exception e) {
LOG.error("Stop DatanodeStore: {} failed", containerDBPath, e);
}
}

public void shutdownCache() {
Expand Down
Loading