Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1101,9 +1101,9 @@ public void writeChunkForClosedContainer(ChunkInfo chunkInfo, BlockID blockID,
* Handle Put Block operation for closed container. Calls BlockManager to process the request.
*
*/
public void putBlockForClosedContainer(List<ContainerProtos.ChunkInfo> chunkInfos, KeyValueContainer kvContainer,
BlockData blockData, long blockCommitSequenceId)
throws IOException {
public void putBlockForClosedContainer(KeyValueContainer kvContainer, BlockData blockData,
long blockCommitSequenceId, boolean overwriteBscId)
throws IOException {
Preconditions.checkNotNull(kvContainer);
Preconditions.checkNotNull(blockData);
long startTime = Time.monotonicNowNanos();
Expand All @@ -1112,11 +1112,12 @@ public void putBlockForClosedContainer(List<ContainerProtos.ChunkInfo> chunkInfo
throw new IOException("Container #" + kvContainer.getContainerData().getContainerID() +
" is not in closed state, Container state is " + kvContainer.getContainerState());
}
blockData.setChunks(chunkInfos);
// To be set from the Replica's BCSId
blockData.setBlockCommitSequenceId(blockCommitSequenceId);
if (overwriteBscId) {
blockData.setBlockCommitSequenceId(blockCommitSequenceId);
}

blockManager.putBlock(kvContainer, blockData, false);
blockManager.putBlockForClosedContainer(kvContainer, blockData, overwriteBscId);
ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage();
final long numBytes = blockDataProto.getSerializedSize();
// Increment write stats for PutBlock after write.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,82 @@ public long putBlock(Container container, BlockData data,
data, endOfBlock);
}

@Override
public long putBlockForClosedContainer(Container container, BlockData data, boolean overwriteBcsId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a flag to overwrite the BCSID? Shouldn't it always be an automatic greater-than check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there are lots of warnings due to raw use Container. Maybe we change BlockManager to type T something like this BlockManager<T extends ContainerData>. It could be a separate PR if we agree.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. We can do that in the main branch as an improvement.

throws IOException {
Preconditions.checkNotNull(data, "BlockData cannot be null for put " +
"operation.");
Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
"cannot be negative");

KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData();

// We are not locking the key manager since RocksDB serializes all actions
// against a single DB. We rely on DB level locking to avoid conflicts.
try (DBHandle db = BlockUtils.getDB(containerData, config)) {
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, DB_NULL_ERR_MSG);

long bcsId = data.getBlockCommitSequenceId();
long containerBCSId = containerData.getBlockCommitSequenceId();

// Check if the block is already present in the DB of the container to determine whether
// the blockCount is already incremented for this block in the DB or not.
long localID = data.getLocalID();
boolean incrBlockCount = false;

// update the blockData as well as BlockCommitSequenceId here
try (BatchOperation batch = db.getStore().getBatchHandler()
.initBatchOperation()) {
// If block exists in cache, blockCount should not be incremented.
if (db.getStore().getBlockDataTable().get(containerData.getBlockKey(localID)) == null) {
// Block does not exist in DB => blockCount needs to be
// incremented when the block is added into DB.
incrBlockCount = true;
}

db.getStore().getBlockDataTable().putWithBatch(batch, containerData.getBlockKey(localID), data);
if (overwriteBcsId && bcsId > containerBCSId) {
db.getStore().getMetadataTable().putWithBatch(batch, containerData.getBcsIdKey(), bcsId);
}

// Set Bytes used, this bytes used will be updated for every write and
// only get committed for every put block. In this way, when datanode
// is up, for computation of disk space by container only committed
// block length is used, And also on restart the blocks committed to DB
// is only used to compute the bytes used. This is done to keep the
// current behavior and avoid DB write during write chunk operation.
// Write UTs for this
db.getStore().getMetadataTable().putWithBatch(batch, containerData.getBytesUsedKey(),
containerData.getBytesUsed());

// Set Block Count for a container.
if (incrBlockCount) {
db.getStore().getMetadataTable().putWithBatch(batch, containerData.getBlockCountKey(),
containerData.getBlockCount() + 1);
}

db.getStore().getBatchHandler().commitBatchOperation(batch);
}

if (overwriteBcsId && bcsId > containerBCSId) {
container.updateBlockCommitSequenceId(bcsId);
}

// Increment block count in-memory after the DB update.
if (incrBlockCount) {
containerData.incrBlockCount();
}

if (LOG.isDebugEnabled()) {
LOG.debug("Block {} successfully persisted for closed container {} with bcsId {} chunk size {}",
data.getBlockID(), containerData.getContainerID(), bcsId, data.getChunks().size());
}
return data.getSize();
}
}

public long persistPutBlock(KeyValueContainer container,
BlockData data, boolean endOfBlock)
throws IOException {
Expand Down Expand Up @@ -367,6 +443,19 @@ public List<BlockData> listBlock(Container container, long startLocalID, int
}
}

@Override
public boolean blockExists(Container container, BlockID blockID) throws IOException {
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
try (DBHandle db = BlockUtils.getDB(containerData, config)) {
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, DB_NULL_ERR_MSG);
String blockKey = containerData.getBlockKey(blockID.getLocalID());
return db.getStore().getBlockDataTable().isExist(blockKey);
}
}

/**
* Shutdown KeyValueContainerManager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ public interface BlockManager {
long putBlock(Container container, BlockData data, boolean endOfBlock)
throws IOException;

/**
* Puts or overwrites a block to a closed container.
*
* @param container - Container for which block need to be added.
* @param data - Block Data.
* @param overwriteBcsId - To overwrite bcsId in the block data.
* @return length of the Block.
*/
long putBlockForClosedContainer(Container container, BlockData data, boolean overwriteBcsId)
throws IOException;

/**
* Gets an existing block.
*
Expand Down Expand Up @@ -79,6 +90,15 @@ long putBlock(Container container, BlockData data, boolean endOfBlock)
List<BlockData> listBlock(Container container, long startLocalID, int count)
throws IOException;

/**
* Check if a block exists in the container.
*
* @param container - Container from which blocks need to be listed.
* @param blockID - BlockID of the Block.
* @return True if block exists, false otherwise.
*/
boolean blockExists(Container container, BlockID blockID) throws IOException;

/**
* Returns last committed length of the block.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
Expand All @@ -50,6 +51,7 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.io.TempDir;

Expand Down Expand Up @@ -196,6 +198,52 @@ public void testPutAndGetBlock(ContainerTestVersionInfo versionInfo)

}

@ContainerTestVersionInfo.ContainerTest
public void testPutBlockForClosed(ContainerTestVersionInfo versionInfo)
throws Exception {
initTest(versionInfo);
assertEquals(0, keyValueContainer.getContainerData().getBlockCount());
// 1. Put Block with bcsId = 2, Overwrite = true
blockManager.putBlockForClosedContainer(keyValueContainer, blockData1, true);

BlockData fromGetBlockData;
//Check Container's bcsId
fromGetBlockData = blockManager.getBlock(keyValueContainer, blockData1.getBlockID());
assertEquals(1, keyValueContainer.getContainerData().getBlockCount());
assertEquals(1, keyValueContainer.getContainerData().getBlockCommitSequenceId());
assertEquals(1, fromGetBlockData.getBlockCommitSequenceId());

// 2. Put Block with bcsId = 3, Overwrite = false
BlockData blockData2 = createBlockData(1L, 3L, 1, 0, 2048, 2);
blockManager.putBlockForClosedContainer(keyValueContainer, blockData2, false);

// The block should be written, but we won't be able to read it, As BcsId < container's BcsId
// fails during block read.
Assertions.assertThrows(StorageContainerException.class, () -> blockManager
.getBlock(keyValueContainer, blockData2.getBlockID()));
assertEquals(2, keyValueContainer.getContainerData().getBlockCount());
// BcsId should still be 1, as the BcsId is not overwritten
assertEquals(1, keyValueContainer.getContainerData().getBlockCommitSequenceId());

// 3. Put Block with bcsId = 3, Overwrite = true
// This should succeed as we are overwriting the BcsId, The container BcsId should be updated to 3
// The block count should not change.
blockManager.putBlockForClosedContainer(keyValueContainer, blockData2, true);
fromGetBlockData = blockManager.getBlock(keyValueContainer, blockData2.getBlockID());
assertEquals(2, keyValueContainer.getContainerData().getBlockCount());
assertEquals(2, keyValueContainer.getContainerData().getBlockCommitSequenceId());
assertEquals(2, fromGetBlockData.getBlockCommitSequenceId());

// 4. Put Block with bcsId = 1 < container bcsId, Overwrite = true
// Container bcsId should not change
BlockData blockData3 = createBlockData(1L, 1L, 1, 0, 2048, 1);
blockManager.putBlockForClosedContainer(keyValueContainer, blockData3, true);
fromGetBlockData = blockManager.getBlock(keyValueContainer, blockData3.getBlockID());
assertEquals(3, keyValueContainer.getContainerData().getBlockCount());
assertEquals(2, keyValueContainer.getContainerData().getBlockCommitSequenceId());
assertEquals(1, fromGetBlockData.getBlockCommitSequenceId());
}

@ContainerTestVersionInfo.ContainerTest
public void testListBlock(ContainerTestVersionInfo versionInfo)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ public void testWriteChunkAndPutBlockFailureForNonClosedContainer(
ChunkBuffer.wrap(getData());
Assertions.assertThrows(IOException.class, () -> keyValueHandler.writeChunkForClosedContainer(
getChunkInfo(), getBlockID(), ChunkBuffer.wrap(getData()), keyValueContainer));
Assertions.assertThrows(IOException.class, () -> keyValueHandler.putBlockForClosedContainer(
null, keyValueContainer, new BlockData(getBlockID()), 0L));
Assertions.assertThrows(IOException.class, () -> keyValueHandler.putBlockForClosedContainer(keyValueContainer,
new BlockData(getBlockID()), 0L, true));
}

@Test
Expand Down Expand Up @@ -227,7 +227,8 @@ public void testPutBlockForClosedContainer() throws IOException {
List<ContainerProtos.ChunkInfo> chunkInfoList = new ArrayList<>();
chunkInfoList.add(getChunkInfo().getProtoBufMessage());
BlockData putBlockData = new BlockData(getBlockID());
keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer, putBlockData, 1L);
putBlockData.setChunks(chunkInfoList);
keyValueHandler.putBlockForClosedContainer(kvContainer, putBlockData, 1L, true);
Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 1L);
Assertions.assertEquals(containerData.getBlockCount(), 1L);

Expand All @@ -242,7 +243,8 @@ public void testPutBlockForClosedContainer() throws IOException {
ChunkInfo newChunkInfo = new ChunkInfo(String.format("%d.data.%d", getBlockID()
.getLocalID(), 1L), 0, 20L);
chunkInfoList.add(newChunkInfo.getProtoBufMessage());
keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer, putBlockData, 2L);
putBlockData.setChunks(chunkInfoList);
keyValueHandler.putBlockForClosedContainer(kvContainer, putBlockData, 2L, true);
Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 2L);
Assertions.assertEquals(containerData.getBlockCount(), 1L);

Expand All @@ -253,8 +255,7 @@ public void testPutBlockForClosedContainer() throws IOException {
Assertions.assertTrue(blockDataEquals(putBlockData, getBlockData));
}

// Put block on bcsId <= containerBcsId should be a no-op
keyValueHandler.putBlockForClosedContainer(chunkInfoList, kvContainer, putBlockData, 2L);
keyValueHandler.putBlockForClosedContainer(kvContainer, putBlockData, 2L, true);
Assertions.assertEquals(containerData.getBlockCommitSequenceId(), 2L);
}

Expand Down