Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
Expand All @@ -46,30 +47,18 @@
/**
* Base class for all the container report handlers.
*/
public class AbstractContainerReportHandler {

abstract class AbstractContainerReportHandler {
private final NodeManager nodeManager;
private final ContainerManager containerManager;
private final SCMContext scmContext;
private final Logger logger;

/**
* Constructs AbstractContainerReportHandler instance with the
* given ContainerManager instance.
*
* @param containerManager ContainerManager
* @param logger Logger to be used for logging
*/
AbstractContainerReportHandler(final ContainerManager containerManager,
final SCMContext scmContext,
final Logger logger) {
AbstractContainerReportHandler(NodeManager nodeManager, ContainerManager containerManager, SCMContext scmContext) {
this.nodeManager = Objects.requireNonNull(nodeManager, "nodeManager == null");
this.containerManager = Objects.requireNonNull(containerManager, "containerManager == null");
this.scmContext = Objects.requireNonNull(scmContext, "scmContext == null");
this.logger = Objects.requireNonNull(logger, "logger == null");
}

protected Logger getLogger() {
return logger;
}
protected abstract Logger getLogger();

/** @return the container in SCM and the replica from a datanode details for logging. */
static Object getDetailsForLogging(ContainerInfo container, ContainerReplicaProto replica, DatanodeDetails datanode) {
Expand Down Expand Up @@ -414,10 +403,10 @@ private boolean isHealthy(final State replicaState) {
&& replicaState != State.DELETED;
}

/**
* Return ContainerManager.
* @return {@link ContainerManager}
*/
protected NodeManager getNodeManager() {
return nodeManager;
}

protected ContainerManager getContainerManager() {
return containerManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,15 @@ public class ContainerReportHandler extends AbstractContainerReportHandler
private static final Logger LOG =
LoggerFactory.getLogger(ContainerReportHandler.class);

private final NodeManager nodeManager;
private final ContainerManager containerManager;
private final String unknownContainerHandleAction;
enum UnknownContainerAction {
WARN, DELETE;

/**
* The action taken by ContainerReportHandler to handle
* unknown containers.
*/
static final String UNKNOWN_CONTAINER_ACTION_WARN = "WARN";
static final String UNKNOWN_CONTAINER_ACTION_DELETE = "DELETE";
static UnknownContainerAction parse(String s) {
return s.equals(DELETE.name()) ? DELETE : WARN;
}
}

private final UnknownContainerAction unknownContainerHandleAction;

/**
* Constructs ContainerReportHandler instance with the
Expand All @@ -70,18 +69,21 @@ public ContainerReportHandler(final NodeManager nodeManager,
final ContainerManager containerManager,
final SCMContext scmContext,
OzoneConfiguration conf) {
super(containerManager, scmContext, LOG);
this.nodeManager = nodeManager;
this.containerManager = containerManager;
super(nodeManager, containerManager, scmContext);

if (conf != null) {
ScmConfig scmConfig = conf.getObject(ScmConfig.class);
unknownContainerHandleAction = scmConfig.getUnknownContainerAction();
unknownContainerHandleAction = UnknownContainerAction.parse(scmConfig.getUnknownContainerAction());
} else {
unknownContainerHandleAction = UNKNOWN_CONTAINER_ACTION_WARN;
unknownContainerHandleAction = UnknownContainerAction.WARN;
}
}

@Override
protected Logger getLogger() {
return LOG;
}

public ContainerReportHandler(final NodeManager nodeManager,
final ContainerManager containerManager) {
this(nodeManager, containerManager, SCMContext.emptyContext(), null);
Expand Down Expand Up @@ -142,10 +144,9 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,

final DatanodeDetails dnFromReport =
reportFromDatanode.getDatanodeDetails();
final DatanodeDetails datanodeDetails = nodeManager.getNode(dnFromReport.getID());
final DatanodeDetails datanodeDetails = getNodeManager().getNode(dnFromReport.getID());
if (datanodeDetails == null) {
LOG.warn("Received container report from unknown datanode {}",
dnFromReport);
getLogger().warn("Datanode not found: {}", dnFromReport);
return;
}
final ContainerReportsProto containerReport =
Expand All @@ -159,7 +160,7 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
final List<ContainerReplicaProto> replicas =
containerReport.getReportsList();
final Set<ContainerID> expectedContainersInDatanode =
nodeManager.getContainers(datanodeDetails);
getNodeManager().getContainers(datanodeDetails);

for (ContainerReplicaProto replica : replicas) {
ContainerID cid = ContainerID.valueOf(replica.getContainerID());
Expand All @@ -169,7 +170,7 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
// from protobuf. However we don't want to store that object if
// there is already an instance for the same ContainerID we can
// reuse.
container = containerManager.getContainer(cid);
container = getContainerManager().getContainer(cid);
cid = container.containerID();
} catch (ContainerNotFoundException e) {
// Ignore this for now. It will be handled later with a null check
Expand All @@ -181,7 +182,7 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
boolean alreadyInDn = expectedContainersInDatanode.remove(cid);
if (!alreadyInDn) {
// This is a new Container not in the nodeManager -> dn map yet
nodeManager.addContainer(datanodeDetails, cid);
getNodeManager().addContainer(datanodeDetails, cid);
}
if (container == null || ContainerReportValidator
.validate(container, datanodeDetails, replica)) {
Expand All @@ -193,17 +194,16 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
// report, so it is now missing on the DN. We need to remove it from the
// list
processMissingReplicas(datanodeDetails, expectedContainersInDatanode);
containerManager.notifyContainerReportProcessing(true, true);
getContainerManager().notifyContainerReportProcessing(true, true);
if (reportFromDatanode.isRegister()) {
publisher.fireEvent(SCMEvents.CONTAINER_REGISTRATION_REPORT,
new SCMDatanodeProtocolServer.NodeRegistrationContainerReport(datanodeDetails,
reportFromDatanode.getReport()));
}
}
} catch (NodeNotFoundException ex) {
containerManager.notifyContainerReportProcessing(true, false);
LOG.error("Received container report from unknown datanode {}.",
datanodeDetails, ex);
getContainerManager().notifyContainerReportProcessing(true, false);
getLogger().warn("Datanode not found: {}", datanodeDetails, ex);
}

}
Expand All @@ -224,11 +224,9 @@ private void processSingleReplica(final DatanodeDetails datanodeDetails,
final EventPublisher publisher) {
final Object detailsForLogging = getDetailsForLogging(container, replicaProto, datanodeDetails);
if (container == null) {
if (unknownContainerHandleAction.equals(
UNKNOWN_CONTAINER_ACTION_WARN)) {
if (unknownContainerHandleAction == UnknownContainerAction.WARN) {
getLogger().error("CONTAINER_NOT_FOUND for {}", detailsForLogging);
} else if (unknownContainerHandleAction.equals(
UNKNOWN_CONTAINER_ACTION_DELETE)) {
} else if (unknownContainerHandleAction == UnknownContainerAction.DELETE) {
final ContainerID containerId = ContainerID
.valueOf(replicaProto.getContainerID());
deleteReplica(containerId, datanodeDetails, publisher, "CONTAINER_NOT_FOUND", true, detailsForLogging);
Expand All @@ -253,26 +251,26 @@ private void processMissingReplicas(final DatanodeDetails datanodeDetails,
final Set<ContainerID> missingReplicas) {
for (ContainerID id : missingReplicas) {
try {
nodeManager.removeContainer(datanodeDetails, id);
getNodeManager().removeContainer(datanodeDetails, id);
} catch (NodeNotFoundException e) {
LOG.warn("Failed to remove container {} from a node which does not " +
"exist {}", id, datanodeDetails, e);
getLogger().warn("Failed to remove missing container {}: datanode {} not found",
id, datanodeDetails, e);
}
try {
containerManager.getContainerReplicas(id).stream()
getContainerManager().getContainerReplicas(id).stream()
.filter(replica -> replica.getDatanodeDetails()
.equals(datanodeDetails)).findFirst()
.ifPresent(replica -> {
try {
containerManager.removeContainerReplica(id, replica);
getContainerManager().removeContainerReplica(id, replica);
} catch (ContainerNotFoundException |
ContainerReplicaNotFoundException ignored) {
// This should not happen, but even if it happens, not an issue
}
});
} catch (ContainerNotFoundException e) {
LOG.warn("Cannot remove container replica, container {} not found.",
id, e);
getLogger().warn("Failed to remove container replica: container {} not found in datanode {}",
id, datanodeDetails, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.container.report.ContainerReportValidator;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
Expand All @@ -42,27 +43,26 @@ public class IncrementalContainerReportHandler extends
private static final Logger LOG = LoggerFactory.getLogger(
IncrementalContainerReportHandler.class);

private final NodeManager nodeManager;

public IncrementalContainerReportHandler(
final NodeManager nodeManager,
final ContainerManager containerManager,
final SCMContext scmContext) {
super(containerManager, scmContext, LOG);
this.nodeManager = nodeManager;
super(nodeManager, containerManager, scmContext);
}

@Override
protected Logger getLogger() {
return LOG;
}

@Override
public void onMessage(final IncrementalContainerReportFromDatanode report,
final EventPublisher publisher) {
final DatanodeDetails dnFromReport = report.getDatanodeDetails();
if (LOG.isDebugEnabled()) {
LOG.debug("Processing incremental container report from data node {}", dnFromReport);
}
final DatanodeDetails dd = nodeManager.getNode(dnFromReport.getID());
getLogger().debug("Processing incremental container report from datanode {}", dnFromReport);
final DatanodeDetails dd = getNodeManager().getNode(dnFromReport.getID());
if (dd == null) {
LOG.warn("Received container report from unknown datanode {}",
dnFromReport);
getLogger().warn("Datanode not found: {}", dnFromReport);
return;
}

Expand All @@ -82,44 +82,38 @@ public void onMessage(final IncrementalContainerReportFromDatanode report,
// Ensure we reuse the same ContainerID instance in containerInfo
id = container.containerID();
} finally {
if (replicaProto.getState().equals(
ContainerReplicaProto.State.DELETED)) {
nodeManager.removeContainer(dd, id);
if (replicaProto.getState() == State.DELETED) {
getNodeManager().removeContainer(dd, id);
} else {
nodeManager.addContainer(dd, id);
getNodeManager().addContainer(dd, id);
}
}
if (ContainerReportValidator.validate(container, dd, replicaProto)) {
processContainerReplica(dd, container, replicaProto, publisher);
}
success = true;
} catch (ContainerNotFoundException e) {
LOG.warn("Container {} not found!", replicaProto.getContainerID());
getLogger().warn("Container {} not found!", replicaProto.getContainerID());
} catch (NodeNotFoundException ex) {
LOG.error("Received ICR from unknown datanode {}",
report.getDatanodeDetails(), ex);
getLogger().error("Datanode not found {}", report.getDatanodeDetails(), ex);
} catch (ContainerReplicaNotFoundException e) {
LOG.warn("Container {} replica not found!",
getLogger().warn("Container {} replica not found!",
replicaProto.getContainerID());
} catch (SCMException ex) {
if (ex.getResult() == SCMException.ResultCodes.SCM_NOT_LEADER) {
LOG.info("Failed to process {} container {}: {}",
getLogger().info("Failed to process {} container {}: {}",
replicaProto.getState(), id, ex.getMessage());
} else {
LOG.error("Exception while processing ICR for container {}",
getLogger().error("Exception while processing ICR for container {}",
replicaProto.getContainerID(), ex);
}
} catch (IOException | InvalidStateTransitionException e) {
LOG.error("Exception while processing ICR for container {}",
getLogger().error("Exception while processing ICR for container {}",
replicaProto.getContainerID(), e);
}
}
}

getContainerManager().notifyContainerReportProcessing(false, success);
}

protected NodeManager getNodeManager() {
return this.nodeManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testUnknownContainerDeleted() {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(
ScmConfig.HDDS_SCM_UNKNOWN_CONTAINER_ACTION,
ContainerReportHandler.UNKNOWN_CONTAINER_ACTION_DELETE);
ContainerReportHandler.UnknownContainerAction.DELETE.name());

sendContainerReport(conf);
verify(publisher, times(1)).fireEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,25 @@
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Recon's container report handler.
*/
public class ReconContainerReportHandler extends ContainerReportHandler {
private static final Logger LOG = LoggerFactory.getLogger(ReconContainerReportHandler.class);

public ReconContainerReportHandler(NodeManager nodeManager,
ContainerManager containerManager) {
super(nodeManager, containerManager);
}

@Override
protected Logger getLogger() {
return LOG;
}

@Override
public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
final EventPublisher publisher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public ReconIncrementalContainerReportHandler(NodeManager nodeManager,
super(nodeManager, containerManager, scmContext);
}

@Override
protected Logger getLogger() {
return LOG;
}

@Override
public void onMessage(final IncrementalContainerReportFromDatanode report,
final EventPublisher publisher) {
Expand Down