Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.slf4j.Logger;

import java.io.IOException;
Expand Down Expand Up @@ -68,7 +72,7 @@ public class AbstractContainerReportHandler {
* @throws IOException In case of any Exception while processing the report
*/
protected void processContainerReplica(final DatanodeDetails datanodeDetails,
final ContainerReplicaProto replicaProto)
final ContainerReplicaProto replicaProto, final EventPublisher publisher)
throws IOException {
final ContainerID containerId = ContainerID
.valueof(replicaProto.getContainerID());
Expand All @@ -80,7 +84,8 @@ protected void processContainerReplica(final DatanodeDetails datanodeDetails,
// Synchronized block should be replaced by container lock,
// once we have introduced lock inside ContainerInfo.
synchronized (containerManager.getContainer(containerId)) {
updateContainerStats(datanodeDetails, containerId, replicaProto);
updateContainerStats(datanodeDetails, containerId, replicaProto,
publisher);
updateContainerState(datanodeDetails, containerId, replicaProto);
updateContainerReplica(datanodeDetails, containerId, replicaProto);
}
Expand All @@ -96,13 +101,24 @@ protected void processContainerReplica(final DatanodeDetails datanodeDetails,
*/
private void updateContainerStats(final DatanodeDetails datanodeDetails,
final ContainerID containerId,
final ContainerReplicaProto replicaProto)
final ContainerReplicaProto replicaProto,
final EventPublisher publisher)
throws ContainerNotFoundException {
final ContainerInfo containerInfo = containerManager
.getContainer(containerId);

if (isHealthy(replicaProto::getState)) {
final ContainerInfo containerInfo = containerManager
.getContainer(containerId);
if (containerInfo.getState() == HddsProtos.LifeCycleState.DELETED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem correct to put the logic to delete the replica for the DELETED container inside updateContainerStats.

In ContainerReportHandler#processContainerReplicas(..) there is logic to delete an unknown container in the exception handler.

Could we extract this into a new method, which is called from the exception handler. Then in AbstractContainerReportHandler#updateContainerState(...) handle the containers which should be deleted in the "case DELETED" branch of the swith statement. It could call that same extracted method - that way the logic to form the DeleteContainer command will be the same for both? It also seems more logical to put the delete inside UpdateContainerState rather than UpdateContainerStats.

final DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(containerId.getId(), true);
final CommandForDatanode datanodeCommand = new CommandForDatanode<>(
datanodeDetails.getUuid(), deleteCommand);
publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
logger.info("Sending delete container command for deleted container {}"
+ " to datanode {}", containerId.getId(), datanodeDetails);
return;
}

if (isHealthy(replicaProto::getState)) {
if (containerInfo.getSequenceId() <
replicaProto.getBlockCommitSequenceId()) {
containerInfo.updateSequenceId(
Expand Down Expand Up @@ -244,11 +260,15 @@ private void updateContainerState(final DatanodeDetails datanode,
*/
break;
case DELETING:
throw new UnsupportedOperationException(
"Unsupported container state 'DELETING'.");
/*
* The container is under deleting. do nothing.
*/
break;
case DELETED:
throw new UnsupportedOperationException(
"Unsupported container state 'DELETED'.");
/*
* The container is deleted. do nothing.
*/
break;
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private void processContainerReplicas(final DatanodeDetails datanodeDetails,
final EventPublisher publisher) {
for (ContainerReplicaProto replicaProto : replicas) {
try {
processContainerReplica(datanodeDetails, replicaProto);
processContainerReplica(datanodeDetails, replicaProto, publisher);
} catch (ContainerNotFoundException e) {
if(unknownContainerHandleAction.equals(
UNKNOWN_CONTAINER_ACTION_WARN)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void onMessage(final IncrementalContainerReportFromDatanode report,
ContainerReplicaProto.State.DELETED)) {
nodeManager.addContainer(dd, id);
}
processContainerReplica(dd, replicaProto);
processContainerReplica(dd, replicaProto, publisher);
} catch (ContainerNotFoundException e) {
success = false;
LOG.warn("Container {} not found!", replicaProto.getContainerID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
Expand Down Expand Up @@ -79,7 +80,7 @@
public class ReplicationManager
implements MetricsSource, EventHandler<SafeModeStatus> {

private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(ReplicationManager.class);

public static final String METRICS_SOURCE_NAME = "SCMReplicationManager";
Expand Down Expand Up @@ -312,6 +313,16 @@ private void processContainer(ContainerID id) {
action -> replicas.stream()
.noneMatch(r -> r.getDatanodeDetails().equals(action.datanode)));

/*
* If container is under deleting and all it's replicas are deleted, then
* make the container as CLEANED, or resend the delete replica command if
* needed.
*/
if (state == LifeCycleState.DELETING) {
handleContainerUnderDelete(container, replicas);
return;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add another check here:

if (state == LifeCycleState.DELETED) {
  return
}

This will avoid doing any further processing on a container which is expected to have zero replicas?

/*
* We don't have to take any action if the container is healthy.
*
Expand All @@ -320,6 +331,12 @@ private void processContainer(ContainerID id) {
* exact number of replicas in the same state.
*/
if (isContainerHealthy(container, replicas)) {
/*
* If container is empty, schedule task to delete the container.
*/
if (isContainerEmpty(container, replicas)) {
deleteContainerReplicas(container, replicas);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChenSammi , Is there any specific reason that we let ReplicationManager to help clean empty containers? After this, ReplicaManager will do additionally container empty check for all healthy containers. Not sure if this is an efficiency way to put logic here.

I wonder if it would be simpler to remove empty containers as part of Container Report processing? In AbstractContainerReportHandler#updateContainerState, we could check the size and number of keys of the reported containers in the CLOSED branch of the switch statement, and then take action to delete an empty container there? I have a feeling it might be simpler, but I am not sure. The disadvantage of doing it in the Container Report Processing, is that we are dealing with only a single replica at that stage. However if the container is CLOSED in SCM, and a report says it is empty then we should be good to simply remove the container from SCM and issue the delete container command when processing the container report.

Actually I prefer this way as @sodonnel mentioned.

but I am not sure. The disadvantage of doing it in the Container Report Processing, is that we are dealing with only a single replica at that stage

We could also get all replica info and check state in ContainerReportHandler, then send delete container command

I'm okay for current way but just share my thought for this.

return;
}

Expand Down Expand Up @@ -400,6 +417,21 @@ private boolean isContainerHealthy(final ContainerInfo container,
r -> compareState(container.getState(), r.getState()));
}

/**
* Returns true if the container is empty and CLOSED.
*
* @param container Container to check
* @param replicas Set of ContainerReplicas
* @return true if the container is empty, false otherwise
*/
private boolean isContainerEmpty(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
return container.getState() == LifeCycleState.CLOSED &&
(container.getUsedBytes() == 0 && container.getNumberOfKeys() == 0) &&
replicas.stream().allMatch(r -> r.getState() == State.CLOSED &&
r.getBytesUsed() == 0 && r.getKeyCount() == 0);
}

/**
* Checks if the container is under replicated or not.
*
Expand All @@ -409,6 +441,10 @@ private boolean isContainerHealthy(final ContainerInfo container,
*/
private boolean isContainerUnderReplicated(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
if (container.getState() != LifeCycleState.CLOSED &&
container.getState() != LifeCycleState.QUASI_CLOSED) {
return false;
}
boolean misReplicated = !getPlacementStatus(
replicas, container.getReplicationFactor().getNumber())
.isPolicySatisfied();
Expand Down Expand Up @@ -465,6 +501,64 @@ private boolean canForceCloseContainer(final ContainerInfo container,
return uniqueQuasiClosedReplicaCount > (replicationFactor / 2);
}

/**
* Delete the container and its replicas.
*
* @param container ContainerInfo
* @param replicas Set of ContainerReplicas
*/
private void deleteContainerReplicas(final ContainerInfo container,
final Set<ContainerReplica> replicas) throws IOException {
Preconditions.assertTrue(container.getState() ==
LifeCycleState.CLOSED);
Preconditions.assertTrue(container.getNumberOfKeys() == 0);
Preconditions.assertTrue(container.getUsedBytes() == 0);

replicas.stream().forEach(rp -> {
Preconditions.assertTrue(rp.getState() == State.CLOSED);
Preconditions.assertTrue(rp.getBytesUsed() == 0);
Preconditions.assertTrue(rp.getKeyCount() == 0);
sendDeleteCommand(container, rp.getDatanodeDetails(), false);
});
containerManager.updateContainerState(container.containerID(),
HddsProtos.LifeCycleEvent.DELETE);
LOG.debug("Deleting empty container {} replicas,", container.containerID());
}

/**
* Handle the container which is under delete.
*
* @param container ContainerInfo
* @param replicas Set of ContainerReplicas
*/
private void handleContainerUnderDelete(final ContainerInfo container,
final Set<ContainerReplica> replicas) throws IOException {
if (replicas.size() == 0) {
containerManager.updateContainerState(container.containerID(),
HddsProtos.LifeCycleEvent.CLEANUP);
LOG.debug("Container {} state changes to DELETED",
container.containerID());
} else {
// Check whether to resend the delete replica command
final List<DatanodeDetails> deletionInFlight = inflightDeletion
.getOrDefault(container.containerID(), Collections.emptyList())
.stream()
.map(action -> action.datanode)
.collect(Collectors.toList());
Set<ContainerReplica> filteredReplicas = replicas.stream().filter(
r -> !deletionInFlight.contains(r.getDatanodeDetails()))
.collect(Collectors.toSet());
// Resend the delete command
if (filteredReplicas.size() > 0) {
filteredReplicas.stream().forEach(rp -> {
sendDeleteCommand(container, rp.getDatanodeDetails(), false);
});
LOG.debug("Resend delete Container {} command",
container.containerID());
}
}
}

/**
* Force close the container replica(s) with highest sequence Id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,11 @@ private HddsProtos.LifeCycleState updateContainerState(
containerID);
}
}
containerStore.put(containerID, container);
if (newState == LifeCycleState.DELETED) {
containerStore.delete(containerID);
} else {
containerStore.put(containerID, container);
}
return newState;
} catch (ContainerNotFoundException cnfe) {
throw new SCMException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public ContainerSafeModeRule(String ruleName, EventQueue eventQueue,
// now. These containers can be handled by tracking pipelines.

Optional.ofNullable(container.getState())
.filter(state -> state != HddsProtos.LifeCycleState.OPEN)
.filter(state -> state != HddsProtos.LifeCycleState.CLOSING)
.filter(state -> (state == HddsProtos.LifeCycleState.QUASI_CLOSED ||
state == HddsProtos.LifeCycleState.CLOSED))
.ifPresent(s -> containerMap.put(container.getContainerID(),
container));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ public static ContainerReplica getReplicas(
.setDatanodeDetails(datanodeDetails)
.setOriginNodeId(originNodeId)
.setSequenceId(sequenceId)
.setBytesUsed(100)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
Expand Down Expand Up @@ -99,6 +100,7 @@ public static void init() throws Exception {
conf = new OzoneConfiguration();
GenericTestUtils.setLogLevel(DeletedBlockLogImpl.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(SCMBlockDeletingService.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(ReplicationManager.LOG, Level.DEBUG);

String path =
GenericTestUtils.getTempPath(TestBlockDeletion.class.getSimpleName());
Expand All @@ -119,6 +121,8 @@ public static void init() throws Exception {
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
conf.setQuietMode(false);
conf.setTimeDuration("hdds.scm.replication.event.timeout", 100,
TimeUnit.MILLISECONDS);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
.setHbInterval(200)
Expand Down Expand Up @@ -222,7 +226,7 @@ public void testBlockDeletion() throws Exception {
}

@Test
public void testContainerStatistics() throws Exception {
public void testContainerStatisticsAfterDelete() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();

Expand Down Expand Up @@ -277,6 +281,41 @@ public void testContainerStatistics() throws Exception {
Assert.assertEquals(0, container.getUsedBytes());
Assert.assertEquals(0, container.getNumberOfKeys());
});

cluster.shutdownHddsDatanode(0);
scm.getReplicationManager().processContainersNow();
// Wait for container state change to DELETING
Thread.sleep(100);
containerInfos = scm.getContainerManager().getContainers();
containerInfos.stream().forEach(container ->
Assert.assertEquals(HddsProtos.LifeCycleState.DELETING,
container.getState()));
LogCapturer logCapturer =
LogCapturer.captureLogs(ReplicationManager.LOG);
logCapturer.clearOutput();

scm.getReplicationManager().processContainersNow();
Thread.sleep(100);
// Wait for delete replica command resend
GenericTestUtils.waitFor(() -> logCapturer.getOutput()
.contains("Resend delete Container"), 500, 5000);
cluster.restartHddsDatanode(0, true);
Thread.sleep(100);

scm.getReplicationManager().processContainersNow();
// Wait for container state change to DELETED
Thread.sleep(100);
containerInfos = scm.getContainerManager().getContainers();
containerInfos.stream().forEach(container -> {
Assert.assertEquals(HddsProtos.LifeCycleState.DELETED,
container.getState());
try {
Assert.assertNull(scm.getScmMetadataStore().getContainerTable()
.get(container.containerID()));
} catch (IOException e) {
Assert.fail("Getting container from SCM DB should not fail");
}
});
}

private void waitForDatanodeBlockDeletionStart()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void onMessage(final IncrementalContainerReportFromDatanode report,
return;
}
getNodeManager().addContainer(dd, id);
processContainerReplica(dd, replicaProto);
processContainerReplica(dd, replicaProto, publisher);
} catch (ContainerNotFoundException e) {
success = false;
LOG.warn("Container {} not found!", replicaProto.getContainerID());
Expand Down