Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public byte[] toPersistedFormat(Pipeline object) throws IOException {
public Pipeline fromPersistedFormat(byte[] rawData) throws IOException {
HddsProtos.Pipeline.Builder pipelineBuilder = HddsProtos.Pipeline
.newBuilder(HddsProtos.Pipeline.PARSER.parseFrom(rawData));
Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState(
HddsProtos.PipelineState.PIPELINE_ALLOCATED).build());
Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.build());
// When SCM is restarted, set Creation time with current time.
pipeline.setCreationTimestamp(Instant.now());
Preconditions.checkNotNull(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,11 @@ protected void processPipelineReport(PipelineReport report,
}
if (pipeline.isHealthy()) {
pipelineManager.openPipeline(pipelineID);
if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
}
}
}
if (pipeline.isHealthy()) {
if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ public Pipeline createPipeline(ReplicationType type,
lock.writeLock().lock();
try {
Pipeline pipeline = pipelineFactory.create(type, factor);
pipelineStore.put(pipeline.getId(), pipeline);
if (pipelineStore != null) {
pipelineStore.put(pipeline.getId(), pipeline);
}
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
recordMetricsForPipeline(pipeline);
Expand Down Expand Up @@ -405,6 +407,23 @@ public void addContainerToPipeline(PipelineID pipelineID,
}
}

private void updatePipelineStateInDb(PipelineID pipelineId,
Pipeline.PipelineState state)
throws IOException {
// null check is here to prevent the case where SCM store
// is closed but the staleNode handlers/pipleine creations
// still try to access it.
if (pipelineStore != null) {
try {
pipelineStore.put(pipelineId, getPipeline(pipelineId));
} catch (IOException ex) {
LOG.info("Pipeline {} state update failed", pipelineId);
// revert back to old state in memory
stateManager.updatePipelineState(pipelineId, state);
}
}
}

@Override
public void removeContainerFromPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException {
Expand Down Expand Up @@ -436,7 +455,10 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
public void openPipeline(PipelineID pipelineId) throws IOException {
lock.writeLock().lock();
try {
Pipeline.PipelineState state = stateManager.
getPipeline(pipelineId).getPipelineState();
Pipeline pipeline = stateManager.openPipeline(pipelineId);
updatePipelineStateInDb(pipelineId, state);
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
} finally {
Expand Down Expand Up @@ -535,7 +557,10 @@ public void triggerPipelineCreation() {
@Override
public void activatePipeline(PipelineID pipelineID)
throws IOException {
Pipeline.PipelineState state = stateManager.
getPipeline(pipelineID).getPipelineState();
stateManager.activatePipeline(pipelineID);
updatePipelineStateInDb(pipelineID, state);
}

/**
Expand All @@ -547,7 +572,10 @@ public void activatePipeline(PipelineID pipelineID)
@Override
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
Pipeline.PipelineState state = stateManager.
getPipeline(pipelineID).getPipelineState();
stateManager.deactivatePipeline(pipelineID);
updatePipelineStateInDb(pipelineID, state);
}

/**
Expand Down Expand Up @@ -600,7 +628,10 @@ public void waitPipelineReady(PipelineID pipelineID, long timeout)
private void finalizePipeline(PipelineID pipelineId) throws IOException {
lock.writeLock().lock();
try {
Pipeline.PipelineState state = stateManager.
getPipeline(pipelineId).getPipelineState();
stateManager.finalizePipeline(pipelineId);
updatePipelineStateInDb(pipelineId, state);
Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
for (ContainerID containerID : containerIDs) {
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
Expand Down Expand Up @@ -669,6 +700,15 @@ public void close() throws IOException {

// shutdown pipeline provider.
pipelineFactory.shutdown();
lock.writeLock().lock();
try {
pipelineStore.close();
pipelineStore = null;
} catch (Exception ex) {
LOG.error("Pipeline store close failed", ex);
} finally {
lock.writeLock().unlock();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class HealthyPipelineSafeModeRule extends SafeModeExitRule<Pipeline> {
// We want to wait for RATIS THREE factor write pipelines
int pipelineCount = pipelineManager.getPipelines(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.ALLOCATED).size();
Pipeline.PipelineState.OPEN).size();

// This value will be zero when pipeline count is 0.
// On a fresh installed cluster, there will be zero pipelines in the SCM
Expand Down Expand Up @@ -118,7 +118,6 @@ protected void process(Pipeline pipeline) {
Preconditions.checkNotNull(pipeline);
if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
pipeline.isHealthy() &&
!processedPipelineIDs.contains(pipeline.getId())) {
getSafeModeMetrics().incCurrentHealthyPipelinesCount();
currentHealthyPipelineCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.slf4j.Logger;
Expand All @@ -37,11 +40,11 @@

/**
* This rule covers whether we have at least one datanode is reported for each
* pipeline. This rule is for all open containers, we have at least one
* open pipeline. This rule is for all open containers, we have at least one
* replica available for read when we exit safe mode.
*/
public class OneReplicaPipelineSafeModeRule extends
SafeModeExitRule<Pipeline> {
SafeModeExitRule<PipelineReportFromDatanode> {

private static final Logger LOG =
LoggerFactory.getLogger(OneReplicaPipelineSafeModeRule.class);
Expand All @@ -50,6 +53,7 @@ public class OneReplicaPipelineSafeModeRule extends
private Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
private Set<PipelineID> oldPipelineIDSet;
private int currentReportedPipelineCount = 0;
private PipelineManager pipelineManager;


public OneReplicaPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
Expand All @@ -68,9 +72,10 @@ public OneReplicaPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT +
" value should be >= 0.0 and <= 1.0");

this.pipelineManager = pipelineManager;
oldPipelineIDSet = pipelineManager.getPipelines(
HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE)
HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
.stream().map(p -> p.getId()).collect(Collectors.toSet());
int totalPipelineCount = oldPipelineIDSet.size();

Expand All @@ -85,8 +90,8 @@ public OneReplicaPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
}

@Override
protected TypedEvent<Pipeline> getEventType() {
return SCMEvents.OPEN_PIPELINE;
protected TypedEvent<PipelineReportFromDatanode> getEventType() {
return SCMEvents.PIPELINE_REPORT;
}

@Override
Expand All @@ -95,16 +100,26 @@ protected boolean validate() {
}

@Override
protected void process(Pipeline pipeline) {
Preconditions.checkNotNull(pipeline);
if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
!reportedPipelineIDSet.contains(pipeline.getId())) {
if (oldPipelineIDSet.contains(pipeline.getId())) {
getSafeModeMetrics()
.incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
currentReportedPipelineCount++;
reportedPipelineIDSet.add(pipeline.getId());
protected void process(PipelineReportFromDatanode report) {
Preconditions.checkNotNull(report);
for (PipelineReport report1 : report.getReport().getPipelineReportList()) {
Pipeline pipeline;
try {
pipeline = pipelineManager.getPipeline(
PipelineID.getFromProtobuf(report1.getPipelineID()));
} catch (PipelineNotFoundException pnfe) {
continue;
}
if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
pipeline.isOpen() &&
!reportedPipelineIDSet.contains(pipeline.getId())) {
if (oldPipelineIDSet.contains(pipeline.getId())) {
getSafeModeMetrics().
incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
currentReportedPipelineCount++;
reportedPipelineIDSet.add(pipeline.getId());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public void testPipelineReload() throws IOException {
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
pipelineManager.openPipeline(pipeline.getId());
pipelines.add(pipeline);
}
pipelineManager.close();
Expand All @@ -146,7 +147,8 @@ public void testPipelineReload() throws IOException {
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
for (Pipeline p : pipelines) {
pipelineManager.openPipeline(p.getId());
// After reload, pipelines should be in open state
Assert.assertTrue(pipelineManager.getPipeline(p.getId()).isOpen());
}
List<Pipeline> pipelineList =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,15 @@ public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception {
Pipeline pipeline1 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
pipelineManager.openPipeline(pipeline1.getId());
Pipeline pipeline2 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
pipelineManager.openPipeline(pipeline2.getId());
Pipeline pipeline3 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
pipelineManager.openPipeline(pipeline3.getId());

SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(
config, containers, pipelineManager, eventQueue);
Expand Down Expand Up @@ -204,12 +207,15 @@ public void testHealthyPipelineSafeModeRuleWithMixedPipelines()
Pipeline pipeline1 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE);
pipelineManager.openPipeline(pipeline1.getId());
Pipeline pipeline2 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
pipelineManager.openPipeline(pipeline2.getId());
Pipeline pipeline3 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
pipelineManager.openPipeline(pipeline3.getId());


SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(
Expand Down
Loading