diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 80f86a677a7c..44af34cb919c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdds.scm; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; @@ -254,6 +255,7 @@ public long getStreamBufferFlushSize() { return streamBufferFlushSize; } + @VisibleForTesting public void setStreamBufferFlushSize(long streamBufferFlushSize) { this.streamBufferFlushSize = streamBufferFlushSize; } @@ -262,6 +264,7 @@ public int getStreamBufferSize() { return streamBufferSize; } + @VisibleForTesting public void setStreamBufferSize(int streamBufferSize) { this.streamBufferSize = streamBufferSize; } @@ -270,6 +273,7 @@ public boolean isStreamBufferFlushDelay() { return streamBufferFlushDelay; } + @VisibleForTesting public void setStreamBufferFlushDelay(boolean streamBufferFlushDelay) { this.streamBufferFlushDelay = streamBufferFlushDelay; } @@ -278,6 +282,7 @@ public long getStreamBufferMaxSize() { return streamBufferMaxSize; } + @VisibleForTesting public void setStreamBufferMaxSize(long streamBufferMaxSize) { this.streamBufferMaxSize = streamBufferMaxSize; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/StreamBufferArgs.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/StreamBufferArgs.java new file mode 100644 index 000000000000..4772cb90fb37 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/StreamBufferArgs.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + +/** + * This class encapsulates the arguments that are + * required for Ozone client StreamBuffer. + */ +public class StreamBufferArgs { + + private int streamBufferSize; + private long streamBufferFlushSize; + private long streamBufferMaxSize; + private boolean streamBufferFlushDelay; + + protected StreamBufferArgs(Builder builder) { + this.streamBufferSize = builder.bufferSize; + this.streamBufferFlushSize = builder.bufferFlushSize; + this.streamBufferMaxSize = builder.bufferMaxSize; + this.streamBufferFlushDelay = builder.streamBufferFlushDelay; + } + + public int getStreamBufferSize() { + return streamBufferSize; + } + + public long getStreamBufferFlushSize() { + return streamBufferFlushSize; + } + + public long getStreamBufferMaxSize() { + return streamBufferMaxSize; + } + + public boolean isStreamBufferFlushDelay() { + return streamBufferFlushDelay; + } + + public void setStreamBufferFlushDelay(boolean streamBufferFlushDelay) { + this.streamBufferFlushDelay = streamBufferFlushDelay; + } + + protected void setStreamBufferSize(int streamBufferSize) { + this.streamBufferSize = streamBufferSize; + } + + protected void setStreamBufferFlushSize(long streamBufferFlushSize) { + this.streamBufferFlushSize = streamBufferFlushSize; + } + + protected void setStreamBufferMaxSize(long streamBufferMaxSize) { + this.streamBufferMaxSize = streamBufferMaxSize; + } + + /** + * Builder class for StreamBufferArgs. + */ + public static class Builder { + private int bufferSize; + private long bufferFlushSize; + private long bufferMaxSize; + private boolean streamBufferFlushDelay; + + public Builder setBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public Builder setBufferFlushSize(long bufferFlushSize) { + this.bufferFlushSize = bufferFlushSize; + return this; + } + + public Builder setBufferMaxSize(long bufferMaxSize) { + this.bufferMaxSize = bufferMaxSize; + return this; + } + + public Builder setStreamBufferFlushDelay(boolean streamBufferFlushDelay) { + this.streamBufferFlushDelay = streamBufferFlushDelay; + return this; + } + + public StreamBufferArgs build() { + return new StreamBufferArgs(this); + } + + public static Builder getNewBuilder() { + return new Builder(); + } + } + + public static StreamBufferArgs getDefaultStreamBufferArgs( + ReplicationConfig replicationConfig, OzoneClientConfig clientConfig) { + int bufferSize; + long flushSize; + long bufferMaxSize; + boolean streamBufferFlushDelay = clientConfig.isStreamBufferFlushDelay(); + if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.EC) { + bufferSize = ((ECReplicationConfig) replicationConfig).getEcChunkSize(); + flushSize = ((ECReplicationConfig) replicationConfig).getEcChunkSize(); + bufferMaxSize = ((ECReplicationConfig) replicationConfig).getEcChunkSize(); + } else { + bufferSize = clientConfig.getStreamBufferSize(); + flushSize = clientConfig.getStreamBufferFlushSize(); + bufferMaxSize = clientConfig.getStreamBufferMaxSize(); + } + + return Builder.getNewBuilder() + .setBufferSize(bufferSize) + .setBufferFlushSize(flushSize) + .setBufferMaxSize(bufferMaxSize) + .setStreamBufferFlushDelay(streamBufferFlushDelay) + .build(); + } +} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 9988fb258ebb..b97165084f6e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; @@ -88,6 +89,7 @@ public class BlockOutputStream extends OutputStream { private XceiverClientFactory xceiverClientFactory; private XceiverClientSpi xceiverClient; private OzoneClientConfig config; + private StreamBufferArgs streamBufferArgs; private int chunkIndex; private final AtomicLong chunkOffset = new AtomicLong(); @@ -134,6 +136,7 @@ public class BlockOutputStream extends OutputStream { * @param pipeline pipeline where block will be written * @param bufferPool pool of buffers */ + @SuppressWarnings("checkstyle:ParameterNumber") public BlockOutputStream( BlockID blockID, XceiverClientFactory xceiverClientManager, @@ -141,7 +144,7 @@ public BlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - ContainerClientMetrics clientMetrics + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs ) throws IOException { this.xceiverClientFactory = xceiverClientManager; this.config = config; @@ -166,12 +169,12 @@ public BlockOutputStream( //number of buffers used before doing a flush refreshCurrentBuffer(); - flushPeriod = (int) (config.getStreamBufferFlushSize() / config + flushPeriod = (int) (streamBufferArgs.getStreamBufferFlushSize() / streamBufferArgs .getStreamBufferSize()); Preconditions .checkArgument( - (long) flushPeriod * config.getStreamBufferSize() == config + (long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs .getStreamBufferFlushSize()); // A single thread executor handle the responses of async requests @@ -185,6 +188,7 @@ public BlockOutputStream( config.getBytesPerChecksum()); this.clientMetrics = clientMetrics; this.pipeline = pipeline; + this.streamBufferArgs = streamBufferArgs; } void refreshCurrentBuffer() { @@ -321,7 +325,7 @@ private void updateFlushLength() { } private boolean isBufferPoolFull() { - return bufferPool.computeBufferData() == config.getStreamBufferMaxSize(); + return bufferPool.computeBufferData() == streamBufferArgs.getStreamBufferMaxSize(); } /** @@ -339,7 +343,7 @@ public void writeOnRetry(long len) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Retrying write length {} for blockID {}", len, blockID); } - Preconditions.checkArgument(len <= config.getStreamBufferMaxSize()); + Preconditions.checkArgument(len <= streamBufferArgs.getStreamBufferMaxSize()); int count = 0; while (len > 0) { ChunkBuffer buffer = bufferPool.getBuffer(count); @@ -355,13 +359,13 @@ public void writeOnRetry(long len) throws IOException { // the buffer. We should just validate // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to // call for handling full buffer/flush buffer condition. - if (writtenDataLength % config.getStreamBufferFlushSize() == 0) { + if (writtenDataLength % streamBufferArgs.getStreamBufferFlushSize() == 0) { // reset the position to zero as now we will be reading the // next buffer in the list updateFlushLength(); executePutBlock(false, false); } - if (writtenDataLength == config.getStreamBufferMaxSize()) { + if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) { handleFullBuffer(); } } @@ -518,9 +522,9 @@ void putFlushFuture(long flushPos, public void flush() throws IOException { if (xceiverClientFactory != null && xceiverClient != null && bufferPool != null && bufferPool.getSize() > 0 - && (!config.isStreamBufferFlushDelay() || + && (!streamBufferArgs.isStreamBufferFlushDelay() || writtenDataLength - totalDataFlushedLength - >= config.getStreamBufferSize())) { + >= streamBufferArgs.getStreamBufferSize())) { handleFlush(false); } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index 90cf4743f8dd..1d7fdc1df60b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -66,6 +67,7 @@ public class ECBlockOutputStream extends BlockOutputStream { * @param pipeline pipeline where block will be written * @param bufferPool pool of buffers */ + @SuppressWarnings("checkstyle:ParameterNumber") public ECBlockOutputStream( BlockID blockID, XceiverClientFactory xceiverClientManager, @@ -73,10 +75,10 @@ public ECBlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - ContainerClientMetrics clientMetrics + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs ) throws IOException { super(blockID, xceiverClientManager, - pipeline, bufferPool, config, token, clientMetrics); + pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs); // In EC stream, there will be only one node in pipeline. this.datanodeDetails = pipeline.getClosestNode(); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java index ede70574967a..ee708bf0de15 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -71,6 +72,7 @@ public class RatisBlockOutputStream extends BlockOutputStream * @param blockID block ID * @param bufferPool pool of buffers */ + @SuppressWarnings("checkstyle:ParameterNumber") public RatisBlockOutputStream( BlockID blockID, XceiverClientFactory xceiverClientManager, @@ -78,10 +80,10 @@ public RatisBlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - ContainerClientMetrics clientMetrics + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs ) throws IOException { super(blockID, xceiverClientManager, pipeline, - bufferPool, config, token, clientMetrics); + bufferPool, config, token, clientMetrics, streamBufferArgs); this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient()); } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index a2a0832bdb4f..29c0798df77d 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; @@ -103,6 +104,8 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) config.setStreamBufferFlushSize(16 * 1024 * 1024); config.setChecksumType(ChecksumType.NONE); config.setBytesPerChecksum(256 * 1024); + StreamBufferArgs streamBufferArgs = + StreamBufferArgs.getDefaultStreamBufferArgs(pipeline.getReplicationConfig(), config); return new RatisBlockOutputStream( new BlockID(1L, 1L), @@ -111,7 +114,7 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) bufferPool, config, null, - ContainerClientMetrics.acquire()); + ContainerClientMetrics.acquire(), streamBufferArgs); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 80348dbe45ef..24e76821f9c5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -223,13 +224,15 @@ private ECBlockOutputStream getECBlockOutputStream( BlockLocationInfo blockLocationInfo, DatanodeDetails datanodeDetails, ECReplicationConfig repConfig, int replicaIndex, OzoneClientConfig configuration) throws IOException { + StreamBufferArgs streamBufferArgs = + StreamBufferArgs.getDefaultStreamBufferArgs(repConfig, configuration); return new ECBlockOutputStream( blockLocationInfo.getBlockID(), containerOperationClient.getXceiverClientManager(), containerOperationClient.singleNodePipeline(datanodeDetails, repConfig, replicaIndex), BufferPool.empty(), configuration, - blockLocationInfo.getToken(), clientMetrics); + blockLocationInfo.getToken(), clientMetrics, streamBufferArgs); } @VisibleForTesting diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 2501803fc070..9bdec27f534f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; @@ -61,6 +62,7 @@ public class BlockOutputStreamEntry extends OutputStream { private BufferPool bufferPool; private ContainerClientMetrics clientMetrics; + private StreamBufferArgs streamBufferArgs; @SuppressWarnings({"parameternumber", "squid:S00107"}) BlockOutputStreamEntry( @@ -71,7 +73,7 @@ public class BlockOutputStreamEntry extends OutputStream { BufferPool bufferPool, Token token, OzoneClientConfig config, - ContainerClientMetrics clientMetrics + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs ) { this.config = config; this.outputStream = null; @@ -84,6 +86,7 @@ public class BlockOutputStreamEntry extends OutputStream { this.currentPosition = 0; this.bufferPool = bufferPool; this.clientMetrics = clientMetrics; + this.streamBufferArgs = streamBufferArgs; } /** @@ -105,13 +108,17 @@ void checkStream() throws IOException { */ void createOutputStream() throws IOException { outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager, - pipeline, bufferPool, config, token, clientMetrics); + pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs); } ContainerClientMetrics getClientMetrics() { return clientMetrics; } + StreamBufferArgs getStreamBufferArgs() { + return streamBufferArgs; + } + @Override public void write(int b) throws IOException { checkStream(); @@ -353,6 +360,7 @@ public static class Builder { private Token token; private OzoneClientConfig config; private ContainerClientMetrics clientMetrics; + private StreamBufferArgs streamBufferArgs; public Builder setBlockID(BlockID bID) { this.blockID = bID; @@ -398,6 +406,10 @@ public Builder setClientMetrics(ContainerClientMetrics clientMetrics) { this.clientMetrics = clientMetrics; return this; } + public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) { + this.streamBufferArgs = streamBufferArgs; + return this; + } public BlockOutputStreamEntry build() { return new BlockOutputStreamEntry(blockID, @@ -406,7 +418,7 @@ public BlockOutputStreamEntry build() { pipeline, length, bufferPool, - token, config, clientMetrics); + token, config, clientMetrics, streamBufferArgs); } } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index 573e4a8dd3ca..d0f3b5728a8b 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -27,10 +27,10 @@ import java.util.Map; import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -84,6 +84,7 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware { private final long openID; private final ExcludeList excludeList; private final ContainerClientMetrics clientMetrics; + private final StreamBufferArgs streamBufferArgs; @SuppressWarnings({"parameternumber", "squid:S00107"}) public BlockOutputStreamEntryPool( @@ -94,7 +95,7 @@ public BlockOutputStreamEntryPool( boolean isMultipart, OmKeyInfo info, boolean unsafeByteBufferConversion, XceiverClientFactory xceiverClientFactory, long openID, - ContainerClientMetrics clientMetrics + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs ) { this.config = config; this.xceiverClientFactory = xceiverClientFactory; @@ -111,12 +112,13 @@ public BlockOutputStreamEntryPool( this.excludeList = createExcludeList(); this.bufferPool = - new BufferPool(config.getStreamBufferSize(), - (int) (config.getStreamBufferMaxSize() / config + new BufferPool(streamBufferArgs.getStreamBufferSize(), + (int) (streamBufferArgs.getStreamBufferMaxSize() / streamBufferArgs .getStreamBufferSize()), ByteStringConversion .createByteBufferConversion(unsafeByteBufferConversion)); this.clientMetrics = clientMetrics; + this.streamBufferArgs = streamBufferArgs; } ExcludeList createExcludeList() { @@ -124,17 +126,14 @@ ExcludeList createExcludeList() { Clock.system(ZoneOffset.UTC)); } - BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics) { + BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics, + OzoneClientConfig clientConfig, StreamBufferArgs streamBufferArgs) { streamEntries = new ArrayList<>(); omClient = null; keyArgs = null; xceiverClientFactory = null; - config = - new OzoneConfiguration().getObject(OzoneClientConfig.class); - config.setStreamBufferSize(0); - config.setStreamBufferMaxSize(0); - config.setStreamBufferFlushSize(0); - config.setStreamBufferFlushDelay(false); + config = clientConfig; + streamBufferArgs.setStreamBufferFlushDelay(false); requestID = null; int chunkSize = 0; bufferPool = new BufferPool(chunkSize, 1); @@ -143,6 +142,7 @@ ExcludeList createExcludeList() { openID = -1; excludeList = createExcludeList(); this.clientMetrics = clientMetrics; + this.streamBufferArgs = null; } /** @@ -189,6 +189,7 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { .setBufferPool(bufferPool) .setToken(subKeyInfo.getToken()) .setClientMetrics(clientMetrics) + .setStreamBufferArgs(streamBufferArgs) .build(); } @@ -255,6 +256,10 @@ ContainerClientMetrics getClientMetrics() { return clientMetrics; } + StreamBufferArgs getStreamBufferArgs() { + return streamBufferArgs; + } + /** * Discards the subsequent pre allocated blocks and removes the streamEntries * from the streamEntries list for the container which is closed. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java index 26d11f3d642b..07d0f46069ca 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; @@ -78,9 +79,10 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry { ECBlockOutputStreamEntry(BlockID blockID, String key, XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length, BufferPool bufferPool, Token token, - OzoneClientConfig config, ContainerClientMetrics clientMetrics) { + OzoneClientConfig config, ContainerClientMetrics clientMetrics, + StreamBufferArgs streamBufferArgs) { super(blockID, key, xceiverClientManager, pipeline, length, bufferPool, - token, config, clientMetrics); + token, config, clientMetrics, streamBufferArgs); assertInstanceOf( pipeline.getReplicationConfig(), ECReplicationConfig.class); this.replicationConfig = @@ -99,7 +101,7 @@ void checkStream() throws IOException { streams[i] = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(), createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 1), - getBufferPool(), getConf(), getToken(), getClientMetrics()); + getBufferPool(), getConf(), getToken(), getClientMetrics(), getStreamBufferArgs()); } blockOutputStreams = streams; } @@ -441,6 +443,7 @@ public static class Builder { private Token token; private OzoneClientConfig config; private ContainerClientMetrics clientMetrics; + private StreamBufferArgs streamBufferArgs; public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) { this.blockID = bID; @@ -492,6 +495,12 @@ public ECBlockOutputStreamEntry.Builder setClientMetrics( return this; } + public ECBlockOutputStreamEntry.Builder setStreamBufferArgs( + StreamBufferArgs args) { + this.streamBufferArgs = args; + return this; + } + public ECBlockOutputStreamEntry build() { return new ECBlockOutputStreamEntry(blockID, key, @@ -499,7 +508,7 @@ public ECBlockOutputStreamEntry build() { pipeline, length, bufferPool, - token, config, clientMetrics); + token, config, clientMetrics, streamBufferArgs); } } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java index acc70d0dda61..e551605d842d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -56,10 +57,10 @@ public ECBlockOutputStreamEntryPool(OzoneClientConfig config, boolean unsafeByteBufferConversion, XceiverClientFactory xceiverClientFactory, long openID, - ContainerClientMetrics clientMetrics) { + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs) { super(config, omClient, requestId, replicationConfig, uploadID, partNumber, isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory, - openID, clientMetrics); + openID, clientMetrics, streamBufferArgs); assert replicationConfig instanceof ECReplicationConfig; } @@ -82,6 +83,7 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { .setBufferPool(getBufferPool()) .setToken(subKeyInfo.getToken()) .setClientMetrics(getClientMetrics()) + .setStreamBufferArgs(getStreamBufferArgs()) .build(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index 15ebccda2886..b5c36474ff9e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -128,14 +128,12 @@ public long getFlushCheckpoint() { } private ECKeyOutputStream(Builder builder) { - super(builder.getReplicationConfig(), builder.getClientMetrics()); + super(builder.getReplicationConfig(), builder.getClientMetrics(), + builder.getClientConfig(), builder.getStreamBufferArgs()); this.config = builder.getClientConfig(); this.bufferPool = builder.getByteBufferPool(); // For EC, cell/chunk size and buffer size can be same for now. ecChunkSize = builder.getReplicationConfig().getEcChunkSize(); - this.config.setStreamBufferMaxSize(ecChunkSize); - this.config.setStreamBufferFlushSize(ecChunkSize); - this.config.setStreamBufferSize(ecChunkSize); this.numDataBlks = builder.getReplicationConfig().getData(); this.numParityBlks = builder.getReplicationConfig().getParity(); ecChunkBufferCache = new ECChunkBuffers( @@ -151,7 +149,7 @@ private ECKeyOutputStream(Builder builder) { builder.isMultipartKey(), info, builder.isUnsafeByteBufferConversionEnabled(), builder.getXceiverManager(), builder.getOpenHandler().getId(), - builder.getClientMetrics()); + builder.getClientMetrics(), builder.getStreamBufferArgs()); this.writeOffset = 0; this.encoder = CodecUtil.createRawEncoderWithFallback( diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 4e0c4c91faed..8b128e9cd945 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -94,6 +95,7 @@ enum StreamAction { private final BlockOutputStreamEntryPool blockOutputStreamEntryPool; private long clientID; + private StreamBufferArgs streamBufferArgs; /** * Indicates if an atomic write is required. When set to true, @@ -104,8 +106,10 @@ enum StreamAction { private boolean atomicKeyCreation; public KeyOutputStream(ReplicationConfig replicationConfig, - ContainerClientMetrics clientMetrics) { + ContainerClientMetrics clientMetrics, OzoneClientConfig clientConfig, + StreamBufferArgs streamBufferArgs) { this.replication = replicationConfig; + this.config = clientConfig; closed = false; this.retryPolicyMap = HddsClientUtils.getExceptionList() .stream() @@ -113,7 +117,8 @@ public KeyOutputStream(ReplicationConfig replicationConfig, e -> RetryPolicies.TRY_ONCE_THEN_FAIL)); retryCount = 0; offset = 0; - blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(clientMetrics); + blockOutputStreamEntryPool = + new BlockOutputStreamEntryPool(clientMetrics, clientConfig, streamBufferArgs); } @VisibleForTesting @@ -151,7 +156,7 @@ public KeyOutputStream( String uploadID, int partNumber, boolean isMultipart, boolean unsafeByteBufferConversion, ContainerClientMetrics clientMetrics, - boolean atomicKeyCreation + boolean atomicKeyCreation, StreamBufferArgs streamBufferArgs ) { this.config = config; this.replication = replicationConfig; @@ -165,7 +170,7 @@ public KeyOutputStream( unsafeByteBufferConversion, xceiverClientManager, handler.getId(), - clientMetrics); + clientMetrics, streamBufferArgs); this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException( config.getMaxRetryCount(), config.getRetryInterval()); this.retryCount = 0; @@ -173,6 +178,7 @@ public KeyOutputStream( this.writeOffset = 0; this.clientID = handler.getId(); this.atomicKeyCreation = atomicKeyCreation; + this.streamBufferArgs = streamBufferArgs; } /** @@ -279,7 +285,7 @@ private int writeToOutputStream(BlockOutputStreamEntry current, // to or less than the max length of the buffer allocated. // The len specified here is the combined sum of the data length of // the buffers - Preconditions.checkState(!retry || len <= config + Preconditions.checkState(!retry || len <= streamBufferArgs .getStreamBufferMaxSize()); int dataWritten = (int) (current.getWrittenDataLength() - currentPos); writeLen = retry ? (int) len : dataWritten; @@ -330,7 +336,7 @@ private void handleException(BlockOutputStreamEntry streamEntry, pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount); } Preconditions.checkArgument( - bufferedDataLen <= config.getStreamBufferMaxSize()); + bufferedDataLen <= streamBufferArgs.getStreamBufferMaxSize()); Preconditions.checkArgument( offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen); long containerId = streamEntry.getBlockID().getContainerID(); @@ -608,6 +614,7 @@ public static class Builder { private ReplicationConfig replicationConfig; private ContainerClientMetrics clientMetrics; private boolean atomicKeyCreation = false; + private StreamBufferArgs streamBufferArgs; public String getMultipartUploadID() { return multipartUploadID; @@ -676,6 +683,15 @@ public Builder setConfig(OzoneClientConfig config) { return this; } + public StreamBufferArgs getStreamBufferArgs() { + return streamBufferArgs; + } + + public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) { + this.streamBufferArgs = streamBufferArgs; + return this; + } + public boolean isUnsafeByteBufferConversionEnabled() { return unsafeByteBufferConversion; } @@ -725,7 +741,8 @@ public KeyOutputStream build() { isMultipartKey, unsafeByteBufferConversion, clientMetrics, - atomicKeyCreation); + atomicKeyCreation, + streamBufferArgs); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 5302c88520d1..850ae0d19376 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.client.ClientTrustManager; @@ -207,6 +208,7 @@ public class RpcClient implements ClientProtocol { private final boolean topologyAwareReadEnabled; private final boolean checkKeyNameEnabled; private final OzoneClientConfig clientConfig; + private final ReplicationConfigValidator replicationConfigValidator; private final Cache keyProviderCache; private final boolean getLatestVersionLocation; private final ByteBufferPool byteBufferPool; @@ -230,6 +232,8 @@ public RpcClient(ConfigurationSource conf, String omServiceId) this.ugi = UserGroupInformation.getCurrentUser(); // Get default acl rights for user and group. OzoneAclConfig aclConfig = this.conf.getObject(OzoneAclConfig.class); + replicationConfigValidator = + this.conf.getObject(ReplicationConfigValidator.class); this.userRights = aclConfig.getUserDefaultRights(); this.groupRights = aclConfig.getGroupDefaultRights(); @@ -1343,9 +1347,7 @@ public OzoneOutputStream createKey( } if (replicationConfig != null) { - ReplicationConfigValidator validator = - this.conf.getObject(ReplicationConfigValidator.class); - validator.validate(replicationConfig); + replicationConfigValidator.validate(replicationConfig); } OmKeyArgs.Builder builder = new OmKeyArgs.Builder() @@ -1854,7 +1856,7 @@ public OzoneDataStreamOutput createMultipartStreamKey( .setMultipartUploadID(uploadID) .setIsMultipartKey(true) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) - .setConfig(conf.getObject(OzoneClientConfig.class)) + .setConfig(clientConfig) .setAtomicKeyCreation(isS3GRequest.get()) .build(); keyOutputStream @@ -2269,7 +2271,7 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey) .setOmClient(ozoneManagerClient) .setReplicationConfig(replicationConfig) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) - .setConfig(conf.getObject(OzoneClientConfig.class)) + .setConfig(clientConfig) .setAtomicKeyCreation(isS3GRequest.get()) .build(); keyOutputStream @@ -2279,6 +2281,7 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey) openKey, keyOutputStream, null); return new OzoneDataStreamOutput(out != null ? out : keyOutputStream); } + private OzoneOutputStream createOutputStream(OpenKeySession openKey) throws IOException { KeyOutputStream keyOutputStream = createKeyOutputStream(openKey) @@ -2336,6 +2339,8 @@ private KeyOutputStream.Builder createKeyOutputStream( ReplicationConfig replicationConfig = openKey.getKeyInfo().getReplicationConfig(); + StreamBufferArgs streamBufferArgs = StreamBufferArgs.getDefaultStreamBufferArgs( + replicationConfig, clientConfig); if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.EC) { builder = new ECKeyOutputStream.Builder() @@ -2351,9 +2356,10 @@ private KeyOutputStream.Builder createKeyOutputStream( .setXceiverClientManager(xceiverClientManager) .setOmClient(ozoneManagerClient) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) - .setConfig(conf.getObject(OzoneClientConfig.class)) + .setConfig(clientConfig) .setAtomicKeyCreation(isS3GRequest.get()) - .setClientMetrics(clientMetrics); + .setClientMetrics(clientMetrics) + .setStreamBufferArgs(streamBufferArgs); } @Override diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java index 00a7ba557490..983516002909 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java @@ -22,6 +22,8 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; @@ -74,8 +76,13 @@ public synchronized void close() throws IOException { @Override public KeyOutputStream getKeyOutputStream() { - return new KeyOutputStream( - ReplicationConfig.getDefault(new OzoneConfiguration()), null) { + OzoneConfiguration conf = new OzoneConfiguration(); + ReplicationConfig replicationConfig = + ReplicationConfig.getDefault(conf); + OzoneClientConfig ozoneClientConfig = conf.getObject(OzoneClientConfig.class); + StreamBufferArgs streamBufferArgs = + StreamBufferArgs.getDefaultStreamBufferArgs(replicationConfig, ozoneClientConfig); + return new KeyOutputStream(replicationConfig, null, ozoneClientConfig, streamBufferArgs) { @Override public synchronized OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java index b214b0968a25..c94048e00d85 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java @@ -326,8 +326,8 @@ private void applyOperation(long counter) throws Exception { case CREATE_KEY: keyName = getPath(counter); getMetrics().timer(operation.name()).time(() -> { - try (OutputStream stream = bucket.createKey(keyName, - dataSize.toBytes())) { + try (OutputStream stream = bucket.createStreamKey(keyName, + dataSize.toBytes(), replicationConfig, new HashMap<>())) { contentGenerator.write(stream); } return null;