Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -509,22 +509,52 @@ public void scrubPipelines() throws IOException, TimeoutException {
if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
(currentTime.toEpochMilli() - p.getCreationTimestamp()
.toEpochMilli() >= pipelineScrubTimeoutInMills)) {
LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
" since it stays at ALLOCATED stage for " +
LOG.info("Scrubbing pipeline: id: {} since it stays at ALLOCATED " +
"stage for {} mins.", p.getId(),
Duration.between(currentTime, p.getCreationTimestamp())
.toMinutes() + " mins.");
.toMinutes());
closePipeline(p, false);
}
// scrub pipelines who stay CLOSED for too long.
if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) {
LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
" since it stays at CLOSED stage.");
LOG.info("Scrubbing pipeline: id: {} since it stays at CLOSED stage.",
p.getId());
closeContainersForPipeline(p.getId());
removePipeline(p);
}
// If a datanode is stopped and then SCM is restarted, a pipeline can get
// stuck in an open state. For Ratis, provided some other DNs that were
// part of the open pipeline register to SCM after the restart, the Ratis
// pipeline close will get triggered by the DNs. For EC that will never
// happen, as the DNs are not aware of the pipeline. Therefore we should
// close any pipelines in the scrubber if they have nodes which are not
// registered
if (isOpenWithUnregisteredNodes(p)) {
LOG.info("Scrubbing pipeline: id: {} as it has unregistered nodes",
p.getId());
closeContainersForPipeline(p.getId());
closePipeline(p, true);
}
}
}

/**
* @param pipeline The pipeline to check
* @return True if the pipeline is open and contains unregistered nodes. False
* otherwise.
*/
private boolean isOpenWithUnregisteredNodes(Pipeline pipeline) {
if (!pipeline.isOpen()) {
return false;
}
for (DatanodeDetails dn : pipeline.getNodes()) {
if (nodeManager.getNodeByUuid(dn.getUuidString()) == null) {
return true;
}
}
return false;
}

/**
* Schedules a fixed interval job to create pipelines.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Supplier;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand All @@ -29,12 +30,14 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBuffer;
import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBufferStub;
Expand Down Expand Up @@ -125,6 +128,10 @@ public void init() throws Exception {
GenericTestUtils.getRandomizedTempPath());
scm = HddsTestUtils.getScm(conf);
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
// Mock Node Manager is not able to correctly set up things for the EC
// placement policy (Rack Scatter), so just use the random one.
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_KEY,
SCMContainerPlacementRandom.class.getName());
dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
nodeManager = new MockNodeManager(true, 20);
maxPipelineCount = nodeManager.getNodeCount(
Expand All @@ -151,7 +158,7 @@ private PipelineManagerImpl createPipelineManager(boolean isLeader)
throws IOException {
return PipelineManagerImpl.newPipelineManager(conf,
SCMHAManagerStub.getInstance(isLeader),
new MockNodeManager(true, 20),
nodeManager,
SCMDBDefinition.PIPELINES.getTable(dbStore),
new EventQueue(),
scmContext,
Expand All @@ -163,7 +170,7 @@ private PipelineManagerImpl createPipelineManager(
boolean isLeader, SCMHADBTransactionBuffer buffer) throws IOException {
return PipelineManagerImpl.newPipelineManager(conf,
SCMHAManagerStub.getInstance(isLeader, buffer),
new MockNodeManager(true, 20),
nodeManager,
SCMDBDefinition.PIPELINES.getTable(dbStore),
new EventQueue(),
SCMContext.emptyContext(),
Expand Down Expand Up @@ -341,7 +348,6 @@ public void testDeactivatePipelineShouldFailOnFollower() throws Exception {
@Test
public void testRemovePipeline() throws Exception {
PipelineManagerImpl pipelineManager = createPipelineManager(true);
pipelineManager.setScmContext(scmContext);
// Create a pipeline
Pipeline pipeline = pipelineManager.createPipeline(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
Expand Down Expand Up @@ -391,7 +397,6 @@ public void testRemovePipeline() throws Exception {
@Test
public void testClosePipelineShouldFailOnFollower() throws Exception {
PipelineManagerImpl pipelineManager = createPipelineManager(true);
pipelineManager.setScmContext(scmContext);
Pipeline pipeline = pipelineManager.createPipeline(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
Assertions.assertEquals(1, pipelineManager.getPipelines().size());
Expand All @@ -413,7 +418,6 @@ public void testClosePipelineShouldFailOnFollower() throws Exception {
@Test
public void testPipelineReport() throws Exception {
PipelineManagerImpl pipelineManager = createPipelineManager(true);
pipelineManager.setScmContext(scmContext);
SCMSafeModeManager scmSafeModeManager =
new SCMSafeModeManager(conf, new ArrayList<>(), null, pipelineManager,
new EventQueue(), serviceManager, scmContext);
Expand Down Expand Up @@ -571,7 +575,6 @@ public void testScrubPipelines() throws Exception {
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, 50, TimeUnit.SECONDS);

PipelineManagerImpl pipelineManager = createPipelineManager(true);
pipelineManager.setScmContext(scmContext);
Pipeline allocatedPipeline = pipelineManager
.createPipeline(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE));
Expand Down Expand Up @@ -628,13 +631,36 @@ public void testScrubPipelines() throws Exception {
pipelineManager.close();
}

@Test
public void testScrubOpenWithUnregisteredNodes() throws Exception {
PipelineManagerImpl pipelineManager = createPipelineManager(true);
Pipeline pipeline = pipelineManager
.createPipeline(new ECReplicationConfig(3, 2));
pipelineManager.openPipeline(pipeline.getId());

// Scrubbing the pipelines should not affect this pipeline
pipelineManager.scrubPipelines();
pipeline = pipelineManager.getPipeline(pipeline.getId());
Assertions.assertEquals(Pipeline.PipelineState.OPEN,
pipeline.getPipelineState());

// Now, "unregister" one of the nodes in the pipeline
DatanodeDetails firstDN = nodeManager.getNodeByUuid(
pipeline.getNodes().get(0).getUuidString());
nodeManager.getClusterNetworkTopologyMap().remove(firstDN);

pipelineManager.scrubPipelines();
pipeline = pipelineManager.getPipeline(pipeline.getId());
Assertions.assertEquals(Pipeline.PipelineState.CLOSED,
pipeline.getPipelineState());
}

@Test
public void testScrubPipelinesShouldFailOnFollower() throws Exception {
conf.setTimeDuration(
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, 10, TimeUnit.SECONDS);

PipelineManagerImpl pipelineManager = createPipelineManager(true);
pipelineManager.setScmContext(scmContext);
Pipeline pipeline = pipelineManager
.createPipeline(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE));
Expand Down Expand Up @@ -765,7 +791,6 @@ public void testPipelineCloseFlow() throws IOException, TimeoutException {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(LoggerFactory.getLogger(PipelineManagerImpl.class));
PipelineManagerImpl pipelineManager = createPipelineManager(true);
pipelineManager.setScmContext(scmContext);
Pipeline pipeline = pipelineManager.createPipeline(
RatisReplicationConfig
.getInstance(HddsProtos.ReplicationFactor.THREE));
Expand Down