Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
84b56f8
HDDS-5916: DNs in pipeline raft group get stuck in infinite leader el…
Mar 11, 2022
cb4adb9
fix rat and checkstyle errors
Mar 14, 2022
e51aa4b
fix findbugs errors
Mar 15, 2022
50ded1c
solve a edge case bug
Mar 30, 2022
5c9addf
fix test
Mar 31, 2022
a7e47b7
fix check style error
Mar 31, 2022
71f1b28
address PR comments
Apr 1, 2022
d7dbf36
address PR comments
Apr 1, 2022
9452d81
resolve conflicts
May 7, 2022
b89eed1
fix checkstyle error
May 7, 2022
e346e8b
trigger new CI check
adoroszlai May 9, 2022
878f989
Avoid new static import for easier merge from master
adoroszlai May 18, 2022
b807db6
Merge remote-tracking branch 'origin/master' into HDDS-5916-support-d…
adoroszlai May 18, 2022
424c788
address PR comments
May 20, 2022
e6156e6
fix compilation error
May 20, 2022
f94bb3d
Test read/write after restart
adoroszlai May 23, 2022
c7993ae
Remove ozone-dn-restart env
adoroszlai May 23, 2022
52d20d2
Fix smoketest path
adoroszlai May 23, 2022
aa2fb97
Default values for variables
adoroszlai May 23, 2022
627a568
Skip chunk generator/validator in kubernetes
adoroszlai May 23, 2022
4404106
merge ozone and ozne-dn-restart environment
May 30, 2022
8bbca39
delete unecessary dn env setting
May 30, 2022
aeeee27
trun two logs from debug to info
May 31, 2022
db20d84
make block allocation more reliable when there are pipelines are in a…
Jun 1, 2022
62c46eb
address PR comments
Jun 1, 2022
1a81703
fix checkstyle error
Jun 2, 2022
2c5f812
fix a comment
Jun 11, 2022
27f2d76
add more unit test
Jun 16, 2022
7941838
resovle conflict
Jun 16, 2022
7ce0077
fix test failure
Jun 17, 2022
e742158
working
Jun 17, 2022
d8f4822
cleanup
Jun 17, 2022
801e856
Merge remote-tracking branch 'origin/master' into HDDS-5916-support-d…
adoroszlai Jun 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
Expand All @@ -39,6 +40,7 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;

import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
Expand All @@ -65,6 +67,8 @@ public final class RatisHelper {

private static final Logger LOG = LoggerFactory.getLogger(RatisHelper.class);

private static final OzoneConfiguration CONF = new OzoneConfiguration();

// Prefix for Ratis Server GRPC and Ratis client conf.
public static final String HDDS_DATANODE_RATIS_PREFIX_KEY = "hdds.ratis";

Expand Down Expand Up @@ -97,7 +101,18 @@ public static UUID toDatanodeId(RaftProtos.RaftPeerProto peerId) {
}

private static String toRaftPeerAddress(DatanodeDetails id, Port.Name port) {
return id.getIpAddress() + ":" + id.getPort(port).getValue();
if (datanodeUseHostName()) {
final String address =
id.getHostName() + ":" + id.getPort(port).getValue();
LOG.debug("Datanode is using hostname for raft peer address: {}",
address);
return address;
} else {
final String address =
id.getIpAddress() + ":" + id.getPort(port).getValue();
LOG.debug("Datanode is using IP for raft peer address: {}", address);
return address;
}
}

public static RaftPeerId toRaftPeerId(DatanodeDetails id) {
Expand Down Expand Up @@ -369,6 +384,12 @@ public static Long getMinReplicatedIndex(
.min(Long::compareTo).orElse(null);
}

private static boolean datanodeUseHostName() {
return CONF.getBoolean(
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
}

private static <U> Class<? extends U> getClass(String name,
Class<U> xface) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ public InvalidTopologyException(String msg) {
*/
void add(Node node);

/**
* Update a node. This will be called when a datanode needs to be updated.
* If the old datanode does not exist, then just add the new datanode.
* @param oldNode node to be updated; can be null
* @param newNode node to update to; cannot be null
*/
void update(Node oldNode, Node newNode);

/**
* Remove a node from the network topology. This will be called when a
* existing datanode is removed from the system.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,59 @@ public void add(Node node) {
}
}

/**
* Update a leaf node. It is called when a datanode needs to be updated.
* If the old datanode does not exist, then just add the new datanode.
* @param oldNode node to be updated; can be null
* @param newNode node to update to; cannot be null
*/
@Override
public void update(Node oldNode, Node newNode) {
Preconditions.checkArgument(newNode != null, "newNode cannot be null");
if (oldNode != null && oldNode instanceof InnerNode) {
throw new IllegalArgumentException(
"Not allowed to update an inner node: "
+ oldNode.getNetworkFullPath());
}

if (newNode instanceof InnerNode) {
throw new IllegalArgumentException(
"Not allowed to update a leaf node to an inner node: "
+ newNode.getNetworkFullPath());
}

int newDepth = NetUtils.locationToDepth(newNode.getNetworkLocation()) + 1;
// Check depth
if (maxLevel != newDepth) {
throw new InvalidTopologyException("Failed to update to " +
newNode.getNetworkFullPath()
+ ": Its path depth is not "
+ maxLevel);
}

netlock.writeLock().lock();
boolean add;
try {
boolean exist = false;
if (oldNode != null) {
exist = containsNode(oldNode);
}
if (exist) {
clusterTree.remove(oldNode);
}

add = clusterTree.add(newNode);
} finally {
netlock.writeLock().unlock();
}
if (add) {
LOG.info("Updated to the new node: {}", newNode.getNetworkFullPath());
if (LOG.isDebugEnabled()) {
LOG.debug("NetworkTopology became:\n{}", this);
}
}
}

/**
* Remove a node from the network topology. This will be called when a
* existing datanode is removed from the system.
Expand Down Expand Up @@ -150,16 +203,20 @@ public boolean contains(Node node) {
Preconditions.checkArgument(node != null, "node cannot be null");
netlock.readLock().lock();
try {
Node parent = node.getParent();
while (parent != null && parent != clusterTree) {
parent = parent.getParent();
}
if (parent == clusterTree) {
return true;
}
return containsNode(node);
} finally {
netlock.readLock().unlock();
}
}

private boolean containsNode(Node node) {
Node parent = node.getParent();
while (parent != null && parent != clusterTree) {
parent = parent.getParent();
}
if (parent == clusterTree) {
return true;
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,44 @@ public void testSingleNodeRackWithAffinityNode() {
}

@Test
public void testUpdateNode() {
List<NodeSchema> schemas = new ArrayList<>();
schemas.add(ROOT_SCHEMA);
schemas.add(DATACENTER_SCHEMA);
schemas.add(RACK_SCHEMA);
schemas.add(LEAF_SCHEMA);

NodeSchemaManager manager = NodeSchemaManager.getInstance();
manager.init(schemas.toArray(new NodeSchema[0]), true);
NetworkTopology newCluster =
new NetworkTopologyImpl(manager);
Node node = createDatanode("1.1.1.1", "/d1/r1");
newCluster.add(node);
assertTrue(newCluster.contains(node));

// update
Node newNode = createDatanode("1.1.1.2", "/d1/r1");
assertFalse(newCluster.contains(newNode));
newCluster.update(node, newNode);
assertFalse(newCluster.contains(node));
assertTrue(newCluster.contains(newNode));

// update a non-existing node
Node nodeExisting = createDatanode("1.1.1.3", "/d1/r1");
Node newNode2 = createDatanode("1.1.1.4", "/d1/r1");
assertFalse(newCluster.contains(nodeExisting));
assertFalse(newCluster.contains(newNode2));

newCluster.update(nodeExisting, newNode2);
assertFalse(newCluster.contains(nodeExisting));
assertTrue(newCluster.contains(newNode2));

// old node is null
Node newNode3 = createDatanode("1.1.1.5", "/d1/r1");
assertFalse(newCluster.contains(newNode3));
newCluster.update(null, newNode3);
assertTrue(newCluster.contains(newNode3));
}
public void testIsAncestor() {
NodeImpl r1 = new NodeImpl("r1", "/", NODE_COST_DEFAULT);
NodeImpl r12 = new NodeImpl("r12", "/", NODE_COST_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private void persistContainerDatanodeDetails() {
File idPath = new File(dataNodeIDPath);
DatanodeDetails datanodeDetails = this.context.getParent()
.getDatanodeDetails();
if (datanodeDetails != null && !idPath.exists()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for dropping this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because when the datanode got restarted in k8s, the IP will be changed. So the original info in this file is not accurate any more. This will make sure we update with the latest info.

And when we are not using k8s, I think it is not harmful to always update this file whenever the node restarts.

if (datanodeDetails != null) {
try {
ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import org.apache.hadoop.hdds.scm.net.NodeImpl;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;

Expand Down Expand Up @@ -58,10 +61,29 @@ public class EventQueue implements EventPublisher, AutoCloseable {

private boolean isRunning = true;

private static final Gson TRACING_SERIALIZER = new GsonBuilder().create();
private static final Gson TRACING_SERIALIZER = new GsonBuilder()
.setExclusionStrategies(new DatanodeDetailsGsonExclusionStrategy())
.create();

private boolean isSilent = false;

// The field parent in DatanodeDetails class has the circular reference
// which will result in Gson infinite recursive parsing. We need to exclude
// this field when generating json string for DatanodeDetails object
static class DatanodeDetailsGsonExclusionStrategy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change can be merged as a quick PR and not wait on this PR.

implements ExclusionStrategy {
@Override
public boolean shouldSkipField(FieldAttributes f) {
return f.getDeclaringClass() == NodeImpl.class
&& f.getName().equals("parent");
}

@Override
public boolean shouldSkipClass(Class<?> aClass) {
return false;
}
}

/**
* Add new handler to the event queue.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ public final class SCMEvents {
public static final TypedEvent<DatanodeDetails> NEW_NODE =
new TypedEvent<>(DatanodeDetails.class, "New_Node");

/**
* This event will be triggered whenever a datanode is registered with
* SCM with a different Ip or host name.
*/
public static final TypedEvent<DatanodeDetails> NODE_ADDRESS_UPDATE =
new TypedEvent<>(DatanodeDetails.class, "Node_Address_Update");

/**
* This event will be triggered whenever a datanode is moved from healthy to
* stale state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ enum ServiceStatus {
enum Event {
PRE_CHECK_COMPLETED,
NEW_NODE_HANDLER_TRIGGERED,
NODE_ADDRESS_UPDATE_HANDLER_TRIGGERED,
UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public NewNodeHandler(PipelineManager pipelineManager,
public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) {
try {
pipelineManager.closeStalePipelines(datanodeDetails);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is closeStalePipelines necessary here? Since when SCM processes the register command, it should be able to distinguish the new node / updated node, and here should be only responsible for the new node case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. This is necessary. I believe in my testing, if a datanode is dead for a long time, SCM will remove it from the registration list. When the node comes up with a different IP, it first registers with SCM, and SCM treat it as a new node. But the old pipeline with the old IPs may still be there.

Another way to achieve this is to delete the pipelines if SCM is going to remove the dead nodes. But I am not that familiar with this part of the code. I may need to have a further look.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to your implementation, when the node comes up with a different IP, it will register first with SCM, SCM node manager will get it as long as its UUID is not changed through isNodeRegistered. Since it triggers the event of address updated event and in which it will close state pipelines and update node info, also creating new pipelines.
For the New node case, I think we do not need this close action.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I think I misstated my case. You are right. When a datanode is dead for a long time, SCM actually won't remove it from its registration list. So when this node with the same uuid comes up again with different IP, it will fall to update address condition, instead of registering new node.

However, there is another case. If the SCM also restarts, then it will lose all its in memory node registration map, but it still have all the old pipelines since pipelines are read from persistent. So in this case, if the datanode changes its IP, and come to register with SCM, SCM will treat it as a new node instead of a known node with different IP. So in this case, we still need to close all the stale pipelines which has the old IPs for this datanode.

Please let me know if my above statement makes sense to you. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, thx for the explanation.

serviceManager.notifyEventTriggered(Event.NEW_NODE_HANDLER_TRIGGERED);

if (datanodeDetails.getPersistedOpState()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.node;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ha.SCMService;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Handles datanode ip or hostname change event.
*/
public class NodeAddressUpdateHandler
implements EventHandler<DatanodeDetails> {
private static final Logger LOG =
LoggerFactory.getLogger(NodeAddressUpdateHandler.class);

private final PipelineManager pipelineManager;
private final NodeDecommissionManager decommissionManager;
private final SCMServiceManager serviceManager;

public NodeAddressUpdateHandler(PipelineManager pipelineManager,
NodeDecommissionManager
decommissionManager,
SCMServiceManager serviceManager) {
this.pipelineManager = pipelineManager;
this.decommissionManager = decommissionManager;
this.serviceManager = serviceManager;
}

@Override
public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) {
try {
LOG.info("Closing stale pipelines for datanode: {}", datanodeDetails);
pipelineManager.closeStalePipelines(datanodeDetails);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is closing pipelines necessary even if using datanode hostname instead of IP? If two or three datanodes of some pipeline are restarted, a new pipeline is created as they register, and the first one or two pipelines are then closed very soon.

2022-06-01 10:01:03 INFO  PipelineStateManagerImpl:101 - Created pipeline Pipeline[ Id: 56d494b4-9faf-47ca-ab77-585281316c58, Nodes: 06b25030-0703-4edf-83bb-26fd94666a0d{ip: 10.42.0.40, host: datanode-2.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}76504e44-bd67-405b-9d90-95cb8e20dfd9{ip: 10.42.0.36, host: datanode-0.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}2f2163ec-bb96-4432-8418-dcfd7b829cbf{ip: 10.42.0.38, host: datanode-1.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}, ReplicationConfig: RATIS/THREE, State:ALLOCATED, leaderId:, CreationTimestamp2022-06-01T10:01:03.639Z[UTC]].
2022-06-01 10:02:15 INFO  SCMNodeManager:433 - Updated Datanode to: 76504e44-bd67-405b-9d90-95cb8e20dfd9{ip: 10.42.0.41, host: datanode-0.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}
2022-06-01 10:02:15 INFO  PipelineManagerImpl:461 - Closed pipeline: PipelineID=56d494b4-9faf-47ca-ab77-585281316c58
2022-06-01 10:02:15 INFO  PipelineStateManagerImpl:101 - Created pipeline Pipeline[ Id: 98579454-a0cd-49a0-b5b4-3c0a2a2376ef, Nodes: 2f2163ec-bb96-4432-8418-dcfd7b829cbf{ip: 10.42.0.38, host: datanode-1.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}76504e44-bd67-405b-9d90-95cb8e20dfd9{ip: 10.42.0.41, host: datanode-0.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}06b25030-0703-4edf-83bb-26fd94666a0d{ip: 10.42.0.40, host: datanode-2.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}, ReplicationConfig: RATIS/THREE, State:ALLOCATED, leaderId:, CreationTimestamp2022-06-01T10:02:15.319Z[UTC]].
2022-06-01 10:02:17 INFO  SCMNodeManager:433 - Updated Datanode to: 2f2163ec-bb96-4432-8418-dcfd7b829cbf{ip: 10.42.0.42, host: datanode-1.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}
2022-06-01 10:02:17 INFO  PipelineManagerImpl:461 - Closed pipeline: PipelineID=98579454-a0cd-49a0-b5b4-3c0a2a2376ef
2022-06-01 10:02:17 INFO  PipelineStateManagerImpl:101 - Created pipeline Pipeline[ Id: 62cfbe84-ea34-49f9-992a-6ab42ba74e0f, Nodes: 06b25030-0703-4edf-83bb-26fd94666a0d{ip: 10.42.0.40, host: datanode-2.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}2f2163ec-bb96-4432-8418-dcfd7b829cbf{ip: 10.42.0.42, host: datanode-1.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}76504e44-bd67-405b-9d90-95cb8e20dfd9{ip: 10.42.0.41, host: datanode-0.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}, ReplicationConfig: RATIS/THREE, State:ALLOCATED, leaderId:, CreationTimestamp2022-06-01T10:02:17.105Z[UTC]].
2022-06-01 10:02:25 INFO  SCMNodeManager:433 - Updated Datanode to: 06b25030-0703-4edf-83bb-26fd94666a0d{ip: 10.42.0.43, host: datanode-2.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}
2022-06-01 10:02:25 INFO  PipelineManagerImpl:461 - Closed pipeline: PipelineID=62cfbe84-ea34-49f9-992a-6ab42ba74e0f
2022-06-01 10:02:25 INFO  PipelineStateManagerImpl:101 - Created pipeline Pipeline[ Id: 86d151b0-0b6f-486e-904f-772d8a0b4131, Nodes: 2f2163ec-bb96-4432-8418-dcfd7b829cbf{ip: 10.42.0.42, host: datanode-1.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}06b25030-0703-4edf-83bb-26fd94666a0d{ip: 10.42.0.43, host: datanode-2.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}76504e44-bd67-405b-9d90-95cb8e20dfd9{ip: 10.42.0.41, host: datanode-0.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}, ReplicationConfig: RATIS/THREE, State:ALLOCATED, leaderId:, CreationTimestamp2022-06-01T10:02:25.141Z[UTC]].

Copy link
Contributor Author

@sokui sokui Jun 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is closing pipelines necessary even if using datanode hostname instead of IP

The problem is that pipeline is using IP address. The datanode host name config only impact the ratis address. Ideally I think we should use hostname in pipeline. However, pipeline info is kept in persistent, if we we change the code to only support the hostname, then it will be not backward compatible with the pipelines already in the DB. If we decide to support both hostname and IP address, then the logic would be more complex: we should figure out one IP is corresponding to one hostname so that pipeline will not be created duplicately. Use hostname in pipeline is a big change. We need to consider more about it. Therefore, I think it should be another PR if we decide to make the change.

If two or three datanodes of some pipeline are restarted, a new pipeline is created as they register, and the first one or two pipelines are then closed very soon.

Is this a question or statement?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a statement based on behavior I saw while checking kubernetes test results where 3 datanodes are restarted. See the log above. Pipeline 98579454-... is created when datanode-0 registers with new IP. It is then closed when datanode-1 registers, new pipeline 62cfbe84-... is created, etc.

But I guess quick restarts are less realistic in non-test environment, so it may be OK for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2022-06-01 10:01:03 INFO  PipelineStateManagerImpl:101 - Created pipeline Pipeline[ Id: 56d494b4-9faf-47ca-ab77-585281316c58, Nodes: 06b25030-0703-4edf-83bb-26fd94666a0d{ip: 10.42.0.40, host: datanode-2.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}76504e44-bd67-405b-9d90-95cb8e20dfd9{ip: 10.42.0.36, host: datanode-0.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}2f2163ec-bb96-4432-8418-dcfd7b829cbf{ip: 10.42.0.38, host: datanode-1.datanode.default.svc.cluster.local, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}, ReplicationConfig: RATIS/THREE, State:ALLOCATED, leaderId:, CreationTimestamp2022-06-01T10:01:03.639Z[UTC]].

I think the above log is before the restarting. After restarting, the right order is that: first, datanode update its IP address in SCM; second, SCM closes all pipelines belonging to this datanode's old IP address; third, SCM create new pipeline(s) for this datanode with the new IP

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, pipeline 56d494b4-... is before the restart. This gets closed when first DN re-registers. My point is that the new pipeline created after this DN registers gets closed right away when the second DN re-registers.

Copy link
Contributor Author

@sokui sokui Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I got what you mean. This is because the restarts are so quick that SCM thinks these 3 datanodes keep alive. So when the first DN re-registers and tries to create the new pipeline, SCM thinks the other two DN are still good, and thus uses the old info (because the other two DNs have not re-registered yet). Same thing happens when the 2nd DN re-registeres, where SCM thinks the 3rd DN is good and uses its old info.

For quick restarts, I do not think there is a good way for SCM to recognize these DN's are dead/restarted. Therefore, we rely on the logic of this PR for waiting ALLOCATED pipelines to be OPEN. In such way, the block allocation can still get the correct pipeline eventually.

serviceManager.notifyEventTriggered(SCMService.Event
.NODE_ADDRESS_UPDATE_HANDLER_TRIGGERED);

decommissionManager.continueAdminForNode(datanodeDetails);
} catch (NodeNotFoundException e) {
// Should not happen, as the node has just registered to call this event
// handler.
LOG.error(
"NodeNotFound when updating the node Ip or host name to the " +
"decommissionManager",
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,28 @@ public void updateLastKnownLayoutVersion(DatanodeDetails datanodeDetails,
.updateLastKnownLayoutVersion(layoutInfo);
}

/**
* Update node.
*
* @param datanodeDetails the datanode details
* @param layoutInfo the layoutInfo
* @throws NodeNotFoundException the node not found exception
*/
public void updateNode(DatanodeDetails datanodeDetails,
LayoutVersionProto layoutInfo)
throws NodeNotFoundException {
DatanodeInfo datanodeInfo =
nodeStateMap.getNodeInfo(datanodeDetails.getUuid());
NodeStatus newNodeStatus = newNodeStatus(datanodeDetails, layoutInfo);
LOG.info("updating node {} from {} to {} with status {}",
datanodeDetails.getUuidString(),
datanodeInfo,
datanodeDetails,
newNodeStatus);
nodeStateMap.updateNode(datanodeDetails, newNodeStatus, layoutInfo);
updateLastKnownLayoutVersion(datanodeDetails, layoutInfo);
}

/**
* Returns the current state of the node.
*
Expand Down
Loading