Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private int streamBufferSize = 4 * 1024 * 1024;

@Config(key = "stream.buffer.increment",
defaultValue = "0B",
type = ConfigType.SIZE,
description = "Buffer (defined by ozone.client.stream.buffer.size) "
+ "will be incremented with this steps. If zero, the full buffer "
+ "will "
+ "be created at once. Setting it to a variable between 0 and "
+ "ozone.client.stream.buffer.size can reduce the memory usage for "
+ "very small keys, but has a performance overhead.",
tags = ConfigTag.CLIENT)
private int bufferIncrement = 0;

@Config(key = "stream.buffer.flush.delay",
defaultValue = "true",
description = "Default true, when call flush() and determine whether "
Expand Down Expand Up @@ -118,6 +130,9 @@ private void validate() {
Preconditions.checkState(streamBufferFlushSize > 0);
Preconditions.checkState(streamBufferMaxSize > 0);

Preconditions.checkArgument(bufferIncrement < streamBufferSize,
"Buffer increment should be smaller than the size of the stream "
+ "buffer");
Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0,
"expected max. buffer size (%s) to be a multiple of flush size (%s)",
streamBufferMaxSize, streamBufferFlushSize);
Expand Down Expand Up @@ -209,4 +224,7 @@ public void setChecksumVerify(boolean checksumVerify) {
this.checksumVerify = checksumVerify;
}

public int getBufferIncrement() {
return bufferIncrement;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,13 @@
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync;

import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -281,7 +280,7 @@ private void doFlushOrWatchIfNeeded() throws IOException {

private void allocateNewBufferIfNeeded() {
if (currentBufferRemaining == 0) {
currentBuffer = bufferPool.allocateBuffer(config.getBytesPerChecksum());
currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement());
currentBufferRemaining = currentBuffer.remaining();
}
}
Expand Down