Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private long dataStreamBufferFlushSize = 16 * 1024 * 1024;

@Config(key = "datastream.min.packet.size",
defaultValue = "1MB",
type = ConfigType.SIZE,
description = "The maximum size of the ByteBuffer "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's set this size to 1 MB == bytesPerCheckSum by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set to 1Mb

Copy link
Member

@captainzmc captainzmc Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In performance test, I found that the performance of 512K is better than that of IMB. Can we change the default value to 512K?

+ "(used via ratis streaming)",
tags = ConfigTag.CLIENT)
private int dataStreamMinPacketSize = 1024 * 1024;

@Config(key = "stream.buffer.increment",
defaultValue = "0B",
type = ConfigType.SIZE,
Expand Down Expand Up @@ -207,6 +215,14 @@ public void setStreamBufferMaxSize(long streamBufferMaxSize) {
this.streamBufferMaxSize = streamBufferMaxSize;
}

public int getDataStreamMinPacketSize() {
return dataStreamMinPacketSize;
}

public void setDataStreamMinPacketSize(int dataStreamMinPacketSize) {
this.dataStreamMinPacketSize = dataStreamMinPacketSize;
}

public int getMaxRetryCount() {
return maxRetryCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
Expand Down Expand Up @@ -125,7 +127,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
private final long syncSize = 0; // TODO: disk sync is disabled for now
private long syncPosition = 0;

private StreamBuffer currentBuffer;
private XceiverClientMetrics metrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to track currentBufferRemaining? It can always be deduced from streamBuffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/**
* Creates a new BlockDataStreamOutput.
*
Expand Down Expand Up @@ -172,6 +175,7 @@ public BlockDataStreamOutput(
ioException = new AtomicReference<>(null);
checksum = new Checksum(config.getChecksumType(),
config.getBytesPerChecksum());
metrics = XceiverClientManager.getXceiverClientMetrics();
}

private DataStreamOutput setupStream(Pipeline pipeline) throws IOException {
Expand Down Expand Up @@ -257,27 +261,47 @@ public void write(ByteBuffer b, int off, int len) throws IOException {
if (len == 0) {
return;
}
int curLen = len;
// set limit on the number of bytes that a ByteBuffer(StreamBuffer) can hold
int maxBufferLen = config.getDataStreamMaxBufferSize();
while (curLen > 0) {
int writeLen = Math.min(curLen, maxBufferLen);
while (len > 0) {
allocateNewBufferIfNeeded();
int writeLen = Math.min(len, currentBuffer.length());
final StreamBuffer buf = new StreamBuffer(b, off, writeLen);
currentBuffer.put(buf);
writeChunkIfNeeded();
off += writeLen;
bufferList.add(buf);
writeChunkToContainer(buf.duplicate());
curLen -= writeLen;
writtenDataLength += writeLen;
len -= writeLen;
doFlushIfNeeded();
}
}

private void writeChunkIfNeeded() throws IOException {
if (currentBuffer.length()==0) {
writeChunk(currentBuffer);
currentBuffer = null;
}
}

private void writeChunk(StreamBuffer sb) throws IOException {
bufferList.add(sb);
ByteBuffer dup = sb.duplicate();
dup.position(0);
dup.limit(sb.position());
writeChunkToContainer(dup);
}

private void allocateNewBufferIfNeeded() {
if (currentBuffer==null) {
currentBuffer =
StreamBuffer.allocate(config.getDataStreamMinPacketSize());
}
}

private void doFlushIfNeeded() throws IOException {
Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config
.getDataStreamMaxBufferSize());
long boundary = config.getDataStreamBufferFlushSize() / config
.getDataStreamMaxBufferSize();
if (bufferList.size() % boundary == 0) {
if (!bufferList.isEmpty() && bufferList.size() % boundary == 0) {
updateFlushLength();
executePutBlock(false, false);
}
Expand Down Expand Up @@ -308,11 +332,10 @@ public void writeOnRetry(long len) throws IOException {
int count = 0;
while (len > 0) {
final StreamBuffer buf = bufferList.get(count);
final long writeLen = Math.min(buf.length(), len);
final long writeLen = Math.min(buf.position(), len);
final ByteBuffer duplicated = buf.duplicate();
if (writeLen != buf.length()) {
duplicated.limit(Math.toIntExact(len));
}
duplicated.position(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this position be always be set to 0?? The starting offset should be deduced rom last ack'd length in case of retry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are writing a new chunk here everytime picking each entry from the bufferList, hence the position will be 0. Same logic as this

duplicated.limit(buf.position());
writeChunkToContainer(duplicated);
len -= writeLen;
count++;
Expand Down Expand Up @@ -449,6 +472,11 @@ private void handleFlush(boolean close)
// This can be a partially filled chunk. Since we are flushing the buffer
// here, we just limit this buffer to the current position. So that next
// write will happen in new buffer

if (currentBuffer!=null) {
writeChunk(currentBuffer);
currentBuffer = null;
}
updateFlushLength();
executePutBlock(close, false);
} else if (close) {
Expand Down Expand Up @@ -584,6 +612,7 @@ private void writeChunkToContainer(ByteBuffer buf)
.setLen(effectiveChunkSize)
.setChecksumData(checksumData.getProtoBufMessage())
.build();
metrics.incrPendingContainerOpsMetrics(ContainerProtos.Type.WriteChunk);

if (LOG.isDebugEnabled()) {
LOG.debug("Writing chunk {} length {} at offset {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class StreamBuffer {
private final ByteBuffer buffer;

public StreamBuffer(ByteBuffer buffer) {
this.buffer = buffer.asReadOnlyBuffer();
this.buffer = buffer;
}

public StreamBuffer(ByteBuffer buffer, int offset, int length) {
Expand All @@ -43,4 +43,17 @@ public int length() {
return buffer.limit() - buffer.position();
}

public int position() {
return buffer.position();
}


public void put(StreamBuffer sb){
buffer.put(sb.buffer);
}

public static StreamBuffer allocate(int size){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its better to add position() and other calls in here, instead of getting an instance of underlying buffer directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return new StreamBuffer(ByteBuffer.allocate(size));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private long releaseBuffers(List<Long> indexes) {
Preconditions.checkState(commitIndexMap.containsKey(index));
final List<StreamBuffer> buffers = commitIndexMap.remove(index);
final long length =
buffers.stream().mapToLong(StreamBuffer::length).sum();
buffers.stream().mapToLong(StreamBuffer::position).sum();
totalAckDataLength += length;
// clear the future object from the future Map
final CompletableFuture<ContainerCommandResponseProto> remove =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ boolean isEmpty() {
long computeBufferData() {
long totalDataLen =0;
for (StreamBuffer b : bufferList){
totalDataLen += b.length();
totalDataLen += b.position();
}
return totalDataLen;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ abstract class Builder {
protected Optional<Long> dataStreamBufferFlushSize= Optional.empty();
protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty();
protected Optional<Long> streamBufferMaxSize = Optional.empty();
protected OptionalInt dataStreamMinPacketSize = OptionalInt.empty();
protected Optional<Long> blockSize = Optional.empty();
protected Optional<StorageUnit> streamBufferSizeUnit = Optional.empty();
protected boolean includeRecon = false;
Expand Down Expand Up @@ -565,6 +566,11 @@ public Builder setDataStreamBufferFlushize(long size) {
return this;
}

public Builder setDataStreamMinPacketSize(int size) {
dataStreamMinPacketSize = OptionalInt.of(size);
return this;
}

/**
* Sets the block size for stream buffer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@ protected void initializeConfiguration() throws IOException {
if (!dataStreamMaxBufferSize.isPresent()) {
dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get());
}
if (!dataStreamMinPacketSize.isPresent()) {
dataStreamMinPacketSize = OptionalInt.of(chunkSize.get()/4);
}
if (!blockSize.isPresent()) {
blockSize = Optional.of(2 * streamBufferMaxSize.get());
}
Expand All @@ -685,6 +688,9 @@ protected void initializeConfiguration() throws IOException {
clientConfig.setDataStreamMaxBufferSize((int) Math.round(
streamBufferSizeUnit.get()
.toBytes(dataStreamMaxBufferSize.getAsInt())));
clientConfig.setDataStreamMinPacketSize((int) Math.round(
streamBufferSizeUnit.get()
.toBytes(dataStreamMinPacketSize.getAsInt())));
conf.setFromObject(clientConfig);

conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public static void init() throws Exception {
.setDataStreamBufferFlushize(maxFlushSize)
.setDataStreamBufferMaxSize(chunkSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES)
.setDataStreamMinPacketSize(2*chunkSize/5)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
Expand Down Expand Up @@ -193,7 +194,7 @@ private void testWriteWithFailure(int dataLength) throws Exception {

@Test
public void testPutBlockAtBoundary() throws Exception {
int dataLength = 500;
int dataLength = 200;
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long putBlockCount = metrics.getContainerOpCountMetrics(
Expand All @@ -211,8 +212,8 @@ public void testPutBlockAtBoundary() throws Exception {
metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
key.close();
// Since data length is 500 , first putBlock will be at 400(flush boundary)
// and the other at 500
// Since data length is 200 , first putBlock will be at 160(flush boundary)
// and the other at 200
Assert.assertTrue(
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
== putBlockCount + 2);
Expand All @@ -230,4 +231,29 @@ private void validateData(String keyName, byte[] data) throws Exception {
.validateData(keyName, data, objectStore, volumeName, bucketName);
}


@Test
public void testMinPacketSize() throws Exception {
String keyName = getKeyName();
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0);
long writeChunkCount =
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
byte[] data =
ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 5)
.getBytes(UTF_8);
key.write(ByteBuffer.wrap(data));
// minPacketSize= 40, so first write of 20 wont trigger a writeChunk
Assert.assertEquals(writeChunkCount,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
key.write(ByteBuffer.wrap(data));
Assert.assertEquals(writeChunkCount + 1,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
// now close the stream, It will update the key length.
key.close();
String dataString = new String(data, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public void setup() throws Exception {
conf.setQuietMode(false);
cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200)
.setDataStreamMinPacketSize(1024)
.build();
cluster.waitForClusterToBeReady();
cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ static void startCluster(OzoneConfiguration conf) throws Exception {
.setTotalPipelineNumLimit(10)
.setScmId(scmId)
.setClusterId(clusterId)
.setDataStreamMinPacketSize(1024)
.build();
cluster.waitForClusterToBeReady();
ozClient = OzoneClientFactory.getRpcClient(conf);
Expand Down