Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,12 @@ private void createPipelines() {
continue;
}

try {
pipelineManager.scrubPipeline(type, factor);
} catch (IOException e) {
LOG.error("Error while scrubbing pipelines {}", e);
if (!pipelineManager.getSafeModeStatus()) {
try {
pipelineManager.scrubPipeline(type, factor);
} catch (IOException e) {
LOG.error("Error while scrubbing pipelines {}", e);
}
}

while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,17 @@ void scrubPipeline(ReplicationType type, ReplicationFactor factor)
default void waitPipelineReady(PipelineID pipelineID, long timeout)
throws IOException {
}

/**
* Set SafeMode status.
*
* @param safeModeStatus
*/
void setSafeModeStatus(boolean safeModeStatus);

/**
* Get SafeMode status.
* @return boolean
*/
boolean getSafeModeStatus();
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Set;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -85,6 +86,8 @@ public class SCMPipelineManager implements PipelineManager {
// Pipeline Manager MXBean
private ObjectName pmInfoBean;

private final AtomicBoolean isInSafeMode;

public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
EventPublisher eventPublisher)
throws IOException {
Expand Down Expand Up @@ -127,6 +130,9 @@ protected SCMPipelineManager(Configuration conf, NodeManager nodeManager,
HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
this.isInSafeMode = new AtomicBoolean(conf.getBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED,
HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT));
}

public PipelineStateManager getStateManager() {
Expand Down Expand Up @@ -414,7 +420,7 @@ public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
.toEpochMilli() >= pipelineScrubTimeoutInMills)
.collect(Collectors.toList());
for (Pipeline p : needToSrubPipelines) {
LOG.info("srubbing pipeline: id: " + p.getId().toString() +
LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
" since it stays at ALLOCATED stage for " +
Duration.between(currentTime, p.getCreationTimestamp()).toMinutes() +
" mins.");
Expand Down Expand Up @@ -618,4 +624,15 @@ protected MetadataStore getPipelineStore() {
protected NodeManager getNodeManager() {
return nodeManager;
}

@Override
public void setSafeModeStatus(boolean safeModeStatus) {
this.isInSafeMode.set(safeModeStatus);
}

@Override
public boolean getSafeModeStatus() {
return this.isInSafeMode.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ public SCMSafeModeManager(Configuration conf,
exitRules.put(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE,
oneReplicaPipelineSafeModeRule);
}
emitSafeModeStatus();
boolean createPipelineInSafemode = conf.getBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
Expand All @@ -30,8 +29,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -94,8 +91,8 @@ public SafeModeHandler(Configuration configuration,
* Set SafeMode status based on
* {@link org.apache.hadoop.hdds.scm.events.SCMEvents#SAFE_MODE_STATUS}.
*
* Inform BlockManager, ScmClientProtocolServer and replicationAcitivity
* status about safeMode status.
* Inform BlockManager, ScmClientProtocolServer, ScmPipeline Manager and
* Replication Manager status about safeMode status.
*
* @param safeModeStatus
* @param publisher
Expand All @@ -114,8 +111,9 @@ public void onMessage(SafeModeStatus safeModeStatus,
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
scmPipelineManager.setSafeModeStatus(isInSafeMode.get());
replicationManager.start();
cleanupPipelines();
scmPipelineManager.triggerPipelineCreation();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bharatviswa504 for the detail explanation.
When add triggerPipelineCreation() here, we'd better remove the pipelineManager.startPipelineCreator(); in SCMSafeModeManager#exitSafeMode, otherwise, the time wait actually doesn't has effect on pipeline scrubber in PipelineManager.
Also, I would suggest move the scmPipelineManager.setSafeModeStatus(isInSafeMode.get()); into the safeModeExitThread run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipelineManager.startPipelineCreator(); is there in exitSafeMode so that once after we are out of safeMode schedule a periodic task to create pipelines.

triggerPipelineCreation will schedule a task if no pipeline creator is running when the call happened, as previously here we used to destroy pipelines, now replaced it with a call to trigger pipeline creation.

And also we want to run scrubber after safe mode exit, but I think we should be fine to scrub pipelines after an additional safeModewait time. Addressed this.

});

safeModeExitThread.setDaemon(true);
Expand All @@ -124,21 +122,6 @@ public void onMessage(SafeModeStatus safeModeStatus,

}

private void cleanupPipelines() {
List<Pipeline> pipelineList = scmPipelineManager.getPipelines();
pipelineList.forEach((pipeline) -> {
try {
if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
pipeline.isAllocationTimeout()) {
scmPipelineManager.finalizeAndDestroyPipeline(pipeline, false);
}
} catch (IOException ex) {
LOG.error("Finalize and destroy pipeline failed for pipeline "
+ pipeline.toString(), ex);
}
});
}

public boolean getSafeModeStatus() {
return isInSafeMode.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ public StorageContainerManager(OzoneConfiguration conf,
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, safeModeHandler);

// Emit initial safe mode status, as now handlers are registered.
scmSafeModeManager.emitSafeModeStatus();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can save this step as most likely it's still in safe mode at the time.

Copy link
Contributor Author

@bharatviswa504 bharatviswa504 Mar 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is there here, so that in all Managers and protocolServer where we need safeModeStatus will get the value accordingly.

    isInSafeMode.set(safeModeStatus.getSafeModeStatus());
    scmClientProtocolServer.setSafeModeStatus(isInSafeMode.get());
    scmBlockManager.setSafeModeStatus(isInSafeMode.get());
    scmPipelineManager.setSafeModeStatus(isInSafeMode.get());

With the current code, we don't need it but in the future, if some manager has not read the HDDS_SCM_SAFEMODE_ENABLED and waiting for this to set the initial safemode status and then take specific actions it will help.

registerMXBean();
registerMetricsSource(this);
}
Expand Down