diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 52f435dc826d..2b5854ca2086 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.io.InterruptedIOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,6 +44,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; import org.apache.hadoop.hdds.scm.client.ClientTrustManager; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -384,6 +387,12 @@ private XceiverClientReply sendCommandWithRetry( } } + boolean allInService = datanodeList.stream() + .allMatch(dn -> dn.getPersistedOpState() == NodeOperationalState.IN_SERVICE); + if (!allInService) { + datanodeList = sortDatanodeByOperationalState(datanodeList); + } + for (DatanodeDetails dn : datanodeList) { try { if (LOG.isDebugEnabled()) { @@ -447,6 +456,30 @@ private XceiverClientReply sendCommandWithRetry( } } + private static List sortDatanodeByOperationalState( + List datanodeList) { + List sortedDatanodeList = new ArrayList<>(datanodeList); + // Make IN_SERVICE's Datanode precede all other State's Datanodes. + // This is a stable sort that does not change the order of the + // IN_SERVICE's Datanode. + Comparator byOpStateStable = (first, second) -> { + boolean firstInService = first.getPersistedOpState() == + NodeOperationalState.IN_SERVICE; + boolean secondInService = second.getPersistedOpState() == + NodeOperationalState.IN_SERVICE; + + if (firstInService == secondInService) { + return 0; + } else if (firstInService) { + return -1; + } else { + return 1; + } + }; + sortedDatanodeList.sort(byOpStateStable); + return sortedDatanodeList; + } + @Override public XceiverClientReply sendCommandAsync( ContainerCommandRequestProto request) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java index 79c937ceb58b..99095f55b008 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java @@ -20,13 +20,17 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; + +import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; @@ -174,6 +178,39 @@ public XceiverClientReply sendCommandAsync( assertEquals(1, seenDNs.size()); } + @Test + public void testPrimaryReadFromNormalDatanode() + throws IOException { + final List seenDNs = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + Pipeline randomPipeline = MockPipeline.createRatisPipeline(); + int nodeCount = randomPipeline.getNodes().size(); + assertThat(nodeCount).isGreaterThan(1); + randomPipeline.getNodes().forEach( + node -> assertEquals(NodeOperationalState.IN_SERVICE, node.getPersistedOpState())); + + randomPipeline.getNodes().get( + RandomUtils.nextInt(0, nodeCount)).setPersistedOpState(NodeOperationalState.IN_MAINTENANCE); + randomPipeline.getNodes().get( + RandomUtils.nextInt(0, nodeCount)).setPersistedOpState(NodeOperationalState.IN_MAINTENANCE); + try (XceiverClientGrpc client = new XceiverClientGrpc(randomPipeline, conf) { + @Override + public XceiverClientReply sendCommandAsync( + ContainerProtos.ContainerCommandRequestProto request, + DatanodeDetails dn) { + seenDNs.add(dn); + return buildValidResponse(); + } + }) { + invokeXceiverClientGetBlock(client); + } catch (IOException e) { + e.printStackTrace(); + } + // Always the IN_SERVICE datanode will be read first + assertEquals(NodeOperationalState.IN_SERVICE, seenDNs.get(0).getPersistedOpState()); + } + } + @Test public void testConnectionReusedAfterGetBlock() throws IOException { // With a new Client, make 100 calls. On each call, ensure that only one