diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index d793d93e8e4c..091d2a83dd33 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -89,6 +89,13 @@ public enum ChecksumCombineMode { tags = ConfigTag.CLIENT) private long streamWindowSize = 64 * 1024 * 1024; + @Config(key = "datastream.pipeline.mode", + defaultValue = "true", + description = "Streaming write support both pipeline mode(datanode1->" + + "datanode2->datanode3) and star mode(datanode1->datanode2, " + + "datanode1->datanode3). By default we use pipeline mode.", + tags = ConfigTag.CLIENT) + private boolean datastreamPipelineMode = true; @Config(key = "stream.buffer.increment", defaultValue = "0B", @@ -313,4 +320,12 @@ public ChecksumCombineMode getChecksumCombineMode() { ChecksumCombineMode.COMPOSITE_CRC.name()); } } + + public boolean isDatastreamPipelineMode() { + return datastreamPipelineMode; + } + + public void setDatastreamPipelineMode(boolean datastreamPipelineMode) { + this.datastreamPipelineMode = datastreamPipelineMode; + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 8b3e32cf41a4..3df5eb0e1203 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -137,6 +137,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private XceiverClientMetrics metrics; // buffers for which putBlock is yet to be executed private List buffersForPutBlock; + private boolean isDatastreamPipelineMode; /** * Creates a new BlockDataStreamOutput. * @@ -154,6 +155,7 @@ public BlockDataStreamOutput( ) throws IOException { this.xceiverClientFactory = xceiverClientManager; this.config = config; + this.isDatastreamPipelineMode = config.isDatastreamPipelineMode(); this.blockID = new AtomicReference<>(blockID); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); @@ -203,9 +205,14 @@ private DataStreamOutput setupStream(Pipeline pipeline) throws IOException { ContainerCommandRequestMessage message = ContainerCommandRequestMessage.toMessage(builder.build(), null); - return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) - .stream(message.getContent().asReadOnlyByteBuffer(), - getRoutingTable(pipeline)); + if (isDatastreamPipelineMode) { + return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) + .stream(message.getContent().asReadOnlyByteBuffer(), + getRoutingTable(pipeline)); + } else { + return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) + .stream(message.getContent().asReadOnlyByteBuffer()); + } } public RoutingTable getRoutingTable(Pipeline pipeline) {