diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java index e3c903da2db0..52f42594429a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java @@ -61,7 +61,8 @@ abstract class AbstractContainerReportHandler { protected abstract Logger getLogger(); /** @return the container in SCM and the replica from a datanode details for logging. */ - static Object getDetailsForLogging(ContainerInfo container, ContainerReplicaProto replica, DatanodeDetails datanode) { + protected static Object getDetailsForLogging(ContainerInfo container, ContainerReplicaProto replica, + DatanodeDetails datanode) { Objects.requireNonNull(replica, "replica == null"); Objects.requireNonNull(datanode, "datanode == null"); if (container != null) { @@ -93,21 +94,6 @@ public String toString() { }; } - /** - * Process the given ContainerReplica received from specified datanode. - * - * @param datanodeDetails DatanodeDetails for the DN - * @param replicaProto Protobuf representing the replicas - * @param publisher EventPublisher instance - */ - protected void processContainerReplica(final DatanodeDetails datanodeDetails, - final ContainerReplicaProto replicaProto, final EventPublisher publisher) - throws IOException, InvalidStateTransitionException { - ContainerInfo container = getContainerManager().getContainer( - ContainerID.valueOf(replicaProto.getContainerID())); - processContainerReplica( - datanodeDetails, container, replicaProto, publisher); - } /** * Process the given ContainerReplica received from specified datanode. @@ -120,18 +106,15 @@ protected void processContainerReplica(final DatanodeDetails datanodeDetails, */ protected void processContainerReplica(final DatanodeDetails datanodeDetails, final ContainerInfo containerInfo, - final ContainerReplicaProto replicaProto, final EventPublisher publisher) + final ContainerReplicaProto replicaProto, final EventPublisher publisher, Object detailsForLogging) throws IOException, InvalidStateTransitionException { - final ContainerID containerId = containerInfo.containerID(); - final Object detailsForLogging = getDetailsForLogging(containerInfo, replicaProto, datanodeDetails); - getLogger().debug("Processing replica {}", detailsForLogging); // Synchronized block should be replaced by container lock, // once we have introduced lock inside ContainerInfo. synchronized (containerInfo) { updateContainerStats(datanodeDetails, containerInfo, replicaProto, detailsForLogging); if (!updateContainerState(datanodeDetails, containerInfo, replicaProto, publisher, detailsForLogging)) { - updateContainerReplica(datanodeDetails, containerId, replicaProto); + updateContainerReplica(datanodeDetails, containerInfo.containerID(), replicaProto); } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 288c479ba2d4..b632b1708a0c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -234,8 +234,7 @@ private void processSingleReplica(final DatanodeDetails datanodeDetails, return; } try { - processContainerReplica( - datanodeDetails, container, replicaProto, publisher); + processContainerReplica(datanodeDetails, container, replicaProto, publisher, detailsForLogging); } catch (IOException | InvalidStateTransitionException e) { getLogger().error("Failed to process {}", detailsForLogging, e); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java index 72722153b9a6..247e3667d9ef 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java @@ -36,8 +36,8 @@ /** * Handles incremental container reports from datanode. */ -public class IncrementalContainerReportHandler extends - AbstractContainerReportHandler +public class IncrementalContainerReportHandler + extends AbstractContainerReportHandler implements EventHandler { private static final Logger LOG = LoggerFactory.getLogger( @@ -58,14 +58,25 @@ protected Logger getLogger() { @Override public void onMessage(final IncrementalContainerReportFromDatanode report, final EventPublisher publisher) { + final DatanodeDetails datanode = getDatanodeDetails(report); + if (datanode == null) { + return; + } + processICR(report, publisher, datanode); + } + + protected DatanodeDetails getDatanodeDetails(final IncrementalContainerReportFromDatanode report) { final DatanodeDetails dnFromReport = report.getDatanodeDetails(); getLogger().debug("Processing incremental container report from datanode {}", dnFromReport); final DatanodeDetails dd = getNodeManager().getNode(dnFromReport.getID()); if (dd == null) { getLogger().warn("Datanode not found: {}", dnFromReport); - return; } + return dd; + } + protected void processICR(IncrementalContainerReportFromDatanode report, + EventPublisher publisher, DatanodeDetails dd) { boolean success = false; // HDDS-5249 - we must ensure that an ICR and FCR for the same datanode // do not run at the same time or it can result in a data consistency @@ -74,13 +85,15 @@ public void onMessage(final IncrementalContainerReportFromDatanode report, synchronized (dd) { for (ContainerReplicaProto replicaProto : report.getReport().getReportList()) { + Object detailsForLogging = getDetailsForLogging(null, replicaProto, dd); ContainerID id = ContainerID.valueOf(replicaProto.getContainerID()); - ContainerInfo container = null; + final ContainerInfo container; try { try { container = getContainerManager().getContainer(id); // Ensure we reuse the same ContainerID instance in containerInfo id = container.containerID(); + detailsForLogging = getDetailsForLogging(container, replicaProto, dd); } finally { if (replicaProto.getState() == State.DELETED) { getNodeManager().removeContainer(dd, id); @@ -89,27 +102,23 @@ public void onMessage(final IncrementalContainerReportFromDatanode report, } } if (ContainerReportValidator.validate(container, dd, replicaProto)) { - processContainerReplica(dd, container, replicaProto, publisher); + processContainerReplica(dd, container, replicaProto, publisher, detailsForLogging); } success = true; } catch (ContainerNotFoundException e) { - getLogger().warn("Container {} not found!", replicaProto.getContainerID()); + getLogger().warn("Container not found: {}", detailsForLogging); } catch (NodeNotFoundException ex) { - getLogger().error("Datanode not found {}", report.getDatanodeDetails(), ex); + getLogger().error("{}: {}", ex, detailsForLogging); } catch (ContainerReplicaNotFoundException e) { - getLogger().warn("Container {} replica not found!", - replicaProto.getContainerID()); + getLogger().warn("Container replica not found: {}", detailsForLogging, e); } catch (SCMException ex) { if (ex.getResult() == SCMException.ResultCodes.SCM_NOT_LEADER) { - getLogger().info("Failed to process {} container {}: {}", - replicaProto.getState(), id, ex.getMessage()); + getLogger().info("SCM_NOT_LEADER: Failed to process {}", detailsForLogging); } else { - getLogger().error("Exception while processing ICR for container {}", - replicaProto.getContainerID(), ex); + getLogger().info("Failed to process {}", detailsForLogging, ex); } } catch (IOException | InvalidStateTransitionException e) { - getLogger().error("Exception while processing ICR for container {}", - replicaProto.getContainerID(), e); + getLogger().info("Failed to process {}", detailsForLogging, e); } } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java index c6f10c9b5b91..85fc477cb8eb 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java @@ -17,19 +17,13 @@ package org.apache.hadoop.ozone.recon.scm; -import java.io.IOException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +36,7 @@ public class ReconIncrementalContainerReportHandler private static final Logger LOG = LoggerFactory.getLogger( ReconIncrementalContainerReportHandler.class); - public ReconIncrementalContainerReportHandler(NodeManager nodeManager, + ReconIncrementalContainerReportHandler(NodeManager nodeManager, ContainerManager containerManager, SCMContext scmContext) { super(nodeManager, containerManager, scmContext); } @@ -55,16 +49,8 @@ protected Logger getLogger() { @Override public void onMessage(final IncrementalContainerReportFromDatanode report, final EventPublisher publisher) { - final DatanodeDetails dnFromReport = report.getDatanodeDetails(); - if (LOG.isDebugEnabled()) { - LOG.debug("Processing incremental container report from data node {}", - dnFromReport); - } - - final DatanodeDetails dd = getNodeManager().getNode(dnFromReport.getID()); - if (dd == null) { - LOG.warn("Received container report from unknown datanode {}", - dnFromReport); + final DatanodeDetails datanode = getDatanodeDetails(report); + if (datanode == null) { return; } @@ -77,36 +63,6 @@ public void onMessage(final IncrementalContainerReportFromDatanode report, LOG.error("Exception while checking and adding new container.", ioEx); return; } - boolean success = true; - for (ContainerReplicaProto replicaProto : - report.getReport().getReportList()) { - ContainerID id = ContainerID.valueOf(replicaProto.getContainerID()); - ContainerInfo container = null; - try { - try { - container = getContainerManager().getContainer(id); - // Ensure we reuse the same ContainerID instance in containerInfo - id = container.containerID(); - } finally { - if (replicaProto.getState().equals( - ContainerReplicaProto.State.DELETED)) { - getNodeManager().removeContainer(dd, id); - } else { - getNodeManager().addContainer(dd, id); - } - } - processContainerReplica(dd, replicaProto, publisher); - success = true; - } catch (NodeNotFoundException ex) { - success = false; - LOG.error("Received ICR from unknown datanode {}.", - report.getDatanodeDetails(), ex); - } catch (IOException | InvalidStateTransitionException e) { - success = false; - LOG.error("Exception while processing ICR for container {}", - replicaProto.getContainerID()); - } - } - containerManager.notifyContainerReportProcessing(false, success); + processICR(report, publisher, datanode); } }