Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,15 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private String fsDefaultBucketLayout = "FILE_SYSTEM_OPTIMIZED";

@Config(key = "incremental.chunk.list",
defaultValue = "false",
type = ConfigType.BOOLEAN,
description = "Client PutBlock request can choose incremental chunk " +
"list rather than full chunk list to optimize performance. " +
"Critical to HBase.",
tags = ConfigTag.CLIENT)
private boolean incrementalChunkList = false;

@PostConstruct
private void validate() {
Preconditions.checkState(streamBufferSize > 0);
Expand Down Expand Up @@ -399,4 +408,12 @@ public boolean isDatastreamPipelineMode() {
public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
this.datastreamPipelineMode = datastreamPipelineMode;
}

public void setIncrementalChunkList(boolean enable) {
this.incrementalChunkList = enable;
}

public boolean getIncrementalChunkList() {
return this.incrementalChunkList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hadoop.hdds.scm.storage;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -79,6 +81,10 @@ public class BlockOutputStream extends OutputStream {
LoggerFactory.getLogger(BlockOutputStream.class);
public static final String EXCEPTION_MSG =
"Unexpected Storage Container Exception: ";
public static final String INCREMENTAL_CHUNK_LIST = "incremental";
public static final String FULL_CHUNK = "full";
public static final KeyValue FULL_CHUNK_KV =
KeyValue.newBuilder().setKey(FULL_CHUNK).build();

private AtomicReference<BlockID> blockID;
private final AtomicReference<ChunkInfo> previousChunkInfo
Expand Down Expand Up @@ -121,6 +127,12 @@ public class BlockOutputStream extends OutputStream {
private int currentBufferRemaining;
//current buffer allocated to write
private ChunkBuffer currentBuffer;
// last chunk holds the buffer from the last complete chunk
// so it may be different from currentBuffer
// we need this to calculate checksum
//private ChunkBuffer lastChunkBuffer;
private ByteBuffer lastChunkBuffer;
private long lastChunkOffset;
private final Token<? extends TokenIdentifier> token;
private int replicationIndex;
private Pipeline pipeline;
Expand Down Expand Up @@ -160,6 +172,15 @@ public BlockOutputStream(
}
this.containerBlockData = BlockData.newBuilder().setBlockID(
blkIDBuilder.build()).addMetadata(keyValue);
// tell DataNode I will send incremental chunk list
if (config.getIncrementalChunkList()) {
KeyValue incrkeyValue =
KeyValue.newBuilder()
.setKey(INCREMENTAL_CHUNK_LIST)
.build();
this.containerBlockData.addMetadata(incrkeyValue);
}
this.lastChunkBuffer = ByteBuffer.allocate(config.getStreamBufferSize());
this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
this.bufferPool = bufferPool;
this.token = token;
Expand All @@ -185,6 +206,9 @@ public BlockOutputStream(
config.getBytesPerChecksum());
this.clientMetrics = clientMetrics;
this.pipeline = pipeline;
this.lastChunkBuffer =
ByteBuffer.allocate(config.getStreamBufferSize());
this.lastChunkOffset = 0;
}

void refreshCurrentBuffer() {
Expand Down Expand Up @@ -461,6 +485,14 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
ContainerCommandResponseProto> flushFuture = null;
try {
BlockData blockData = containerBlockData.build();
LOG.debug("sending PutBlock {}", blockData);

if (config.getIncrementalChunkList()) {
// remove any chunks in the containerBlockData list.
// since they are sent.
containerBlockData.clearChunks();
}

XceiverClientReply asyncReply =
putBlockAsync(xceiverClient, blockData, close, token);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
Expand Down Expand Up @@ -507,6 +539,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
handleInterruptedException(ex, false);
}
putFlushFuture(flushPos, flushFuture);

return flushFuture;
}

Expand Down Expand Up @@ -739,7 +772,12 @@ CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
setIoException(ce);
throw ce;
});
containerBlockData.addChunks(chunkInfo);
if (config.getIncrementalChunkList()) {
updateBlockDataForWriteChunk(chunk);
} else {
containerBlockData.addChunks(chunkInfo);
}

clientMetrics.recordWriteChunk(pipeline, chunkInfo.getLen());
return validateFuture;
} catch (IOException | ExecutionException e) {
Expand All @@ -751,6 +789,165 @@ CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
return null;
}

/**
* Update container block data, which is later sent to DataNodes via PutBlock,
* using the new chunks sent out via WriteChunk.
*
* This method is only used when incremental chunk list is enabled.
* @param chunk
* @throws OzoneChecksumException
*/
private void updateBlockDataForWriteChunk(ChunkBuffer chunk)
throws OzoneChecksumException {
// Update lastChunkBuffer using the new chunk data.
// This is used to calculate checksum for the last partial chunk in
// containerBlockData which will used by PutBlock.

// the last partial chunk in containerBlockData will be replaced.
// So remove it.
removeLastPartialChunk();
LOG.debug("lastChunkOffset = {}", lastChunkOffset);

chunk.rewind();
LOG.debug("adding chunk pos {} limit {} remaining {}",
chunk.position(), chunk.limit(), chunk.remaining());
LOG.debug("adding lastChunkBuffer pos {} limit {} remaining {}",
lastChunkBuffer.position(), lastChunkBuffer.limit(),
lastChunkBuffer.remaining());

// Append the chunk to the last chunk buffer.
// if the resulting size could exceed limit (4MB),
// drop the full chunk and leave the rest.
if (lastChunkBuffer.position() + chunk.remaining() <
lastChunkBuffer.capacity()) {
LOG.debug("containerBlockData one block");
appendLastChunkBuffer(chunk, 0, chunk.remaining());
LOG.debug("after append, lastChunkBuffer={}", lastChunkBuffer);
} else {
LOG.debug("containerBlockData two blocks");
int remainingBufferSize =
lastChunkBuffer.capacity() - lastChunkBuffer.position();
appendLastChunkBuffer(chunk, 0, remainingBufferSize);

// create chunk info for lastChunkBuffer, which is full
ChunkInfo lastChunkInfo = createChunkInfo(lastChunkOffset);
LOG.debug("lastChunkInfo = {}", lastChunkInfo);
//long lastChunkSize = lastChunkInfo.getLen();
addToBlockData(lastChunkInfo);

lastChunkBuffer.clear();
appendLastChunkBuffer(chunk, remainingBufferSize,
chunk.remaining() - remainingBufferSize);
lastChunkOffset += config.getStreamBufferSize();
LOG.debug("Updates lastChunkOffset to {}", lastChunkOffset);
}
// create chunk info for lastChunkBuffer, which is partial
if (lastChunkBuffer.remaining() > 0) {
ChunkInfo lastChunkInfo2 = createChunkInfo(lastChunkOffset);
LOG.debug("lastChunkInfo2 = {}", lastChunkInfo2);
long lastChunkSize = lastChunkInfo2.getLen();
if (lastChunkSize > 0) {
addToBlockData(lastChunkInfo2);
}

lastChunkBuffer.clear();
lastChunkBuffer.position((int) lastChunkSize);
} else {
lastChunkBuffer.clear();
}
}

private void appendLastChunkBuffer(ChunkBuffer chunkBuffer, int offset,
int length) {
LOG.debug("copying to last chunk buffer offset={} length={}",
offset, length);
int pos = 0;
int uncopied = length;
for (ByteBuffer bb : chunkBuffer.asByteBufferList()) {
if (pos + bb.remaining() > offset) {
int copyStart = offset < pos ? 0 : offset - pos;
int copyLen = Math.min(uncopied, bb.remaining());
try {
LOG.debug("put into last chunk buffer start = {} len = {}",
copyStart, copyLen);
lastChunkBuffer.put(bb.array(), copyStart, copyLen);
} catch (BufferOverflowException e) {
LOG.error("appending from " + copyStart + " for len=" + copyLen +
". lastChunkBuffer remaining=" + lastChunkBuffer.remaining() +
" pos=" + lastChunkBuffer.position() +
" limit=" + lastChunkBuffer.limit() +
" capacity=" + lastChunkBuffer.capacity());
throw e;
}

uncopied -= copyLen;
}

pos += bb.remaining();
if (pos > offset + length) {
return;
}
if (uncopied == 0) {
return;
}
}
}

private void removeLastPartialChunk() {
if (containerBlockData.getChunksList().isEmpty()) {
return;
}
int lastChunkIndex = containerBlockData.getChunksCount() - 1;
ChunkInfo lastChunkInBlockData = containerBlockData.getChunks(
lastChunkIndex);
if (!isFullChunk(lastChunkInBlockData)) {
containerBlockData.removeChunks(lastChunkIndex);
}
}

private ChunkInfo createChunkInfo(long lastPartialChunkOffset)
throws OzoneChecksumException {
lastChunkBuffer.flip();
int revisedChunkSize = lastChunkBuffer.remaining();
// create the chunk info to be sent in PutBlock.
ChecksumData revisedChecksumData =
checksum.computeChecksum(lastChunkBuffer);

long chunkID = lastPartialChunkOffset / config.getStreamBufferSize();
ChunkInfo.Builder revisedChunkInfo = ChunkInfo.newBuilder()
.setChunkName(blockID.get().getLocalID() + "_chunk_" + chunkID)
.setOffset(lastPartialChunkOffset)
.setLen(revisedChunkSize)
.setChecksumData(revisedChecksumData.getProtoBufMessage());
// if full chunk
if (revisedChunkSize == config.getStreamBufferSize()) {
revisedChunkInfo.addMetadata(FULL_CHUNK_KV);
}
return revisedChunkInfo.build();
}

private boolean isFullChunk(ChunkInfo chunkInfo) {
Preconditions.checkState(
chunkInfo.getLen() <= config.getStreamBufferSize());
return chunkInfo.getLen() == config.getStreamBufferSize();
}

private void addToBlockData(ChunkInfo revisedChunkInfo) {
LOG.debug("containerBlockData chunk: {}", containerBlockData);
if (containerBlockData.getChunksCount() > 0) {
ChunkInfo lastChunk = containerBlockData.getChunks(
containerBlockData.getChunksCount() - 1);
LOG.debug("revisedChunkInfo chunk: {}", revisedChunkInfo);
if (lastChunk.getOffset() + lastChunk.getLen() !=
revisedChunkInfo.getOffset()) {
throw new AssertionError(
"lastChunk.getOffset() + lastChunk.getLen() " +
"!= revisedChunkInfo.getOffset()");
}
}
containerBlockData.addChunks(revisedChunkInfo);
}

@VisibleForTesting
public void setXceiverClient(XceiverClientSpi xceiverClient) {
this.xceiverClient = xceiverClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public enum ClientVersion implements ComponentVersion {
BUCKET_LAYOUT_SUPPORT(3,
"This client version has support for Object Store and File " +
"System Optimized Bucket Layouts."),
INCREMENTAL_CHUNK_LIST_SUPPORT(4,
"This client version has support for incremental chunk list."),

FUTURE_VERSION(-1, "Used internally when the server side is older and an"
+ " unknown client version has arrived from the client.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ static long computePendingDeleteBytes(List<Long> localIDs,
for (long id : localIDs) {
try {
final String blockKey = containerData.getBlockKey(id);
//// TODO: use store.getBlockByID() instead
final BlockData blockData = blockDataTable.get(blockKey);
if (blockData != null) {
pendingDeleteBytes += blockData.getSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChunkBuffer;
Expand Down Expand Up @@ -546,7 +547,11 @@ ContainerCommandResponseProto handlePutBlock(
long bcsId =
dispatcherContext == null ? 0 : dispatcherContext.getLogIndex();
blockData.setBlockCommitSequenceId(bcsId);
blockManager.putBlock(kvContainer, blockData, endOfBlock);
boolean incrementalChunkList =
request.getVersion() >=
ClientVersion.INCREMENTAL_CHUNK_LIST_SUPPORT.toProtoValue();
blockManager.putBlock(kvContainer, blockData, endOfBlock,
incrementalChunkList);

blockDataProto = blockData.getProtoBufMessage();

Expand Down
Loading