Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.slf4j.Logger;

import java.io.IOException;
Expand Down Expand Up @@ -68,7 +72,7 @@ public class AbstractContainerReportHandler {
* @throws IOException In case of any Exception while processing the report
*/
protected void processContainerReplica(final DatanodeDetails datanodeDetails,
final ContainerReplicaProto replicaProto)
final ContainerReplicaProto replicaProto, final EventPublisher publisher)
throws IOException {
final ContainerID containerId = ContainerID
.valueof(replicaProto.getContainerID());
Expand All @@ -81,8 +85,10 @@ protected void processContainerReplica(final DatanodeDetails datanodeDetails,
// once we have introduced lock inside ContainerInfo.
synchronized (containerManager.getContainer(containerId)) {
updateContainerStats(datanodeDetails, containerId, replicaProto);
updateContainerState(datanodeDetails, containerId, replicaProto);
updateContainerReplica(datanodeDetails, containerId, replicaProto);
if (!updateContainerState(datanodeDetails, containerId, replicaProto,
publisher)) {
updateContainerReplica(datanodeDetails, containerId, replicaProto);
}
}
}

Expand All @@ -98,11 +104,10 @@ private void updateContainerStats(final DatanodeDetails datanodeDetails,
final ContainerID containerId,
final ContainerReplicaProto replicaProto)
throws ContainerNotFoundException {
final ContainerInfo containerInfo = containerManager
.getContainer(containerId);

if (isHealthy(replicaProto::getState)) {
final ContainerInfo containerInfo = containerManager
.getContainer(containerId);

if (containerInfo.getSequenceId() <
replicaProto.getBlockCommitSequenceId()) {
containerInfo.updateSequenceId(
Expand Down Expand Up @@ -154,15 +159,18 @@ private List<ContainerReplica> getOtherReplicas(ContainerID containerId,
* @param datanode Datanode from which the report is received
* @param containerId ID of the container
* @param replica ContainerReplica
* @boolean true - replica should be ignored in the next process
* @throws IOException In case of Exception
*/
private void updateContainerState(final DatanodeDetails datanode,
private boolean updateContainerState(final DatanodeDetails datanode,
final ContainerID containerId,
final ContainerReplicaProto replica)
final ContainerReplicaProto replica,
final EventPublisher publisher)
throws IOException {

final ContainerInfo container = containerManager
.getContainer(containerId);
boolean ignored = false;

switch (container.getState()) {
case OPEN:
Expand Down Expand Up @@ -244,20 +252,29 @@ private void updateContainerState(final DatanodeDetails datanode,
*/
break;
case DELETING:
throw new UnsupportedOperationException(
"Unsupported container state 'DELETING'.");
/*
* The container is under deleting. do nothing.
*/
break;
case DELETED:
throw new UnsupportedOperationException(
"Unsupported container state 'DELETED'.");
/*
* The container is deleted. delete the replica.
*/
deleteReplica(containerId, datanode, publisher, "DELETED");
ignored = true;
break;
default:
break;
}

return ignored;
}

private void updateContainerReplica(final DatanodeDetails datanodeDetails,
final ContainerID containerId,
final ContainerReplicaProto replicaProto)
throws ContainerNotFoundException, ContainerReplicaNotFoundException {

final ContainerReplica replica = ContainerReplica.newBuilder()
.setContainerID(containerId)
.setContainerState(replicaProto.getState())
Expand Down Expand Up @@ -297,4 +314,14 @@ protected ContainerManager getContainerManager() {
return containerManager;
}

protected void deleteReplica(ContainerID containerID, DatanodeDetails dn,
EventPublisher publisher, String reason) {
final DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(containerID.getId(), true);
final CommandForDatanode datanodeCommand = new CommandForDatanode<>(
dn.getUuid(), deleteCommand);
publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
logger.info("Sending delete container command for " + reason +
" container {} to datanode {}", containerID.getId(), dn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -159,7 +157,7 @@ private void processContainerReplicas(final DatanodeDetails datanodeDetails,
final EventPublisher publisher) {
for (ContainerReplicaProto replicaProto : replicas) {
try {
processContainerReplica(datanodeDetails, replicaProto);
processContainerReplica(datanodeDetails, replicaProto, publisher);
} catch (ContainerNotFoundException e) {
if(unknownContainerHandleAction.equals(
UNKNOWN_CONTAINER_ACTION_WARN)) {
Expand All @@ -170,13 +168,7 @@ private void processContainerReplicas(final DatanodeDetails datanodeDetails,
UNKNOWN_CONTAINER_ACTION_DELETE)) {
final ContainerID containerId = ContainerID
.valueof(replicaProto.getContainerID());
final DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(containerId.getId(), true);
final CommandForDatanode datanodeCommand = new CommandForDatanode<>(
datanodeDetails.getUuid(), deleteCommand);
publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
LOG.info("Sending delete container command for unknown container {}"
+ " to datanode {}", containerId.getId(), datanodeDetails);
deleteReplica(containerId, datanodeDetails, publisher, "unknown");
}
} catch (IOException e) {
LOG.error("Exception while processing container report for container" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void onMessage(final IncrementalContainerReportFromDatanode report,
ContainerReplicaProto.State.DELETED)) {
nodeManager.addContainer(dd, id);
}
processContainerReplica(dd, replicaProto);
processContainerReplica(dd, replicaProto, publisher);
} catch (ContainerNotFoundException e) {
success = false;
LOG.warn("Container {} not found!", replicaProto.getContainerID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
Expand Down Expand Up @@ -79,7 +80,7 @@
public class ReplicationManager
implements MetricsSource, EventHandler<SafeModeStatus> {

private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(ReplicationManager.class);

public static final String METRICS_SOURCE_NAME = "SCMReplicationManager";
Expand Down Expand Up @@ -312,6 +313,16 @@ private void processContainer(ContainerID id) {
action -> replicas.stream()
.noneMatch(r -> r.getDatanodeDetails().equals(action.datanode)));

/*
* If container is under deleting and all it's replicas are deleted, then
* make the container as CLEANED, or resend the delete replica command if
* needed.
*/
if (state == LifeCycleState.DELETING) {
handleContainerUnderDelete(container, replicas);
return;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add another check here:

if (state == LifeCycleState.DELETED) {
  return
}

This will avoid doing any further processing on a container which is expected to have zero replicas?

/*
* We don't have to take any action if the container is healthy.
*
Expand All @@ -320,6 +331,12 @@ private void processContainer(ContainerID id) {
* exact number of replicas in the same state.
*/
if (isContainerHealthy(container, replicas)) {
/*
* If container is empty, schedule task to delete the container.
*/
if (isContainerEmpty(container, replicas)) {
deleteContainerReplicas(container, replicas);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChenSammi , Is there any specific reason that we let ReplicationManager to help clean empty containers? After this, ReplicaManager will do additionally container empty check for all healthy containers. Not sure if this is an efficiency way to put logic here.

I wonder if it would be simpler to remove empty containers as part of Container Report processing? In AbstractContainerReportHandler#updateContainerState, we could check the size and number of keys of the reported containers in the CLOSED branch of the switch statement, and then take action to delete an empty container there? I have a feeling it might be simpler, but I am not sure. The disadvantage of doing it in the Container Report Processing, is that we are dealing with only a single replica at that stage. However if the container is CLOSED in SCM, and a report says it is empty then we should be good to simply remove the container from SCM and issue the delete container command when processing the container report.

Actually I prefer this way as @sodonnel mentioned.

but I am not sure. The disadvantage of doing it in the Container Report Processing, is that we are dealing with only a single replica at that stage

We could also get all replica info and check state in ContainerReportHandler, then send delete container command

I'm okay for current way but just share my thought for this.

return;
}

Expand Down Expand Up @@ -400,6 +417,21 @@ private boolean isContainerHealthy(final ContainerInfo container,
r -> compareState(container.getState(), r.getState()));
}

/**
* Returns true if the container is empty and CLOSED.
*
* @param container Container to check
* @param replicas Set of ContainerReplicas
* @return true if the container is empty, false otherwise
*/
private boolean isContainerEmpty(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
return container.getState() == LifeCycleState.CLOSED &&
(container.getUsedBytes() == 0 && container.getNumberOfKeys() == 0) &&
replicas.stream().allMatch(r -> r.getState() == State.CLOSED &&
r.getBytesUsed() == 0 && r.getKeyCount() == 0);
}

/**
* Checks if the container is under replicated or not.
*
Expand All @@ -409,6 +441,10 @@ private boolean isContainerHealthy(final ContainerInfo container,
*/
private boolean isContainerUnderReplicated(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
if (container.getState() != LifeCycleState.CLOSED &&
container.getState() != LifeCycleState.QUASI_CLOSED) {
return false;
}
boolean misReplicated = !getPlacementStatus(
replicas, container.getReplicationFactor().getNumber())
.isPolicySatisfied();
Expand Down Expand Up @@ -465,6 +501,64 @@ private boolean canForceCloseContainer(final ContainerInfo container,
return uniqueQuasiClosedReplicaCount > (replicationFactor / 2);
}

/**
* Delete the container and its replicas.
*
* @param container ContainerInfo
* @param replicas Set of ContainerReplicas
*/
private void deleteContainerReplicas(final ContainerInfo container,
final Set<ContainerReplica> replicas) throws IOException {
Preconditions.assertTrue(container.getState() ==
LifeCycleState.CLOSED);
Preconditions.assertTrue(container.getNumberOfKeys() == 0);
Preconditions.assertTrue(container.getUsedBytes() == 0);

replicas.stream().forEach(rp -> {
Preconditions.assertTrue(rp.getState() == State.CLOSED);
Preconditions.assertTrue(rp.getBytesUsed() == 0);
Preconditions.assertTrue(rp.getKeyCount() == 0);
sendDeleteCommand(container, rp.getDatanodeDetails(), false);
});
containerManager.updateContainerState(container.containerID(),
HddsProtos.LifeCycleEvent.DELETE);
LOG.debug("Deleting empty container {} replicas,", container.containerID());
}

/**
* Handle the container which is under delete.
*
* @param container ContainerInfo
* @param replicas Set of ContainerReplicas
*/
private void handleContainerUnderDelete(final ContainerInfo container,
final Set<ContainerReplica> replicas) throws IOException {
if (replicas.size() == 0) {
containerManager.updateContainerState(container.containerID(),
HddsProtos.LifeCycleEvent.CLEANUP);
LOG.debug("Container {} state changes to DELETED",
container.containerID());
} else {
// Check whether to resend the delete replica command
final List<DatanodeDetails> deletionInFlight = inflightDeletion
.getOrDefault(container.containerID(), Collections.emptyList())
.stream()
.map(action -> action.datanode)
.collect(Collectors.toList());
Set<ContainerReplica> filteredReplicas = replicas.stream().filter(
r -> !deletionInFlight.contains(r.getDatanodeDetails()))
.collect(Collectors.toSet());
// Resend the delete command
if (filteredReplicas.size() > 0) {
filteredReplicas.stream().forEach(rp -> {
sendDeleteCommand(container, rp.getDatanodeDetails(), false);
});
LOG.debug("Resend delete Container {} command",
container.containerID());
}
}
}

/**
* Force close the container replica(s) with highest sequence Id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,11 @@ private HddsProtos.LifeCycleState updateContainerState(
containerID);
}
}
containerStore.put(containerID, container);
if (newState == LifeCycleState.DELETED) {
containerStore.delete(containerID);
} else {
containerStore.put(containerID, container);
}
return newState;
} catch (ContainerNotFoundException cnfe) {
throw new SCMException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public ContainerSafeModeRule(String ruleName, EventQueue eventQueue,
// now. These containers can be handled by tracking pipelines.

Optional.ofNullable(container.getState())
.filter(state -> state != HddsProtos.LifeCycleState.OPEN)
.filter(state -> state != HddsProtos.LifeCycleState.CLOSING)
.filter(state -> (state == HddsProtos.LifeCycleState.QUASI_CLOSED ||
state == HddsProtos.LifeCycleState.CLOSED))
.ifPresent(s -> containerMap.put(container.getContainerID(),
container));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ public static ContainerReplica getReplicas(
.setDatanodeDetails(datanodeDetails)
.setOriginNodeId(originNodeId)
.setSequenceId(sequenceId)
.setBytesUsed(100)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,39 @@ public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas()
assertEquals(11L, containerOne.getNumberOfKeys());
}

@Test
public void testStaleReplicaOfDeletedContainer() throws NodeNotFoundException,
SCMException, ContainerNotFoundException {

final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, containerManager);

final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();
final DatanodeDetails datanodeOne = nodeIterator.next();
final ContainerInfo containerOne = getContainer(LifeCycleState.DELETED);

final Set<ContainerID> containerIDSet = Stream.of(
containerOne.containerID())
.collect(Collectors.toSet());

nodeManager.setContainers(datanodeOne, containerIDSet);
containerStateManager.loadContainer(containerOne);

// Expects the replica will be deleted.
final ContainerReportsProto containerReport = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne.getUuidString());
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);

Mockito.verify(publisher, Mockito.times(1));

Assert.assertEquals(0, containerManager.getContainerReplicas(
containerOne.containerID()).size());
}

private ContainerReportFromDatanode getContainerReportFromDatanode(
ContainerID containerId, ContainerReplicaProto.State state,
DatanodeDetails dn, long bytesUsed, long keyCount) {
Expand Down
Loading