Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public DatanodeDetails(DatanodeDetails datanodeDetails) {
this.hostName = datanodeDetails.hostName;
this.ports = datanodeDetails.ports;
this.setNetworkName(datanodeDetails.getNetworkName());
this.setParent(datanodeDetails.getParent());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void incrementCount(List<Long> txIDs) throws IOException {
if (block == null) {
// Should we make this an error ? How can we not find the deleted
// TXID?
LOG.warn("Deleted TXID not found.");
LOG.warn("Deleted TXID {} not found.", txID);
continue;
}
DeletedBlocksTransaction.Builder builder = block.toBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,15 @@ public ContainerReportHandler(final NodeManager nodeManager,
public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
final EventPublisher publisher) {

final DatanodeDetails datanodeDetails =
final DatanodeDetails dnFromReport =
reportFromDatanode.getDatanodeDetails();
DatanodeDetails datanodeDetails =
nodeManager.getNodeByUuid(dnFromReport.getUuidString());
if (datanodeDetails == null) {
LOG.warn("Received container report from unknown datanode {}",
dnFromReport);
return;
}
final ContainerReportsProto containerReport =
reportFromDatanode.getReport();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,23 @@ public IncrementalContainerReportHandler(
@Override
public void onMessage(final IncrementalContainerReportFromDatanode report,
final EventPublisher publisher) {
final DatanodeDetails dnFromReport = report.getDatanodeDetails();
if (LOG.isDebugEnabled()) {
LOG.debug("Processing incremental container report from data node {}",
report.getDatanodeDetails().getUuid());
dnFromReport.getUuid());
}
DatanodeDetails dd =
nodeManager.getNodeByUuid(dnFromReport.getUuidString());
if (dd == null) {
LOG.warn("Received container report from unknown datanode {}",
dnFromReport);
return;
}

boolean success = true;
for (ContainerReplicaProto replicaProto :
report.getReport().getReportList()) {
try {
final DatanodeDetails dd = report.getDatanodeDetails();
final ContainerID id = ContainerID.valueof(
replicaProto.getContainerID());
if (!replicaProto.getState().equals(
Expand All @@ -81,7 +88,7 @@ public void onMessage(final IncrementalContainerReportFromDatanode report,
} catch (IOException e) {
success = false;
LOG.error("Exception while processing ICR for container {}",
replicaProto.getContainerID());
replicaProto.getContainerID(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,21 @@ public RegisteredCommand register(
if (networkLocation != null) {
datanodeDetails.setNetworkLocation(networkLocation);
}
nodeStateManager.addNode(datanodeDetails);
clusterMap.add(datanodeDetails);
addEntryTodnsToUuidMap(dnsName, datanodeDetails.getUuidString());
// Updating Node Report, as registration is successful
processNodeReport(datanodeDetails, nodeReport);
LOG.info("Registered Data node : {}", datanodeDetails);
if (!isNodeRegistered(datanodeDetails)) {
clusterMap.add(datanodeDetails);
nodeStateManager.addNode(datanodeDetails);
// Check that datanode in nodeStateManager has topology parent set
DatanodeDetails dn =
getNodeByUuid(datanodeDetails.getUuid().toString());
Preconditions.checkState(dn.getParent() != null);
addEntryTodnsToUuidMap(dnsName, datanodeDetails.getUuidString());
// Updating Node Report, as registration is successful
processNodeReport(datanodeDetails, nodeReport);
LOG.info("Registered Data node : {}", datanodeDetails);
} else {
LOG.trace("Datanode is already registered. Datanode: {}",
datanodeDetails.toString());
}
} catch (NodeAlreadyExistsException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Datanode is already registered. Datanode: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand All @@ -26,18 +26,27 @@
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.IncrementalContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Set;
import java.util.UUID;

import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
Expand All @@ -55,9 +64,18 @@ public class TestIncrementalContainerReportHandler {

@Before
public void setup() throws IOException {
final ConfigurationSource conf = new OzoneConfiguration();
final OzoneConfiguration conf = new OzoneConfiguration();
final String path =
GenericTestUtils.getTempPath(UUID.randomUUID().toString());
Path scmPath = Paths.get(path, "scm-meta");
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
this.containerManager = Mockito.mock(ContainerManager.class);
this.nodeManager = Mockito.mock(NodeManager.class);
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
EventQueue eventQueue = new EventQueue();
SCMStorageConfig storageConfig = new SCMStorageConfig(conf);
this.nodeManager =
new SCMNodeManager(conf, storageConfig, eventQueue, clusterMap);

this.containerStateManager = new ContainerStateManager(conf);
this.publisher = Mockito.mock(EventPublisher.class);

Expand Down Expand Up @@ -105,6 +123,9 @@ public void testClosingToClosed() throws IOException {
final DatanodeDetails datanodeOne = randomDatanodeDetails();
final DatanodeDetails datanodeTwo = randomDatanodeDetails();
final DatanodeDetails datanodeThree = randomDatanodeDetails();
nodeManager.register(datanodeOne, null, null);
nodeManager.register(datanodeTwo, null, null);
nodeManager.register(datanodeThree, null, null);
final Set<ContainerReplica> containerReplicas = getReplicas(
container.containerID(),
ContainerReplicaProto.State.CLOSING,
Expand Down Expand Up @@ -139,6 +160,9 @@ public void testClosingToQuasiClosed() throws IOException {
final DatanodeDetails datanodeOne = randomDatanodeDetails();
final DatanodeDetails datanodeTwo = randomDatanodeDetails();
final DatanodeDetails datanodeThree = randomDatanodeDetails();
nodeManager.register(datanodeOne, null, null);
nodeManager.register(datanodeTwo, null, null);
nodeManager.register(datanodeThree, null, null);
final Set<ContainerReplica> containerReplicas = getReplicas(
container.containerID(),
ContainerReplicaProto.State.CLOSING,
Expand Down Expand Up @@ -174,6 +198,9 @@ public void testQuasiClosedToClosed() throws IOException {
final DatanodeDetails datanodeOne = randomDatanodeDetails();
final DatanodeDetails datanodeTwo = randomDatanodeDetails();
final DatanodeDetails datanodeThree = randomDatanodeDetails();
nodeManager.register(datanodeOne, null, null);
nodeManager.register(datanodeTwo, null, null);
nodeManager.register(datanodeThree, null, null);
final Set<ContainerReplica> containerReplicas = getReplicas(
container.containerID(),
ContainerReplicaProto.State.CLOSING,
Expand Down Expand Up @@ -212,6 +239,9 @@ public void testDeleteContainer() throws IOException {
final DatanodeDetails datanodeOne = randomDatanodeDetails();
final DatanodeDetails datanodeTwo = randomDatanodeDetails();
final DatanodeDetails datanodeThree = randomDatanodeDetails();
nodeManager.register(datanodeOne, null, null);
nodeManager.register(datanodeTwo, null, null);
nodeManager.register(datanodeThree, null, null);
final Set<ContainerReplica> containerReplicas = getReplicas(
container.containerID(),
ContainerReplicaProto.State.CLOSED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.Event;
Expand Down Expand Up @@ -56,9 +57,9 @@ public void resetEventCollector() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
SCMStorageConfig storageConfig = Mockito.mock(SCMStorageConfig.class);
Mockito.when(storageConfig.getClusterID()).thenReturn("cluster1");
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
nodeManager =
new SCMNodeManager(conf, storageConfig, new EventQueue(), Mockito.mock(
NetworkTopology.class));
new SCMNodeManager(conf, storageConfig, new EventQueue(), clusterMap);
nodeReportHandler = new NodeReportHandler(nodeManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,8 @@ public void testScmRegisterNodeWith4LayerNetworkTopology()
List<DatanodeDetails> nodeList = nodeManager.getAllNodes();
nodeList.stream().forEach(node ->
Assert.assertTrue(node.getNetworkLocation().startsWith("/rack1/ng")));
nodeList.stream().forEach(node ->
Assert.assertTrue(node.getParent() != null));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.node.SCMNodeMetrics;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
Expand Down Expand Up @@ -64,7 +64,7 @@ public static void setup() throws Exception {
SCMStorageConfig config =
new SCMStorageConfig(NodeType.DATANODE, new File("/tmp"), "storage");
nodeManager = new SCMNodeManager(source, config, publisher,
Mockito.mock(NetworkTopology.class));
new NetworkTopologyImpl(source));

registeredDatanode = DatanodeDetails.newBuilder()
.setHostName("localhost")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.*;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
Expand All @@ -32,6 +34,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -42,6 +45,7 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.Set;

/**
* This class tests container report with DN container state info.
Expand Down Expand Up @@ -122,6 +126,12 @@ public void testContainerReportKeyWrite() throws Exception {


ContainerInfo cinfo = scm.getContainerInfo(keyInfo.getContainerID());
Set<ContainerReplica> replicas =
scm.getContainerManager().getContainerReplicas(
new ContainerID(keyInfo.getContainerID()));
Assert.assertTrue(replicas.size() == 1);
replicas.stream().forEach(rp ->
Assert.assertTrue(rp.getDatanodeDetails().getParent() != null));

LOG.info("SCM Container Info keyCount: {} usedBytes: {}",
cinfo.getNumberOfKeys(), cinfo.getUsedBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,18 @@ public ReconIncrementalContainerReportHandler(NodeManager nodeManager,
@Override
public void onMessage(final IncrementalContainerReportFromDatanode report,
final EventPublisher publisher) {
final DatanodeDetails dnFromReport = report.getDatanodeDetails();
if (LOG.isDebugEnabled()) {
LOG.debug("Processing incremental container report from data node {}",
report.getDatanodeDetails());
dnFromReport);
}

DatanodeDetails dd =
getNodeManager().getNodeByUuid(dnFromReport.getUuidString());
if (dd == null) {
LOG.warn("Received container report from unknown datanode {}",
dnFromReport);
return;
}

ReconContainerManager containerManager =
Expand All @@ -61,7 +70,6 @@ public void onMessage(final IncrementalContainerReportFromDatanode report,
for (ContainerReplicaProto replicaProto :
report.getReport().getReportList()) {
try {
final DatanodeDetails dd = report.getDatanodeDetails();
final ContainerID id = ContainerID.valueof(
replicaProto.getContainerID());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,30 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.UUID;

import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;

/**
Expand All @@ -59,17 +68,26 @@ public void testProcessICR() throws IOException, NodeNotFoundException {
datanodeDetails.getUuidString());
when(reportMock.getReport()).thenReturn(containerReport);

NodeManager nodeManagerMock = mock(NodeManager.class);
final String path =
GenericTestUtils.getTempPath(UUID.randomUUID().toString());
Path scmPath = Paths.get(path, "scm-meta");
final OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
EventQueue eventQueue = new EventQueue();
SCMStorageConfig storageConfig = new SCMStorageConfig(conf);
NodeManager nodeManager =
new SCMNodeManager(conf, storageConfig, eventQueue, clusterMap);
nodeManager.register(datanodeDetails, null, null);

ReconContainerManager containerManager = getContainerManager();
ReconIncrementalContainerReportHandler reconIcr =
new ReconIncrementalContainerReportHandler(nodeManagerMock,
new ReconIncrementalContainerReportHandler(nodeManager,
containerManager);
EventPublisher eventPublisherMock = mock(EventPublisher.class);

reconIcr.onMessage(reportMock, eventPublisherMock);
verify(nodeManagerMock, times(1))
.addContainer(datanodeDetails, containerID);
nodeManager.addContainer(datanodeDetails, containerID);
assertTrue(containerManager.exists(containerID));
assertEquals(1, containerManager.getContainerReplicas(containerID).size());
}
Expand Down