From bb6ffe523de87085f546946e5d33d8658f7e141e Mon Sep 17 00:00:00 2001 From: Glen Geng Date: Wed, 30 Dec 2020 10:54:22 +0800 Subject: [PATCH] HDDS-4632: Add term into SetNodeOperationalStateCommand. --- .../common/states/endpoint/HeartbeatEndpointTask.java | 4 ++++ .../src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto | 2 +- .../java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index f656d84c41f7..32dd3e3bddd6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -352,6 +352,10 @@ private void processResponse(SCMHeartbeatResponseProto response, SetNodeOperationalStateCommand setNodeOperationalStateCommand = SetNodeOperationalStateCommand.getFromProtobuf( commandResponseProto.getSetNodeOperationalStateCommandProto()); + if (commandResponseProto.hasTerm()) { + setNodeOperationalStateCommand.setTerm( + commandResponseProto.getTerm()); + } if (LOG.isDebugEnabled()) { LOG.debug("Received SCM set operational state command. State: {} " + "Expiry: {}", setNodeOperationalStateCommand.getOpState(), diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto index 8a1db33988d9..c7a22934509d 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto @@ -304,13 +304,13 @@ message SCMCommandProto { optional ReplicateContainerCommandProto replicateContainerCommandProto = 6; optional CreatePipelineCommandProto createPipelineCommandProto = 7; optional ClosePipelineCommandProto closePipelineCommandProto = 8; + optional SetNodeOperationalStateCommandProto setNodeOperationalStateCommandProto = 9; // Under HA mode, holds term of underlying RaftServer iff current // SCM is a leader, otherwise, holds term 0. // Notes that, the first elected leader is from term 1, term 0, // as the initial value of currentTerm, is never used under HA mode. optional uint64 term = 15; - optional SetNodeOperationalStateCommandProto setNodeOperationalStateCommandProto = 9; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index a7c391757f48..da51bea7cfdf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -436,7 +436,7 @@ private void updateDatanodeOpState(DatanodeDetails reportedDn) reportedDn.getPersistedOpStateExpiryEpochSec(), scmStatus.getOperationalState(), scmStatus.getOpStateExpiryEpochSeconds()); - commandQueue.addCommand(reportedDn.getUuid(), + addDatanodeCommand(reportedDn.getUuid(), new SetNodeOperationalStateCommand( Time.monotonicNow(), scmStatus.getOperationalState(), scmStatus.getOpStateExpiryEpochSeconds()));