Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private int streamBufferSize = 4 * 1024 * 1024;

@Config(key = "datastream.max.buffer.size",
defaultValue = "4MB",
type = ConfigType.SIZE,
description = "The maximum size of the ByteBuffer "
+ "(used via ratis streaming)",
tags = ConfigTag.CLIENT)
private int dataStreamMaxBufferSize = 4 * 1024 * 1024;

@Config(key = "datastream.buffer.flush.size",
defaultValue = "16MB",
type = ConfigType.SIZE,
description = "The boundary at which putBlock is executed",
tags = ConfigTag.CLIENT)
private long dataStreamBufferFlushSize = 16 * 1024 * 1024;

@Config(key = "stream.buffer.increment",
defaultValue = "0B",
type = ConfigType.SIZE,
Expand Down Expand Up @@ -168,6 +183,14 @@ public void setStreamBufferSize(int streamBufferSize) {
this.streamBufferSize = streamBufferSize;
}

public int getDataStreamMaxBufferSize() {
return dataStreamMaxBufferSize;
}

public void setDataStreamMaxBufferSize(int dataStreamMaxBufferSize) {
this.dataStreamMaxBufferSize = dataStreamMaxBufferSize;
}

public boolean isStreamBufferFlushDelay() {
return streamBufferFlushDelay;
}
Expand Down Expand Up @@ -227,4 +250,12 @@ public void setChecksumVerify(boolean checksumVerify) {
public int getBufferIncrement() {
return bufferIncrement;
}

public long getDataStreamBufferFlushSize() {
return dataStreamBufferFlushSize;
}

public void setDataStreamBufferFlushSize(long dataStreamBufferFlushSize) {
this.dataStreamBufferFlushSize = dataStreamBufferFlushSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,30 @@ public void write(ByteBuffer b, int off, int len) throws IOException {
if (len == 0) {
return;
}
int curLen = len;
// set limit on the number of bytes that a ByteBuffer(StreamBuffer) can hold
int maxBufferLen = config.getDataStreamMaxBufferSize();
while (curLen > 0) {
int writeLen = Math.min(curLen, maxBufferLen);
final StreamBuffer buf = new StreamBuffer(b, off, writeLen);
off += writeLen;
bufferList.add(buf);
writeChunkToContainer(buf.duplicate());
curLen -= writeLen;
writtenDataLength += writeLen;
doFlushIfNeeded();
}
}

final StreamBuffer buf = new StreamBuffer(b, off, len);
bufferList.add(buf);

writeChunkToContainer(buf.duplicate());

writtenDataLength += len;
private void doFlushIfNeeded() throws IOException {
Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config
.getDataStreamMaxBufferSize());
long boundary = config.getDataStreamBufferFlushSize() / config
.getDataStreamMaxBufferSize();
if (bufferList.size() % boundary == 0) {
updateFlushLength();
executePutBlock(false, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we need add updateFlushLength(), waitOnFlushFutures() and watchForCommit(false) here? Just as what we did in handleFlush. Currently we only releaseBuffers after watchForCommit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @captainzmc for the comment. The plan here is to execute a putBlock at 32MB and not block the client right away by waiting for the flush futures and calling watchForCommit. We will introduce another boundary say at 64MB to check whether atleast 32MB is flushed and committed to all datanodes i.e watchForCommit() and waitForFlushFutures() will occur at 64mb. (I will add this is in a different patch after this PR)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining, make sense.

}
}

private void updateFlushLength() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ abstract class Builder {
protected Optional<Integer> chunkSize = Optional.empty();
protected OptionalInt streamBufferSize = OptionalInt.empty();
protected Optional<Long> streamBufferFlushSize = Optional.empty();
protected Optional<Long> dataStreamBufferFlushSize= Optional.empty();
protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty();
protected Optional<Long> streamBufferMaxSize = Optional.empty();
protected Optional<Long> blockSize = Optional.empty();
protected Optional<StorageUnit> streamBufferSizeUnit = Optional.empty();
Expand Down Expand Up @@ -553,6 +555,16 @@ public Builder setStreamBufferMaxSize(long size) {
return this;
}

public Builder setDataStreamBufferMaxSize(int size) {
dataStreamMaxBufferSize = OptionalInt.of(size);
return this;
}

public Builder setDataStreamBufferFlushize(long size) {
dataStreamBufferFlushSize = Optional.of(size);
return this;
}

/**
* Sets the block size for stream buffer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,12 @@ protected void initializeConfiguration() throws IOException {
if (!streamBufferMaxSize.isPresent()) {
streamBufferMaxSize = Optional.of(2 * streamBufferFlushSize.get());
}
if (!dataStreamBufferFlushSize.isPresent()) {
dataStreamBufferFlushSize = Optional.of((long) 4 * chunkSize.get());
}
if (!dataStreamMaxBufferSize.isPresent()) {
dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get());
}
if (!blockSize.isPresent()) {
blockSize = Optional.of(2 * streamBufferMaxSize.get());
}
Expand All @@ -674,6 +680,11 @@ protected void initializeConfiguration() throws IOException {
streamBufferSizeUnit.get().toBytes(streamBufferMaxSize.get())));
clientConfig.setStreamBufferFlushSize(Math.round(
streamBufferSizeUnit.get().toBytes(streamBufferFlushSize.get())));
clientConfig.setDataStreamBufferFlushSize(Math.round(
streamBufferSizeUnit.get().toBytes(dataStreamBufferFlushSize.get())));
clientConfig.setDataStreamMaxBufferSize((int) Math.round(
streamBufferSizeUnit.get()
.toBytes(dataStreamMaxBufferSize.getAsInt())));
conf.setFromObject(clientConfig);

conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.ozone.MiniOzoneCluster;
Expand Down Expand Up @@ -101,6 +104,8 @@ public static void init() throws Exception {
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
.setDataStreamBufferFlushize(maxFlushSize)
.setDataStreamBufferMaxSize(chunkSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES)
.build();
cluster.waitForClusterToBeReady();
Expand Down Expand Up @@ -186,6 +191,35 @@ private void testWriteWithFailure(int dataLength) throws Exception {
validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
}

@Test
public void testPutBlockAtBoundary() throws Exception {
int dataLength = 500;
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
String keyName = getKeyName();
OzoneDataStreamOutput key = createKey(
keyName, ReplicationType.RATIS, 0);
byte[] data =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(ByteBuffer.wrap(data));
Assert.assertTrue(
metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
key.close();
// Since data length is 500 , first putBlock will be at 400(flush boundary)
// and the other at 500
Assert.assertTrue(
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
== putBlockCount + 2);
validateData(keyName, data);
}


private OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
long size) throws Exception {
return TestHelper.createStreamKey(
Expand Down