Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,6 +44,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -384,6 +387,12 @@ private XceiverClientReply sendCommandWithRetry(
}
}

boolean allInService = datanodeList.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change seems sensible to me. I tried to think if there was a nicer way to do this, and thought that the following was easier to follow:

    Iterator<DatanodeDetails> iter = datanodeList.iterator();
    List<DatanodeDetails> notInService = null;
    while(iter.hasNext()) {
      DatanodeDetails dn = iter.next();
      if (dn.getPersistedOpState() != HddsProtos.NodeOperationalState.IN_SERVICE) {
        iter.remove();
        if (notInService == null) {
          notInService = new ArrayList<>();
        }
        notInService.add(dn);
      }
    }
    if (notInService != null) {
      datanodeList.addAll(notInService);
    }

It could be shorter, but I tried to avoid allocating a list for notInService unless it was really needed, as it won't be needed most of the time.

Only change this if you feel it is better - I am not insisting it should be changed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion. This Implementation need to remove and readd the element, this may not bring in more benefits compare with the sort.

.allMatch(dn -> dn.getPersistedOpState() == NodeOperationalState.IN_SERVICE);
if (!allInService) {
datanodeList = sortDatanodeByOperationalState(datanodeList);
}

for (DatanodeDetails dn : datanodeList) {
try {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -447,6 +456,30 @@ private XceiverClientReply sendCommandWithRetry(
}
}

private static List<DatanodeDetails> sortDatanodeByOperationalState(
List<DatanodeDetails> datanodeList) {
List<DatanodeDetails> sortedDatanodeList = new ArrayList<>(datanodeList);
// Make IN_SERVICE's Datanode precede all other State's Datanodes.
// This is a stable sort that does not change the order of the
// IN_SERVICE's Datanode.
Comparator<DatanodeDetails> byOpStateStable = (first, second) -> {
boolean firstInService = first.getPersistedOpState() ==
NodeOperationalState.IN_SERVICE;
boolean secondInService = second.getPersistedOpState() ==
NodeOperationalState.IN_SERVICE;

if (firstInService == secondInService) {
return 0;
} else if (firstInService) {
return -1;
} else {
return 1;
}
};
sortedDatanodeList.sort(byOpStateStable);
return sortedDatanodeList;
}

@Override
public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
Expand Down Expand Up @@ -174,6 +178,39 @@ public XceiverClientReply sendCommandAsync(
assertEquals(1, seenDNs.size());
}

@Test
public void testPrimaryReadFromNormalDatanode()
throws IOException {
final List<DatanodeDetails> seenDNs = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Pipeline randomPipeline = MockPipeline.createRatisPipeline();
int nodeCount = randomPipeline.getNodes().size();
assertThat(nodeCount).isGreaterThan(1);
randomPipeline.getNodes().forEach(
node -> assertEquals(NodeOperationalState.IN_SERVICE, node.getPersistedOpState()));

randomPipeline.getNodes().get(
RandomUtils.nextInt(0, nodeCount)).setPersistedOpState(NodeOperationalState.IN_MAINTENANCE);
randomPipeline.getNodes().get(
RandomUtils.nextInt(0, nodeCount)).setPersistedOpState(NodeOperationalState.IN_MAINTENANCE);
try (XceiverClientGrpc client = new XceiverClientGrpc(randomPipeline, conf) {
@Override
public XceiverClientReply sendCommandAsync(
ContainerProtos.ContainerCommandRequestProto request,
DatanodeDetails dn) {
seenDNs.add(dn);
return buildValidResponse();
}
}) {
invokeXceiverClientGetBlock(client);
} catch (IOException e) {
e.printStackTrace();
}
// Always the IN_SERVICE datanode will be read first
assertEquals(NodeOperationalState.IN_SERVICE, seenDNs.get(0).getPersistedOpState());
}
}

@Test
public void testConnectionReusedAfterGetBlock() throws IOException {
// With a new Client, make 100 calls. On each call, ensure that only one
Expand Down