Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,32 @@

package org.apache.hadoop.hdds.scm.pipeline;

import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
.ReplicationType.RATIS;

/**
* Test Node failure detection and handling in Ratis.
*/
@Ignore
public class TestNodeFailure {

private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static Pipeline ratisPipelineOne;
private static Pipeline ratisPipelineTwo;
private static ContainerManager containerManager;
private static List<Pipeline> ratisPipelines;
private static PipelineManager pipelineManager;
private static long timeForFailure;
private static int timeForFailure;

/**
* Create a MiniDFSCluster for testing.
Expand All @@ -62,7 +52,7 @@ public class TestNodeFailure {
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
final OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(
RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
DatanodeRatisServerConfig.RATIS_FOLLOWER_SLOWNESS_TIMEOUT_KEY,
Expand All @@ -71,28 +61,22 @@ public static void init() throws Exception {
RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
DatanodeRatisServerConfig.RATIS_SERVER_NO_LEADER_TIMEOUT_KEY,
10, TimeUnit.SECONDS);
conf.setTimeDuration(
ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
10, TimeUnit.SECONDS);
conf.set(HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, "2s");

cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(6)
.setHbInterval(1000)
.setHbProcessorInterval(1000)
.build();
cluster.waitForClusterToBeReady();
StorageContainerManager scm = cluster.getStorageContainerManager();
containerManager = scm.getContainerManager();

final StorageContainerManager scm = cluster.getStorageContainerManager();
pipelineManager = scm.getPipelineManager();
ratisPipelineOne = pipelineManager.getPipeline(
containerManager.allocateContainer(
RATIS, THREE, "testOwner").getPipelineID());
ratisPipelineTwo = pipelineManager.getPipeline(
containerManager.allocateContainer(
RATIS, THREE, "testOwner").getPipelineID());
// At this stage, there should be 2 pipeline one with 1 open container each.
// Try closing the both the pipelines, one with a closed container and
// the other with an open container.
timeForFailure = conf.getObject(DatanodeRatisServerConfig.class)
ratisPipelines = pipelineManager.getPipelines(
HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);

timeForFailure = (int) conf.getObject(DatanodeRatisServerConfig.class)
.getFollowerSlownessTimeout();
}

Expand All @@ -106,35 +90,39 @@ public static void shutdown() {
}
}

@Ignore
// Enable this after we implement teardown pipeline logic once a datanode
// dies.
@Test(timeout = 300_000L)
public void testPipelineFail() throws InterruptedException, IOException,
TimeoutException {
Assert.assertEquals(ratisPipelineOne.getPipelineState(),
Pipeline.PipelineState.OPEN);
Pipeline pipelineToFail = ratisPipelineOne;
DatanodeDetails dnToFail = pipelineToFail.getFirstNode();
cluster.shutdownHddsDatanode(dnToFail);

// wait for sufficient time for the callback to be triggered
Thread.sleep(3 * timeForFailure);
@Test
public void testPipelineFail() {
ratisPipelines.forEach(pipeline -> {
try {
waitForPipelineCreation(pipeline.getId());
cluster.shutdownHddsDatanode(pipeline.getFirstNode());
GenericTestUtils.waitFor(() -> {
try {
return pipelineManager.getPipeline(pipeline.getId())
.getPipelineState().equals(Pipeline.PipelineState.CLOSED);
} catch (PipelineNotFoundException ex) {
return true;
}
}, timeForFailure / 2, timeForFailure * 3);
} catch (Exception e) {
Assert.fail("Test Failed: " + e.getMessage());
}
});
}

Assert.assertEquals(Pipeline.PipelineState.CLOSED,
pipelineManager.getPipeline(ratisPipelineOne.getId())
.getPipelineState());
Assert.assertEquals(Pipeline.PipelineState.OPEN,
pipelineManager.getPipeline(ratisPipelineTwo.getId())
.getPipelineState());
// Now restart the datanode and make sure that a new pipeline is created.
cluster.setWaitForClusterToBeReadyTimeout(300000);
cluster.restartHddsDatanode(dnToFail, true);
Pipeline ratisPipelineThree = pipelineManager.getPipeline(
containerManager.allocateContainer(
RATIS, THREE, "testOwner").getPipelineID());
//Assert that new container is not created from the ratis 2 pipeline
Assert.assertNotEquals(ratisPipelineThree.getId(),
ratisPipelineTwo.getId());
/**
* Waits until the Pipeline is marked as OPEN.
* @param pipelineID Id of the pipeline
*/
private void waitForPipelineCreation(final PipelineID pipelineID)
throws Exception {
GenericTestUtils.waitFor(() -> {
try {
return pipelineManager.getPipeline(pipelineID)
.getPipelineState().equals(Pipeline.PipelineState.OPEN);
} catch (PipelineNotFoundException ex) {
return false;
}
}, 1000, 1000 * 60);
}
}