Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RoutingTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -144,7 +146,7 @@ public BlockDataStreamOutput(
this.xceiverClient =
(XceiverClientRatis)xceiverClientManager.acquireClient(pipeline);
// Alternatively, stream setup can be delayed till the first chunk write.
this.out = setupStream();
this.out = setupStream(pipeline);
this.token = token;

flushPeriod = (int) (config.getStreamBufferFlushSize() / config
Expand All @@ -166,7 +168,7 @@ public BlockDataStreamOutput(
config.getBytesPerChecksum());
}

private DataStreamOutput setupStream() throws IOException {
private DataStreamOutput setupStream(Pipeline pipeline) throws IOException {
// Execute a dummy WriteChunk request to get the path of the target file,
// but does NOT write any data to it.
ContainerProtos.WriteChunkRequestProto.Builder writeChunkRequest =
Expand All @@ -184,7 +186,39 @@ private DataStreamOutput setupStream() throws IOException {
ContainerCommandRequestMessage.toMessage(builder.build(), null);

return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
.stream(message.getContent().asReadOnlyByteBuffer());
.stream(message.getContent().asReadOnlyByteBuffer(),
getRoutingTable(pipeline));
}

public RoutingTable getRoutingTable(Pipeline pipeline) {
RaftPeerId primaryId = null;
List<RaftPeerId> raftPeers = new ArrayList<>();

for (DatanodeDetails dn : pipeline.getNodes()) {
final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString());
try {
if (dn == pipeline.getFirstNode()) {
primaryId = raftPeerId;
}
} catch (IOException e) {
LOG.error("Can not get FirstNode from the pipeline: {} with " +
"exception: {}", pipeline.toString(), e.getLocalizedMessage());
return null;
}
raftPeers.add(raftPeerId);
}

RoutingTable.Builder builder = RoutingTable.newBuilder();
RaftPeerId previousId = primaryId;
for (RaftPeerId peerId : raftPeers) {
if (peerId.equals(primaryId)) {
continue;
}
builder.addSuccessor(previousId, peerId);
previousId = peerId;
}

return builder.build();
}

public BlockID getBlockID() {
Expand Down