Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,12 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT =
"ozone.scm.pipeline.destroy.timeout";

// We wait for 150s before closing containers
// OzoneConfigKeys#OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION.
// So, we are waiting for another 150s before deleting the pipeline
// (150 + 150) = 300s
public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT =
"66s";
"300s";

public static final String OZONE_SCM_PIPELINE_CREATION_INTERVAL =
"ozone.scm.pipeline.creation.interval";
Expand All @@ -427,7 +431,7 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL =
"ozone.scm.pipeline.scrub.interval";
public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT =
"5m";
"150s";


// Allow SCM to auto create factor ONE ratis pipeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,21 @@ public static Codec<Pipeline> getCodec() {
// suggested leader id with high priority
private final UUID suggestedLeaderId;

private final Instant stateEnterTime;

/**
* The immutable properties of pipeline object is used in
* ContainerStateManager#getMatchingContainerByPipeline to take a lock on
* the container allocations for a particular pipeline.
* <br><br>
* Since the Pipeline class is immutable, if we want to change the state of
* the Pipeline we should create a new Pipeline object with the new state.
* Make sure that you set the value of <i>creationTimestamp</i> properly while
* creating the new Pipeline object.
* <br><br>
* There is no need to worry about the value of <i>stateEnterTime</i> as it's
* set to <i>Instant.now</i> when you crate the Pipeline object as part of
* state change.
*/
private Pipeline(PipelineID id,
ReplicationConfig replicationConfig, PipelineState state,
Expand All @@ -102,6 +113,7 @@ private Pipeline(PipelineID id,
this.creationTimestamp = Instant.now();
this.suggestedLeaderId = suggestedLeaderId;
this.replicaIndexes = new HashMap<>();
this.stateEnterTime = Instant.now();
Copy link
Contributor

@kerneltime kerneltime Nov 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to compare against the configured clock in SCMContainerManager

private void initializeSystemManagers(OzoneConfiguration conf,
      SCMConfigurator configurator) throws IOException {
    // Use SystemClock when data is persisted
    // and used again after system restarts.
    systemClock = Clock.system(ZoneOffset.UTC);

Would it be possible to get the creation timestamp from the same clock here? Currently it should be fine but it would be more resistant to bugs due to refactoring in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, let me create a separate Jira for handling this.

}

/**
Expand Down Expand Up @@ -140,6 +152,10 @@ public Instant getCreationTimestamp() {
return creationTimestamp;
}

public Instant getStateEnterTime() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, need set closedState time when close is triggered for pipeline close, else this time is same as creation time as initialized in constructor.
This is to scrub from close time with delay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sumitagrawl the PipelineState inside Pipeline class is immutable.
If we want to change the state of a pipeline, we create a new Pipeline object with new state which will also update the stateEnterTime with current time.

Since PipelineState is immutable, stateEnterTime is also designed to be immutable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nandakumar131
We can add comment in Pipeline build() to notify the stateEnterTime will be latest when enter the state and no need update the time while building this object. Just to avoid later on changes if context is missed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is a little strange that creationTime and stateEnterTime are both basically hardcoded to "now". Feels like we should only need one of them, as creation time isn't really "correct". If the pipeline changes state from open to closed, the creation time is going to change too, due to its immutable nature. I can see how this could be confusing to someone in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sodonnel, I completely understand the confusion here. We should refactor the code for better understandability.
Currently we don't have any functional issues as we update the creation time in the builder while we create the Pipeline object from existing pipeline reference.
If you look at Pipeline$Builder(Pipeline pipeline), it assigns the creation time from the given pipeline reference.

With the current code, it's easy to introduce bugs in future. We should re-write the Pipeline class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The creationTimestamp in the Pipeline constructor is a fallback value. We update the creation time in the Pipeline$Builder#build. If the creation time is not set in the builder, we use the Instant#now value set in constructor.

Copy link
Contributor

@sodonnel sodonnel Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed that the builder sets creation time. I guess this is OK as the constructor is private, so the only way to create the object is via the builder.

I noticed that the setter is public:

  public void setCreationTimestamp(Instant creationTimestamp) {
    this.creationTimestamp = creationTimestamp;
  }

We could make that private as the inner Builder class can call the private methods. This is not really in scope for this PR though.

return stateEnterTime;
}

/**
* Return the suggested leaderId which has a high priority among DNs of the
* pipeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public void onMessage(final DatanodeDetails datanodeDetails,
* action.
*/
LOG.info("A dead datanode is detected. {}", datanodeDetails);
destroyPipelines(datanodeDetails);
closeContainers(datanodeDetails, publisher);
destroyPipelines(datanodeDetails);

// Remove the container replicas associated with the dead node unless it
// is IN_MAINTENANCE
Expand Down Expand Up @@ -122,8 +122,8 @@ private void destroyPipelines(final DatanodeDetails datanodeDetails) {
.ifPresent(pipelines ->
pipelines.forEach(id -> {
try {
pipelineManager.closePipeline(
pipelineManager.getPipeline(id), false);
pipelineManager.closePipeline(id);
pipelineManager.deletePipeline(id);
} catch (PipelineNotFoundException ignore) {
// Pipeline is not there in pipeline manager,
// should we care?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void onMessage(DatanodeDetails datanodeDetails,
pipelineID, pipeline.getPipelineState(),
HddsProtos.NodeState.HEALTHY_READONLY,
datanodeDetails.getUuidString());
pipelineManager.closePipeline(pipeline, true);
pipelineManager.closePipeline(pipelineID);
} catch (IOException ex) {
LOG.error("Failed to close pipeline {} which uses HEALTHY READONLY " +
"datanode {}: ", pipelineID, datanodeDetails, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
Expand Down Expand Up @@ -59,8 +58,7 @@ public void onMessage(DatanodeDetails datanodeDetails,
datanodeDetails, pipelineIds);
for (PipelineID pipelineID : pipelineIds) {
try {
Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
pipelineManager.closePipeline(pipeline, true);
pipelineManager.closePipeline(pipelineID);
} catch (IOException e) {
LOG.info("Could not finalize pipeline={} for dn={}", pipelineID,
datanodeDetails);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hadoop.hdds.scm.node;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
Expand Down Expand Up @@ -57,8 +56,7 @@ public void onMessage(DatanodeDetails datanodeDetails,
datanodeDetails, pipelineIds);
for (PipelineID pipelineID : pipelineIds) {
try {
Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
pipelineManager.closePipeline(pipeline, false);
pipelineManager.closePipeline(pipelineID);
} catch (IOException e) {
LOG.info("Could not finalize pipeline={} for dn={}", pipelineID,
datanodeDetails);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ private void processPipelineAction(final DatanodeDetails datanode,
info.getDetailedReason());

if (action == PipelineAction.Action.CLOSE) {
pipelineManager.closePipeline(
pipelineManager.getPipeline(pid), false);
pipelineManager.closePipeline(pid);
} else {
LOG.error("unknown pipeline action:{}", action);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,14 @@ NavigableSet<ContainerID> getContainersInPipeline(PipelineID pipelineID)

void openPipeline(PipelineID pipelineId) throws IOException;

@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why leave this deprecated on around? This is not an external facing API, so nothing should be using it except tests. It would probably be better to just remove it now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we can remove this method completely. I just didn't want to have large changes in a single PR if possible, it will become hard for the reviewers.

We will have to change another 15+ classes to remove this method, I will create a follow-up Jira to make this change.

void closePipeline(Pipeline pipeline, boolean onTimeout)
throws IOException;

void closePipeline(PipelineID pipelineID) throws IOException;

void deletePipeline(PipelineID pipelineID) throws IOException;

void closeStalePipelines(DatanodeDetails datanodeDetails);

void scrubPipelines() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,33 +489,46 @@ private void closeContainersForPipeline(final PipelineID pipelineId)
* put pipeline in CLOSED state.
* @param pipeline - ID of the pipeline.
* @param onTimeout - whether to remove pipeline after some time.
* @throws IOException
* @throws IOException throws exception in case of failure
* @deprecated Do not use this method, onTimeout is not honored.
*/
@Override
@Deprecated
public void closePipeline(Pipeline pipeline, boolean onTimeout)
throws IOException {
PipelineID pipelineID = pipeline.getId();
throws IOException {
closePipeline(pipeline.getId());
}

/**
* Move the Pipeline to CLOSED state.
* @param pipelineID ID of the Pipeline to be closed
* @throws IOException In case of exception while closing the Pipeline
*/
public void closePipeline(PipelineID pipelineID) throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
// close containers.
closeContainersForPipeline(pipelineID);

if (!pipeline.isClosed()) {
if (!getPipeline(pipelineID).isClosed()) {
acquireWriteLock();
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_CLOSED);
} finally {
releaseWriteLock();
}
LOG.info("Pipeline {} moved to CLOSED state", pipeline);
LOG.info("Pipeline {} moved to CLOSED state", pipelineID);
}

metrics.removePipelineMetrics(pipelineID);

if (!onTimeout) {
// close pipeline right away.
removePipeline(pipeline);
}
}

/**
* Deletes the Pipeline for the given PipelineID.
* @param pipelineID ID of the Pipeline to be deleted
* @throws IOException In case of exception while deleting the Pipeline
*/
public void deletePipeline(PipelineID pipelineID) throws IOException {
removePipeline(getPipeline(pipelineID));
}

/** close the pipelines whose nodes' IPs are stale.
Expand All @@ -535,9 +548,10 @@ public void closeStalePipelines(DatanodeDetails datanodeDetails) {
pipelinesWithStaleIpOrHostname.size());
pipelinesWithStaleIpOrHostname.forEach(p -> {
try {
LOG.info("Closing the stale pipeline: {}", p.getId());
closePipeline(p, false);
LOG.info("Closed the stale pipeline: {}", p.getId());
final PipelineID id = p.getId();
LOG.info("Closing the stale pipeline: {}", id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I am not sure we need this log message. We have a message below for "closed" and then exception handler has a message for "failed to close".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

closePipeline(id);
deletePipeline(id);
} catch (IOException e) {
LOG.error("Closing the stale pipeline failed: {}", p, e);
}
Expand Down Expand Up @@ -568,26 +582,34 @@ public void scrubPipelines() throws IOException {
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
long pipelineDeleteTimoutInMills = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);

List<Pipeline> candidates = stateManager.getPipelines();

for (Pipeline p : candidates) {
final PipelineID id = p.getId();
// scrub pipelines who stay ALLOCATED for too long.
if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
(currentTime.toEpochMilli() - p.getCreationTimestamp()
.toEpochMilli() >= pipelineScrubTimeoutInMills)) {

LOG.info("Scrubbing pipeline: id: {} since it stays at ALLOCATED " +
"stage for {} mins.", p.getId(),
"stage for {} mins.", id,
Duration.between(currentTime, p.getCreationTimestamp())
.toMinutes());
closePipeline(p, false);
closePipeline(id);
deletePipeline(id);
}
// scrub pipelines who stay CLOSED for too long.
if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) {
if (p.getPipelineState() == Pipeline.PipelineState.CLOSED &&
(currentTime.toEpochMilli() - p.getStateEnterTime().toEpochMilli())
>= pipelineDeleteTimoutInMills) {
LOG.info("Scrubbing pipeline: id: {} since it stays at CLOSED stage.",
p.getId());
closeContainersForPipeline(p.getId());
removePipeline(p);
deletePipeline(id);
}
// If a datanode is stopped and then SCM is restarted, a pipeline can get
// stuck in an open state. For Ratis, provided some other DNs that were
Expand All @@ -599,8 +621,7 @@ public void scrubPipelines() throws IOException {
if (isOpenWithUnregisteredNodes(p)) {
LOG.info("Scrubbing pipeline: id: {} as it has unregistered nodes",
p.getId());
closeContainersForPipeline(p.getId());
closePipeline(p, true);
closePipeline(id);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ public void closePipeline(final Pipeline pipeline, final boolean onTimeout)
HddsProtos.PipelineState.PIPELINE_CLOSED);
}

@Override
public void closePipeline(PipelineID pipelineID) throws IOException {

}

@Override
public void deletePipeline(PipelineID pipelineID) throws IOException {

}

@Override
public void closeStalePipelines(DatanodeDetails datanodeDetails) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.UUID;

/**
Expand All @@ -39,12 +39,12 @@ public class TestPipelineActionHandler {

@Test
public void testCloseActionForMissingPipeline()
throws PipelineNotFoundException, NotLeaderException {
throws IOException {
final PipelineManager manager = Mockito.mock(PipelineManager.class);
final EventQueue queue = Mockito.mock(EventQueue.class);

Mockito.when(manager.getPipeline(Mockito.any(PipelineID.class)))
.thenThrow(new PipelineNotFoundException());
Mockito.doThrow(new PipelineNotFoundException())
.when(manager).closePipeline(Mockito.any(PipelineID.class));

final PipelineActionHandler actionHandler =
new PipelineActionHandler(manager, SCMContext.emptyContext(), null);
Expand Down
Loading