diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index e90250efe91c..c4d3428a8018 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -730,12 +730,15 @@ private void initTermOfLeaderSCM() { * Always record the latest term that has seen. */ private void updateTermOfLeaderSCM(SCMCommand command) { + updateTermOfLeaderSCM(command.getTerm()); + } + + public void updateTermOfLeaderSCM(final long newTerm) { if (!termOfLeaderSCM.isPresent()) { return; } final long currentTerm = termOfLeaderSCM.getAsLong(); - final long newTerm = command.getTerm(); if (currentTerm < newTerm) { setTermOfLeaderSCM(newTerm); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index a694ba00becd..3a1bd8ffb31d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -307,6 +307,9 @@ private void processResponse(SCMHeartbeatResponseProto response, Preconditions.checkState(response.getDatanodeUUID() .equalsIgnoreCase(datanodeDetails.getUuid()), "Unexpected datanode ID in the response."); + if (response.hasTerm()) { + context.updateTermOfLeaderSCM(response.getTerm()); + } // Verify the response is indeed for this datanode. for (SCMCommandProto commandResponseProto : response.getCommandsList()) { switch (commandResponseProto.getCommandType()) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java index eeb373671f5e..a9a933d4c980 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.UUID; import org.apache.hadoop.hdds.client.ECReplicationConfig; @@ -107,6 +108,7 @@ public void handlesReconstructContainerCommand() throws Exception { @Test public void testheartbeatWithoutReports() throws Exception { + final long termInSCM = 42; StorageContainerDatanodeProtocolClientSideTranslatorPB scm = Mockito.mock( StorageContainerDatanodeProtocolClientSideTranslatorPB.class); @@ -118,9 +120,15 @@ public void testheartbeatWithoutReports() throws Exception { .setDatanodeUUID( ((SCMHeartbeatRequestProto)invocation.getArgument(0)) .getDatanodeDetails().getUuid()) + .setTerm(termInSCM) .build()); - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(scm); + OzoneConfiguration conf = new OzoneConfiguration(); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + Mockito.mock(DatanodeStateMachine.class)); + context.setTermOfLeaderSCM(1); + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( + conf, context, scm); endpointTask.call(); SCMHeartbeatRequestProto heartbeat = argument.getValue(); Assertions.assertTrue(heartbeat.hasDatanodeDetails()); @@ -128,6 +136,9 @@ public void testheartbeatWithoutReports() throws Exception { Assertions.assertFalse(heartbeat.hasContainerReport()); Assertions.assertTrue(heartbeat.getCommandStatusReportsCount() == 0); Assertions.assertFalse(heartbeat.hasContainerActions()); + OptionalLong termInDatanode = context.getTermOfLeaderSCM(); + Assertions.assertTrue(termInDatanode.isPresent()); + Assertions.assertEquals(termInSCM, termInDatanode.getAsLong()); } @Test @@ -314,22 +325,6 @@ public void testheartbeatWithAllReports() throws Exception { } } - /** - * Creates HeartbeatEndpointTask for the given StorageContainerManager proxy. - * - * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB - * - * @return HeartbeatEndpointTask - */ - private HeartbeatEndpointTask getHeartbeatEndpointTask( - StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) { - OzoneConfiguration conf = new OzoneConfiguration(); - StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - Mockito.mock(DatanodeStateMachine.class)); - return getHeartbeatEndpointTask(conf, context, proxy); - - } - /** * Creates HeartbeatEndpointTask with the given conf, context and * StorageContainerManager client side proxy. diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto index 7f8476d99500..39d6a2931e93 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto @@ -152,6 +152,9 @@ message CommandQueueReportProto { message SCMHeartbeatResponseProto { required string datanodeUUID = 1; repeated SCMCommandProto commands = 2; + + // Same as term in SCMCommandProto + optional int64 term = 3; } message SCMNodeAddressList { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 467adbe23a4d..bc00f4d142be 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -48,6 +49,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode; @@ -80,6 +82,7 @@ import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.ratis.protocol.exceptions.NotLeaderException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -104,6 +107,7 @@ import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer; import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName; import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,9 +134,12 @@ public class SCMDatanodeProtocolServer implements private final EventPublisher eventPublisher; private ProtocolMessageMetrics protocolMessageMetrics; + private final SCMContext scmContext; + public SCMDatanodeProtocolServer(final OzoneConfiguration conf, OzoneStorageContainerManager scm, - EventPublisher eventPublisher) + EventPublisher eventPublisher, + SCMContext scmContext) throws IOException { // This constructor has broken down to smaller methods so that Recon's @@ -142,6 +149,7 @@ public SCMDatanodeProtocolServer(final OzoneConfiguration conf, this.scm = scm; this.eventPublisher = eventPublisher; + this.scmContext = scmContext; heartbeatDispatcher = new SCMDatanodeHeartbeatDispatcher( scm.getScmNodeManager(), eventPublisher); @@ -274,14 +282,19 @@ public SCMHeartbeatResponseProto sendHeartbeat( for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) { cmdResponses.add(getCommandResponse(cmd, scm)); } + final OptionalLong term = getTermIfLeader(); boolean auditSuccess = true; Map auditMap = Maps.newHashMap(); auditMap.put("datanodeUUID", heartbeat.getDatanodeDetails().getUuid()); auditMap.put("command", flatten(cmdResponses.toString())); + term.ifPresent(t -> auditMap.put("term", String.valueOf(t))); try { - return SCMHeartbeatResponseProto.newBuilder() - .setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid()) - .addAllCommands(cmdResponses).build(); + SCMHeartbeatResponseProto.Builder builder = + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid()) + .addAllCommands(cmdResponses); + term.ifPresent(builder::setTerm); + return builder.build(); } catch (Exception ex) { auditSuccess = false; AUDIT.logWriteFailure( @@ -297,6 +310,17 @@ public SCMHeartbeatResponseProto sendHeartbeat( } } + private OptionalLong getTermIfLeader() { + if (scmContext != null && scmContext.isLeader()) { + try { + return OptionalLong.of(scmContext.getTermOfLeader()); + } catch (NotLeaderException e) { + // only leader should distribute current term + } + } + return OptionalLong.empty(); + } + /** * Returns a SCMCommandRepose from the SCM Command. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index ad443c7266fd..9f32eb3731e1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -388,7 +388,7 @@ private StorageContainerManager(OzoneConfiguration conf, scmAdmins = new OzoneAdmins(scmAdminUsernames, scmAdminGroups); datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this, - eventQueue); + eventQueue, scmContext); blockProtocolServer = new SCMBlockProtocolServer(conf, this); clientProtocolServer = new SCMClientProtocolServer(conf, this); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDatanodeProtocolServer.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDatanodeProtocolServer.java index 2f14806f9ba7..019e09790cf1 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDatanodeProtocolServer.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDatanodeProtocolServer.java @@ -46,7 +46,7 @@ public ReconDatanodeProtocolServer(OzoneConfiguration conf, OzoneStorageContainerManager scm, EventPublisher eventPublisher) throws IOException { - super(conf, scm, eventPublisher); + super(conf, scm, eventPublisher, null); } @Override