diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index 1977b11531b1..e40434f508e6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -364,6 +364,7 @@ public void shutdown() { private BlockData getBlockByID(DBHandle db, BlockID blockID, KeyValueContainerData containerData) throws IOException { - return db.getStore().getBlockByID(blockID, containerData); + String blockKey = containerData.getBlockKey(blockID.getLocalID()); + return db.getStore().getBlockByID(blockID, blockKey); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java index fec2a3f7d220..b2c62dfcbd17 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java @@ -77,6 +77,7 @@ public ConfigurationSource getConfig() { public DBColumnFamilyDefinition getFinalizeBlocksColumnFamily() { return null; } + public abstract DBColumnFamilyDefinition getLastChunkInfoColumnFamily(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java index 4abfb60c4f4b..35f8bf832227 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java @@ -130,8 +130,7 @@ default void compactionIfNeeded() throws Exception { } default BlockData getBlockByID(BlockID blockID, - KeyValueContainerData containerData) throws IOException { - String blockKey = containerData.getBlockKey(blockID.getLocalID()); + String blockKey) throws IOException { // check block data table BlockData blockData = getBlockDataTable().get(blockKey); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java index 7f37c9ae51ca..25479a7a9c14 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java @@ -53,7 +53,7 @@ * - All keys have containerID as prefix. * - The table 3 has String as key instead of Long since we want to use prefix. */ -public class DatanodeStoreSchemaThreeImpl extends AbstractDatanodeStore +public class DatanodeStoreSchemaThreeImpl extends DatanodeStoreWithIncrementalChunkList implements DeleteTransactionStore { public static final String DUMP_FILE_SUFFIX = ".data"; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java index f09d30e45a77..c9ea52b47c7d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java @@ -31,7 +31,7 @@ * 2. A metadata table. * 3. A Delete Transaction Table. */ -public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore +public class DatanodeStoreSchemaTwoImpl extends DatanodeStoreWithIncrementalChunkList implements DeleteTransactionStore { private final Table diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java new file mode 100644 index 000000000000..51e453350083 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.metadata; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK; +import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK; +import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.INCREMENTAL_CHUNK_LIST; + +/** + * Constructs a datanode store in accordance with schema version 2, which uses + * three column families/tables: + * 1. A block data table. + * 2. A metadata table. + * 3. A Delete Transaction Table. + */ +public class DatanodeStoreWithIncrementalChunkList extends AbstractDatanodeStore { + /** + * Constructs the metadata store and starts the DB services. + * + * @param config - Ozone Configuration. + * @throws IOException - on Failure. + */ + public DatanodeStoreWithIncrementalChunkList(ConfigurationSource config, + AbstractDatanodeDBDefinition dbDef, boolean openReadOnly) throws IOException { + super(config, dbDef, openReadOnly); + } + + + @Override + public BlockData getBlockByID(BlockID blockID, + String blockKey) throws IOException { + BlockData lastChunk = null; + // check block data table + BlockData blockData = getBlockDataTable().get(blockKey); + if (blockData == null || isPartialChunkList(blockData)) { + // check last chunk table + lastChunk = getLastChunkInfoTable().get(blockKey); + } + + if (blockData == null) { + if (lastChunk == null) { + throw new StorageContainerException( + NO_SUCH_BLOCK_ERR_MSG + " BlockID : " + blockID, NO_SUCH_BLOCK); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("blockData=(null), lastChunk={}", lastChunk.getChunks()); + } + return lastChunk; + } + } else { + if (lastChunk != null) { + reconcilePartialChunks(lastChunk, blockData); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("blockData={}, lastChunk=(null)", blockData.getChunks()); + } + } + } + + return blockData; + } + + private void reconcilePartialChunks( + BlockData lastChunk, BlockData blockData) { + LOG.debug("blockData={}, lastChunk={}", + blockData.getChunks(), lastChunk.getChunks()); + Preconditions.checkState(lastChunk.getChunks().size() == 1); + ContainerProtos.ChunkInfo lastChunkInBlockData = + blockData.getChunks().get(blockData.getChunks().size() - 1); + Preconditions.checkState( + lastChunkInBlockData.getOffset() + lastChunkInBlockData.getLen() + == lastChunk.getChunks().get(0).getOffset(), + "chunk offset does not match"); + + // append last partial chunk to the block data + List chunkInfos = + new ArrayList<>(blockData.getChunks()); + chunkInfos.add(lastChunk.getChunks().get(0)); + blockData.setChunks(chunkInfos); + + blockData.setBlockCommitSequenceId( + lastChunk.getBlockCommitSequenceId()); + } + + private static boolean isPartialChunkList(BlockData data) { + return data.getMetadata().containsKey(INCREMENTAL_CHUNK_LIST); + } + + private static boolean isFullChunk(ContainerProtos.ChunkInfo chunkInfo) { + for (ContainerProtos.KeyValue kv: chunkInfo.getMetadataList()) { + if (kv.getKey().equals(FULL_CHUNK)) { + return true; + } + } + return false; + } + + // if eob or if the last chunk is full, + private static boolean shouldAppendLastChunk(boolean endOfBlock, + BlockData data) { + if (endOfBlock || data.getChunks().isEmpty()) { + return true; + } + return isFullChunk(data.getChunks().get(data.getChunks().size() - 1)); + } + + public void putBlockByID(BatchOperation batch, boolean incremental, + long localID, BlockData data, KeyValueContainerData containerData, + boolean endOfBlock) throws IOException { + if (!incremental && !isPartialChunkList(data)) { + // Case (1) old client: override chunk list. + getBlockDataTable().putWithBatch( + batch, containerData.getBlockKey(localID), data); + } else if (shouldAppendLastChunk(endOfBlock, data)) { + moveLastChunkToBlockData(batch, localID, data, containerData); + } else { + // incremental chunk list, + // not end of block, has partial chunks + putBlockWithPartialChunks(batch, localID, data, containerData); + } + } + + private void moveLastChunkToBlockData(BatchOperation batch, long localID, + BlockData data, KeyValueContainerData containerData) throws IOException { + // if eob or if the last chunk is full, + // the 'data' is full so append it to the block table's chunk info + // and then remove from lastChunkInfo + BlockData blockData = getBlockDataTable().get( + containerData.getBlockKey(localID)); + if (blockData == null) { + // Case 2.1 if the block did not have full chunks before, + // the block's chunk is what received from client this time. + blockData = data; + } else { + // case 2.2 the block already has some full chunks + List chunkInfoList = blockData.getChunks(); + blockData.setChunks(new ArrayList<>(chunkInfoList)); + for (ContainerProtos.ChunkInfo chunk : data.getChunks()) { + blockData.addChunk(chunk); + } + blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId()); + } + // delete the entry from last chunk info table + getLastChunkInfoTable().deleteWithBatch( + batch, containerData.getBlockKey(localID)); + // update block data table + getBlockDataTable().putWithBatch(batch, + containerData.getBlockKey(localID), blockData); + } + + private void putBlockWithPartialChunks(BatchOperation batch, long localID, + BlockData data, KeyValueContainerData containerData) throws IOException { + if (data.getChunks().size() == 1) { + // Case (3.1) replace/update the last chunk info table + getLastChunkInfoTable().putWithBatch( + batch, containerData.getBlockKey(localID), data); + } else { + int lastChunkIndex = data.getChunks().size() - 1; + // received more than one chunk this time + List lastChunkInfo = + Collections.singletonList( + data.getChunks().get(lastChunkIndex)); + BlockData blockData = getBlockDataTable().get( + containerData.getBlockKey(localID)); + if (blockData == null) { + // Case 3.2: if the block does not exist in the block data table + List chunkInfos = + new ArrayList<>(data.getChunks()); + chunkInfos.remove(lastChunkIndex); + data.setChunks(chunkInfos); + blockData = data; + LOG.debug("block {} does not have full chunks yet. Adding the " + + "chunks to it {}", localID, blockData); + } else { + // Case 3.3: if the block exists in the block data table, + // append chunks till except the last one (supposedly partial) + List chunkInfos = + new ArrayList<>(blockData.getChunks()); + + LOG.debug("blockData.getChunks()={}", chunkInfos); + LOG.debug("data.getChunks()={}", data.getChunks()); + + for (int i = 0; i < lastChunkIndex; i++) { + chunkInfos.add(data.getChunks().get(i)); + } + blockData.setChunks(chunkInfos); + blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId()); + } + getBlockDataTable().putWithBatch(batch, + containerData.getBlockKey(localID), blockData); + // update the last partial chunk + data.setChunks(lastChunkInfo); + getLastChunkInfoTable().putWithBatch( + batch, containerData.getBlockKey(localID), data); + } + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java index 57ea7234e95d..38a01e46900d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java @@ -36,8 +36,10 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; @@ -45,6 +47,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion; +import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK; +import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.INCREMENTAL_CHUNK_LIST; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; @@ -222,4 +227,181 @@ public void testListBlock(ContainerTestVersionInfo versionInfo) assertNotNull(listBlockData); assertEquals(10, listBlockData.size()); } + + private BlockData createBlockData(long containerID, long blockNo, + int chunkID, long offset, long len, long bcsID) + throws IOException { + blockID1 = new BlockID(containerID, blockNo); + blockData = new BlockData(blockID1); + List chunkList1 = new ArrayList<>(); + ChunkInfo info1 = new ChunkInfo(String.format("%d_chunk_%d", blockID1 + .getLocalID(), chunkID), offset, len); + chunkList1.add(info1.getProtoBufMessage()); + blockData.setChunks(chunkList1); + blockData.setBlockCommitSequenceId(bcsID); + blockData.addMetadata(INCREMENTAL_CHUNK_LIST, ""); + + return blockData; + } + + private BlockData createBlockDataWithOneFullChunk(long containerID, + long blockNo, int chunkID, long offset, long len, long bcsID) + throws IOException { + blockID1 = new BlockID(containerID, blockNo); + blockData = new BlockData(blockID1); + List chunkList1 = new ArrayList<>(); + ChunkInfo info1 = new ChunkInfo(String.format("%d_chunk_%d", blockID1 + .getLocalID(), 1), 0, 4 * 1024 * 1024); + info1.addMetadata(FULL_CHUNK, ""); + + ChunkInfo info2 = new ChunkInfo(String.format("%d_chunk_%d", blockID1 + .getLocalID(), chunkID), offset, len); + chunkList1.add(info1.getProtoBufMessage()); + chunkList1.add(info2.getProtoBufMessage()); + blockData.setChunks(chunkList1); + blockData.setBlockCommitSequenceId(bcsID); + blockData.addMetadata(INCREMENTAL_CHUNK_LIST, ""); + + return blockData; + } + + private BlockData createBlockDataWithThreeFullChunks(long containerID, + long blockNo, long bcsID) throws IOException { + blockID1 = new BlockID(containerID, blockNo); + blockData = new BlockData(blockID1); + List chunkList1 = new ArrayList<>(); + long chunkLimit = 4 * 1024 * 1024; + for (int i = 1; i < 4; i++) { + ChunkInfo info1 = new ChunkInfo( + String.format("%d_chunk_%d", blockID1.getLocalID(), i), + chunkLimit * i, chunkLimit); + info1.addMetadata(FULL_CHUNK, ""); + chunkList1.add(info1.getProtoBufMessage()); + } + blockData.setChunks(chunkList1); + blockData.setBlockCommitSequenceId(bcsID); + blockData.addMetadata(INCREMENTAL_CHUNK_LIST, ""); + + return blockData; + } + + @ContainerTestVersionInfo.ContainerTest + public void testFlush1(ContainerTestVersionInfo versionInfo) + throws Exception { + initTest(versionInfo); + Assumptions.assumeFalse( + isSameSchemaVersion(schemaVersion, OzoneConsts.SCHEMA_V1)); + // simulates writing 1024 bytes, hsync, + // write another 1024 bytes, hsync + // write another 1024 bytes, hsync + long containerID = 1; + long blockNo = 2; + // put 1st chunk + blockData1 = createBlockData(containerID, blockNo, 1, 0, 1024, + 1); + blockManager.putBlock(keyValueContainer, blockData1, false); + // put 2nd chunk + BlockData blockData2 = createBlockData(containerID, blockNo, 1, 0, 2048, + 2); + blockManager.putBlock(keyValueContainer, blockData2, false); + assertEquals(1, keyValueContainer.getContainerData().getBlockCount()); + + BlockData getBlockData = blockManager.getBlock(keyValueContainer, + new BlockID(containerID, blockNo)); + assertEquals(2048, getBlockData.getSize()); + assertEquals(2, getBlockData.getBlockCommitSequenceId()); + List chunkInfos = getBlockData.getChunks(); + assertEquals(1, chunkInfos.size()); + assertEquals(2048, chunkInfos.get(0).getLen()); + assertEquals(0, chunkInfos.get(0).getOffset()); + + // put 3rd chunk, end-of-block + BlockData blockData3 = createBlockData(containerID, blockNo, 1, 0, 3072, + 3); + blockManager.putBlock(keyValueContainer, blockData3, true); + assertEquals(1, keyValueContainer.getContainerData().getBlockCount()); + + getBlockData = blockManager.getBlock(keyValueContainer, + new BlockID(containerID, blockNo)); + assertEquals(3072, getBlockData.getSize()); + assertEquals(3, getBlockData.getBlockCommitSequenceId()); + chunkInfos = getBlockData.getChunks(); + assertEquals(1, chunkInfos.size()); + assertEquals(3072, chunkInfos.get(0).getLen()); + assertEquals(0, chunkInfos.get(0).getOffset()); + } + + @ContainerTestVersionInfo.ContainerTest + public void testFlush2(ContainerTestVersionInfo versionInfo) + throws Exception { + initTest(versionInfo); + Assumptions.assumeFalse( + isSameSchemaVersion(schemaVersion, OzoneConsts.SCHEMA_V1)); + // simulates writing a full chunk + 1024 bytes, hsync, + // write another 1024 bytes, hsync + // write another 1024 bytes, hsync + long containerID = 1; + long blockNo = 2; + long chunkLimit = 4 * 1024 * 1024; + // first hsync (a full chunk + 1024 bytes) + blockData1 = createBlockDataWithOneFullChunk(containerID, + blockNo, 2, chunkLimit, 1024, 1); + blockManager.putBlock(keyValueContainer, blockData1, false); + // second hsync (1024 bytes) + BlockData blockData2 = createBlockData(containerID, blockNo, 2, + chunkLimit, 2048, 2); + blockManager.putBlock(keyValueContainer, blockData2, false); + assertEquals(1, keyValueContainer.getContainerData().getBlockCount()); + // third hsync (1024 bytes) + BlockData blockData3 = createBlockData(containerID, blockNo, 2, + chunkLimit, 3072, 3); + blockManager.putBlock(keyValueContainer, blockData3, false); + assertEquals(1, keyValueContainer.getContainerData().getBlockCount()); + + // verify that first chunk is full, second chunk is 3072 bytes + BlockData getBlockData = blockManager.getBlock(keyValueContainer, + new BlockID(containerID, blockNo)); + assertEquals(3072 + chunkLimit, getBlockData.getSize()); + assertEquals(3, getBlockData.getBlockCommitSequenceId()); + List chunkInfos = getBlockData.getChunks(); + assertEquals(2, chunkInfos.size()); + assertEquals(chunkLimit, chunkInfos.get(0).getLen()); + assertEquals(0, chunkInfos.get(0).getOffset()); + assertEquals(3072, chunkInfos.get(1).getLen()); + assertEquals(chunkLimit, chunkInfos.get(1).getOffset()); + } + + @ContainerTestVersionInfo.ContainerTest + public void testFlush3(ContainerTestVersionInfo versionInfo) + throws Exception { + initTest(versionInfo); + Assumptions.assumeFalse( + isSameSchemaVersion(schemaVersion, OzoneConsts.SCHEMA_V1)); + // simulates writing 1024 bytes, hsync, + // and then write till 4 chunks are full + long containerID = 1; + long blockNo = 2; + long chunkLimit = 4 * 1024 * 1024; + // first hsync (1024 bytes) + blockData1 = createBlockDataWithOneFullChunk(containerID, blockNo, 2, + chunkLimit, 1024, 1); + blockManager.putBlock(keyValueContainer, blockData1, false); + // full flush (4 chunks) + BlockData blockData2 = createBlockDataWithThreeFullChunks( + containerID, blockNo, 2); + blockManager.putBlock(keyValueContainer, blockData2, false); + assertEquals(1, keyValueContainer.getContainerData().getBlockCount()); + + // verify that the four chunks are full + BlockData getBlockData = blockManager.getBlock(keyValueContainer, + new BlockID(containerID, blockNo)); + assertEquals(chunkLimit * 4, getBlockData.getSize()); + assertEquals(2, getBlockData.getBlockCommitSequenceId()); + List chunkInfos = getBlockData.getChunks(); + assertEquals(4, chunkInfos.size()); + for (int i = 0; i < 4; i++) { + assertEquals(chunkLimit, chunkInfos.get(i).getLen()); + assertEquals(chunkLimit * i, chunkInfos.get(i).getOffset()); + } + } } diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java index ef2f1fa118e1..393e8cdb3112 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java @@ -19,6 +19,7 @@ import org.apache.commons.collections.map.HashedMap; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; @@ -30,18 +31,25 @@ import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * State represents persisted data of one specific datanode. */ public class MockDatanodeStorage { - - private final Map blocks = new HashedMap(); + public static final Logger LOG = + LoggerFactory.getLogger(MockDatanodeStorage.class); + public static final String INCREMENTAL_CHUNK_LIST = "incremental"; + public static final String FULL_CHUNK = "full"; + public static final ContainerProtos.KeyValue FULL_CHUNK_KV = + ContainerProtos.KeyValue.newBuilder().setKey(FULL_CHUNK).build(); + + private final Map blocks = new HashedMap(); private final Map> containerBlocks = new HashedMap(); private final Map fullBlockData = new HashMap<>(); - private final Map chunks = new HashMap<>(); - private final Map data = new HashMap<>(); private IOException exception = null; @@ -50,8 +58,70 @@ public void setStorageFailed(IOException reason) { this.exception = reason; } + private boolean isIncrementalChunkList(BlockData blockData) { + for (ContainerProtos.KeyValue kv : blockData.getMetadataList()) { + if (kv.getKey().equals(INCREMENTAL_CHUNK_LIST)) { + return true; + } + } + return false; + } + + private BlockID toBlockID(DatanodeBlockID datanodeBlockID) { + return new BlockID(datanodeBlockID.getContainerID(), + datanodeBlockID.getLocalID()); + } + public void putBlock(DatanodeBlockID blockID, BlockData blockData) { - blocks.put(blockID, blockData); + if (isIncrementalChunkList(blockData)) { + LOG.debug("incremental chunk list"); + putBlockIncremental(blockID, blockData); + } else { + LOG.debug("full chunk list"); + putBlockFull(blockID, blockData); + } + } + + private boolean isFullChunk(ChunkInfo chunkInfo) { + return (chunkInfo.getMetadataList().contains(FULL_CHUNK_KV)); + } + + public void putBlockIncremental( + DatanodeBlockID blockID, BlockData blockData) { + BlockID id = toBlockID(blockID); + if (blocks.containsKey(id)) { + // block already exists. let's append the chunk list to it. + BlockData existing = blocks.get(id); + if (existing.getChunksCount() == 0) { + // empty chunk list. override it. + putBlockFull(blockID, blockData); + } else { + BlockData.Builder blockDataBuilder = pruneLastPartialChunks(existing); + blockDataBuilder.addAllChunks(blockData.getChunksList()); + blocks.put(id, blockDataBuilder.build()); + } + // TODO: verify the chunk list beginning/offset/len is sane + } else { + // the block does not exist yet, simply add it + putBlockFull(blockID, blockData); + } + } + + private BlockData.Builder pruneLastPartialChunks(BlockData existing) { + BlockData.Builder blockDataBuilder = BlockData.newBuilder(existing); + int lastChunkIndex = existing.getChunksCount() - 1; + // if the last chunk in the existing block is full, append after it. + ChunkInfo chunkInfo = existing.getChunks(lastChunkIndex); + if (!isFullChunk(chunkInfo)) { + // otherwise, remove it and append + blockDataBuilder.removeChunks(lastChunkIndex); + } + return blockDataBuilder; + } + + public void putBlockFull(DatanodeBlockID blockID, BlockData blockData) { + BlockID id = toBlockID(blockID); + blocks.put(id, blockData); List dnBlocks = containerBlocks .getOrDefault(blockID.getContainerID(), new ArrayList<>()); dnBlocks.add(blockID); @@ -59,14 +129,24 @@ public void putBlock(DatanodeBlockID blockID, BlockData blockData) { } public BlockData getBlock(DatanodeBlockID blockID) { - return blocks.get(blockID); + BlockID id = toBlockID(blockID); + //assert blocks.containsKey(blockID); + if (!blocks.containsKey(id)) { + StringBuilder sb = new StringBuilder(); + for (BlockID bid : blocks.keySet()) { + sb.append(bid).append("\n"); + } + throw new AssertionError("blockID " + id + + " not found in blocks. Available block ID: \n" + sb); + } + return blocks.get(id); } public List listBlock(long containerID) { List datanodeBlockIDS = containerBlocks.get(containerID); List listBlocksData = new ArrayList<>(); for (DatanodeBlockID dBlock : datanodeBlockIDS) { - listBlocksData.add(blocks.get(dBlock)); + listBlocksData.add(blocks.get(toBlockID(dBlock))); } return listBlocksData; } @@ -77,31 +157,39 @@ public void writeChunk( if (exception != null) { throw exception; } - data.put(createKey(blockID, chunkInfo), - ByteString.copyFrom(bytes.toByteArray())); - chunks.put(createKey(blockID, chunkInfo), chunkInfo); + String blockKey = createKey(blockID); + ByteString block; + if (data.containsKey(blockKey)) { + block = data.get(blockKey); + assert block.size() == chunkInfo.getOffset(); + data.put(blockKey, block.concat(bytes)); + } else { + assert chunkInfo.getOffset() == 0; + data.put(blockKey, bytes); + } + fullBlockData .put(new BlockID(blockID.getContainerID(), blockID.getLocalID()), - fullBlockData.getOrDefault(blockID, "") + fullBlockData.getOrDefault(toBlockID(blockID), "") .concat(bytes.toStringUtf8())); } - public ChunkInfo readChunkInfo( - DatanodeBlockID blockID, - ChunkInfo chunkInfo) { - return chunks.get(createKey(blockID, chunkInfo)); - } - public ByteString readChunkData( DatanodeBlockID blockID, ChunkInfo chunkInfo) { - return data.get(createKey(blockID, chunkInfo)); - + if (LOG.isDebugEnabled()) { + LOG.debug( + "readChunkData: blockID={}, offset={}, len={}", + createKey(blockID), chunkInfo.getOffset(), chunkInfo.getLen()); + } + ByteString str = data.get(createKey(blockID)).substring( + (int)chunkInfo.getOffset(), + (int)chunkInfo.getOffset() + (int)chunkInfo.getLen()); + return str; } - private String createKey(DatanodeBlockID blockId, ChunkInfo chunkInfo) { - return blockId.getContainerID() + "_" + blockId.getLocalID() + "_" - + chunkInfo.getChunkName() + "_" + chunkInfo.getOffset(); + private String createKey(DatanodeBlockID blockId) { + return blockId.getContainerID() + "_" + blockId.getLocalID(); } public Map getAllBlockData() { diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java index 59eb49e5557c..7e5de329d129 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java @@ -110,8 +110,7 @@ public XceiverClientReply sendCommandAsync( private ReadChunkResponseProto readChunk(ReadChunkRequestProto readChunk) { return ReadChunkResponseProto.newBuilder() - .setChunkData(datanodeStorage - .readChunkInfo(readChunk.getBlockID(), readChunk.getChunkData())) + .setChunkData(readChunk.getChunkData()) .setData(datanodeStorage .readChunkData(readChunk.getBlockID(), readChunk.getChunkData())) .setBlockID(readChunk.getBlockID())