Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ private DFSConfigKeysLegacy() {
public static final String DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT =
"10m";

public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY =
"dfs.datanode.failed.volumes.tolerated";

public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;

public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY =
"dfs.metrics.percentiles.intervals";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ public class DatanodeConfiguration {
"hdds.datanode.container.delete.threads.max";
static final String PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY =
"hdds.datanode.periodic.disk.check.interval.minutes";
public static final String FAILED_VOLUMES_TOLERATED_KEY =
"hdds.datanode.failed.volumes.tolerated";

static final int REPLICATION_MAX_STREAMS_DEFAULT = 10;

static final long PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT = 15;

static final int FAILED_VOLUMES_TOLERATED_DEFAULT = -1;

/**
* The maximum number of replication commands a single datanode can execute
* simultaneously.
Expand Down Expand Up @@ -123,6 +127,17 @@ public void setBlockDeletionLimit(int limit) {
private long periodicDiskCheckIntervalMinutes =
PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT;

@Config(key = "failed.volumes.tolerated",
defaultValue = "-1",
type = ConfigType.INT,
tags = { DATANODE },
description = "The number of volumes that are allowed to fail "
+ "before a datanode stops offering service. "
+ "Config this to -1 means unlimited, but we should have "
+ "at least one good volume left."
)
private int failedVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;

@PostConstruct
public void validate() {
if (replicationMaxStreams < 1) {
Expand All @@ -147,6 +162,13 @@ public void validate() {
periodicDiskCheckIntervalMinutes =
PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT;
}

if (failedVolumesTolerated < -1) {
LOG.warn(FAILED_VOLUMES_TOLERATED_KEY +
"must be greater than -1 and was set to {}. Defaulting to {}",
failedVolumesTolerated, FAILED_VOLUMES_TOLERATED_DEFAULT);
failedVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
}
}

public void setReplicationMaxStreams(int replicationMaxStreams) {
Expand All @@ -173,4 +195,12 @@ public void setPeriodicDiskCheckIntervalMinutes(
long periodicDiskCheckIntervalMinutes) {
this.periodicDiskCheckIntervalMinutes = periodicDiskCheckIntervalMinutes;
}

public int getFailedVolumesTolerated() {
return failedVolumesTolerated;
}

public void setFailedVolumesTolerated(int failedVolumesTolerated) {
this.failedVolumesTolerated = failedVolumesTolerated;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ private void start() throws IOException {
}
}

public void handleFatalVolumeFailures() {
LOG.error("DatanodeStateMachine Shutdown due to too many bad volumes, "
+ "check " + DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_KEY);
hddsDatanodeStopService.stopService();
}

/**
* Gets the current context.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.container.common.volume;

import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -36,6 +37,7 @@
import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Timer;

Expand Down Expand Up @@ -106,13 +108,12 @@ public HddsVolumeChecker(ConfigurationSource conf, Timer timer)

this.timer = timer;

DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
/**
* Maximum number of volume failures that can be tolerated without
* declaring a fatal error.
*/
int maxVolumeFailuresTolerated = conf.getInt(
DFSConfigKeysLegacy.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
DFSConfigKeysLegacy.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
int maxVolumeFailuresTolerated = dnConf.getFailedVolumesTolerated();

minDiskCheckGapMs = conf.getTimeDuration(
DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
Expand Down Expand Up @@ -140,7 +141,7 @@ public HddsVolumeChecker(ConfigurationSource conf, Timer timer)

if (maxVolumeFailuresTolerated < MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
throw new DiskErrorException("Invalid value configured for "
+ DFSConfigKeysLegacy.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY
+ DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_KEY
+ " - "
+ maxVolumeFailuresTolerated + " "
+ DataNode.MAX_VOLUME_FAILURES_TOLERATED_MSG);
Expand Down Expand Up @@ -239,7 +240,7 @@ public interface Callback {
* @param failedVolumes set of volumes that failed disk checks.
*/
void call(Set<HddsVolume> healthyVolumes,
Set<HddsVolume> failedVolumes);
Set<HddsVolume> failedVolumes) throws IOException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState;
import org.apache.hadoop.util.DiskChecker;
Expand Down Expand Up @@ -102,15 +103,17 @@ public class MutableVolumeSet implements VolumeSet {
private Runnable shutdownHook;
private final HddsVolumeChecker volumeChecker;
private Runnable failedVolumeListener;
private StateContext context;

public MutableVolumeSet(String dnUuid, ConfigurationSource conf)
throws IOException {
this(dnUuid, null, conf);
public MutableVolumeSet(String dnUuid, ConfigurationSource conf,
StateContext context) throws IOException {
this(dnUuid, null, conf, context);
}

public MutableVolumeSet(String dnUuid, String clusterID,
ConfigurationSource conf)
ConfigurationSource conf, StateContext context)
throws IOException {
this.context = context;
this.datanodeUuid = dnUuid;
this.clusterID = clusterID;
this.conf = conf;
Expand Down Expand Up @@ -267,14 +270,30 @@ void checkAllVolumes() throws IOException {
* Handle one or more failed volumes.
* @param failedVolumes
*/
private void handleVolumeFailures(Set<HddsVolume> failedVolumes) {
private void handleVolumeFailures(Set<HddsVolume> failedVolumes)
throws IOException {
this.writeLock();
try {
for (HddsVolume v : failedVolumes) {
// Immediately mark the volume as failed so it is unavailable
// for new containers.
failVolume(v.getHddsRootDir().getPath());
}

// check failed volume tolerated
if (!hasEnoughVolumes()) {
// on startup, we could not try to stop uninitialized services
if (shutdownHook == null) {
DatanodeConfiguration dnConf =
conf.getObject(DatanodeConfiguration.class);
throw new IOException("Don't have enough good volumes on startup,"
+ " bad volumes detected: " + failedVolumes.size()
+ " max tolerated: " + dnConf.getFailedVolumesTolerated());
}
if (context != null) {
context.getParent().handleFatalVolumeFailures();
}
}
} finally {
this.writeUnlock();
}
Expand Down Expand Up @@ -522,6 +541,19 @@ public Map<StorageType, List<HddsVolume>> getVolumeStateMap() {
return ImmutableMap.copyOf(volumeStateMap);
}

public boolean hasEnoughVolumes() {
DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
int maxVolumeFailuresTolerated = dnConf.getFailedVolumesTolerated();

// Max number of bad volumes allowed, should have at least 1 good volume
if (maxVolumeFailuresTolerated ==
HddsVolumeChecker.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
return getVolumesList().size() >= 1;
} else {
return getFailedVolumesList().size() <= maxVolumeFailuresTolerated;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference of if else here, could you do a little explaination?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is borrowed from hdfs, when we set this 'tolerated' to -1, we mean unlimited number of bad volumes but we still should have at least 1 good volume left.
Otherwise, we should have fewer or equal number of bad volumes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bshashikant sure, will cp the lines above into code comments.

}
}

public StorageLocationReport[] getStorageReport()
throws IOException {
boolean failed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class OzoneContainer {
private final AtomicReference<InitializingStatus> initializingStatus;
private final ReplicationServer replicationServer;
private DatanodeDetails datanodeDetails;
private StateContext context;

enum InitializingStatus {
UNINITIALIZED, INITIALIZING, INITIALIZED
Expand All @@ -113,7 +114,9 @@ public OzoneContainer(
throws IOException {
config = conf;
this.datanodeDetails = datanodeDetails;
volumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf);
this.context = context;
volumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
context);
volumeSet.setFailedVolumeListener(this::handleVolumeFailures);
containerSet = new ContainerSet();
metadataScanner = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public static void init() throws IOException {
conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
datanodeUuid = UUID.randomUUID().toString();
volumeSet = new MutableVolumeSet(datanodeUuid, conf);
volumeSet = new MutableVolumeSet(datanodeUuid, conf, null);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void testDelete() throws Exception {
final long numBlocksToDelete = TestDB.NUM_PENDING_DELETION_BLOCKS;
String datanodeUuid = UUID.randomUUID().toString();
ContainerSet containerSet = makeContainerSet();
VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf);
VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
Expand Down Expand Up @@ -305,7 +305,7 @@ public void testDelete() throws Exception {
public void testReadDeletedBlockChunkInfo() throws Exception {
String datanodeUuid = UUID.randomUUID().toString();
ContainerSet containerSet = makeContainerSet();
VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf);
VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public static void shutdown() throws IOException {
@Before
public void setupPaths() throws IOException {
containerSet = new ContainerSet();
volumeSet = new MutableVolumeSet(DATANODE_UUID, conf);
volumeSet = new MutableVolumeSet(DATANODE_UUID, conf, null);
blockManager = new BlockManagerImpl(conf);
chunkManager = ChunkManagerFactory.createChunkManager(conf, blockManager,
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public void testContainerCloseActionWhenFull() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HDDS_DATANODE_DIR_KEY, testDir);
DatanodeDetails dd = randomDatanodeDetails();
MutableVolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf);
MutableVolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf,
null);

try {
UUID scmId = UUID.randomUUID();
Expand Down Expand Up @@ -277,7 +278,7 @@ public void testWriteChunkWithCreateContainerFailure() throws IOException {
private HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
OzoneConfiguration conf) throws IOException {
ContainerSet containerSet = new ContainerSet();
VolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf);
VolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf, null);
DatanodeStateMachine stateMachine = Mockito.mock(
DatanodeStateMachine.class);
StateContext context = Mockito.mock(StateContext.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_STREAMS_LIMIT_KEY;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_KEY;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_DEFAULT;

import static org.junit.Assert.assertEquals;

Expand All @@ -40,11 +42,13 @@ public void acceptsValidValues() {
int validReplicationLimit = 123;
int validDeleteThreads = 42;
long validDiskCheckIntervalMinutes = 60;
int validFailedVolumesTolerated = 10;
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit);
conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, validDeleteThreads);
conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY,
validDiskCheckIntervalMinutes);
conf.setInt(FAILED_VOLUMES_TOLERATED_KEY, validFailedVolumesTolerated);

// WHEN
DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class);
Expand All @@ -54,6 +58,8 @@ public void acceptsValidValues() {
assertEquals(validDeleteThreads, subject.getContainerDeleteThreads());
assertEquals(validDiskCheckIntervalMinutes,
subject.getPeriodicDiskCheckIntervalMinutes());
assertEquals(validFailedVolumesTolerated,
subject.getFailedVolumesTolerated());
}

@Test
Expand All @@ -62,11 +68,13 @@ public void overridesInvalidValues() {
int invalidReplicationLimit = -5;
int invalidDeleteThreads = 0;
long invalidDiskCheckIntervalMinutes = -1;
int invalidFailedVolumesTolerated = -2;
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit);
conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, invalidDeleteThreads);
conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY,
invalidDiskCheckIntervalMinutes);
conf.setInt(FAILED_VOLUMES_TOLERATED_KEY, invalidFailedVolumesTolerated);

// WHEN
DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class);
Expand All @@ -78,6 +86,8 @@ public void overridesInvalidValues() {
subject.getContainerDeleteThreads());
assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT,
subject.getPeriodicDiskCheckIntervalMinutes());
assertEquals(FAILED_VOLUMES_TOLERATED_DEFAULT,
subject.getFailedVolumesTolerated());
}

@Test
Expand All @@ -95,6 +105,8 @@ public void isCreatedWitDefaultValues() {
subject.getContainerDeleteThreads());
assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT,
subject.getPeriodicDiskCheckIntervalMinutes());
assertEquals(FAILED_VOLUMES_TOLERATED_DEFAULT,
subject.getFailedVolumesTolerated());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class TestVolumeSet {
private static final String DUMMY_IP_ADDR = "0.0.0.0";

private void initializeVolumeSet() throws Exception {
volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf);
volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf, null);
}

@Rule
Expand Down Expand Up @@ -230,7 +230,8 @@ public void testFailVolumes() throws Exception{
OzoneConfiguration ozoneConfig = new OzoneConfiguration();
ozoneConfig.set(HDDS_DATANODE_DIR_KEY, readOnlyVolumePath.getAbsolutePath()
+ "," + volumePath.getAbsolutePath());
volSet = new MutableVolumeSet(UUID.randomUUID().toString(), ozoneConfig);
volSet = new MutableVolumeSet(UUID.randomUUID().toString(), ozoneConfig,
null);
assertEquals(1, volSet.getFailedVolumesList().size());
assertEquals(readOnlyVolumePath, volSet.getFailedVolumesList().get(0)
.getHddsRootDir());
Expand Down
Loading