Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,14 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private boolean incrementalChunkList = true;

@Config(key = "stream.putblock.piggybacking",
defaultValue = "false",
type = ConfigType.BOOLEAN,
description = "Allow PutBlock to be piggybacked in WriteChunk " +
"requests if the chunk is small.",
tags = ConfigTag.CLIENT)
private boolean enablePutblockPiggybacking = false;

@PostConstruct
public void validate() {
Preconditions.checkState(streamBufferSize > 0);
Expand Down Expand Up @@ -421,6 +429,14 @@ public String getFsDefaultBucketLayout() {
return fsDefaultBucketLayout;
}

public void setEnablePutblockPiggybacking(boolean enablePutblockPiggybacking) {
this.enablePutblockPiggybacking = enablePutblockPiggybacking;
}

public boolean getEnablePutblockPiggybacking() {
return enablePutblockPiggybacking;
}

public boolean isDatastreamPipelineMode() {
return datastreamPipelineMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import static org.apache.hadoop.hdds.DatanodeVersion.COMBINED_PUTBLOCK_WRITECHUNK_RPC;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
Expand Down Expand Up @@ -140,6 +142,7 @@ public class BlockOutputStream extends OutputStream {
private int replicationIndex;
private Pipeline pipeline;
private final ContainerClientMetrics clientMetrics;
private boolean allowPutBlockPiggybacking;

/**
* Creates a new BlockOutputStream.
Expand Down Expand Up @@ -211,6 +214,20 @@ public BlockOutputStream(
this.clientMetrics = clientMetrics;
this.pipeline = pipeline;
this.streamBufferArgs = streamBufferArgs;
this.allowPutBlockPiggybacking = config.getEnablePutblockPiggybacking() &&
allDataNodesSupportPiggybacking();
}

private boolean allDataNodesSupportPiggybacking() {
// return true only if all DataNodes in the pipeline are on a version
// that supports PutBlock piggybacking.
for (DatanodeDetails dn : pipeline.getNodes()) {
if (dn.getCurrentVersion() <
COMBINED_PUTBLOCK_WRITECHUNK_RPC.toProtoValue()) {
return false;
}
}
return true;
}

void refreshCurrentBuffer() {
Expand Down Expand Up @@ -499,22 +516,8 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
}
// if the ioException is not set, putBlock is successful
if (getIoException() == null && !force) {
BlockID responseBlockID = BlockID.getFromProtobuf(
e.getPutBlock().getCommittedBlockLength().getBlockID());
Preconditions.checkState(blockID.get().getContainerBlockID()
.equals(responseBlockID.getContainerBlockID()));
// updates the bcsId of the block
blockID.set(responseBlockID);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " flushLength "
+ flushPos + " numBuffers " + byteBufferList.size()
+ " blockID " + blockID + " bufferPool size" + bufferPool
.getSize() + " currentBufferIndex " + bufferPool
.getCurrentBufferIndex());
}
// for standalone protocol, logIndex will always be 0.
updateCommitInfo(asyncReply, byteBufferList);
handleSuccessfulPutBlock(e.getPutBlock().getCommittedBlockLength(),
asyncReply, flushPos, byteBufferList);
}
return e;
}, responseExecutor).exceptionally(e -> {
Expand Down Expand Up @@ -551,7 +554,7 @@ public void flush() throws IOException {
}
}

private void writeChunk(ChunkBuffer buffer)
private void writeChunkCommon(ChunkBuffer buffer)
throws IOException {
// This data in the buffer will be pushed to datanode and a reference will
// be added to the bufferList. Once putBlock gets executed, this list will
Expand All @@ -562,7 +565,18 @@ private void writeChunk(ChunkBuffer buffer)
bufferList = new ArrayList<>();
}
bufferList.add(buffer);
writeChunkToContainer(buffer.duplicate(0, buffer.position()));
}

private void writeChunk(ChunkBuffer buffer)
throws IOException {
writeChunkCommon(buffer);
writeChunkToContainer(buffer.duplicate(0, buffer.position()), false);
}

private void writeChunkAndPutBlock(ChunkBuffer buffer)
throws IOException {
writeChunkCommon(buffer);
writeChunkToContainer(buffer.duplicate(0, buffer.position()), true);
}

/**
Expand Down Expand Up @@ -594,14 +608,23 @@ private void handleFlushInternal(boolean close)
if (totalDataFlushedLength < writtenDataLength) {
refreshCurrentBuffer();
Preconditions.checkArgument(currentBuffer.position() > 0);
if (currentBuffer.hasRemaining()) {
writeChunk(currentBuffer);
}

// This can be a partially filled chunk. Since we are flushing the buffer
// here, we just limit this buffer to the current position. So that next
// write will happen in new buffer
updateFlushLength();
executePutBlock(close, false);
if (currentBuffer.hasRemaining()) {
if (allowPutBlockPiggybacking) {
updateFlushLength();
writeChunkAndPutBlock(currentBuffer);
} else {
writeChunk(currentBuffer);
updateFlushLength();
executePutBlock(close, false);
}
} else {
updateFlushLength();
executePutBlock(close, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether PutBlock required here for small chunk?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the typical case (no hflush) where a PutBlock is sent to update metadata after four 1-MB chunks are sent via WriteChunk requests)

}
} else if (close) {
// forcing an "empty" putBlock if stream is being closed without new
// data since latest flush - we need to send the "EOF" flag
Expand Down Expand Up @@ -713,7 +736,7 @@ public boolean isClosed() {
* @return
*/
CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
ChunkBuffer chunk) throws IOException {
ChunkBuffer chunk, boolean putBlockPiggybacking) throws IOException {
int effectiveChunkSize = chunk.remaining();
final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
final ByteString data = chunk.toByteString(
Expand All @@ -726,6 +749,8 @@ CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
.setChecksumData(checksumData.getProtoBufMessage())
.build();

long flushPos = totalDataFlushedLength;

if (LOG.isDebugEnabled()) {
LOG.debug("Writing chunk {} length {} at offset {}",
chunkInfo.getChunkName(), effectiveChunkSize, offset);
Expand All @@ -743,42 +768,93 @@ CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
+ ", previous = " + previous);
}

final List<ChunkBuffer> byteBufferList;
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
validateFuture = null;
try {
XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
blockID.get(), data, tokenString, replicationIndex);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
respFuture = asyncReply.getResponse();
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
validateFuture = respFuture.thenApplyAsync(e -> {
try {
validateResponse(e);
} catch (IOException sce) {
respFuture.completeExceptionally(sce);
}
return e;
}, responseExecutor).exceptionally(e -> {
String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
" into block " + blockID;
LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
CompletionException ce = new CompletionException(msg, e);
setIoException(ce);
throw ce;
});
BlockData blockData = null;

if (config.getIncrementalChunkList()) {
updateBlockDataForWriteChunk(chunk);
} else {
containerBlockData.addChunks(chunkInfo);
}
if (putBlockPiggybacking) {
Preconditions.checkNotNull(bufferList);
byteBufferList = bufferList;
bufferList = null;
Preconditions.checkNotNull(byteBufferList);

blockData = containerBlockData.build();
LOG.debug("piggyback chunk list {}", blockData);

if (config.getIncrementalChunkList()) {
// remove any chunks in the containerBlockData list.
// since they are sent.
containerBlockData.clearChunks();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to remove last chunk as well here which is updated in updateBlockDataForWriteChunk in case of IncrementalChunkList?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No --
lastChunk is used to calculate checksum in PutBlock (PutBlock contains metadata)
whereas containerBlockData is sent by WriteChunk (WriteChunk contains written data)

}
} else {
byteBufferList = null;
}

XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
blockID.get(), data, tokenString, replicationIndex, blockData);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
respFuture = asyncReply.getResponse();
validateFuture = respFuture.thenApplyAsync(e -> {
try {
validateResponse(e);
} catch (IOException sce) {
respFuture.completeExceptionally(sce);
}
// if the ioException is not set, putBlock is successful
if (getIoException() == null && putBlockPiggybacking) {
handleSuccessfulPutBlock(e.getWriteChunk().getCommittedBlockLength(),
asyncReply, flushPos, byteBufferList);
}
return e;
}, responseExecutor).exceptionally(e -> {
String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
" into block " + blockID;
LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
CompletionException ce = new CompletionException(msg, e);
setIoException(ce);
throw ce;
});
clientMetrics.recordWriteChunk(pipeline, chunkInfo.getLen());
return validateFuture;

} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
}
return null;
if (putBlockPiggybacking) {
putFlushFuture(flushPos, validateFuture);
}
return validateFuture;
}

private void handleSuccessfulPutBlock(
ContainerProtos.GetCommittedBlockLengthResponseProto e,
XceiverClientReply asyncReply, long flushPos,
List<ChunkBuffer> byteBufferList) {
BlockID responseBlockID = BlockID.getFromProtobuf(
e.getBlockID());
Preconditions.checkState(blockID.get().getContainerBlockID()
.equals(responseBlockID.getContainerBlockID()));
// updates the bcsId of the block
blockID.set(responseBlockID);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " flushLength "
+ flushPos + " numBuffers " + byteBufferList.size()
+ " blockID " + blockID + " bufferPool size" + bufferPool
.getSize() + " currentBufferIndex " + bufferPool
.getCurrentBufferIndex());
}
// for standalone protocol, logIndex will always be 0.
updateCommitInfo(asyncReply, byteBufferList);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,14 @@ public ECBlockOutputStream(
@Override
public void write(byte[] b, int off, int len) throws IOException {
this.currentChunkRspFuture =
writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
writeChunkToContainer(
ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)), false);
updateWrittenDataLength(len);
}

public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> write(
ByteBuffer buff) throws IOException {
return writeChunkToContainer(ChunkBuffer.wrap(buff));
return writeChunkToContainer(ChunkBuffer.wrap(buff), false);
}

public CompletableFuture<ContainerProtos.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public enum DatanodeVersion implements ComponentVersion {
DEFAULT_VERSION(0, "Initial version"),

SEPARATE_RATIS_PORTS_AVAILABLE(1, "Version with separated Ratis port."),
COMBINED_PUTBLOCK_WRITECHUNK_RPC(2, "WriteChunk can optionally support " +
"a PutBlock request"),

FUTURE_VERSION(-1, "Used internally in the client when the server side is "
+ " newer and an unknown server version has arrived to the client.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ListBlockResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.ozone.common.ChunkBuffer;
Expand Down Expand Up @@ -210,6 +211,28 @@ public static ContainerCommandResponseProto getPutFileResponseSuccess(
.build();
}

/**
* Gets a response for the WriteChunk RPC.
* @param msg - ContainerCommandRequestProto
* @return - ContainerCommandResponseProto
*/
public static ContainerCommandResponseProto getWriteChunkResponseSuccess(
ContainerCommandRequestProto msg, BlockData blockData) {

WriteChunkResponseProto.Builder writeChunk =
WriteChunkResponseProto.newBuilder();
if (blockData != null) {
writeChunk.setCommittedBlockLength(
getCommittedBlockLengthResponseBuilder(
blockData.getSize(), blockData.getBlockID()));

}
return getSuccessResponseBuilder(msg)
.setCmdType(Type.WriteChunk)
.setWriteChunk(writeChunk)
.build();
}

/**
* Gets a response to the read small file call.
* @param request - Msg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,10 @@ static long getLen(ReadChunkResponseProto response) {
*/
public static XceiverClientReply writeChunkAsync(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
ByteString data, String tokenString, int replicationIndex)
ByteString data, String tokenString,
int replicationIndex, BlockData blockData)
throws IOException, ExecutionException, InterruptedException {

WriteChunkRequestProto.Builder writeChunkRequest =
WriteChunkRequestProto.newBuilder()
.setBlockID(DatanodeBlockID.newBuilder()
Expand All @@ -406,6 +408,12 @@ public static XceiverClientReply writeChunkAsync(
.build())
.setChunkData(chunk)
.setData(data);
if (blockData != null) {
PutBlockRequestProto.Builder createBlockRequest =
PutBlockRequestProto.newBuilder()
.setBlockData(blockData);
writeChunkRequest.setBlock(createBlockRequest);
}
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,10 @@ public TransactionContext startTransaction(RaftClientRequest request)
if (!blockAlreadyFinalized) {
// create the log entry proto
final WriteChunkRequestProto commitWriteChunkProto =
WriteChunkRequestProto.newBuilder()
.setBlockID(write.getBlockID())
.setChunkData(write.getChunkData())
WriteChunkRequestProto.newBuilder(write)
// skipping the data field as it is
// already set in statemachine data proto
.clearData()
.build();
ContainerCommandRequestProto commitContainerCommandProto =
ContainerCommandRequestProto
Expand Down
Loading