diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index e2c5471be8e4..4c9e9c21d582 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -222,6 +222,15 @@ public enum ChecksumCombineMode { tags = ConfigTag.CLIENT) private String fsDefaultBucketLayout = "FILE_SYSTEM_OPTIMIZED"; + @Config(key = "incremental.chunk.list", + defaultValue = "false", + type = ConfigType.BOOLEAN, + description = "Client PutBlock request can choose incremental chunk " + + "list rather than full chunk list to optimize performance. " + + "Critical to HBase.", + tags = ConfigTag.CLIENT) + private boolean incrementalChunkList = false; + @PostConstruct private void validate() { Preconditions.checkState(streamBufferSize > 0); @@ -399,4 +408,12 @@ public boolean isDatastreamPipelineMode() { public void setDatastreamPipelineMode(boolean datastreamPipelineMode) { this.datastreamPipelineMode = datastreamPipelineMode; } + + public void setIncrementalChunkList(boolean enable) { + this.incrementalChunkList = enable; + } + + public boolean getIncrementalChunkList() { + return this.incrementalChunkList; + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 9988fb258ebb..aa449fc76f8a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.storage; import java.io.IOException; import java.io.OutputStream; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -79,6 +81,10 @@ public class BlockOutputStream extends OutputStream { LoggerFactory.getLogger(BlockOutputStream.class); public static final String EXCEPTION_MSG = "Unexpected Storage Container Exception: "; + public static final String INCREMENTAL_CHUNK_LIST = "incremental"; + public static final String FULL_CHUNK = "full"; + public static final KeyValue FULL_CHUNK_KV = + KeyValue.newBuilder().setKey(FULL_CHUNK).build(); private AtomicReference blockID; private final AtomicReference previousChunkInfo @@ -121,6 +127,12 @@ public class BlockOutputStream extends OutputStream { private int currentBufferRemaining; //current buffer allocated to write private ChunkBuffer currentBuffer; + // last chunk holds the buffer from the last complete chunk + // so it may be different from currentBuffer + // we need this to calculate checksum + //private ChunkBuffer lastChunkBuffer; + private ByteBuffer lastChunkBuffer; + private long lastChunkOffset; private final Token token; private int replicationIndex; private Pipeline pipeline; @@ -160,6 +172,15 @@ public BlockOutputStream( } this.containerBlockData = BlockData.newBuilder().setBlockID( blkIDBuilder.build()).addMetadata(keyValue); + // tell DataNode I will send incremental chunk list + if (config.getIncrementalChunkList()) { + KeyValue incrkeyValue = + KeyValue.newBuilder() + .setKey(INCREMENTAL_CHUNK_LIST) + .build(); + this.containerBlockData.addMetadata(incrkeyValue); + } + this.lastChunkBuffer = ByteBuffer.allocate(config.getStreamBufferSize()); this.xceiverClient = xceiverClientManager.acquireClient(pipeline); this.bufferPool = bufferPool; this.token = token; @@ -185,6 +206,9 @@ public BlockOutputStream( config.getBytesPerChecksum()); this.clientMetrics = clientMetrics; this.pipeline = pipeline; + this.lastChunkBuffer = + ByteBuffer.allocate(config.getStreamBufferSize()); + this.lastChunkOffset = 0; } void refreshCurrentBuffer() { @@ -461,6 +485,14 @@ ContainerCommandResponseProto> executePutBlock(boolean close, ContainerCommandResponseProto> flushFuture = null; try { BlockData blockData = containerBlockData.build(); + LOG.debug("sending PutBlock {}", blockData); + + if (config.getIncrementalChunkList()) { + // remove any chunks in the containerBlockData list. + // since they are sent. + containerBlockData.clearChunks(); + } + XceiverClientReply asyncReply = putBlockAsync(xceiverClient, blockData, close, token); CompletableFuture future = @@ -507,6 +539,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close, handleInterruptedException(ex, false); } putFlushFuture(flushPos, flushFuture); + return flushFuture; } @@ -739,7 +772,12 @@ CompletableFuture writeChunkToContainer( setIoException(ce); throw ce; }); - containerBlockData.addChunks(chunkInfo); + if (config.getIncrementalChunkList()) { + updateBlockDataForWriteChunk(chunk); + } else { + containerBlockData.addChunks(chunkInfo); + } + clientMetrics.recordWriteChunk(pipeline, chunkInfo.getLen()); return validateFuture; } catch (IOException | ExecutionException e) { @@ -751,6 +789,165 @@ CompletableFuture writeChunkToContainer( return null; } + /** + * Update container block data, which is later sent to DataNodes via PutBlock, + * using the new chunks sent out via WriteChunk. + * + * This method is only used when incremental chunk list is enabled. + * @param chunk + * @throws OzoneChecksumException + */ + private void updateBlockDataForWriteChunk(ChunkBuffer chunk) + throws OzoneChecksumException { + // Update lastChunkBuffer using the new chunk data. + // This is used to calculate checksum for the last partial chunk in + // containerBlockData which will used by PutBlock. + + // the last partial chunk in containerBlockData will be replaced. + // So remove it. + removeLastPartialChunk(); + LOG.debug("lastChunkOffset = {}", lastChunkOffset); + + chunk.rewind(); + LOG.debug("adding chunk pos {} limit {} remaining {}", + chunk.position(), chunk.limit(), chunk.remaining()); + LOG.debug("adding lastChunkBuffer pos {} limit {} remaining {}", + lastChunkBuffer.position(), lastChunkBuffer.limit(), + lastChunkBuffer.remaining()); + + // Append the chunk to the last chunk buffer. + // if the resulting size could exceed limit (4MB), + // drop the full chunk and leave the rest. + if (lastChunkBuffer.position() + chunk.remaining() < + lastChunkBuffer.capacity()) { + LOG.debug("containerBlockData one block"); + appendLastChunkBuffer(chunk, 0, chunk.remaining()); + LOG.debug("after append, lastChunkBuffer={}", lastChunkBuffer); + } else { + LOG.debug("containerBlockData two blocks"); + int remainingBufferSize = + lastChunkBuffer.capacity() - lastChunkBuffer.position(); + appendLastChunkBuffer(chunk, 0, remainingBufferSize); + + // create chunk info for lastChunkBuffer, which is full + ChunkInfo lastChunkInfo = createChunkInfo(lastChunkOffset); + LOG.debug("lastChunkInfo = {}", lastChunkInfo); + //long lastChunkSize = lastChunkInfo.getLen(); + addToBlockData(lastChunkInfo); + + lastChunkBuffer.clear(); + appendLastChunkBuffer(chunk, remainingBufferSize, + chunk.remaining() - remainingBufferSize); + lastChunkOffset += config.getStreamBufferSize(); + LOG.debug("Updates lastChunkOffset to {}", lastChunkOffset); + } + // create chunk info for lastChunkBuffer, which is partial + if (lastChunkBuffer.remaining() > 0) { + ChunkInfo lastChunkInfo2 = createChunkInfo(lastChunkOffset); + LOG.debug("lastChunkInfo2 = {}", lastChunkInfo2); + long lastChunkSize = lastChunkInfo2.getLen(); + if (lastChunkSize > 0) { + addToBlockData(lastChunkInfo2); + } + + lastChunkBuffer.clear(); + lastChunkBuffer.position((int) lastChunkSize); + } else { + lastChunkBuffer.clear(); + } + } + + private void appendLastChunkBuffer(ChunkBuffer chunkBuffer, int offset, + int length) { + LOG.debug("copying to last chunk buffer offset={} length={}", + offset, length); + int pos = 0; + int uncopied = length; + for (ByteBuffer bb : chunkBuffer.asByteBufferList()) { + if (pos + bb.remaining() > offset) { + int copyStart = offset < pos ? 0 : offset - pos; + int copyLen = Math.min(uncopied, bb.remaining()); + try { + LOG.debug("put into last chunk buffer start = {} len = {}", + copyStart, copyLen); + lastChunkBuffer.put(bb.array(), copyStart, copyLen); + } catch (BufferOverflowException e) { + LOG.error("appending from " + copyStart + " for len=" + copyLen + + ". lastChunkBuffer remaining=" + lastChunkBuffer.remaining() + + " pos=" + lastChunkBuffer.position() + + " limit=" + lastChunkBuffer.limit() + + " capacity=" + lastChunkBuffer.capacity()); + throw e; + } + + uncopied -= copyLen; + } + + pos += bb.remaining(); + if (pos > offset + length) { + return; + } + if (uncopied == 0) { + return; + } + } + } + + private void removeLastPartialChunk() { + if (containerBlockData.getChunksList().isEmpty()) { + return; + } + int lastChunkIndex = containerBlockData.getChunksCount() - 1; + ChunkInfo lastChunkInBlockData = containerBlockData.getChunks( + lastChunkIndex); + if (!isFullChunk(lastChunkInBlockData)) { + containerBlockData.removeChunks(lastChunkIndex); + } + } + + private ChunkInfo createChunkInfo(long lastPartialChunkOffset) + throws OzoneChecksumException { + lastChunkBuffer.flip(); + int revisedChunkSize = lastChunkBuffer.remaining(); + // create the chunk info to be sent in PutBlock. + ChecksumData revisedChecksumData = + checksum.computeChecksum(lastChunkBuffer); + + long chunkID = lastPartialChunkOffset / config.getStreamBufferSize(); + ChunkInfo.Builder revisedChunkInfo = ChunkInfo.newBuilder() + .setChunkName(blockID.get().getLocalID() + "_chunk_" + chunkID) + .setOffset(lastPartialChunkOffset) + .setLen(revisedChunkSize) + .setChecksumData(revisedChecksumData.getProtoBufMessage()); + // if full chunk + if (revisedChunkSize == config.getStreamBufferSize()) { + revisedChunkInfo.addMetadata(FULL_CHUNK_KV); + } + return revisedChunkInfo.build(); + } + + private boolean isFullChunk(ChunkInfo chunkInfo) { + Preconditions.checkState( + chunkInfo.getLen() <= config.getStreamBufferSize()); + return chunkInfo.getLen() == config.getStreamBufferSize(); + } + + private void addToBlockData(ChunkInfo revisedChunkInfo) { + LOG.debug("containerBlockData chunk: {}", containerBlockData); + if (containerBlockData.getChunksCount() > 0) { + ChunkInfo lastChunk = containerBlockData.getChunks( + containerBlockData.getChunksCount() - 1); + LOG.debug("revisedChunkInfo chunk: {}", revisedChunkInfo); + if (lastChunk.getOffset() + lastChunk.getLen() != + revisedChunkInfo.getOffset()) { + throw new AssertionError( + "lastChunk.getOffset() + lastChunk.getLen() " + + "!= revisedChunkInfo.getOffset()"); + } + } + containerBlockData.addChunks(revisedChunkInfo); + } + @VisibleForTesting public void setXceiverClient(XceiverClientSpi xceiverClient) { this.xceiverClient = xceiverClient; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/ClientVersion.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/ClientVersion.java index cc6695dc7d68..d40de2b9d368 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/ClientVersion.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/ClientVersion.java @@ -41,6 +41,8 @@ public enum ClientVersion implements ComponentVersion { BUCKET_LAYOUT_SUPPORT(3, "This client version has support for Object Store and File " + "System Optimized Bucket Layouts."), + INCREMENTAL_CHUNK_LIST_SUPPORT(4, + "This client version has support for incremental chunk list."), FUTURE_VERSION(-1, "Used internally when the server side is older and an" + " unknown client version has arrived from the client."); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java index cbbbb70278a7..803846cd2b36 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Map; import java.util.TreeMap; + import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerMetadataInspector.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerMetadataInspector.java index 2678d04dfe86..d19c9e737db6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerMetadataInspector.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerMetadataInspector.java @@ -548,6 +548,7 @@ static long computePendingDeleteBytes(List localIDs, for (long id : localIDs) { try { final String blockKey = containerData.getBlockKey(id); + //// TODO: use store.getBlockByID() instead final BlockData blockData = blockDataTable.get(blockKey); if (blockData != null) { pendingDeleteBytes += blockData.getSize(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 042ed978420a..6ae0ed0a66b7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.utils.HddsServerUtil; +import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChunkBuffer; @@ -546,7 +547,11 @@ ContainerCommandResponseProto handlePutBlock( long bcsId = dispatcherContext == null ? 0 : dispatcherContext.getLogIndex(); blockData.setBlockCommitSequenceId(bcsId); - blockManager.putBlock(kvContainer, blockData, endOfBlock); + boolean incrementalChunkList = + request.getVersion() >= + ClientVersion.INCREMENTAL_CHUNK_LIST_SUPPORT.toProtoValue(); + blockManager.putBlock(kvContainer, blockData, endOfBlock, + incrementalChunkList); blockDataProto = blockData.getProtoBufMessage(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index f2c126451c62..af8b471fa384 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -39,7 +39,6 @@ import com.google.common.base.Preconditions; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,8 +54,10 @@ public class BlockManagerImpl implements BlockManager { private ConfigurationSource config; private static final String DB_NULL_ERR_MSG = "DB cannot be null here"; - private static final String NO_SUCH_BLOCK_ERR_MSG = + public static final String NO_SUCH_BLOCK_ERR_MSG = "Unable to find the block."; + public static final String INCREMENTAL_CHUNK_LIST = "incremental"; + public static final String FULL_CHUNK = "full"; // Default Read Buffer capacity when Checksum is not present private final long defaultReadBufferCapacity; @@ -87,6 +88,23 @@ public BlockManagerImpl(ConfigurationSource conf) { public long putBlock(Container container, BlockData data) throws IOException { return putBlock(container, data, true); } + + /** + * Puts or overwrites a block. + * + * @param container - Container for which block need to be added. + * @param data - BlockData. + * @param endOfBlock - The last putBlock call for this block (when + * all the chunks are written and stream is closed) + * @return length of the block. + * @throws IOException + */ + @Override + public long putBlock(Container container, BlockData data, boolean endOfBlock) + throws IOException { + return putBlock(container, data, endOfBlock, true); + } + /** * Puts or overwrites a block. * @@ -94,21 +112,24 @@ public long putBlock(Container container, BlockData data) throws IOException { * @param data - BlockData. * @param endOfBlock - The last putBlock call for this block (when * all the chunks are written and stream is closed) + * @param incremental - chunk list is incremental. * @return length of the block. * @throws IOException */ @Override public long putBlock(Container container, BlockData data, - boolean endOfBlock) throws IOException { + boolean endOfBlock, boolean incremental) throws IOException { return persistPutBlock( (KeyValueContainer) container, data, config, - endOfBlock); + endOfBlock, + incremental); } public static long persistPutBlock(KeyValueContainer container, - BlockData data, ConfigurationSource config, boolean endOfBlock) + BlockData data, ConfigurationSource config, boolean endOfBlock, + boolean incremental) throws IOException { Preconditions.checkNotNull(data, "BlockData cannot be null for put " + "operation."); @@ -152,6 +173,12 @@ public static long persistPutBlock(KeyValueContainer container, // update the blockData as well as BlockCommitSequenceId here try (BatchOperation batch = db.getStore().getBatchHandler() .initBatchOperation()) { + //Preconditions.checkState(!data.getChunks().isEmpty(), + // "empty chunk list unexpected"); + /*if (data.getChunks().isEmpty()) { + //LOG.error("unexpected empty chunk"); + assert endOfBlock; + }*/ // If the block does not exist in the pendingPutBlockCache of the // container, then check the DB to ascertain if it exists or not. @@ -165,8 +192,8 @@ public static long persistPutBlock(KeyValueContainer container, } } - db.getStore().getBlockDataTable().putWithBatch( - batch, containerData.getBlockKey(localID), data); + db.getStore().putBlockByID(batch, incremental, localID, data, + containerData, endOfBlock); if (bcsId != 0) { db.getStore().getMetadataTable().putWithBatch( batch, containerData.getBcsIdKey(), bcsId); @@ -352,14 +379,6 @@ public void shutdown() { private BlockData getBlockByID(DBHandle db, BlockID blockID, KeyValueContainerData containerData) throws IOException { - String blockKey = containerData.getBlockKey(blockID.getLocalID()); - - BlockData blockData = db.getStore().getBlockDataTable().get(blockKey); - if (blockData == null) { - throw new StorageContainerException(NO_SUCH_BLOCK_ERR_MSG + - " BlockID : " + blockID, NO_SUCH_BLOCK); - } - - return blockData; + return db.getStore().getBlockByID(blockID, containerData); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java index cab1a3ab7e2e..5b4956b9a5e8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java @@ -52,6 +52,19 @@ public interface BlockManager { long putBlock(Container container, BlockData data, boolean incrKeyCount) throws IOException; + /** + * Puts or overwrites a block. + * + * @param container - Container for which block need to be added. + * @param data - Block Data. + * @param incrKeyCount - Whether to increase container key count. + * @param incremental - chunk list is incremental. + * @return length of the Block. + * @throws IOException + */ + long putBlock(Container container, BlockData data, boolean incrKeyCount, + boolean incremental) throws IOException; + /** * Gets an existing block. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java index 60e5a583551e..d3127dde418e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java @@ -434,6 +434,7 @@ private DeleteTransactionStats deleteTransactions( for (Long blkLong : entry.getLocalIDList()) { String blk = containerData.getBlockKey(blkLong); BlockData blkInfo = blockDataTable.get(blk); + //// TODO: deal with the last chunk table LOG.debug("Deleting block {}", blkLong); if (blkInfo == null) { try { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java index 2c1c3c214d56..b538b7ee6cc5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java @@ -73,4 +73,7 @@ public ConfigurationSource getConfig() { public abstract DBColumnFamilyDefinition getDeletedBlocksColumnFamily(); + + public abstract DBColumnFamilyDefinition + getLastChunkInfoColumnFamily(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java index 50303bd99ba6..2210f64a836f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java @@ -63,6 +63,8 @@ public abstract class AbstractDatanodeStore implements DatanodeStore { private Table blockDataTable; + private Table lastChunkInfoTable; + private Table blockDataTableWithIterator; private Table deletedBlocksTable; @@ -173,6 +175,12 @@ public void start(ConfigurationSource config) deletedBlocksTable = new DatanodeTable<>( dbDef.getDeletedBlocksColumnFamily().getTable(this.store)); checkTableStatus(deletedBlocksTable, deletedBlocksTable.getName()); + + if (dbDef.getLastChunkInfoColumnFamily() != null) { + lastChunkInfoTable = new DatanodeTable<>( + dbDef.getLastChunkInfoColumnFamily().getTable(this.store)); + checkTableStatus(lastChunkInfoTable, lastChunkInfoTable.getName()); + } } } @@ -204,6 +212,11 @@ public Table getBlockDataTable() { return blockDataTable; } + @Override + public Table getLastChunkInfoTable() { + return lastChunkInfoTable; + } + @Override public Table getDeletedBlocksTable() { return deletedBlocksTable; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java index a002eef3f72a..d34edb3a48a7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java @@ -102,6 +102,12 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { return DELETED_BLOCKS; } + @Override + public DBColumnFamilyDefinition + getLastChunkInfoColumnFamily() { + return null; + } + @Override public List> getColumnFamilies(String name) { return COLUMN_FAMILIES.get(name); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java index 745e1153daca..3d288706b624 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java @@ -92,6 +92,15 @@ public class DatanodeSchemaThreeDBDefinition DeletedBlocksTransaction.class, Proto2Codec.get(DeletedBlocksTransaction.class)); + public static final DBColumnFamilyDefinition + LAST_CHUNK_INFO = + new DBColumnFamilyDefinition<>( + "last_chunk_info", + String.class, + FixedLengthStringCodec.get(), + BlockData.class, + BlockData.getCodec()); + private static String separator = ""; private static final Map> @@ -99,7 +108,8 @@ public class DatanodeSchemaThreeDBDefinition BLOCK_DATA, METADATA, DELETED_BLOCKS, - DELETE_TRANSACTION); + DELETE_TRANSACTION, + LAST_CHUNK_INFO); public DatanodeSchemaThreeDBDefinition(String dbPath, ConfigurationSource config) { @@ -122,6 +132,7 @@ public DatanodeSchemaThreeDBDefinition(String dbPath, METADATA.setCfOptions(cfOptions); DELETED_BLOCKS.setCfOptions(cfOptions); DELETE_TRANSACTION.setCfOptions(cfOptions); + LAST_CHUNK_INFO.setCfOptions(cfOptions); } @Override @@ -146,6 +157,12 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { return DELETED_BLOCKS; } + @Override + public DBColumnFamilyDefinition + getLastChunkInfoColumnFamily() { + return LAST_CHUNK_INFO; + } + public DBColumnFamilyDefinition getDeleteTransactionsColumnFamily() { return DELETE_TRANSACTION; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java index 50e181147e55..800679b1e5d1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java @@ -110,6 +110,12 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { return DELETED_BLOCKS; } + @Override + public DBColumnFamilyDefinition + getLastChunkInfoColumnFamily() { + return null; + } + public DBColumnFamilyDefinition getDeleteTransactionsColumnFamily() { return DELETE_TRANSACTION; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java index dd2aa5234b92..966ddb78d8ee 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java @@ -18,18 +18,26 @@ package org.apache.hadoop.ozone.container.metadata; import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter; +import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.hdds.utils.db.BatchOperationHandler; import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import java.io.Closeable; import java.io.IOException; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK; +import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.NO_SUCH_BLOCK_ERR_MSG; + /** * Interface for interacting with datanode databases. */ @@ -77,6 +85,13 @@ public interface DatanodeStore extends Closeable { */ Table getDeletedBlocksTable(); + /** + * A Table that keeps the metadata of the last chunk of blocks. + * + * @return Table + */ + Table getLastChunkInfoTable(); + /** * Helper to create and write batch transactions. */ @@ -102,4 +117,28 @@ BlockIterator getBlockIterator(long containerID, default void compactionIfNeeded() throws Exception { } + + default BlockData getBlockByID(BlockID blockID, + KeyValueContainerData containerData) throws IOException { + String blockKey = containerData.getBlockKey(blockID.getLocalID()); + + // check block data table + BlockData blockData = getBlockDataTable().get(blockKey); + + if (blockData == null) { + throw new StorageContainerException( + NO_SUCH_BLOCK_ERR_MSG + " BlockID : " + blockID, NO_SUCH_BLOCK); + } + + return blockData; + } + + default void putBlockByID(BatchOperation batch, boolean incremental, + long localID, BlockData data, KeyValueContainerData containerData, + boolean endOfBlock) + throws IOException { + // old client: override chunk list. + getBlockDataTable().putWithBatch( + batch, containerData.getBlockKey(localID), data); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java index ee8580defa0c..3f1f5b96024b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.ozone.container.metadata; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec; @@ -30,16 +33,25 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; + +import com.google.common.base.Preconditions; import org.bouncycastle.util.Strings; import org.rocksdb.LiveFileMetaData; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK; +import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK; +import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.INCREMENTAL_CHUNK_LIST; +import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.NO_SUCH_BLOCK_ERR_MSG; import static org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDefinition.getContainerKeyPrefix; /** @@ -217,4 +229,185 @@ public void compactionIfNeeded() throws Exception { } } } + + @Override + public BlockData getBlockByID(BlockID blockID, + KeyValueContainerData containerData) throws IOException { + String blockKey = containerData.getBlockKey(blockID.getLocalID()); + + // check last chunk table + BlockData lastChunk = getLastChunkInfoTable().get(blockKey); + // check block data table + BlockData blockData = getBlockDataTable().get(blockKey); + + if (blockData == null) { + if (lastChunk == null) { + throw new StorageContainerException( + NO_SUCH_BLOCK_ERR_MSG + " BlockID : " + blockID, NO_SUCH_BLOCK); + } else { + LOG.debug("blockData=(null), lastChunk={}", lastChunk.getChunks()); + return lastChunk; + } + } else { + if (lastChunk != null) { + reconcilePartialChunks(lastChunk, blockData); + } else { + LOG.debug("blockData={}, lastChunk=(null)", blockData.getChunks()); + } + } + + return blockData; + } + + private static void reconcilePartialChunks( + BlockData lastChunk, BlockData blockData) { + LOG.debug("blockData={}, lastChunk={}", + blockData.getChunks(), lastChunk.getChunks()); + Preconditions.checkState(lastChunk.getChunks().size() == 1); + ContainerProtos.ChunkInfo lastChunkInBlockData = + blockData.getChunks().get(blockData.getChunks().size() - 1); + Preconditions.checkState( + lastChunkInBlockData.getOffset() + lastChunkInBlockData.getLen() + == lastChunk.getChunks().get(0).getOffset(), + "chunk offset does not match"); + + // append last partial chunk to the block data + List chunkInfos = + new ArrayList<>(blockData.getChunks()); + chunkInfos.add(lastChunk.getChunks().get(0)); + blockData.setChunks(chunkInfos); + + blockData.setBlockCommitSequenceId( + lastChunk.getBlockCommitSequenceId()); + } + + private static boolean isPartialChunkList(BlockData data) { + return data.getMetadata().containsKey(INCREMENTAL_CHUNK_LIST); + } + + private static boolean isFullChunk(ContainerProtos.ChunkInfo chunkInfo) { + for (ContainerProtos.KeyValue kv: chunkInfo.getMetadataList()) { + if (kv.getKey().equals(FULL_CHUNK)) { + return true; + } + } + return false; + } + + // if eob or if the last chunk is full, + private static boolean shouldAppendLastChunk(boolean endOfBlock, + BlockData data) { + if (endOfBlock || data.getChunks().isEmpty()) { + return true; + } + return isFullChunk(data.getChunks().get(data.getChunks().size() - 1)); + } + + public void putBlockByID(BatchOperation batch, boolean incremental, + long localID, BlockData data, KeyValueContainerData containerData, + boolean endOfBlock) throws IOException { + if (!incremental && !isPartialChunkList(data)) { + // Case (1) old client: override chunk list. + getBlockDataTable().putWithBatch( + batch, containerData.getBlockKey(localID), data); + } else if (shouldAppendLastChunk(endOfBlock, data)) { + moveLastChunkToBlockData(batch, localID, data, containerData); + } else { + // incremental chunk list, + // not end of block, has partial chunks + putBlockWithPartialChunks(batch, localID, data, containerData); + } + } + + private void moveLastChunkToBlockData(BatchOperation batch, long localID, + BlockData data, KeyValueContainerData containerData) throws IOException { + // if eob or if the last chunk is full, + // the 'data' is full so append it to the block table's chunk info + // and then remove from lastChunkInfo + BlockData blockData = getBlockDataTable().get( + containerData.getBlockKey(localID)); + if (blockData == null) { + // Case 2.1 if the block did not have full chunks before, + // the block's chunk is what received from client this time. + blockData = data; + } else { + // case 2.2 the block already has some full chunks + List chunkInfoList = blockData.getChunks(); + blockData.setChunks(new ArrayList<>(chunkInfoList)); + for (ContainerProtos.ChunkInfo chunk : data.getChunks()) { + blockData.addChunk(chunk); + } + blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId()); + + /*int next = 0; + for (ContainerProtos.ChunkInfo info : chunkInfoList) { + assert info.getOffset() == next; + next += info.getLen(); + }*/ + } + // delete the entry from last chunk info table + getLastChunkInfoTable().deleteWithBatch( + batch, containerData.getBlockKey(localID)); + // update block data table + getBlockDataTable().putWithBatch(batch, + containerData.getBlockKey(localID), blockData); + } + + private void putBlockWithPartialChunks(BatchOperation batch, long localID, + BlockData data, KeyValueContainerData containerData) throws IOException { + if (data.getChunks().size() == 1) { + // Case (3.1) replace/update the last chunk info table + getLastChunkInfoTable().putWithBatch( + batch, containerData.getBlockKey(localID), data); + } else { + int lastChunkIndex = data.getChunks().size() - 1; + // received more than one chunk this time + List lastChunkInfo = + Collections.singletonList( + data.getChunks().get(lastChunkIndex)); + BlockData blockData = getBlockDataTable().get( + containerData.getBlockKey(localID)); + if (blockData == null) { + // Case 3.2: if the block does not exist in the block data table + List chunkInfos = + new ArrayList<>(data.getChunks()); + chunkInfos.remove(lastChunkIndex); + data.setChunks(chunkInfos); + blockData = data; + LOG.debug("block {} does not have full chunks yet. Adding the " + + "chunks to it {}", localID, blockData); + } else { + // Case 3.3: if the block exists in the block data table, + // append chunks till except the last one (supposedly partial) + List chunkInfos = + new ArrayList<>(blockData.getChunks()); + + LOG.debug("blockData.getChunks()={}", chunkInfos); + LOG.debug("data.getChunks()={}", data.getChunks()); + + for (int i = 0; i < lastChunkIndex; i++) { + chunkInfos.add(data.getChunks().get(i)); + } + blockData.setChunks(chunkInfos); + blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId()); + + /*int next = 0; + for (ContainerProtos.ChunkInfo info : chunkInfos) { + if (info.getOffset() != next) { + + LOG.error("blockData.getChunks()={}", chunkInfos); + LOG.error("data.getChunks()={}", data.getChunks()); + } + assert info.getOffset() == next; + next += info.getLen(); + }*/ + } + getBlockDataTable().putWithBatch(batch, + containerData.getBlockKey(localID), blockData); + // update the last partial chunk + data.setChunks(lastChunkInfo); + getLastChunkInfoTable().putWithBatch( + batch, containerData.getBlockKey(localID), data); + } + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java index 377c7804bb54..9bd39545dcfa 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java @@ -44,13 +44,18 @@ import org.junit.runners.Parameterized; import org.mockito.Mockito; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import static org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion; +import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK; +import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.INCREMENTAL_CHUNK_LIST; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; @@ -225,4 +230,173 @@ public void testListBlock() throws Exception { assertNotNull(listBlockData); assertTrue(listBlockData.size() == 10); } + + private BlockData createBlockData(long containerID, long blockNo, + int chunkID, long begining, long offset, long len, long bcsID) + throws IOException { + blockID1 = new BlockID(containerID, blockNo); + blockData = new BlockData(blockID1); + List chunkList1 = new ArrayList<>(); + ChunkInfo info1 = new ChunkInfo(String.format("%d_chunk_%d", blockID1 + .getLocalID(), chunkID), offset, len); + chunkList1.add(info1.getProtoBufMessage()); + blockData.setChunks(chunkList1); + blockData.setBlockCommitSequenceId(bcsID); + blockData.addMetadata(INCREMENTAL_CHUNK_LIST, ""); + + return blockData; + } + + private BlockData createBlockDataWithOneFullChunk(long containerID, + long blockNo, int chunkID, long begining, long offset, long len, + long bcsID) + throws IOException { + blockID1 = new BlockID(containerID, blockNo); + blockData = new BlockData(blockID1); + List chunkList1 = new ArrayList<>(); + ChunkInfo info1 = new ChunkInfo(String.format("%d_chunk_%d", blockID1 + .getLocalID(), 1), 0, 4 * 1024 * 1024); + info1.addMetadata(FULL_CHUNK, ""); + + ChunkInfo info2 = new ChunkInfo(String.format("%d_chunk_%d", blockID1 + .getLocalID(), chunkID), offset, len); + chunkList1.add(info1.getProtoBufMessage()); + chunkList1.add(info2.getProtoBufMessage()); + blockData.setChunks(chunkList1); + blockData.setBlockCommitSequenceId(bcsID); + blockData.addMetadata(INCREMENTAL_CHUNK_LIST, ""); + + return blockData; + } + + private BlockData createBlockDataWithThreeFullChunks(long containerID, + long blockNo, long bcsID) throws IOException { + blockID1 = new BlockID(containerID, blockNo); + blockData = new BlockData(blockID1); + List chunkList1 = new ArrayList<>(); + long chunkLimit = 4 * 1024 * 1024; + for (int i = 1; i < 4; i++) { + ChunkInfo info1 = new ChunkInfo( + String.format("%d_chunk_%d", blockID1.getLocalID(), i), + chunkLimit * i, chunkLimit); + info1.addMetadata(FULL_CHUNK, ""); + chunkList1.add(info1.getProtoBufMessage()); + } + blockData.setChunks(chunkList1); + blockData.setBlockCommitSequenceId(bcsID); + blockData.addMetadata(INCREMENTAL_CHUNK_LIST, ""); + + return blockData; + } + + @Test + public void testFlush1() throws Exception { + assumeTrue(isSameSchemaVersion(schemaVersion, OzoneConsts.SCHEMA_V3)); + // simulates writing 1024 bytes, hsync, + // write another 1024 bytes, hsync + // write another 1024 bytes, hsync + long containerID = 1; + long blockNo = 2; + // put 1st chunk + blockData1 = createBlockData(containerID, blockNo, 1, 0, 0, 1024, + 1); + blockManager.putBlock(keyValueContainer, blockData1, false); + // put 2nd chunk + BlockData blockData2 = createBlockData(containerID, blockNo, 1, 0, 0, 2048, + 2); + blockManager.putBlock(keyValueContainer, blockData2, false); + assertEquals(1, keyValueContainer.getContainerData().getBlockCount()); + + BlockData getBlockData = blockManager.getBlock(keyValueContainer, + new BlockID(containerID, blockNo)); + assertEquals(2048, getBlockData.getSize()); + assertEquals(2, getBlockData.getBlockCommitSequenceId()); + List chunkInfos = getBlockData.getChunks(); + assertEquals(1, chunkInfos.size()); + assertEquals(2048, chunkInfos.get(0).getLen()); + assertEquals(0, chunkInfos.get(0).getOffset()); + + // put 3rd chunk, end-of-block + BlockData blockData3 = createBlockData(containerID, blockNo, 1, 0, 0, 3072, + 3); + blockManager.putBlock(keyValueContainer, blockData3, true); + assertEquals(1, keyValueContainer.getContainerData().getBlockCount()); + + getBlockData = blockManager.getBlock(keyValueContainer, + new BlockID(containerID, blockNo)); + assertEquals(3072, getBlockData.getSize()); + assertEquals(3, getBlockData.getBlockCommitSequenceId()); + chunkInfos = getBlockData.getChunks(); + assertEquals(1, chunkInfos.size()); + assertEquals(3072, chunkInfos.get(0).getLen()); + assertEquals(0, chunkInfos.get(0).getOffset()); + } + + @Test + public void testFlush2() throws Exception { + assumeTrue(isSameSchemaVersion(schemaVersion, OzoneConsts.SCHEMA_V3)); + // simulates writing a full chunk + 1024 bytes, hsync, + // write another 1024 bytes, hsync + // write another 1024 bytes, hsync + long containerID = 1; + long blockNo = 2; + long chunkLimit = 4 * 1024 * 1024; + // first hsync (a full chunk + 1024 bytes) + blockData1 = createBlockDataWithOneFullChunk(containerID, + blockNo, 2, chunkLimit, chunkLimit, 1024, 1); + blockManager.putBlock(keyValueContainer, blockData1, false); + // second hsync (1024 bytes) + BlockData blockData2 = createBlockData(containerID, blockNo, 2, chunkLimit, + chunkLimit, 2048, 2); + blockManager.putBlock(keyValueContainer, blockData2, false); + assertEquals(1, keyValueContainer.getContainerData().getBlockCount()); + // third hsync (1024 bytes) + BlockData blockData3 = createBlockData(containerID, blockNo, 2, chunkLimit, + chunkLimit, 3072, 3); + blockManager.putBlock(keyValueContainer, blockData3, false); + assertEquals(1, keyValueContainer.getContainerData().getBlockCount()); + + // verify that first chunk is full, second chunk is 3072 bytes + BlockData getBlockData = blockManager.getBlock(keyValueContainer, + new BlockID(containerID, blockNo)); + assertEquals(3072 + chunkLimit, getBlockData.getSize()); + assertEquals(3, getBlockData.getBlockCommitSequenceId()); + List chunkInfos = getBlockData.getChunks(); + assertEquals(2, chunkInfos.size()); + assertEquals(chunkLimit, chunkInfos.get(0).getLen()); + assertEquals(0, chunkInfos.get(0).getOffset()); + assertEquals(3072, chunkInfos.get(1).getLen()); + assertEquals(chunkLimit, chunkInfos.get(1).getOffset()); + } + + @Test + public void testFlush3() throws Exception { + assumeTrue(isSameSchemaVersion(schemaVersion, OzoneConsts.SCHEMA_V3)); + // simulates writing 1024 bytes, hsync, + // and then write till 4 chunks are full + long containerID = 1; + long blockNo = 2; + long chunkLimit = 4 * 1024 * 1024; + // first hsync (1024 bytes) + blockData1 = createBlockDataWithOneFullChunk(containerID, blockNo, 2, + chunkLimit, chunkLimit, 1024, 1); + blockManager.putBlock(keyValueContainer, blockData1, false); + // full flush (4 chunks) + BlockData blockData2 = createBlockDataWithThreeFullChunks( + containerID, blockNo, 2); + blockManager.putBlock(keyValueContainer, blockData2, false); + assertEquals(1, keyValueContainer.getContainerData().getBlockCount()); + + // verify that the four chunks are full + BlockData getBlockData = blockManager.getBlock(keyValueContainer, + new BlockID(containerID, blockNo)); + assertEquals(chunkLimit * 4, getBlockData.getSize()); + assertEquals(2, getBlockData.getBlockCommitSequenceId()); + List chunkInfos = getBlockData.getChunks(); + assertEquals(4, chunkInfos.size()); + for (int i = 0; i < 4; i++) { + assertEquals(chunkLimit, chunkInfos.get(i).getLen()); + assertEquals(chunkLimit * i, chunkInfos.get(i).getOffset()); + } + } } diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java index ef2f1fa118e1..5dc164b5deb6 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java @@ -19,9 +19,11 @@ import org.apache.commons.collections.map.HashedMap; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; +import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import java.io.IOException; @@ -30,17 +32,25 @@ import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hdds.scm.storage.BlockOutputStream.FULL_CHUNK_KV; +import static org.apache.hadoop.hdds.scm.storage.BlockOutputStream.INCREMENTAL_CHUNK_LIST; + /** * State represents persisted data of one specific datanode. */ public class MockDatanodeStorage { + public static final Logger LOG = + LoggerFactory.getLogger(MockDatanodeStorage.class); - private final Map blocks = new HashedMap(); + private final Map blocks = new HashedMap(); private final Map> containerBlocks = new HashedMap(); private final Map fullBlockData = new HashMap<>(); - private final Map chunks = new HashMap<>(); + //private final Map chunks = new HashMap<>(); private final Map data = new HashMap<>(); @@ -50,8 +60,70 @@ public void setStorageFailed(IOException reason) { this.exception = reason; } + private boolean isIncrementalChunkList(BlockData blockData) { + for (ContainerProtos.KeyValue kv : blockData.getMetadataList()) { + if (kv.getKey().equals(INCREMENTAL_CHUNK_LIST)) { + return true; + } + } + return false; + } + + private BlockID toBlockID(DatanodeBlockID datanodeBlockID) { + return new BlockID(datanodeBlockID.getContainerID(), + datanodeBlockID.getLocalID()); + } + public void putBlock(DatanodeBlockID blockID, BlockData blockData) { - blocks.put(blockID, blockData); + if (isIncrementalChunkList(blockData)) { + LOG.debug("incremental chunk list"); + putBlockIncremental(blockID, blockData); + } else { + LOG.debug("full chunk list"); + putBlockFull(blockID, blockData); + } + } + + private boolean isFullChunk(ChunkInfo chunkInfo) { + return (chunkInfo.getMetadataList().contains(FULL_CHUNK_KV)); + } + + public void putBlockIncremental( + DatanodeBlockID blockID, BlockData blockData) { + BlockID id = toBlockID(blockID); + if (blocks.containsKey(id)) { + // block already exists. let's append the chunk list to it. + BlockData existing = blocks.get(id); + if (existing.getChunksCount() == 0) { + // empty chunk list. override it. + putBlockFull(blockID, blockData); + } else { + BlockData.Builder blockDataBuilder = pruneLastPartialChunks(existing); + blockDataBuilder.addAllChunks(blockData.getChunksList()); + blocks.put(id, blockDataBuilder.build()); + } + // TODO: verify the chunk list beginning/offset/len is sane + } else { + // the block does not exist yet, simply add it + putBlockFull(blockID, blockData); + } + } + + private BlockData.Builder pruneLastPartialChunks(BlockData existing) { + BlockData.Builder blockDataBuilder = BlockData.newBuilder(existing); + int lastChunkIndex = existing.getChunksCount() - 1; + // if the last chunk in the existing block is full, append after it. + ChunkInfo chunkInfo = existing.getChunks(lastChunkIndex); + if (!isFullChunk(chunkInfo)) { + // otherwise, remove it and append + blockDataBuilder.removeChunks(lastChunkIndex); + } + return blockDataBuilder; + } + + public void putBlockFull(DatanodeBlockID blockID, BlockData blockData) { + BlockID id = toBlockID(blockID); + blocks.put(id, blockData); List dnBlocks = containerBlocks .getOrDefault(blockID.getContainerID(), new ArrayList<>()); dnBlocks.add(blockID); @@ -59,14 +131,24 @@ public void putBlock(DatanodeBlockID blockID, BlockData blockData) { } public BlockData getBlock(DatanodeBlockID blockID) { - return blocks.get(blockID); + BlockID id = toBlockID(blockID); + //assert blocks.containsKey(blockID); + if (!blocks.containsKey(id)) { + StringBuilder sb = new StringBuilder(); + for (BlockID bid : blocks.keySet()) { + sb.append(bid).append("\n"); + } + throw new AssertionError("blockID " + id + + " not found in blocks. Available block ID: \n" + sb); + } + return blocks.get(id); } public List listBlock(long containerID) { List datanodeBlockIDS = containerBlocks.get(containerID); List listBlocksData = new ArrayList<>(); for (DatanodeBlockID dBlock : datanodeBlockIDS) { - listBlocksData.add(blocks.get(dBlock)); + listBlocksData.add(blocks.get(toBlockID(dBlock))); } return listBlocksData; } @@ -77,31 +159,56 @@ public void writeChunk( if (exception != null) { throw exception; } - data.put(createKey(blockID, chunkInfo), - ByteString.copyFrom(bytes.toByteArray())); - chunks.put(createKey(blockID, chunkInfo), chunkInfo); + String blockKey = createKey(blockID); + ByteString block; + if (data.containsKey(blockKey)) { + block = data.get(blockKey); + assert block.size() == chunkInfo.getOffset(); + data.put(blockKey, block.concat(bytes)); + } else { + assert chunkInfo.getOffset() == 0; + data.put(blockKey, bytes); + } + fullBlockData .put(new BlockID(blockID.getContainerID(), blockID.getLocalID()), - fullBlockData.getOrDefault(blockID, "") + fullBlockData.getOrDefault(toBlockID(blockID), "") .concat(bytes.toStringUtf8())); } public ChunkInfo readChunkInfo( DatanodeBlockID blockID, ChunkInfo chunkInfo) { - return chunks.get(createKey(blockID, chunkInfo)); + BlockData blockData = getBlock(blockID); + for (ChunkInfo info : blockData.getChunksList()) { + if (info.getLen() == chunkInfo.getLen() && + info.getOffset() == chunkInfo.getOffset()) { + return info; + } + } + throw new AssertionError("chunk " + chunkInfo + " not found"); } public ByteString readChunkData( DatanodeBlockID blockID, ChunkInfo chunkInfo) { - return data.get(createKey(blockID, chunkInfo)); - + //return data.get(createKey(blockID, chunkInfo)); + LOG.debug("readChunkData: blockID=" + createKey(blockID) + + " chunkInfo offset=" + chunkInfo.getOffset() + + " chunkInfo len=" + chunkInfo.getLen()); + ByteString str = data.get(createKey(blockID)).substring( + (int)chunkInfo.getOffset(), + (int)chunkInfo.getOffset() + (int)chunkInfo.getLen()); + return str; } - private String createKey(DatanodeBlockID blockId, ChunkInfo chunkInfo) { + /*private String createKey(DatanodeBlockID blockId, ChunkInfo chunkInfo) { return blockId.getContainerID() + "_" + blockId.getLocalID() + "_" + chunkInfo.getChunkName() + "_" + chunkInfo.getOffset(); + }*/ + + private String createKey(DatanodeBlockID blockId) { + return blockId.getContainerID() + "_" + blockId.getLocalID(); } public Map getAllBlockData() { diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java index 49eee44f6e33..7d34d195e5fb 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java @@ -115,8 +115,9 @@ public XceiverClientReply sendCommandAsync( private ReadChunkResponseProto readChunk(ReadChunkRequestProto readChunk) { return ReadChunkResponseProto.newBuilder() - .setChunkData(datanodeStorage - .readChunkInfo(readChunk.getBlockID(), readChunk.getChunkData())) + //.setChunkData(datanodeStorage + // .readChunkInfo(readChunk.getBlockID(), readChunk.getChunkData())) + .setChunkData(readChunk.getChunkData()) .setData(datanodeStorage .readChunkData(readChunk.getBlockID(), readChunk.getChunkData())) .setBlockID(readChunk.getBlockID()) diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java new file mode 100644 index 000000000000..d0eb2cee83e1 --- /dev/null +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.security.cert.X509Certificate; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +import org.jetbrains.annotations.NotNull; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.InMemoryConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.client.rpc.RpcClient; +import org.apache.hadoop.ozone.om.protocolPB.OmTransport; + +import static org.junit.Assert.assertArrayEquals; + +/** + * Verify BlockOutputStream with incremental PutBlock feature. + * (incremental.chunk.list = true) + */ +@RunWith(Parameterized.class) +public class TestBlockOutputStreamIncrementalPutBlock { + private OzoneClient client; + private ObjectStore store; + private RpcClient rpcClient; + + private String keyName = UUID.randomUUID().toString(); + private String volumeName = UUID.randomUUID().toString(); + private String bucketName = UUID.randomUUID().toString(); + private ConfigurationSource config = new InMemoryConfiguration(); + private boolean enableIncrementalChunkList; + + + @Parameterized.Parameters + public static Iterable parameters() { + return Arrays.asList(true, false); + } + + public TestBlockOutputStreamIncrementalPutBlock( + Boolean enableIncrementalChunkList) { + this.enableIncrementalChunkList = enableIncrementalChunkList; + } + + @Before + public void init() throws IOException { + OzoneClientConfig clientConfig = config.getObject(OzoneClientConfig.class); + + clientConfig.setIncrementalChunkList(this.enableIncrementalChunkList); + clientConfig.setChecksumType(ContainerProtos.ChecksumType.CRC32C); + + ((InMemoryConfiguration)config).setFromObject(clientConfig); + + rpcClient = new RpcClient(config, null) { + + @Override + protected OmTransport createOmTransport( + String omServiceId) + throws IOException { + return new MockOmTransport(); + } + + @NotNull + @Override + protected XceiverClientFactory createXceiverClientFactory( + List x509Certificates) + throws IOException { + return new MockXceiverClientFactory(); + } + }; + + client = new OzoneClient(config, rpcClient); + store = client.getObjectStore(); + } + + @After + public void close() throws IOException { + client.close(); + } + + @Test + public void writeSmallKey() throws IOException { + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + int size = 1024; + String s = RandomStringUtils.randomAlphabetic(1024); + ByteBuffer byteBuffer = ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)); + + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + try (OzoneOutputStream out = bucket.createKey(keyName, size, + ReplicationConfig.getDefault(config), new HashMap<>())) { + for (int i = 0; i < 4097; i++) { + out.write(byteBuffer); + out.hsync(); + } + } + + try (OzoneInputStream is = bucket.readKey(keyName)) { + ByteBuffer readBuffer = ByteBuffer.allocate(size); + for (int i = 0; i < 4097; i++) { + is.read(readBuffer); + assertArrayEquals(readBuffer.array(), byteBuffer.array()); + } + } + } + + @Test + public void writeLargeKey() throws IOException { + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + int size = 1024 * 1024 + 1; + ByteBuffer byteBuffer = ByteBuffer.allocate(size); + + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + try (OzoneOutputStream out = bucket.createKey(keyName, size, + ReplicationConfig.getDefault(config), new HashMap<>())) { + for (int i = 0; i < 4; i++) { + out.write(byteBuffer); + out.hsync(); + } + } + + try (OzoneInputStream is = bucket.readKey(keyName)) { + ByteBuffer readBuffer = ByteBuffer.allocate(size); + for (int i = 0; i < 4; i++) { + is.read(readBuffer); + assertArrayEquals(readBuffer.array(), byteBuffer.array()); + } + } + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index b313aa80fb56..6f622700bced 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -61,12 +61,8 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; -import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest; -import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestWithFSO; -import org.apache.hadoop.ozone.om.request.key.OMKeyRequest; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; -import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -74,7 +70,6 @@ import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.event.Level; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME; @@ -116,6 +111,7 @@ public static void init() throws Exception { CONF.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, false); CONF.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name()); CONF.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true); + CONF.setBoolean("ozone.client.incremental.chunk.list", true); cluster = MiniOzoneCluster.newBuilder(CONF) .setNumDatanodes(5) .setTotalPipelineNumLimit(10) @@ -135,9 +131,9 @@ public static void init() throws Exception { bucket = TestDataUtil.createVolumeAndBucket(client, layout); // Enable DEBUG level logging for relevant classes - GenericTestUtils.setLogLevel(OMKeyRequest.LOG, Level.DEBUG); - GenericTestUtils.setLogLevel(OMKeyCommitRequest.LOG, Level.DEBUG); - GenericTestUtils.setLogLevel(OMKeyCommitRequestWithFSO.LOG, Level.DEBUG); + //GenericTestUtils.setLogLevel(OMKeyRequest.LOG, Level.DEBUG); + //GenericTestUtils.setLogLevel(OMKeyCommitRequest.LOG, Level.DEBUG); + //GenericTestUtils.setLogLevel(OMKeyCommitRequestWithFSO.LOG, Level.DEBUG); } @AfterAll diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java index f9ed369e5685..468e520c219d 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java @@ -286,7 +286,8 @@ public void generateData(long index) throws Exception { writtenBytes += currentChunkSize; } - BlockManagerImpl.persistPutBlock(container, blockData, config, true); + BlockManagerImpl.persistPutBlock(container, blockData, config, true, + false); }