Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,13 @@ public final class ScmConfigKeys {
OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
"ozone.scm.keyvalue.container.deletion-choosing.policy";

// Max timeout for pipeline to stay at ALLOCATED state before scrubbed.
public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT =
"ozone.scm.pipeline.allocated.timeout";

public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT =
"5m";

public static final String OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT =
"ozone.scm.container.creation.lease.timeout";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public final class Pipeline {
private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>();
// Current reported Leader for the pipeline
private UUID leaderId;
// Timestamp for pipeline upon creation
private Long creationTimestamp;

/**
* The immutable properties of pipeline object is used in
Expand All @@ -69,6 +71,7 @@ private Pipeline(PipelineID id, ReplicationType type,
this.factor = factor;
this.state = state;
this.nodeStatus = nodeStatus;
this.creationTimestamp = System.currentTimeMillis();
}

/**
Expand Down Expand Up @@ -107,6 +110,24 @@ public PipelineState getPipelineState() {
return state;
}

/**
* Return the creation time of pipeline.
*
* @return Creation Timestamp
*/
public Long getCreationTimestamp() {
return creationTimestamp;
}

/**
* Set the creation timestamp. Only for protobuf now.
*
* @param creationTimestamp
*/
void setCreationTimestamp(Long creationTimestamp) {
this.creationTimestamp = creationTimestamp;
}

/**
* Return the pipeline leader's UUID.
*
Expand Down Expand Up @@ -196,6 +217,7 @@ public HddsProtos.Pipeline getProtobufMessage()
.setFactor(factor)
.setState(PipelineState.getProtobuf(state))
.setLeaderID(leaderId != null ? leaderId.toString() : "")
.setCreationTimeStamp(creationTimestamp)
.addAllMembers(nodeStatus.keySet().stream()
.map(DatanodeDetails::getProtoBufMessage)
.collect(Collectors.toList()));
Expand Down Expand Up @@ -274,6 +296,7 @@ public String toString() {
b.append(", Type:").append(getType());
b.append(", Factor:").append(getFactor());
b.append(", State:").append(getPipelineState());
b.append(", CreationTimestamp").append(getCreationTimestamp());
b.append("]");
return b.toString();
}
Expand All @@ -298,6 +321,7 @@ public static class Builder {
private List<Integer> nodeOrder = null;
private List<DatanodeDetails> nodesInOrder = null;
private UUID leaderId = null;
private Long creationTimestamp = null;

public Builder() {}

Expand All @@ -309,6 +333,7 @@ public Builder(Pipeline pipeline) {
this.nodeStatus = pipeline.nodeStatus;
this.nodesInOrder = pipeline.nodesInOrder.get();
this.leaderId = pipeline.getLeaderId();
this.creationTimestamp = pipeline.getCreationTimestamp();
}

public Builder setId(PipelineID id1) {
Expand Down Expand Up @@ -355,6 +380,10 @@ public Pipeline build() {
Preconditions.checkNotNull(nodeStatus);
Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
pipeline.setLeaderId(leaderId);
// overwrite with original creationTimestamp
if (creationTimestamp != null) {
pipeline.setCreationTimestamp(creationTimestamp);
}

if (nodeOrder != null && !nodeOrder.isEmpty()) {
// This branch is for build from ProtoBuf
Expand Down
1 change: 1 addition & 0 deletions hadoop-hdds/common/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ message Pipeline {
required PipelineID id = 5;
optional string leaderID = 6;
repeated uint32 memberOrders = 7;
optional uint64 creationTimeStamp = 8;
}

message KeyValue {
Expand Down
12 changes: 12 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,18 @@
of max amount of pipelines which are OPEN.
</description>
</property>
<property>
<name>ozone.scm.pipeline.allocated.timeout</name>
<value>5m</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>
Timeout for every pipeline to stay in ALLOCATED stage. When pipeline is created,
it should be at OPEN stage once pipeline report is successfully received by SCM.
If a pipeline stays at ALLOCATED for too long, it should be scrubbed so that new
pipeline can be created. This timeout is for how long pipeline can stay at ALLOCATED
stage until it gets scrubbed.
</description>
</property>
<property>
<name>ozone.scm.container.size</name>
<value>5GB</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,17 @@ private void createPipelines() {

for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
.values()) {
try {
pipelineManager.scrubPipeline(type, factor);
} catch (IOException e) {
LOG.error("Error while scrubbing pipelines {}", e);
}

while (true) {
try {
if (scheduler.isClosed()) {
break;
}

pipelineManager.createPipeline(type, factor);
} catch (IOException ioe) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ NavigableSet<ContainerID> getContainersInPipeline(PipelineID pipelineID)
void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
throws IOException;

void scrubPipeline(ReplicationType type, ReplicationFactor factor)
throws IOException;

void startPipelineCreator();

void triggerPipelineCreation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;

Expand Down Expand Up @@ -322,6 +323,31 @@ public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
}
}

@Override
public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
throws IOException{
if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) {
// Only srub pipeline for RATIS THREE pipeline
return;
}
Long currentTime = System.currentTimeMillis();
Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor,
Pipeline.PipelineState.ALLOCATED).stream()
.filter(p -> (currentTime - p.getCreationTimestamp()
>= pipelineScrubTimeoutInMills))
.collect(Collectors.toList());
for (Pipeline p : needToSrubPipelines) {
LOG.info("srubbing pipeline: id: " + p.getId().toString() +
" since it stays at ALLOCATED stage for " +
(currentTime - p.getCreationTimestamp())/60000 + " mins.");
finalizeAndDestroyPipeline(p, false);
}
}

@Override
public Map<String, Integer> getPipelineInfo() {
final Map<String, Integer> pipelineInfo = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ protected void process(PipelineReportFromDatanode
getSafeModeMetrics().incCurrentHealthyPipelinesCount();
processedPipelineIDs.add(pipelineID);
}

}

if (scmInSafeMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,44 @@
*/
public class MockRatisPipelineProvider extends RatisPipelineProvider {

private boolean autoOpenPipeline;

public MockRatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager,
Configuration conf, boolean autoOpen) {
super(nodeManager, stateManager, conf, null);
autoOpenPipeline = autoOpen;
}

public MockRatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager,
Configuration conf) {
super(nodeManager, stateManager, conf, null);
autoOpenPipeline = true;
}

protected void initializePipeline(Pipeline pipeline) throws IOException {
// do nothing as the datanodes do not exists
}

@Override
public Pipeline create(HddsProtos.ReplicationFactor factor)
throws IOException {
if (autoOpenPipeline) {
return super.create(factor);
} else {
Pipeline initialPipeline = super.create(factor);
return Pipeline.newBuilder()
.setId(initialPipeline.getId())
// overwrite pipeline state to main ALLOCATED
.setState(Pipeline.PipelineState.ALLOCATED)
.setType(initialPipeline.getType())
.setFactor(factor)
.setNodes(initialPipeline.getNodes())
.build();
}
}

@Override
public void shutdown() {
// Do nothing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
Expand Down Expand Up @@ -53,16 +54,19 @@ public class TestRatisPipelineProvider {
@Before
public void init() throws Exception {
nodeManager = new MockNodeManager(true, 10);
stateManager = new PipelineStateManager(new OzoneConfiguration());
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
stateManager = new PipelineStateManager(conf);
provider = new MockRatisPipelineProvider(nodeManager,
stateManager, new OzoneConfiguration());
stateManager, conf);
}

private void createPipelineAndAssertions(
HddsProtos.ReplicationFactor factor) throws IOException {
Pipeline pipeline = provider.create(factor);
assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);

Pipeline pipeline1 = provider.create(factor);
assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE);
Expand Down Expand Up @@ -142,6 +146,8 @@ public void testCreatePipelinesDnExclude() throws IOException {
// only 2 healthy DNs left that are not part of any pipeline
Pipeline pipeline = provider.create(factor);
assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
nodeManager.addPipeline(pipeline);
stateManager.addPipeline(pipeline);

List<DatanodeDetails> nodes = pipeline.getNodes();

Expand Down Expand Up @@ -176,5 +182,6 @@ private void addPipeline(
.build();

stateManager.addPipeline(openPipeline);
nodeManager.addPipeline(openPipeline);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.pipeline;

import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;

Expand All @@ -28,6 +29,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
Expand Down Expand Up @@ -205,7 +207,7 @@ public void testPipelineCreationFailedMetric() throws Exception {
numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
Assert.assertTrue(numPipelineCreateFailed == 1);

// clean up
pipelineManager.close();
}
Expand Down Expand Up @@ -309,6 +311,45 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
pipelineManager.close();
}

@Test
public void testScrubPipeline() throws IOException {
// No timeout for pipeline scrubber.
conf.setTimeDuration(
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
TimeUnit.MILLISECONDS);

final SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
nodeManager, pipelineManager.getStateManager(), conf, false);

pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
ratisProvider);

Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
// At this point, pipeline is not at OPEN stage.
Assert.assertEquals(pipeline.getPipelineState(),
Pipeline.PipelineState.ALLOCATED);

// pipeline should be seen in pipelineManager as ALLOCATED.
Assert.assertTrue(pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.ALLOCATED).contains(pipeline));
pipelineManager.scrubPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);

// pipeline should be scrubbed.
Assert.assertFalse(pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.ALLOCATED).contains(pipeline));

pipelineManager.close();
}

private void sendPipelineReport(DatanodeDetails dn,
Pipeline pipeline, PipelineReportHandler pipelineReportHandler,
boolean isLeader, EventQueue eventQueue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.ozone;

import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand All @@ -30,7 +29,6 @@
import org.junit.BeforeClass;
import org.junit.Test;

import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
import static org.junit.Assert.assertEquals;

/**
Expand Down