Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,15 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private String fsDefaultBucketLayout = "FILE_SYSTEM_OPTIMIZED";

@Config(key = "incremental.chunk.list",
defaultValue = "false",
type = ConfigType.BOOLEAN,
description = "Client PutBlock request can choose incremental chunk " +
"list rather than full chunk list to optimize performance. " +
"Critical to HBase.",
tags = ConfigTag.CLIENT)
private boolean incrementalChunkList = false;

@PostConstruct
private void validate() {
Preconditions.checkState(streamBufferSize > 0);
Expand Down Expand Up @@ -404,4 +413,12 @@ public boolean isDatastreamPipelineMode() {
public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
this.datastreamPipelineMode = datastreamPipelineMode;
}

public void setIncrementalChunkList(boolean enable) {
this.incrementalChunkList = enable;
}

public boolean getIncrementalChunkList() {
return this.incrementalChunkList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hadoop.hdds.scm.storage;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -80,6 +82,12 @@ public class BlockOutputStream extends OutputStream {
LoggerFactory.getLogger(BlockOutputStream.class);
public static final String EXCEPTION_MSG =
"Unexpected Storage Container Exception: ";
public static final String INCREMENTAL_CHUNK_LIST = "incremental";
public static final KeyValue INCREMENTAL_CHUNK_LIST_KV =
KeyValue.newBuilder().setKey(INCREMENTAL_CHUNK_LIST).build();
public static final String FULL_CHUNK = "full";
public static final KeyValue FULL_CHUNK_KV =
KeyValue.newBuilder().setKey(FULL_CHUNK).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put INCREMENTAL_CHUNK_LIST, FULL_CHUNK in some common places. I remember there is one definition too in BlockManagerImpl.

Copy link
Contributor

@ChenSammi ChenSammi Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This FULL_CHUNK_KV metadata is not welly put on chunks with full content, which cause the datanode putBlockByID process the blockData incorrectly.


private AtomicReference<BlockID> blockID;
private final AtomicReference<ChunkInfo> previousChunkInfo
Expand Down Expand Up @@ -123,6 +131,10 @@ public class BlockOutputStream extends OutputStream {
private int currentBufferRemaining;
//current buffer allocated to write
private ChunkBuffer currentBuffer;
// last chunk holds the buffer after the last complete chunk, which may be
// different from currentBuffer. We need this to calculate checksum.
private ByteBuffer lastChunkBuffer;
private long lastChunkOffset;
private final Token<? extends TokenIdentifier> token;
private final String tokenString;
private int replicationIndex;
Expand Down Expand Up @@ -164,6 +176,10 @@ public BlockOutputStream(
}
this.containerBlockData = BlockData.newBuilder().setBlockID(
blkIDBuilder.build()).addMetadata(keyValue);
// tell DataNode I will send incremental chunk list
if (config.getIncrementalChunkList()) {
this.containerBlockData.addMetadata(INCREMENTAL_CHUNK_LIST_KV);
}
this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
this.bufferPool = bufferPool;
this.token = token;
Expand Down Expand Up @@ -192,6 +208,9 @@ public BlockOutputStream(
this.clientMetrics = clientMetrics;
this.pipeline = pipeline;
this.streamBufferArgs = streamBufferArgs;
this.lastChunkBuffer =
ByteBuffer.allocate(config.getStreamBufferSize());
this.lastChunkOffset = 0;
}

void refreshCurrentBuffer() {
Expand Down Expand Up @@ -468,6 +487,14 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
ContainerCommandResponseProto> flushFuture = null;
try {
BlockData blockData = containerBlockData.build();
LOG.debug("sending PutBlock {}", blockData);

if (config.getIncrementalChunkList()) {
// remove any chunks in the containerBlockData list.
// since they are sent.
containerBlockData.clearChunks();
}

XceiverClientReply asyncReply =
putBlockAsync(xceiverClient, blockData, close, tokenString);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
Expand Down Expand Up @@ -746,7 +773,12 @@ CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
setIoException(ce);
throw ce;
});
containerBlockData.addChunks(chunkInfo);
if (config.getIncrementalChunkList()) {
updateBlockDataForWriteChunk(chunk);
} else {
containerBlockData.addChunks(chunkInfo);
}

clientMetrics.recordWriteChunk(pipeline, chunkInfo.getLen());
return validateFuture;
} catch (IOException | ExecutionException e) {
Expand All @@ -758,6 +790,156 @@ CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
return null;
}

/**
* Update container block data, which is later sent to DataNodes via PutBlock,
* using the new chunks sent out via WriteChunk.
*
* This method is only used when incremental chunk list is enabled.
* @param chunk the chunk buffer to be sent out by WriteChunk.
* @throws OzoneChecksumException
*/
private void updateBlockDataForWriteChunk(ChunkBuffer chunk)
throws OzoneChecksumException {
// Update lastChunkBuffer using the new chunk data.
// This is used to calculate checksum for the last partial chunk in
// containerBlockData which will used by PutBlock.

// the last partial chunk in containerBlockData will be replaced.
// So remove it.
removeLastPartialChunk();
chunk.rewind();
LOG.debug("Adding chunk pos {} limit {} remaining {}." +
"lastChunkBuffer pos {} limit {} remaining {} lastChunkOffset = {}",
chunk.position(), chunk.limit(), chunk.remaining(),
lastChunkBuffer.position(), lastChunkBuffer.limit(),
lastChunkBuffer.remaining(), lastChunkOffset);

// Append the chunk to the last chunk buffer.
// if the resulting size exceeds limit (4MB),
// drop the full chunk and keep the rest.
if (lastChunkBuffer.position() + chunk.remaining() <=
lastChunkBuffer.capacity()) {
appendLastChunkBuffer(chunk, 0, chunk.remaining());
} else {
int remainingBufferSize =
lastChunkBuffer.capacity() - lastChunkBuffer.position();
appendLastChunkBuffer(chunk, 0, remainingBufferSize);
updateBlockDataWithLastChunkBuffer();
appendLastChunkBuffer(chunk, remainingBufferSize,
chunk.remaining() - remainingBufferSize);
}
LOG.debug("after append, lastChunkBuffer={} lastChunkOffset={}",
lastChunkBuffer, lastChunkOffset);

updateBlockDataWithLastChunkBuffer();
}

private void updateBlockDataWithLastChunkBuffer()
throws OzoneChecksumException {
// create chunk info for lastChunkBuffer
ChunkInfo lastChunkInfo = createChunkInfo(lastChunkOffset);
LOG.debug("lastChunkInfo = {}", lastChunkInfo);
long lastChunkSize = lastChunkInfo.getLen();
addToBlockData(lastChunkInfo);

lastChunkBuffer.clear();
if (lastChunkSize == config.getStreamBufferSize()) {
lastChunkOffset += config.getStreamBufferSize();
} else {
lastChunkBuffer.position((int) lastChunkSize);
}
}

private void appendLastChunkBuffer(ChunkBuffer chunkBuffer, int offset,
int length) {
LOG.debug("copying to last chunk buffer offset={} length={}",
offset, length);
int pos = 0;
int uncopied = length;
for (ByteBuffer bb : chunkBuffer.asByteBufferList()) {
if (pos + bb.remaining() > offset) {
int copyStart = offset < pos ? 0 : offset - pos;
int copyLen = Math.min(uncopied, bb.remaining());
try {
LOG.debug("put into last chunk buffer start = {} len = {}",
copyStart, copyLen);
lastChunkBuffer.put(bb.array(), copyStart, copyLen);
} catch (BufferOverflowException e) {
LOG.error("appending from " + copyStart + " for len=" + copyLen +
". lastChunkBuffer remaining=" + lastChunkBuffer.remaining() +
" pos=" + lastChunkBuffer.position() +
" limit=" + lastChunkBuffer.limit() +
" capacity=" + lastChunkBuffer.capacity());
throw e;
}

uncopied -= copyLen;
}

pos += bb.remaining();
if (pos > offset + length) {
return;
}
if (uncopied == 0) {
return;
}
}
}

private void removeLastPartialChunk() {
// remove the last chunk if it's partial.
if (containerBlockData.getChunksList().isEmpty()) {
return;
}
int lastChunkIndex = containerBlockData.getChunksCount() - 1;
ChunkInfo lastChunkInBlockData = containerBlockData.getChunks(
lastChunkIndex);
if (!isFullChunk(lastChunkInBlockData)) {
containerBlockData.removeChunks(lastChunkIndex);
}
}

private ChunkInfo createChunkInfo(long lastPartialChunkOffset)
throws OzoneChecksumException {
lastChunkBuffer.flip();
int revisedChunkSize = lastChunkBuffer.remaining();
// create the chunk info to be sent in PutBlock.
ChecksumData revisedChecksumData =
checksum.computeChecksum(lastChunkBuffer);

long chunkID = lastPartialChunkOffset / config.getStreamBufferSize();
ChunkInfo.Builder revisedChunkInfo = ChunkInfo.newBuilder()
.setChunkName(blockID.get().getLocalID() + "_chunk_" + chunkID)
.setOffset(lastPartialChunkOffset)
.setLen(revisedChunkSize)
.setChecksumData(revisedChecksumData.getProtoBufMessage());
// if full chunk
if (revisedChunkSize == config.getStreamBufferSize()) {
revisedChunkInfo.addMetadata(FULL_CHUNK_KV);
}
return revisedChunkInfo.build();
}

private boolean isFullChunk(ChunkInfo chunkInfo) {
Preconditions.checkState(
chunkInfo.getLen() <= config.getStreamBufferSize());
return chunkInfo.getLen() == config.getStreamBufferSize();
}

private void addToBlockData(ChunkInfo revisedChunkInfo) {
LOG.debug("containerBlockData chunk: {}", containerBlockData);
if (containerBlockData.getChunksCount() > 0) {
ChunkInfo lastChunk = containerBlockData.getChunks(
containerBlockData.getChunksCount() - 1);
LOG.debug("revisedChunkInfo chunk: {}", revisedChunkInfo);
Preconditions.checkState(lastChunk.getOffset() + lastChunk.getLen() ==
revisedChunkInfo.getOffset(),
"lastChunk.getOffset() + lastChunk.getLen() " +
"!= revisedChunkInfo.getOffset()");
}
containerBlockData.addChunks(revisedChunkInfo);
}

@VisibleForTesting
public void setXceiverClient(XceiverClientSpi xceiverClient) {
this.xceiverClient = xceiverClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
Expand All @@ -67,6 +69,8 @@
* OM transport for testing with in-memory state.
*/
public class MockOmTransport implements OmTransport {
private static final Logger LOG =
LoggerFactory.getLogger(MockOmTransport.class);

private final MockBlockAllocator blockAllocator;
//volumename -> volumeinfo
Expand Down Expand Up @@ -185,11 +189,44 @@ private GetKeyInfoResponse getKeyInfo(GetKeyInfoRequest request) {
.build();
}

private boolean isHSync(CommitKeyRequest commitKeyRequest) {
return commitKeyRequest.hasHsync() && commitKeyRequest.getHsync();
}

private boolean isRecovery(CommitKeyRequest commitKeyRequest) {
return commitKeyRequest.hasRecovery() && commitKeyRequest.getRecovery();
}

private String toOperationString(CommitKeyRequest commitKeyRequest) {
boolean hsync = isHSync(commitKeyRequest);
boolean recovery = isRecovery(commitKeyRequest);
if (hsync) {
return "hsync";
}
if (recovery) {
return "recover";
}
return "commit";
}


private CommitKeyResponse commitKey(CommitKeyRequest commitKeyRequest) {
final KeyArgs keyArgs = commitKeyRequest.getKeyArgs();
final KeyInfo openKey =
openKeys.get(keyArgs.getVolumeName()).get(keyArgs.getBucketName())
.remove(keyArgs.getKeyName());
.get(keyArgs.getKeyName());
LOG.debug("{} open key vol: {} bucket: {} key: {}",
toOperationString(commitKeyRequest),
keyArgs.getVolumeName(),
keyArgs.getBucketName(),
keyArgs.getKeyName());
boolean hsync = isHSync(commitKeyRequest);
if (!hsync) {
KeyInfo deleteKey = openKeys.get(keyArgs.getVolumeName())
.get(keyArgs.getBucketName())
.remove(keyArgs.getKeyName());
assert deleteKey != null;
}
final KeyInfo.Builder committedKeyInfoWithLocations =
KeyInfo.newBuilder().setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
Expand Down
Loading