Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,6 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private int streamBufferSize = 4 * 1024 * 1024;

@Config(key = "datastream.max.buffer.size",
defaultValue = "4MB",
type = ConfigType.SIZE,
description = "The maximum size of the ByteBuffer "
+ "(used via ratis streaming)",
tags = ConfigTag.CLIENT)
private int dataStreamMaxBufferSize = 4 * 1024 * 1024;

@Config(key = "datastream.buffer.flush.size",
defaultValue = "16MB",
type = ConfigType.SIZE,
Expand All @@ -77,6 +69,15 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private int dataStreamMinPacketSize = 1024 * 1024;

@Config(key = "datastream.window.size",
defaultValue = "64MB",
type = ConfigType.SIZE,
description = "Maximum size of BufferList(used for retry) size per " +
"BlockDataStreamOutput instance",
tags = ConfigTag.CLIENT)
private long streamWindowSize = 64 * 1024 * 1024;


@Config(key = "stream.buffer.increment",
defaultValue = "0B",
type = ConfigType.SIZE,
Expand Down Expand Up @@ -191,14 +192,6 @@ public void setStreamBufferSize(int streamBufferSize) {
this.streamBufferSize = streamBufferSize;
}

public int getDataStreamMaxBufferSize() {
return dataStreamMaxBufferSize;
}

public void setDataStreamMaxBufferSize(int dataStreamMaxBufferSize) {
this.dataStreamMaxBufferSize = dataStreamMaxBufferSize;
}

public boolean isStreamBufferFlushDelay() {
return streamBufferFlushDelay;
}
Expand All @@ -223,6 +216,14 @@ public void setDataStreamMinPacketSize(int dataStreamMinPacketSize) {
this.dataStreamMinPacketSize = dataStreamMinPacketSize;
}

public long getStreamWindowSize() {
return streamWindowSize;
}

public void setStreamWindowSize(long streamWindowSize) {
this.streamWindowSize = streamWindowSize;
}

public int getMaxRetryCount() {
return maxRetryCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
this.ozoneConfiguration = configuration;
}

private void updateCommitInfosMap(
public void updateCommitInfosMap(
Collection<RaftProtos.CommitInfoProto> commitInfoProtos) {
// if the commitInfo map is empty, just update the commit indexes for each
// of the servers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -116,9 +118,9 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
// Also, corresponding to the logIndex, the corresponding list of buffers will
// be released from the buffer pool.
private final StreamCommitWatcher commitWatcher;
private final AtomicReference<CompletableFuture<
ContainerCommandResponseProto>> putBlockFuture
= new AtomicReference<>(CompletableFuture.completedFuture(null));

private Queue<CompletableFuture<ContainerCommandResponseProto>>
putBlockFutures = new LinkedList<>();

private final List<DatanodeDetails> failedServers;
private final Checksum checksum;
Expand Down Expand Up @@ -307,14 +309,33 @@ private void allocateNewBufferIfNeeded() {
}

private void doFlushIfNeeded() throws IOException {
Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config
.getDataStreamMaxBufferSize());
long boundary = config.getDataStreamBufferFlushSize() / config
.getDataStreamMaxBufferSize();
.getDataStreamMinPacketSize();
// streamWindow is the maximum number of buffers that
// are allowed to exist in the bufferList. If buffers in
// the list exceed this limit , client will till it gets
// one putBlockResponse (first index) . This is similar to
// the bufferFull condition in async write path.
long streamWindow = config.getStreamWindowSize() / config
.getDataStreamMinPacketSize();
if (!bufferList.isEmpty() && bufferList.size() % boundary == 0) {
updateFlushLength();
executePutBlock(false, false);
}
if (bufferList.size()==streamWindow){
try {
checkOpen();
if (!putBlockFutures.isEmpty()) {
putBlockFutures.remove().get();
}
} catch (ExecutionException e) {
handleExecutionException(e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, true);
}
watchForCommit(true);
}
}

private void updateFlushLength() {
Expand Down Expand Up @@ -453,8 +474,7 @@ private void executePutBlock(boolean close,
setIoException(ce);
throw ce;
});
putBlockFuture.updateAndGet(f -> f.thenCombine(flushFuture,
(previous, current) -> current));
putBlockFutures.add(flushFuture);
} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
} catch (InterruptedException ex) {
Expand Down Expand Up @@ -496,7 +516,7 @@ private void handleFlush(boolean close)
// data since latest flush - we need to send the "EOF" flag
executePutBlock(true, true);
}
putBlockFuture.get().get();
CompletableFuture.allOf(putBlockFutures.toArray(EMPTY_FUTURE_ARRAY)).get();
watchForCommit(false);
// just check again if the exception is hit while waiting for the
// futures to ensure flush has indeed succeeded
Expand Down Expand Up @@ -638,6 +658,8 @@ private void writeChunkToContainer(ByteBuffer buf)
CompletionException ce = new CompletionException(msg, e);
setIoException(ce);
throw ce;
} else if (r.isSuccess()) {
xceiverClient.updateCommitInfosMap(r.getCommitInfos());
}
}, responseExecutor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ abstract class Builder {
protected OptionalInt streamBufferSize = OptionalInt.empty();
protected Optional<Long> streamBufferFlushSize = Optional.empty();
protected Optional<Long> dataStreamBufferFlushSize= Optional.empty();
protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty();
protected Optional<Long> datastreamWindowSize= Optional.empty();
protected Optional<Long> streamBufferMaxSize = Optional.empty();
protected OptionalInt dataStreamMinPacketSize = OptionalInt.empty();
protected Optional<Long> blockSize = Optional.empty();
Expand Down Expand Up @@ -558,11 +558,6 @@ public Builder setStreamBufferMaxSize(long size) {
return this;
}

public Builder setDataStreamBufferMaxSize(int size) {
dataStreamMaxBufferSize = OptionalInt.of(size);
return this;
}

public Builder setDataStreamBufferFlushize(long size) {
dataStreamBufferFlushSize = Optional.of(size);
return this;
Expand All @@ -573,6 +568,11 @@ public Builder setDataStreamMinPacketSize(int size) {
return this;
}

public Builder setDataStreamStreamWindowSize(long size) {
datastreamWindowSize = Optional.of(size);
return this;
}

/**
* Sets the block size for stream buffer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,12 +645,12 @@ protected void initializeConfiguration() throws IOException {
if (!dataStreamBufferFlushSize.isPresent()) {
dataStreamBufferFlushSize = Optional.of((long) 4 * chunkSize.get());
}
if (!dataStreamMaxBufferSize.isPresent()) {
dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get());
}
if (!dataStreamMinPacketSize.isPresent()) {
dataStreamMinPacketSize = OptionalInt.of(chunkSize.get()/4);
}
if (!datastreamWindowSize.isPresent()) {
datastreamWindowSize = Optional.of((long) 8 * chunkSize.get());
}
if (!blockSize.isPresent()) {
blockSize = Optional.of(2 * streamBufferMaxSize.get());
}
Expand All @@ -669,12 +669,11 @@ protected void initializeConfiguration() throws IOException {
streamBufferSizeUnit.get().toBytes(streamBufferFlushSize.get())));
clientConfig.setDataStreamBufferFlushSize(Math.round(
streamBufferSizeUnit.get().toBytes(dataStreamBufferFlushSize.get())));
clientConfig.setDataStreamMaxBufferSize((int) Math.round(
streamBufferSizeUnit.get()
.toBytes(dataStreamMaxBufferSize.getAsInt())));
clientConfig.setDataStreamMinPacketSize((int) Math.round(
streamBufferSizeUnit.get()
.toBytes(dataStreamMinPacketSize.getAsInt())));
clientConfig.setStreamWindowSize(Math.round(
streamBufferSizeUnit.get().toBytes(datastreamWindowSize.get())));
conf.setFromObject(clientConfig);

conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ public static void init() throws Exception {
.setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
.setDataStreamBufferFlushize(maxFlushSize)
.setDataStreamBufferMaxSize(chunkSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES)
.setDataStreamMinPacketSize(2*chunkSize/5)
.setDataStreamMinPacketSize(chunkSize)
.setDataStreamStreamWindowSize(5*chunkSize)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
Expand Down Expand Up @@ -195,7 +195,7 @@ private void testWriteWithFailure(int dataLength) throws Exception {

@Test
public void testPutBlockAtBoundary() throws Exception {
int dataLength = 200;
int dataLength = 500;
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long putBlockCount = metrics.getContainerOpCountMetrics(
Expand All @@ -213,8 +213,8 @@ public void testPutBlockAtBoundary() throws Exception {
metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
key.close();
// Since data length is 200 , first putBlock will be at 160(flush boundary)
// and the other at 200
// Since data length is 500 , first putBlock will be at 400(flush boundary)
// and the other at 500
Assert.assertTrue(
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
== putBlockCount + 2);
Expand Down Expand Up @@ -242,10 +242,10 @@ public void testMinPacketSize() throws Exception {
long writeChunkCount =
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
byte[] data =
ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 5)
ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 2)
.getBytes(UTF_8);
key.write(ByteBuffer.wrap(data));
// minPacketSize= 40, so first write of 20 wont trigger a writeChunk
// minPacketSize= 100, so first write of 50 wont trigger a writeChunk
Assert.assertEquals(writeChunkCount,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
key.write(ByteBuffer.wrap(data));
Expand Down