diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index c69af90a9142..41e2c48bbbd8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -41,6 +41,8 @@ import org.apache.ratis.client.api.DataStreamOutput; import org.apache.ratis.io.StandardWriteOption; import org.apache.ratis.protocol.DataStreamReply; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.RoutingTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,7 +146,7 @@ public BlockDataStreamOutput( this.xceiverClient = (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline); // Alternatively, stream setup can be delayed till the first chunk write. - this.out = setupStream(); + this.out = setupStream(pipeline); this.token = token; flushPeriod = (int) (config.getStreamBufferFlushSize() / config @@ -166,7 +168,7 @@ public BlockDataStreamOutput( config.getBytesPerChecksum()); } - private DataStreamOutput setupStream() throws IOException { + private DataStreamOutput setupStream(Pipeline pipeline) throws IOException { // Execute a dummy WriteChunk request to get the path of the target file, // but does NOT write any data to it. ContainerProtos.WriteChunkRequestProto.Builder writeChunkRequest = @@ -184,7 +186,39 @@ private DataStreamOutput setupStream() throws IOException { ContainerCommandRequestMessage.toMessage(builder.build(), null); return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) - .stream(message.getContent().asReadOnlyByteBuffer()); + .stream(message.getContent().asReadOnlyByteBuffer(), + getRoutingTable(pipeline)); + } + + public RoutingTable getRoutingTable(Pipeline pipeline) { + RaftPeerId primaryId = null; + List raftPeers = new ArrayList<>(); + + for (DatanodeDetails dn : pipeline.getNodes()) { + final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString()); + try { + if (dn == pipeline.getFirstNode()) { + primaryId = raftPeerId; + } + } catch (IOException e) { + LOG.error("Can not get FirstNode from the pipeline: {} with " + + "exception: {}", pipeline.toString(), e.getLocalizedMessage()); + return null; + } + raftPeers.add(raftPeerId); + } + + RoutingTable.Builder builder = RoutingTable.newBuilder(); + RaftPeerId previousId = primaryId; + for (RaftPeerId peerId : raftPeers) { + if (peerId.equals(primaryId)) { + continue; + } + builder.addSuccessor(previousId, peerId); + previousId = peerId; + } + + return builder.build(); } public BlockID getBlockID() {