From 2a2690b7e5bdd2aa8dde03767de050320e4fa494 Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Wed, 16 Oct 2019 13:03:36 -0700 Subject: [PATCH 01/14] HDDS-1868. Reset, rebased and addressed comments from reviews. --- .../hadoop/hdds/scm/pipeline/Pipeline.java | 25 +++++- hadoop-hdds/common/src/main/proto/hdds.proto | 12 +-- .../report/PipelineReportPublisher.java | 4 +- .../statemachine/DatanodeStateMachine.java | 4 + .../server/ratis/ContainerStateMachine.java | 8 ++ .../server/ratis/XceiverServerRatis.java | 21 +++++ .../StorageContainerDatanodeProtocol.proto | 1 + .../scm/pipeline/PipelineReportHandler.java | 48 +++++++---- .../scm/pipeline/PipelineStateManager.java | 10 +-- .../scm/pipeline/RatisPipelineProvider.java | 20 ++--- .../pipeline/MockRatisPipelineProvider.java | 15 ++++ .../scm/pipeline/TestSCMPipelineManager.java | 85 +++++++++++++++++-- .../hadoop/ozone/MiniOzoneClusterImpl.java | 48 +++++++++-- 13 files changed, 248 insertions(+), 53 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 2828f6ea41ca..dcb4c8caeffc 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.pipeline; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; + import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -50,6 +52,8 @@ public final class Pipeline { private Map nodeStatus; // nodes with ordered distance to client private ThreadLocal> nodesInOrder = new ThreadLocal<>(); + // Current reported Leader for the pipeline + private ByteString leaderId = ByteString.EMPTY; /** * The immutable properties of pipeline object is used in @@ -102,6 +106,17 @@ public PipelineState getPipelineState() { return state; } + public ByteString getLeaderId() { + return leaderId; + } + + /** + * Pipeline object, outside of letting leader id to be set, is immutable. + */ + void setLeaderId(ByteString leaderId) { + this.leaderId = leaderId; + } + /** * Returns the list of nodes which form this pipeline. * @@ -174,7 +189,7 @@ public HddsProtos.Pipeline getProtobufMessage() .setType(type) .setFactor(factor) .setState(PipelineState.getProtobuf(state)) - .setLeaderID("") + .setLeaderID(leaderId) .addAllMembers(nodeStatus.keySet().stream() .map(DatanodeDetails::getProtoBufMessage) .collect(Collectors.toList())); @@ -206,6 +221,7 @@ public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline) .setFactor(pipeline.getFactor()) .setType(pipeline.getType()) .setState(PipelineState.fromProtobuf(pipeline.getState())) + .setLeaderId(pipeline.getLeaderID()) .setNodes(pipeline.getMembersList().stream() .map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList())) .setNodesInOrder(pipeline.getMemberOrdersList()) @@ -274,6 +290,7 @@ public static class Builder { private Map nodeStatus = null; private List nodeOrder = null; private List nodesInOrder = null; + private ByteString leaderId = ByteString.EMPTY; public Builder() {} @@ -306,6 +323,11 @@ public Builder setState(PipelineState state1) { return this; } + public Builder setLeaderId(ByteString leaderId1) { + this.leaderId = leaderId1; + return this; + } + public Builder setNodes(List nodes) { this.nodeStatus = new LinkedHashMap<>(); nodes.forEach(node -> nodeStatus.put(node, -1L)); @@ -324,6 +346,7 @@ public Pipeline build() { Preconditions.checkNotNull(state); Preconditions.checkNotNull(nodeStatus); Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus); + pipeline.setLeaderId(leaderId); if (nodeOrder != null && !nodeOrder.isEmpty()) { // This branch is for build from ProtoBuf diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index d2bb355ff8a4..51b3a4fe39bb 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -67,13 +67,13 @@ enum PipelineState { } message Pipeline { - required string leaderID = 1; - repeated DatanodeDetailsProto members = 2; + repeated DatanodeDetailsProto members = 1; // TODO: remove the state and leaderID from this class - optional PipelineState state = 3 [default = PIPELINE_ALLOCATED]; - optional ReplicationType type = 4 [default = STAND_ALONE]; - optional ReplicationFactor factor = 5 [default = ONE]; - required PipelineID id = 6; + optional PipelineState state = 2 [default = PIPELINE_ALLOCATED]; + optional ReplicationType type = 3 [default = STAND_ALONE]; + optional ReplicationFactor factor = 4 [default = ONE]; + required PipelineID id = 5; + optional bytes leaderID = 6; repeated uint32 memberOrders = 7; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java index e7f4347e9e43..eaf10120575a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java @@ -31,9 +31,9 @@ /** * Publishes Pipeline which will be sent to SCM as part of heartbeat. - * PipelineReport consist of the following information about each containers: + * PipelineReport consist of the following information about each pipeline: * - pipelineID - * + * - leaderID */ public class PipelineReportPublisher extends ReportPublisher { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index c9eb7024eaf1..891bbb3e36e3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -486,4 +486,8 @@ public CommandDispatcher getCommandDispatcher() { public ReplicationSupervisor getSupervisor() { return supervisor; } + + public ReportManager getReportManager() { + return reportManager; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 9d6a189833aa..b617169de421 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -35,6 +35,8 @@ import org.apache.hadoop.util.Time; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftGroupMemberId; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.RaftServerProxy; @@ -854,4 +856,10 @@ public void close() throws IOException { executor.shutdown(); } } + + @Override + public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, + RaftPeerId raftPeerId) { + ratisServer.handleLeaderChangedNotification(groupMemberId, raftPeerId); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index f816bcb252b0..ba0bc40c4073 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -19,11 +19,14 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; @@ -66,8 +69,10 @@ import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Collections; import java.util.Set; @@ -107,6 +112,8 @@ private static long nextCallId() { // TODO: Remove the gids set when Ratis supports an api to query active // pipelines private final Set raftGids = new HashSet<>(); + // pipeline leaders + private Map leaderIdMap = new HashMap<>(); private XceiverServerRatis(DatanodeDetails dd, int port, ContainerDispatcher dispatcher, ContainerController containerController, @@ -593,6 +600,7 @@ public List getPipelineReport() { for (RaftGroupId groupId : gids) { reports.add(PipelineReport.newBuilder() .setPipelineID(PipelineID.valueOf(groupId.getUuid()).getProtobuf()) + .setLeaderID(leaderIdMap.getOrDefault(groupId, ByteString.EMPTY)) .build()); } return reports; @@ -681,4 +689,17 @@ void notifyGroupRemove(RaftGroupId gid) { void notifyGroupAdd(RaftGroupId gid) { raftGids.add(gid); } + + void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId, + RaftPeerId raftPeerId) { + LOG.info("Leader change notification received for group: {} with new " + + "leaderId: {}", groupMemberId.getGroupId(), raftPeerId); + // Save the reported leader to be sent with the report to SCM + leaderIdMap.put(groupMemberId.getGroupId(), + ByteString.copyFromUtf8(raftPeerId.toString())); + // Publish new reports with leaderID + context.addReport(context.getParent().getContainer().getPipelineReport()); + // Trigger HB immediately + context.getParent().triggerHeartbeat(); + } } diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index a975cd5605fc..369986018168 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -239,6 +239,7 @@ message ContainerAction { message PipelineReport { required PipelineID pipelineID = 1; + optional bytes leaderID = 2; } message PipelineReportsProto { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index 793f4e2a5e27..a9ddcaea80f6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -18,25 +18,27 @@ package org.apache.hadoop.hdds.scm.pipeline; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReport; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.server - .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Objects; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; /** * Handles Pipeline Reports from datanode. @@ -50,6 +52,8 @@ public class PipelineReportHandler implements private final Configuration conf; private final SCMSafeModeManager scmSafeModeManager; private final boolean pipelineAvailabilityCheck; + private Map> + reportedLeadersForPipeline = new HashMap<>(); public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager, PipelineManager pipelineManager, @@ -72,8 +76,8 @@ public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode, DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails(); PipelineReportsProto pipelineReport = pipelineReportFromDatanode.getReport(); - Preconditions.checkNotNull(dn, "Pipeline Report is " - + "missing DatanodeDetails."); + Preconditions.checkNotNull(dn, + "Pipeline Report is missing DatanodeDetails."); if (LOGGER.isTraceEnabled()) { LOGGER.trace("Processing pipeline report for dn: {}", dn); } @@ -89,7 +93,6 @@ public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode, publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, pipelineReportFromDatanode); } - } private void processPipelineReport(PipelineReport report, DatanodeDetails dn) @@ -104,11 +107,22 @@ private void processPipelineReport(PipelineReport report, DatanodeDetails dn) return; } + if (report.hasLeaderID()) { + Map ids = + reportedLeadersForPipeline.computeIfAbsent(pipelineID, + k -> new HashMap<>()); + ids.put(dn.getUuid(), report.getLeaderID()); + pipeline.setLeaderId(report.getLeaderID()); + } + if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { - LOGGER.info("Pipeline {} reported by {}", pipeline.getId(), dn); - pipeline.reportDatanode(dn); - if (pipeline.isHealthy()) { - // if all the dns have reported, pipeline can be moved to OPEN state + LOGGER.info("Pipeline {} reported by {} with leaderId {}", + pipeline.getId(), dn, report.getLeaderID().toStringUtf8()); + Map leaderIdPairs = + reportedLeadersForPipeline.get(pipelineID); + if (leaderIdPairs.size() == pipeline.getFactor().getNumber() && + leaderIdPairs.values().stream().distinct().count() == 1) { + // All datanodes reported same leader pipelineManager.openPipeline(pipelineID); } } else { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index 76150579f849..8a4fd1f0e948 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -32,6 +32,8 @@ import java.util.List; import java.util.NavigableSet; +import com.google.common.annotations.VisibleForTesting; + /** * Manages the state of pipelines in SCM. All write operations like pipeline * creation, removal and updates should come via SCMPipelineManager. @@ -52,9 +54,7 @@ class PipelineStateManager { void addPipeline(Pipeline pipeline) throws IOException { pipelineStateMap.addPipeline(pipeline); - if (pipeline.getPipelineState() == PipelineState.OPEN) { - LOG.info("Created pipeline " + pipeline); - } + LOG.info("Created pipeline " + pipeline); } void addContainerToPipeline(PipelineID pipelineId, ContainerID containerID) @@ -131,8 +131,8 @@ Pipeline openPipeline(PipelineID pipelineId) throws IOException { throw new IOException("Closed pipeline can not be opened"); } if (pipeline.getPipelineState() == PipelineState.ALLOCATED) { - pipeline = pipelineStateMap - .updatePipelineState(pipelineId, PipelineState.OPEN); + pipeline = pipelineStateMap.updatePipelineState( + pipelineId, PipelineState.OPEN); LOG.info("Pipeline {} moved to OPEN state", pipeline.toString()); } return pipeline; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 0324a58f13ab..c5dca2ef4453 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -58,6 +58,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; + /** * Implements Api for creating ratis pipelines. */ @@ -153,30 +155,23 @@ public Pipeline create(ReplicationFactor factor) throws IOException { throw new InsufficientDatanodesException(e); } - Pipeline pipeline = Pipeline.newBuilder() - .setId(PipelineID.randomId()) - .setState(PipelineState.OPEN) - .setType(ReplicationType.RATIS) - .setFactor(factor) - .setNodes(dns) - .build(); + Pipeline pipeline = create(factor, dns); initializePipeline(pipeline); return pipeline; } @Override public Pipeline create(ReplicationFactor factor, - List nodes) { + List nodes) { return Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setState(PipelineState.OPEN) + .setState(PipelineState.ALLOCATED) .setType(ReplicationType.RATIS) .setFactor(factor) .setNodes(nodes) .build(); } - @Override public void shutdown() { forkJoinPool.shutdownNow(); @@ -253,4 +248,9 @@ private void callRatisRpc(List datanodes, throw MultipleIOException.createIOException(exceptions); } } + + @VisibleForTesting + PipelineStateManager getStateManager() { + return stateManager; + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java index 01c53baf2bfd..342ee5bea7a2 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java @@ -19,9 +19,12 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.node.NodeManager; import java.io.IOException; +import java.util.List; /** * Mock Ratis Pipeline Provider for Mock Nodes. @@ -42,4 +45,16 @@ protected void initializePipeline(Pipeline pipeline) throws IOException { public void shutdown() { // Do nothing. } + + @Override + public Pipeline create(HddsProtos.ReplicationFactor factor, + List nodes) { + return Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(Pipeline.PipelineState.OPEN) + .setType(HddsProtos.ReplicationType.RATIS) + .setFactor(factor) + .setNodes(nodes) + .build(); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 2a486b1224ed..4c18481e69b6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -21,16 +21,25 @@ import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -40,12 +49,7 @@ import org.junit.Before; import org.junit.Test; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import com.google.protobuf.ByteString; /** * Test cases to verify PipelineManager. @@ -314,4 +318,71 @@ public void testActivateDeactivatePipeline() throws IOException { pipelineManager.close(); } + + @Test + public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { + EventQueue eventQueue = new EventQueue(); + SCMPipelineManager pipelineManager = + new SCMPipelineManager(conf, nodeManager, eventQueue, null); + PipelineProvider mockRatisProvider = + new MockRatisPipelineProvider(nodeManager, + pipelineManager.getStateManager(), conf); + pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, + mockRatisProvider); + Pipeline pipeline = pipelineManager + .createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + // close manager + pipelineManager.close(); + // new pipeline manager loads the pipelines from the db in ALLOCATED state + pipelineManager = + new SCMPipelineManager(conf, nodeManager, eventQueue, null); + mockRatisProvider = + new MockRatisPipelineProvider(nodeManager, + pipelineManager.getStateManager(), conf); + pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, + mockRatisProvider); + Assert.assertEquals(Pipeline.PipelineState.ALLOCATED, + pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); + + SCMSafeModeManager scmSafeModeManager = + new SCMSafeModeManager(new OzoneConfiguration(), + new ArrayList<>(), pipelineManager, eventQueue); + PipelineReportHandler pipelineReportHandler = + new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); + + // Report pipelines with leaders + List nodes = pipeline.getNodes(); + Assert.assertEquals(3, nodes.size()); + // Send leader for only first 2 dns + nodes.subList(0, 2).forEach(dn -> + sendPipelineReport(dn, pipeline, pipelineReportHandler, true)); + sendPipelineReport(nodes.get(2), pipeline, pipelineReportHandler, false); + + Assert.assertEquals(Pipeline.PipelineState.ALLOCATED, + pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); + + nodes.forEach(dn -> + sendPipelineReport(dn, pipeline, pipelineReportHandler, true)); + + Assert.assertEquals(Pipeline.PipelineState.OPEN, + pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); + } + + private void sendPipelineReport(DatanodeDetails dn, + Pipeline pipeline, PipelineReportHandler pipelineReportHandler, + boolean sendLeaderId) { + + PipelineReportsProto.Builder reportProtoBuilder = + PipelineReportsProto.newBuilder(); + PipelineReport.Builder reportBuilder = PipelineReport.newBuilder(); + reportBuilder.setPipelineID(pipeline.getId().getProtobuf()); + if (sendLeaderId) { + reportBuilder.setLeaderID(ByteString.copyFromUtf8("raftPeer-1")); + } + + pipelineReportHandler.onMessage(new PipelineReportFromDatanode(dn, + reportProtoBuilder.addPipelineReport( + reportBuilder.build()).build()), new EventQueue()); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 8b2fc9200a68..686b2a9ccb10 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -28,6 +28,8 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -64,6 +66,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState @@ -95,6 +98,8 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { // Timeout for the cluster to be ready private int waitForClusterToBeReadyTimeout = 60000; // 1 min + // Timeout for all/any pipelines to be in open state + private int waitForPipelineOpenTimeout = 60000; private CertificateClient caClient; /** @@ -143,10 +148,31 @@ public void waitForClusterToBeReady() throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(() -> { final int healthy = scm.getNodeCount(HEALTHY); - final boolean isReady = healthy == hddsDatanodes.size(); - LOG.info("{}. Got {} of {} DN Heartbeats.", - isReady? "Cluster is ready" : "Waiting for cluster to be ready", - healthy, hddsDatanodes.size()); + boolean isReady = healthy == hddsDatanodes.size(); + boolean printIsReadyMsg = true; + List pipelines = scm.getPipelineManager().getPipelines(); + if (!pipelines.isEmpty()) { + List raftPipelines = pipelines.stream().filter(p -> + p.getType() == HddsProtos.ReplicationType.RATIS).collect( + Collectors.toList()); + if (!raftPipelines.isEmpty()) { + List notOpenPipelines = raftPipelines.stream().filter(p -> + p.getPipelineState() != Pipeline.PipelineState.OPEN && + p.getPipelineState() != Pipeline.PipelineState.CLOSED) + .collect(Collectors.toList()); + if (notOpenPipelines.size() > 0) { + LOG.info("Waiting for {} number of pipelines out of {}, to report " + + "a leader.", notOpenPipelines.size(), raftPipelines.size()); + isReady = false; + printIsReadyMsg = false; + } + } + } + if (printIsReadyMsg) { + LOG.info("{}. Got {} of {} DN Heartbeats.", + isReady ? "Cluster is ready" : "Waiting for cluster to be ready", + healthy, hddsDatanodes.size()); + } return isReady; }, 1000, waitForClusterToBeReadyTimeout); } @@ -260,6 +286,18 @@ public void restartOzoneManager() throws IOException { ozoneManager.restart(); } + private void waitForHddsDatanodesStop() throws TimeoutException, + InterruptedException { + GenericTestUtils.waitFor(() -> { + final int healthy = scm.getNodeCount(HEALTHY); + boolean isReady = healthy == hddsDatanodes.size(); + if (!isReady) { + LOG.info("Waiting on {} datanodes to be marked unhealthy.", healthy); + } + return isReady; + }, 1000, waitForClusterToBeReadyTimeout); + } + @Override public void restartHddsDatanode(int i, boolean waitForDatanode) throws InterruptedException, TimeoutException { @@ -279,7 +317,7 @@ public void restartHddsDatanode(int i, boolean waitForDatanode) hddsDatanodes.remove(i); if (waitForDatanode) { // wait for node to be removed from SCM healthy node list. - waitForClusterToBeReady(); + waitForHddsDatanodesStop(); } String[] args = new String[]{}; HddsDatanodeService service = From 84bcbb2e002db4d400e6a7edf50929629ce8570f Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Wed, 16 Oct 2019 13:06:43 -0700 Subject: [PATCH 02/14] HDDS-1868. Remove unused method. --- .../hadoop/hdds/scm/pipeline/RatisPipelineProvider.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index c5dca2ef4453..94443dd7a193 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -58,8 +58,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import com.google.common.annotations.VisibleForTesting; - /** * Implements Api for creating ratis pipelines. */ @@ -248,9 +246,4 @@ private void callRatisRpc(List datanodes, throw MultipleIOException.createIOException(exceptions); } } - - @VisibleForTesting - PipelineStateManager getStateManager() { - return stateManager; - } } From dfb4fad79dd4d790a8b1ee12d550d71c467650f4 Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Wed, 16 Oct 2019 17:18:23 -0700 Subject: [PATCH 03/14] HDDS-1868. Fixed unused imports. --- .../common/transport/server/ratis/XceiverServerRatis.java | 1 - .../apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index ba0bc40c4073..92278c96aad4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index 8a4fd1f0e948..2410b544581c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -32,8 +32,6 @@ import java.util.List; import java.util.NavigableSet; -import com.google.common.annotations.VisibleForTesting; - /** * Manages the state of pipelines in SCM. All write operations like pipeline * creation, removal and updates should come via SCMPipelineManager. From 553c6b458ab37774a7de3b666b9fec65f6f2e096 Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Wed, 16 Oct 2019 22:54:13 -0700 Subject: [PATCH 04/14] HDDS-1868. Remove unused method. --- .../container/common/statemachine/DatanodeStateMachine.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 891bbb3e36e3..c9eb7024eaf1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -486,8 +486,4 @@ public CommandDispatcher getCommandDispatcher() { public ReplicationSupervisor getSupervisor() { return supervisor; } - - public ReportManager getReportManager() { - return reportManager; - } } From 5f0ee8c56937842737940e724837da8ceb6d7d3d Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Thu, 17 Oct 2019 11:27:19 -0700 Subject: [PATCH 05/14] HDDS-1868. Changed the leaderIdMap to CHM. --- .../common/transport/server/ratis/XceiverServerRatis.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 92278c96aad4..69807d94be69 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -78,6 +78,7 @@ import java.util.UUID; import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -112,7 +113,7 @@ private static long nextCallId() { // pipelines private final Set raftGids = new HashSet<>(); // pipeline leaders - private Map leaderIdMap = new HashMap<>(); + private Map leaderIdMap = new ConcurrentHashMap<>(); private XceiverServerRatis(DatanodeDetails dd, int port, ContainerDispatcher dispatcher, ContainerController containerController, From 27f5c0958ecde00c8fcb526a98c62b9894063a8d Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Thu, 17 Oct 2019 22:58:32 -0700 Subject: [PATCH 06/14] HDDS-1868. Checkstyle fix. --- .../common/transport/server/ratis/XceiverServerRatis.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 69807d94be69..ff9b4a40dcce 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -68,7 +68,6 @@ import java.io.File; import java.io.IOException; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; From 89cb6ee99d413c19789d599eeb2bff0869c803e7 Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Sun, 20 Oct 2019 18:16:02 -0700 Subject: [PATCH 07/14] HDDS-1868. Addressed review comments. --- .../server/ratis/XceiverServerRatis.java | 2 ++ .../hdds/scm/pipeline/PipelineManager.java | 5 +++ .../scm/pipeline/PipelineReportHandler.java | 35 ++++--------------- .../hdds/scm/pipeline/SCMPipelineManager.java | 34 ++++++++++++++++++ .../hadoop/ozone/MiniOzoneClusterImpl.java | 2 -- 5 files changed, 48 insertions(+), 30 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index ff9b4a40dcce..803e9835a63b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -683,6 +683,8 @@ public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException { void notifyGroupRemove(RaftGroupId gid) { raftGids.remove(gid); + // Remove any entries for group leader map + leaderIdMap.remove(gid); } void notifyGroupAdd(RaftGroupId gid) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 9ba5f3189f76..66ddc8101fa8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -30,6 +30,8 @@ import java.util.List; import java.util.NavigableSet; +import com.google.protobuf.ByteString; + /** * Interface which exposes the api for pipeline management. */ @@ -68,6 +70,9 @@ NavigableSet getContainersInPipeline(PipelineID pipelineID) int getNumberOfContainers(PipelineID pipelineID) throws IOException; + void processReportedLeaderInfo(PipelineID pipelineID, DatanodeDetails dn, + ByteString leaderID) throws IOException; + void openPipeline(PipelineID pipelineId) throws IOException; void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index a9ddcaea80f6..4973442bdd79 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -19,10 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import java.util.Objects; -import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -38,7 +35,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; /** * Handles Pipeline Reports from datanode. @@ -46,14 +42,12 @@ public class PipelineReportHandler implements EventHandler { - private static final Logger LOGGER = LoggerFactory - .getLogger(PipelineReportHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger( + PipelineReportHandler.class); private final PipelineManager pipelineManager; private final Configuration conf; private final SCMSafeModeManager scmSafeModeManager; private final boolean pipelineAvailabilityCheck; - private Map> - reportedLeadersForPipeline = new HashMap<>(); public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager, PipelineManager pipelineManager, @@ -66,7 +60,6 @@ public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager, this.pipelineAvailabilityCheck = conf.getBoolean( HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT); - } @Override @@ -107,27 +100,13 @@ private void processPipelineReport(PipelineReport report, DatanodeDetails dn) return; } - if (report.hasLeaderID()) { - Map ids = - reportedLeadersForPipeline.computeIfAbsent(pipelineID, - k -> new HashMap<>()); - ids.put(dn.getUuid(), report.getLeaderID()); - pipeline.setLeaderId(report.getLeaderID()); - } - - if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { + if (report.hasLeaderID() && + pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { LOGGER.info("Pipeline {} reported by {} with leaderId {}", pipeline.getId(), dn, report.getLeaderID().toStringUtf8()); - Map leaderIdPairs = - reportedLeadersForPipeline.get(pipelineID); - if (leaderIdPairs.size() == pipeline.getFactor().getNumber() && - leaderIdPairs.values().stream().distinct().count() == 1) { - // All datanodes reported same leader - pipelineManager.openPipeline(pipelineID); - } - } else { - // In OPEN state case just report the datanode - pipeline.reportDatanode(dn); + pipelineManager.processReportedLeaderInfo(pipelineID, dn, + report.getLeaderID()); } + pipeline.reportDatanode(dn); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 0964f6d4db29..e627ea9993ac 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -20,6 +20,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -50,6 +52,8 @@ import java.util.NavigableSet; import java.util.Set; import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -84,6 +88,8 @@ public class SCMPipelineManager implements PipelineManager { // Pipeline Manager MXBean private ObjectName pmInfoBean; private GrpcTlsConfig grpcTlsConfig; + private Map> + reportedLeadersForPipeline = new ConcurrentHashMap<>(); public SCMPipelineManager(Configuration conf, NodeManager nodeManager, EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig) @@ -288,6 +294,33 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException { return stateManager.getNumberOfContainers(pipelineID); } + @Override + public void processReportedLeaderInfo(PipelineID pipelineID, + DatanodeDetails dn, ByteString leaderID) throws IOException { + Pipeline pipeline = stateManager.getPipeline(pipelineID); + if (pipeline == null) { + LOG.error("Pipeline with id {} not found.", pipelineID); + return; + } + + Map ids = + reportedLeadersForPipeline.computeIfAbsent(pipelineID, + k -> new HashMap<>()); + + ids.put(dn.getUuid(), leaderID); + pipeline.setLeaderId(leaderID); + + Map leaderIdPairs = + reportedLeadersForPipeline.get(pipelineID); + if (leaderIdPairs.size() == pipeline.getFactor().getNumber() && + leaderIdPairs.values().stream().distinct().count() == 1) { + // All datanodes reported same leader + LOG.info("Opening pipeline {} with reported leader {}", + pipelineID, leaderID); + openPipeline(pipelineID); + } + } + @Override public void openPipeline(PipelineID pipelineId) throws IOException { lock.writeLock().lock(); @@ -411,6 +444,7 @@ private void destroyPipeline(Pipeline pipeline) throws IOException { RatisPipelineUtils.destroyPipeline(pipeline, conf, grpcTlsConfig); // remove the pipeline from the pipeline manager removePipeline(pipeline.getId()); + reportedLeadersForPipeline.remove(pipeline.getId()); triggerPipelineCreation(); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 686b2a9ccb10..8b010b7dde1c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -98,8 +98,6 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { // Timeout for the cluster to be ready private int waitForClusterToBeReadyTimeout = 60000; // 1 min - // Timeout for all/any pipelines to be in open state - private int waitForPipelineOpenTimeout = 60000; private CertificateClient caClient; /** From ca96b9ceef5349015cfd11115db60569977e1e74 Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Sun, 20 Oct 2019 20:32:53 -0700 Subject: [PATCH 08/14] HDDS-1868. Fix for findbugs violation. --- .../apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index e627ea9993ac..5b4bed72fd7f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -298,11 +298,6 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException { public void processReportedLeaderInfo(PipelineID pipelineID, DatanodeDetails dn, ByteString leaderID) throws IOException { Pipeline pipeline = stateManager.getPipeline(pipelineID); - if (pipeline == null) { - LOG.error("Pipeline with id {} not found.", pipelineID); - return; - } - Map ids = reportedLeadersForPipeline.computeIfAbsent(pipelineID, k -> new HashMap<>()); From 4d61be185c78045facffd3519714fa8977f67505 Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Sun, 20 Oct 2019 21:39:07 -0700 Subject: [PATCH 09/14] HDDS-1868. Addressing latest review comments. --- .../hadoop/hdds/scm/pipeline/Pipeline.java | 7 +++++ .../hdds/scm/pipeline/PipelineManager.java | 5 ---- .../scm/pipeline/PipelineReportHandler.java | 21 ++++++++++--- .../hdds/scm/pipeline/SCMPipelineManager.java | 30 +------------------ 4 files changed, 25 insertions(+), 38 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index dcb4c8caeffc..db8d629c1434 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -32,10 +32,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.stream.Collectors; /** @@ -54,6 +56,7 @@ public final class Pipeline { private ThreadLocal> nodesInOrder = new ThreadLocal<>(); // Current reported Leader for the pipeline private ByteString leaderId = ByteString.EMPTY; + private final Map reportedLeaders = new HashMap<>(); /** * The immutable properties of pipeline object is used in @@ -110,6 +113,10 @@ public ByteString getLeaderId() { return leaderId; } + public Map getReportedLeaders() { + return reportedLeaders; + } + /** * Pipeline object, outside of letting leader id to be set, is immutable. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 66ddc8101fa8..9ba5f3189f76 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -30,8 +30,6 @@ import java.util.List; import java.util.NavigableSet; -import com.google.protobuf.ByteString; - /** * Interface which exposes the api for pipeline management. */ @@ -70,9 +68,6 @@ NavigableSet getContainersInPipeline(PipelineID pipelineID) int getNumberOfContainers(PipelineID pipelineID) throws IOException; - void processReportedLeaderInfo(PipelineID pipelineID, DatanodeDetails dn, - ByteString leaderID) throws IOException; - void openPipeline(PipelineID pipelineId) throws IOException; void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index 4973442bdd79..291c043a20ab 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hdds.scm.pipeline; import java.io.IOException; +import java.util.Map; import java.util.Objects; +import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -35,6 +37,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; /** * Handles Pipeline Reports from datanode. @@ -100,12 +103,22 @@ private void processPipelineReport(PipelineReport report, DatanodeDetails dn) return; } - if (report.hasLeaderID() && - pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { + if (report.hasLeaderID()) { + pipeline.getReportedLeaders().put(dn.getUuid(), report.getLeaderID()); + pipeline.setLeaderId(report.getLeaderID()); + } + + if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { LOGGER.info("Pipeline {} reported by {} with leaderId {}", pipeline.getId(), dn, report.getLeaderID().toStringUtf8()); - pipelineManager.processReportedLeaderInfo(pipelineID, dn, - report.getLeaderID()); + + Map reportedLeaderIds = pipeline.getReportedLeaders(); + if (reportedLeaderIds.size() == pipeline.getFactor().getNumber() && + reportedLeaderIds.values().stream().distinct().count() == 1) { + // All datanodes reported same leader + pipelineManager.openPipeline(pipelineID); + } + } pipeline.reportDatanode(dn); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 5b4bed72fd7f..ac6393d4b126 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -52,8 +51,6 @@ import java.util.NavigableSet; import java.util.Set; import java.util.Collection; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -88,9 +85,7 @@ public class SCMPipelineManager implements PipelineManager { // Pipeline Manager MXBean private ObjectName pmInfoBean; private GrpcTlsConfig grpcTlsConfig; - private Map> - reportedLeadersForPipeline = new ConcurrentHashMap<>(); - + public SCMPipelineManager(Configuration conf, NodeManager nodeManager, EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig) throws IOException { @@ -294,28 +289,6 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException { return stateManager.getNumberOfContainers(pipelineID); } - @Override - public void processReportedLeaderInfo(PipelineID pipelineID, - DatanodeDetails dn, ByteString leaderID) throws IOException { - Pipeline pipeline = stateManager.getPipeline(pipelineID); - Map ids = - reportedLeadersForPipeline.computeIfAbsent(pipelineID, - k -> new HashMap<>()); - - ids.put(dn.getUuid(), leaderID); - pipeline.setLeaderId(leaderID); - - Map leaderIdPairs = - reportedLeadersForPipeline.get(pipelineID); - if (leaderIdPairs.size() == pipeline.getFactor().getNumber() && - leaderIdPairs.values().stream().distinct().count() == 1) { - // All datanodes reported same leader - LOG.info("Opening pipeline {} with reported leader {}", - pipelineID, leaderID); - openPipeline(pipelineID); - } - } - @Override public void openPipeline(PipelineID pipelineId) throws IOException { lock.writeLock().lock(); @@ -439,7 +412,6 @@ private void destroyPipeline(Pipeline pipeline) throws IOException { RatisPipelineUtils.destroyPipeline(pipeline, conf, grpcTlsConfig); // remove the pipeline from the pipeline manager removePipeline(pipeline.getId()); - reportedLeadersForPipeline.remove(pipeline.getId()); triggerPipelineCreation(); } From f04ec9cd53bbc55336302b0df4b52632c2b921e8 Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Tue, 22 Oct 2019 18:20:49 -0700 Subject: [PATCH 10/14] HDDS-1868. Revised patch based on comments from msingh and nanda. --- .../hadoop/hdds/scm/pipeline/Pipeline.java | 48 +++++++++---------- hadoop-hdds/common/src/main/proto/hdds.proto | 2 +- .../server/ratis/XceiverServerRatis.java | 9 ++-- .../StorageContainerDatanodeProtocol.proto | 2 +- .../scm/pipeline/PipelineReportHandler.java | 20 ++------ .../scm/pipeline/TestSCMPipelineManager.java | 19 ++++---- 6 files changed, 44 insertions(+), 56 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index db8d629c1434..65ad8e557583 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -18,27 +18,26 @@ package org.apache.hadoop.hdds.scm.pipeline; -import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Collectors; +import com.google.common.base.Preconditions; /** * Represents a group of datanodes which store a container. @@ -55,8 +54,7 @@ public final class Pipeline { // nodes with ordered distance to client private ThreadLocal> nodesInOrder = new ThreadLocal<>(); // Current reported Leader for the pipeline - private ByteString leaderId = ByteString.EMPTY; - private final Map reportedLeaders = new HashMap<>(); + private UUID leaderId; /** * The immutable properties of pipeline object is used in @@ -109,18 +107,19 @@ public PipelineState getPipelineState() { return state; } - public ByteString getLeaderId() { + /** + * Return the pipeline leader's UUID. + * + * @return DatanodeDetails.UUID. + */ + public UUID getLeaderId() { return leaderId; } - public Map getReportedLeaders() { - return reportedLeaders; - } - /** * Pipeline object, outside of letting leader id to be set, is immutable. */ - void setLeaderId(ByteString leaderId) { + void setLeaderId(UUID leaderId) { this.leaderId = leaderId; } @@ -196,7 +195,7 @@ public HddsProtos.Pipeline getProtobufMessage() .setType(type) .setFactor(factor) .setState(PipelineState.getProtobuf(state)) - .setLeaderID(leaderId) + .setLeaderID(leaderId != null ? leaderId.toString() : "") .addAllMembers(nodeStatus.keySet().stream() .map(DatanodeDetails::getProtoBufMessage) .collect(Collectors.toList())); @@ -228,7 +227,8 @@ public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline) .setFactor(pipeline.getFactor()) .setType(pipeline.getType()) .setState(PipelineState.fromProtobuf(pipeline.getState())) - .setLeaderId(pipeline.getLeaderID()) + .setLeaderId(StringUtils.isNotEmpty(pipeline.getLeaderID()) ? + UUID.fromString(pipeline.getLeaderID()) : null) .setNodes(pipeline.getMembersList().stream() .map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList())) .setNodesInOrder(pipeline.getMemberOrdersList()) @@ -297,7 +297,7 @@ public static class Builder { private Map nodeStatus = null; private List nodeOrder = null; private List nodesInOrder = null; - private ByteString leaderId = ByteString.EMPTY; + private UUID leaderId = null; public Builder() {} @@ -330,7 +330,7 @@ public Builder setState(PipelineState state1) { return this; } - public Builder setLeaderId(ByteString leaderId1) { + public Builder setLeaderId(UUID leaderId1) { this.leaderId = leaderId1; return this; } diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index 51b3a4fe39bb..39a01dc1faf7 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -73,7 +73,7 @@ message Pipeline { optional ReplicationType type = 3 [default = STAND_ALONE]; optional ReplicationFactor factor = 4 [default = ONE]; required PipelineID id = 5; - optional bytes leaderID = 6; + optional string leaderID = 6; repeated uint32 memberOrders = 7; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 803e9835a63b..7f45b530c4bf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -112,7 +111,7 @@ private static long nextCallId() { // pipelines private final Set raftGids = new HashSet<>(); // pipeline leaders - private Map leaderIdMap = new ConcurrentHashMap<>(); + private Map leaderIdMap = new ConcurrentHashMap<>(); private XceiverServerRatis(DatanodeDetails dd, int port, ContainerDispatcher dispatcher, ContainerController containerController, @@ -599,7 +598,8 @@ public List getPipelineReport() { for (RaftGroupId groupId : gids) { reports.add(PipelineReport.newBuilder() .setPipelineID(PipelineID.valueOf(groupId.getUuid()).getProtobuf()) - .setLeaderID(leaderIdMap.getOrDefault(groupId, ByteString.EMPTY)) + .setIsLeader(RatisHelper.toRaftPeerId(datanodeDetails).equals( + leaderIdMap.get(groupId))) .build()); } return reports; @@ -696,8 +696,7 @@ void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId, LOG.info("Leader change notification received for group: {} with new " + "leaderId: {}", groupMemberId.getGroupId(), raftPeerId); // Save the reported leader to be sent with the report to SCM - leaderIdMap.put(groupMemberId.getGroupId(), - ByteString.copyFromUtf8(raftPeerId.toString())); + leaderIdMap.put(groupMemberId.getGroupId(), raftPeerId); // Publish new reports with leaderID context.addReport(context.getParent().getContainer().getPipelineReport()); // Trigger HB immediately diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 369986018168..45a1db681542 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -239,7 +239,7 @@ message ContainerAction { message PipelineReport { required PipelineID pipelineID = 1; - optional bytes leaderID = 2; + required bool isLeader = 2; } message PipelineReportsProto { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index 291c043a20ab..c72232cdf1ed 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -19,9 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import java.io.IOException; -import java.util.Map; import java.util.Objects; -import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -37,7 +35,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; /** * Handles Pipeline Reports from datanode. @@ -103,22 +100,15 @@ private void processPipelineReport(PipelineReport report, DatanodeDetails dn) return; } - if (report.hasLeaderID()) { - pipeline.getReportedLeaders().put(dn.getUuid(), report.getLeaderID()); - pipeline.setLeaderId(report.getLeaderID()); - } - if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { - LOGGER.info("Pipeline {} reported by {} with leaderId {}", - pipeline.getId(), dn, report.getLeaderID().toStringUtf8()); + LOGGER.info("Pipeline {} reported by {} isLeader?: {}", + pipeline.getId(), dn, report.getIsLeader()); - Map reportedLeaderIds = pipeline.getReportedLeaders(); - if (reportedLeaderIds.size() == pipeline.getFactor().getNumber() && - reportedLeaderIds.values().stream().distinct().count() == 1) { - // All datanodes reported same leader + if (report.getIsLeader()) { + // Pipeline reported as the leader + pipeline.setLeaderId(dn.getUuid()); pipelineManager.openPipeline(pipelineID); } - } pipeline.reportDatanode(dn); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 4c18481e69b6..0ea69ee4e6e8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -354,16 +354,17 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { // Report pipelines with leaders List nodes = pipeline.getNodes(); Assert.assertEquals(3, nodes.size()); - // Send leader for only first 2 dns - nodes.subList(0, 2).forEach(dn -> - sendPipelineReport(dn, pipeline, pipelineReportHandler, true)); - sendPipelineReport(nodes.get(2), pipeline, pipelineReportHandler, false); + // Send report for all but no leader + nodes.forEach(dn -> + sendPipelineReport(dn, pipeline, pipelineReportHandler, false)); Assert.assertEquals(Pipeline.PipelineState.ALLOCATED, pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); - nodes.forEach(dn -> - sendPipelineReport(dn, pipeline, pipelineReportHandler, true)); + nodes.subList(0, 2).forEach(dn -> + sendPipelineReport(dn, pipeline, pipelineReportHandler, false)); + sendPipelineReport(nodes.get(nodes.size() - 1), pipeline, + pipelineReportHandler, true); Assert.assertEquals(Pipeline.PipelineState.OPEN, pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); @@ -371,15 +372,13 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { private void sendPipelineReport(DatanodeDetails dn, Pipeline pipeline, PipelineReportHandler pipelineReportHandler, - boolean sendLeaderId) { + boolean isLeader) { PipelineReportsProto.Builder reportProtoBuilder = PipelineReportsProto.newBuilder(); PipelineReport.Builder reportBuilder = PipelineReport.newBuilder(); reportBuilder.setPipelineID(pipeline.getId().getProtobuf()); - if (sendLeaderId) { - reportBuilder.setLeaderID(ByteString.copyFromUtf8("raftPeer-1")); - } + reportBuilder.setIsLeader(isLeader); pipelineReportHandler.onMessage(new PipelineReportFromDatanode(dn, reportProtoBuilder.addPipelineReport( From f6ecae707127651f730fc2943f254e4812488609 Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Wed, 23 Oct 2019 21:53:28 -0700 Subject: [PATCH 11/14] HDDS-1868. New changes based on discussion and unit test fixes. --- .../hadoop/hdds/scm/pipeline/Pipeline.java | 4 +- .../server/ratis/XceiverServerRatis.java | 26 ++++--- .../safemode/HealthyPipelineSafeModeRule.java | 75 +++++++++---------- .../hdds/scm/safemode/SafeModeHandler.java | 5 +- .../org/apache/hadoop/hdds/scm/TestUtils.java | 4 +- .../scm/pipeline/TestSCMPipelineManager.java | 69 ----------------- .../hadoop/ozone/MiniOzoneClusterImpl.java | 14 +++- .../scm/pipeline/TestSCMPipelineMetrics.java | 3 + 8 files changed, 74 insertions(+), 126 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 65ad8e557583..15a7bea84dea 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -175,13 +175,13 @@ void reportDatanode(DatanodeDetails dn) throws IOException { nodeStatus.put(dn, System.currentTimeMillis()); } - boolean isHealthy() { + public boolean isHealthy() { for (Long reportedTime : nodeStatus.values()) { if (reportedTime < 0) { return false; } } - return true; + return leaderId != null; } public boolean isEmpty() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 7f45b530c4bf..743b1e2446a5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -110,8 +110,9 @@ private static long nextCallId() { // TODO: Remove the gids set when Ratis supports an api to query active // pipelines private final Set raftGids = new HashSet<>(); - // pipeline leaders - private Map leaderIdMap = new ConcurrentHashMap<>(); + private final RaftPeerId raftPeerId; + // pipelines for which I am the leader + private Map groupLeaderMap = new ConcurrentHashMap<>(); private XceiverServerRatis(DatanodeDetails dd, int port, ContainerDispatcher dispatcher, ContainerController containerController, @@ -141,9 +142,10 @@ private XceiverServerRatis(DatanodeDetails dd, int port, TimeUnit.MILLISECONDS); this.dispatcher = dispatcher; this.containerController = containerController; + this.raftPeerId = RatisHelper.toRaftPeerId(dd); RaftServer.Builder builder = - RaftServer.newBuilder().setServerId(RatisHelper.toRaftPeerId(dd)) + RaftServer.newBuilder().setServerId(raftPeerId) .setProperties(serverProperties) .setStateMachineRegistry(this::getStateMachine); if (tlsConfig != null) { @@ -598,8 +600,7 @@ public List getPipelineReport() { for (RaftGroupId groupId : gids) { reports.add(PipelineReport.newBuilder() .setPipelineID(PipelineID.valueOf(groupId.getUuid()).getProtobuf()) - .setIsLeader(RatisHelper.toRaftPeerId(datanodeDetails).equals( - leaderIdMap.get(groupId))) + .setIsLeader(groupLeaderMap.get(groupId)) .build()); } return reports; @@ -684,7 +685,7 @@ public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException { void notifyGroupRemove(RaftGroupId gid) { raftGids.remove(gid); // Remove any entries for group leader map - leaderIdMap.remove(gid); + groupLeaderMap.remove(gid); } void notifyGroupAdd(RaftGroupId gid) { @@ -696,10 +697,13 @@ void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId, LOG.info("Leader change notification received for group: {} with new " + "leaderId: {}", groupMemberId.getGroupId(), raftPeerId); // Save the reported leader to be sent with the report to SCM - leaderIdMap.put(groupMemberId.getGroupId(), raftPeerId); - // Publish new reports with leaderID - context.addReport(context.getParent().getContainer().getPipelineReport()); - // Trigger HB immediately - context.getParent().triggerHeartbeat(); + boolean leaderForGroup = this.raftPeerId.equals(raftPeerId); + groupLeaderMap.put(groupMemberId.getGroupId(), leaderForGroup); + if (context != null && leaderForGroup) { + // Publish new report from leader + context.addReport(context.getParent().getContainer().getPipelineReport()); + // Trigger HB immediately + context.getParent().triggerHeartbeat(); + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java index 7a00d760fa4d..2f9a66f27b52 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java @@ -17,10 +17,11 @@ */ package org.apache.hadoop.hdds.scm.safemode; -import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; @@ -30,16 +31,13 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; - - -import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.TypedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Set; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * Class defining Safe mode exit criteria for Pipelines. @@ -54,17 +52,17 @@ public class HealthyPipelineSafeModeRule public static final Logger LOG = LoggerFactory.getLogger(HealthyPipelineSafeModeRule.class); private final PipelineManager pipelineManager; - private final int healthyPipelineThresholdCount; + private int healthyPipelineThresholdCount; private int currentHealthyPipelineCount = 0; - private final Set processedDatanodeDetails = - new HashSet<>(); + private final Map processedPipelines = new HashMap<>(); + private final double healthyPipelinesPercent; HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue, PipelineManager pipelineManager, SCMSafeModeManager manager, Configuration configuration) { super(manager, ruleName, eventQueue); this.pipelineManager = pipelineManager; - double healthyPipelinesPercent = + healthyPipelinesPercent = configuration.getDouble(HddsConfigKeys. HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, HddsConfigKeys. @@ -94,6 +92,12 @@ public class HealthyPipelineSafeModeRule healthyPipelineThresholdCount); } + @VisibleForTesting + public void setHealthyPipelineThresholdCount(int actualPipelineCount) { + healthyPipelineThresholdCount = + (int) Math.ceil(healthyPipelinesPercent * actualPipelineCount); + } + @Override protected TypedEvent getEventType() { return SCMEvents.PROCESSED_PIPELINE_REPORT; @@ -116,46 +120,41 @@ protected void process(PipelineReportFromDatanode // processed report event, we should not consider this pipeline report // from datanode again during threshold calculation. Preconditions.checkNotNull(pipelineReportFromDatanode); - DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails(); - if (!processedDatanodeDetails.contains( - pipelineReportFromDatanode.getDatanodeDetails())) { + PipelineReportsProto pipelineReport = + pipelineReportFromDatanode.getReport(); + + for (PipelineReport report : pipelineReport.getPipelineReportList()) { + PipelineID pipelineID = PipelineID.getFromProtobuf( + report.getPipelineID()); Pipeline pipeline; - PipelineReportsProto pipelineReport = - pipelineReportFromDatanode.getReport(); - - for (PipelineReport report : pipelineReport.getPipelineReportList()) { - PipelineID pipelineID = PipelineID - .getFromProtobuf(report.getPipelineID()); - try { - pipeline = pipelineManager.getPipeline(pipelineID); - } catch (PipelineNotFoundException e) { - continue; - } + try { + pipeline = pipelineManager.getPipeline(pipelineID); + } catch (PipelineNotFoundException e) { + continue; + } + if (!processedPipelines.containsKey(pipelineID)) { if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && - pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) { - // If the pipeline is open state mean, all 3 datanodes are reported - // for this pipeline. + report.getIsLeader()) { + // If the pipeline gets reported with a leader we mark it as healthy currentHealthyPipelineCount++; getSafeModeMetrics().incCurrentHealthyPipelinesCount(); + processedPipelines.put(pipelineID, Boolean.TRUE); } } - if (scmInSafeMode()) { - SCMSafeModeManager.getLogger().info( - "SCM in safe mode. Healthy pipelines reported count is {}, " + - "required healthy pipeline reported count is {}", - currentHealthyPipelineCount, healthyPipelineThresholdCount); - } - - processedDatanodeDetails.add(dnDetails); } - + if (scmInSafeMode()) { + SCMSafeModeManager.getLogger().info( + "SCM in safe mode. Healthy pipelines reported count is {}, " + + "required healthy pipeline reported count is {}", + currentHealthyPipelineCount, healthyPipelineThresholdCount); + } } @Override protected void cleanup() { - processedDatanodeDetails.clear(); + processedPipelines.clear(); } @VisibleForTesting diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java index b9e53330691f..44d1c941774b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java @@ -102,8 +102,7 @@ public SafeModeHandler(Configuration configuration, */ @Override public void onMessage(SafeModeStatus safeModeStatus, - EventPublisher publisher) { - + EventPublisher publisher) { isInSafeMode.set(safeModeStatus.getSafeModeStatus()); scmClientProtocolServer.setSafeModeStatus(isInSafeMode.get()); scmBlockManager.setSafeModeStatus(isInSafeMode.get()); @@ -129,7 +128,7 @@ private void cleanupPipelines() { List pipelineList = scmPipelineManager.getPipelines(); pipelineList.forEach((pipeline) -> { try { - if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { + if (!pipeline.isHealthy()) { scmPipelineManager.finalizeAndDestroyPipeline(pipeline, false); } } catch (IOException ex) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index ea5d571d8501..a475f9b5e535 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -353,7 +353,9 @@ public static PipelineReportFromDatanode getPipelineReportFromDatanode( PipelineReportsProto.newBuilder(); for (PipelineID pipelineID : pipelineIDs) { reportBuilder.addPipelineReport( - PipelineReport.newBuilder().setPipelineID(pipelineID.getProtobuf())); + PipelineReport.newBuilder() + .setPipelineID(pipelineID.getProtobuf()) + .setIsLeader(false)); } return new PipelineReportFromDatanode(dn, reportBuilder.build()); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 0ea69ee4e6e8..e65f197f3f52 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; @@ -49,8 +48,6 @@ import org.junit.Before; import org.junit.Test; -import com.google.protobuf.ByteString; - /** * Test cases to verify PipelineManager. */ @@ -150,72 +147,6 @@ public void testRemovePipeline() throws IOException { pipelineManager.close(); } - @Test - public void testPipelineReport() throws IOException { - EventQueue eventQueue = new EventQueue(); - SCMPipelineManager pipelineManager = - new SCMPipelineManager(conf, nodeManager, eventQueue, null); - PipelineProvider mockRatisProvider = - new MockRatisPipelineProvider(nodeManager, - pipelineManager.getStateManager(), conf); - pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, - mockRatisProvider); - - SCMSafeModeManager scmSafeModeManager = - new SCMSafeModeManager(new OzoneConfiguration(), - new ArrayList<>(), pipelineManager, eventQueue); - - // create a pipeline in allocated state with no dns yet reported - Pipeline pipeline = pipelineManager - .createPipeline(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE); - Assert - .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy()); - Assert - .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen()); - - // get pipeline report from each dn in the pipeline - PipelineReportHandler pipelineReportHandler = - new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); - for (DatanodeDetails dn: pipeline.getNodes()) { - PipelineReportFromDatanode pipelineReportFromDatanode = - TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); - // pipeline is not healthy until all dns report - Assert.assertFalse( - pipelineManager.getPipeline(pipeline.getId()).isHealthy()); - pipelineReportHandler - .onMessage(pipelineReportFromDatanode, new EventQueue()); - } - - // pipeline is healthy when all dns report - Assert - .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isHealthy()); - // pipeline should now move to open state - Assert - .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen()); - - // close the pipeline - pipelineManager.finalizeAndDestroyPipeline(pipeline, false); - - for (DatanodeDetails dn: pipeline.getNodes()) { - PipelineReportFromDatanode pipelineReportFromDatanode = - TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); - // pipeline report for destroyed pipeline should be ignored - pipelineReportHandler - .onMessage(pipelineReportFromDatanode, new EventQueue()); - } - - try { - pipelineManager.getPipeline(pipeline.getId()); - Assert.fail("Pipeline should not have been retrieved"); - } catch (IOException e) { - Assert.assertTrue(e.getMessage().contains("not found")); - } - - // clean up - pipelineManager.close(); - } - @Test public void testPipelineCreationFailedMetric() throws Exception { MockNodeManager nodeManagerMock = new MockNodeManager(true, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 8b010b7dde1c..2813711c2f28 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -290,7 +291,8 @@ private void waitForHddsDatanodesStop() throws TimeoutException, final int healthy = scm.getNodeCount(HEALTHY); boolean isReady = healthy == hddsDatanodes.size(); if (!isReady) { - LOG.info("Waiting on {} datanodes to be marked unhealthy.", healthy); + LOG.info("Waiting on {} datanodes out of {} to be marked unhealthy.", + healthy, hddsDatanodes.size()); } return isReady; }, 1000, waitForClusterToBeReadyTimeout); @@ -548,7 +550,15 @@ StorageContainerManager createSCM() configureSCM(); SCMStorageConfig scmStore = new SCMStorageConfig(conf); initializeScmStorage(scmStore); - return StorageContainerManager.createSCM(conf); + StorageContainerManager scm = StorageContainerManager.createSCM(conf); + HealthyPipelineSafeModeRule rule = + scm.getScmSafeModeManager().getHealthyPipelineSafeModeRule(); + if (rule != null) { + // Set threshold to wait for safe mode exit - this is needed since a + // pipeline is marked open only after leader election. + rule.setHealthyPipelineThresholdCount(numOfDatanodes / 3); + } + return scm; } private void initializeScmStorage(SCMStorageConfig scmStore) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java index 2f1ec66d6948..dcb45ad03f66 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.scm.pipeline; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; @@ -50,6 +51,8 @@ public class TestSCMPipelineMetrics { @Before public void setup() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, + Boolean.TRUE.toString()); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) .build(); From d241b5e19742c63cdf9b8280cc2b1a3f3b9b9766 Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Thu, 24 Oct 2019 20:54:25 -0700 Subject: [PATCH 12/14] HDDS-1868. Checkstyle + UT fix. --- .../common/transport/server/ratis/XceiverServerRatis.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 743b1e2446a5..1b24acb2087a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -600,7 +600,7 @@ public List getPipelineReport() { for (RaftGroupId groupId : gids) { reports.add(PipelineReport.newBuilder() .setPipelineID(PipelineID.valueOf(groupId.getUuid()).getProtobuf()) - .setIsLeader(groupLeaderMap.get(groupId)) + .setIsLeader(groupLeaderMap.getOrDefault(groupId, Boolean.FALSE)) .build()); } return reports; @@ -693,11 +693,11 @@ void notifyGroupAdd(RaftGroupId gid) { } void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId, - RaftPeerId raftPeerId) { + RaftPeerId raftPeerId1) { LOG.info("Leader change notification received for group: {} with new " + "leaderId: {}", groupMemberId.getGroupId(), raftPeerId); // Save the reported leader to be sent with the report to SCM - boolean leaderForGroup = this.raftPeerId.equals(raftPeerId); + boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1); groupLeaderMap.put(groupMemberId.getGroupId(), leaderForGroup); if (context != null && leaderForGroup) { // Publish new report from leader From d7ebe8c22121da04f006c27974844c96d23ee731 Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Thu, 24 Oct 2019 21:30:43 -0700 Subject: [PATCH 13/14] HDDS-1868. Unit test fixes. --- .../common/transport/server/ratis/XceiverServerRatis.java | 2 +- .../hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java | 3 ++- .../scm/safemode/TestOneReplicaPipelineSafeModeRule.java | 3 ++- .../hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java | 6 ++++-- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 1b24acb2087a..1146394fee91 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -695,7 +695,7 @@ void notifyGroupAdd(RaftGroupId gid) { void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId, RaftPeerId raftPeerId1) { LOG.info("Leader change notification received for group: {} with new " + - "leaderId: {}", groupMemberId.getGroupId(), raftPeerId); + "leaderId: {}", groupMemberId.getGroupId(), raftPeerId1); // Save the reported leader to be sent with the report to SCM boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1); groupLeaderMap.put(groupMemberId.getGroupId(), leaderForGroup); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java index 94c3039d41d4..f6d9b0e7c8f3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java @@ -252,7 +252,8 @@ private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) { .newBuilder(); reportBuilder.addPipelineReport(PipelineReport.newBuilder() - .setPipelineID(pipeline.getId().getProtobuf())); + .setPipelineID(pipeline.getId().getProtobuf()) + .setIsLeader(Boolean.TRUE)); // Here no need to fire event from 3 nodes, as already pipeline is in // open state, but doing it. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java index ca54d0521135..7a099774e2bb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java @@ -188,7 +188,8 @@ private void firePipelineEvent(Pipeline pipeline) { PipelineReportsProto.newBuilder(); reportBuilder.addPipelineReport(PipelineReport.newBuilder() - .setPipelineID(pipeline.getId().getProtobuf())); + .setPipelineID(pipeline.getId().getProtobuf()) + .setIsLeader(Boolean.TRUE)); if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) { eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java index 247b38afc7f5..1e608b338168 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java @@ -357,7 +357,8 @@ private void firePipelineEvent(Pipeline pipeline) throws Exception { PipelineReportsProto.newBuilder(); reportBuilder.addPipelineReport(PipelineReport.newBuilder() - .setPipelineID(pipeline.getId().getProtobuf())); + .setPipelineID(pipeline.getId().getProtobuf()) + .setIsLeader(Boolean.TRUE)); queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, new PipelineReportFromDatanode(pipeline.getNodes().get(0), reportBuilder.build())); @@ -493,7 +494,8 @@ public void testSafeModePipelineExitRule() throws Exception { PipelineReportsProto.Builder reportBuilder = PipelineReportsProto .newBuilder(); reportBuilder.addPipelineReport(PipelineReport.newBuilder() - .setPipelineID(pipeline.getId().getProtobuf())); + .setPipelineID(pipeline.getId().getProtobuf()) + .setIsLeader(Boolean.TRUE)); scmSafeModeManager = new SCMSafeModeManager( config, containers, pipelineManager, queue); From 84912ae13964330c7ff10614b6105f83b4b4a8f3 Mon Sep 17 00:00:00 2001 From: Siddharth Wagle Date: Wed, 6 Nov 2019 11:46:18 -0800 Subject: [PATCH 14/14] HDDS-1868. Fix PipelineReportHandler and spurious code. --- .../hdds/scm/pipeline/PipelineReportHandler.java | 12 ++++++++++-- .../hadoop/hdds/scm/pipeline/SCMPipelineManager.java | 3 +-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index c72232cdf1ed..b8cb7b4246c7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -100,9 +100,17 @@ private void processPipelineReport(PipelineReport report, DatanodeDetails dn) return; } + pipeline.reportDatanode(dn); + if (report.getIsLeader()) { + pipeline.setLeaderId(dn.getUuid()); + } + if ((pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) + && pipeline.isHealthy()) { + pipelineManager.openPipeline(pipelineID); + } + if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { - LOGGER.info("Pipeline {} reported by {} isLeader?: {}", - pipeline.getId(), dn, report.getIsLeader()); + if (report.getIsLeader()) { // Pipeline reported as the leader diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index ac6393d4b126..0964f6d4db29 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -85,7 +84,7 @@ public class SCMPipelineManager implements PipelineManager { // Pipeline Manager MXBean private ObjectName pmInfoBean; private GrpcTlsConfig grpcTlsConfig; - + public SCMPipelineManager(Configuration conf, NodeManager nodeManager, EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig) throws IOException {