Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1099,11 +1099,17 @@ public void writeChunkForClosedContainer(ChunkInfo chunkInfo, BlockID blockID,

/**
* Handle Put Block operation for closed container. Calls BlockManager to process the request.
*
* This is primarily used by container reconciliation process to persist the block data for closed container.
* @param kvContainer - Container for which block data need to be persisted.
* @param blockData - Block Data to be persisted (BlockData should have the chunks).
* @param blockCommitSequenceId - Block Commit Sequence ID for the block.
* @param overwriteBscId - To overwrite bcsId in the block data and container. In case of chunk failure
* during reconciliation, we do not want to overwrite the bcsId as this block/container
* is incomplete in its current state.
*/
public void putBlockForClosedContainer(List<ContainerProtos.ChunkInfo> chunkInfos, KeyValueContainer kvContainer,
BlockData blockData, long blockCommitSequenceId)
throws IOException {
public void putBlockForClosedContainer(KeyValueContainer kvContainer, BlockData blockData,
long blockCommitSequenceId, boolean overwriteBscId)
throws IOException {
Preconditions.checkNotNull(kvContainer);
Preconditions.checkNotNull(blockData);
long startTime = Time.monotonicNowNanos();
Expand All @@ -1112,11 +1118,12 @@ public void putBlockForClosedContainer(List<ContainerProtos.ChunkInfo> chunkInfo
throw new IOException("Container #" + kvContainer.getContainerData().getContainerID() +
" is not in closed state, Container state is " + kvContainer.getContainerState());
}
blockData.setChunks(chunkInfos);
// To be set from the Replica's BCSId
blockData.setBlockCommitSequenceId(blockCommitSequenceId);
if (overwriteBscId) {
blockData.setBlockCommitSequenceId(blockCommitSequenceId);
}

blockManager.putBlock(kvContainer, blockData, false);
blockManager.putBlockForClosedContainer(kvContainer, blockData, overwriteBscId);
ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage();
final long numBytes = blockDataProto.getSerializedSize();
// Increment write stats for PutBlock after write.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,80 @@ public long putBlock(Container container, BlockData data,
data, endOfBlock);
}

/**
* {@inheritDoc}
*/
@Override
public long putBlockForClosedContainer(Container container, BlockData data, boolean overwriteBcsId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a flag to overwrite the BCSID? Shouldn't it always be an automatic greater-than check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there are lots of warnings due to raw use Container. Maybe we change BlockManager to type T something like this BlockManager<T extends ContainerData>. It could be a separate PR if we agree.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. We can do that in the main branch as an improvement.

throws IOException {
Preconditions.checkNotNull(data, "BlockData cannot be null for put operation.");
Preconditions.checkState(data.getContainerID() >= 0, "Container Id cannot be negative");

KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData();

// We are not locking the key manager since RocksDB serializes all actions
// against a single DB. We rely on DB level locking to avoid conflicts.
try (DBHandle db = BlockUtils.getDB(containerData, config)) {
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, DB_NULL_ERR_MSG);

long blockBcsID = data.getBlockCommitSequenceId();
long containerBcsID = containerData.getBlockCommitSequenceId();

// Check if the block is already present in the DB of the container to determine whether
// the blockCount is already incremented for this block in the DB or not.
long localID = data.getLocalID();
boolean incrBlockCount = false;

// update the blockData as well as BlockCommitSequenceId here
try (BatchOperation batch = db.getStore().getBatchHandler()
.initBatchOperation()) {
// If block already exists in the DB, blockCount should not be incremented.
if (db.getStore().getBlockDataTable().get(containerData.getBlockKey(localID)) == null) {
incrBlockCount = true;
}

db.getStore().getBlockDataTable().putWithBatch(batch, containerData.getBlockKey(localID), data);
if (overwriteBcsId && blockBcsID > containerBcsID) {
db.getStore().getMetadataTable().putWithBatch(batch, containerData.getBcsIdKey(), blockBcsID);
}

// Set Bytes used, this bytes used will be updated for every write and
// only get committed for every put block. In this way, when datanode
// is up, for computation of disk space by container only committed
// block length is used, And also on restart the blocks committed to DB
// is only used to compute the bytes used. This is done to keep the
// current behavior and avoid DB write during write chunk operation.
db.getStore().getMetadataTable().putWithBatch(batch, containerData.getBytesUsedKey(),
containerData.getBytesUsed());

// Set Block Count for a container.
if (incrBlockCount) {
db.getStore().getMetadataTable().putWithBatch(batch, containerData.getBlockCountKey(),
containerData.getBlockCount() + 1);
}

db.getStore().getBatchHandler().commitBatchOperation(batch);
}

if (overwriteBcsId && blockBcsID > containerBcsID) {
container.updateBlockCommitSequenceId(blockBcsID);
}

// Increment block count in-memory after the DB update.
if (incrBlockCount) {
containerData.incrBlockCount();
}

if (LOG.isDebugEnabled()) {
LOG.debug("Block {} successfully persisted for closed container {} with bcsId {} chunk size {}",
data.getBlockID(), containerData.getContainerID(), blockBcsID, data.getChunks().size());
}
return data.getSize();
}
}

public long persistPutBlock(KeyValueContainer container,
BlockData data, boolean endOfBlock)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.container.keyvalue.impl;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
import static org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage.COMMIT_DATA;
Expand Down Expand Up @@ -149,7 +150,7 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
.getContainerData();

final File chunkFile = getChunkFile(container, blockID);
long len = info.getLen();
long chunkLength = info.getLen();
long offset = info.getOffset();

HddsVolume volume = containerData.getVolume();
Expand All @@ -174,10 +175,27 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
ChunkUtils.validateChunkSize(channel, info, chunkFile.getName());
}

ChunkUtils
.writeData(channel, chunkFile.getName(), data, offset, len, volume);
long fileLengthBeforeWrite;
try {
fileLengthBeforeWrite = channel.size();
} catch (IOException e) {
throw new StorageContainerException("Encountered an error while getting the file size for "
+ chunkFile.getName(), CHUNK_FILE_INCONSISTENCY);
}

ChunkUtils.writeData(channel, chunkFile.getName(), data, offset, chunkLength, volume);

// When overwriting, update the bytes used if the new length is greater than the old length
// This is to ensure that the bytes used is updated correctly when overwriting a smaller chunk
// with a larger chunk at the end of the block.
if (overwrite) {
long fileLengthAfterWrite = offset + chunkLength;
if (fileLengthAfterWrite > fileLengthBeforeWrite) {
containerData.incrBytesUsed(fileLengthAfterWrite - fileLengthBeforeWrite);
}
}

containerData.updateWriteStats(len, overwrite);
containerData.updateWriteStats(chunkLength, overwrite);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.apache.hadoop.ozone.container.keyvalue.interfaces;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;

/**
* BlockManager is for performing key related operations on the container.
Expand Down Expand Up @@ -49,6 +52,18 @@ public interface BlockManager {
long putBlock(Container container, BlockData data, boolean endOfBlock)
throws IOException;

/**
* Persists the block data for a closed container. The block data should have all the chunks and bcsId.
* Overwrites the block if it already exists, The container's used bytes should be updated by the caller with
* {@link ChunkManager#writeChunk(Container, BlockID, ChunkInfo, ByteBuffer, DispatcherContext)}.
*
* @param container - Container for which block data need to be persisted.
* @param data - Block Data to be persisted (BlockData should have the chunks).
* @param overwriteBcsId - To overwrite bcsId of the container.
*/
long putBlockForClosedContainer(Container container, BlockData data, boolean overwriteBcsId)
throws IOException;

/**
* Gets an existing block.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.hadoop.ozone.container.keyvalue.impl;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
import static org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion;
import static org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
Expand All @@ -36,10 +38,12 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
Expand Down Expand Up @@ -81,10 +85,10 @@ private void initTest(ContainerTestVersionInfo versionInfo)
this.schemaVersion = versionInfo.getSchemaVersion();
this.config = new OzoneConfiguration();
ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, config);
initilaze();
initialize();
}

private void initilaze() throws Exception {
private void initialize() throws Exception {
UUID datanodeId = UUID.randomUUID();
HddsVolume hddsVolume = new HddsVolume.Builder(folder.toString())
.conf(config)
Expand Down Expand Up @@ -196,6 +200,73 @@ public void testPutAndGetBlock(ContainerTestVersionInfo versionInfo)

}

@ContainerTestVersionInfo.ContainerTest
public void testPutBlockForClosed(ContainerTestVersionInfo versionInfo)
throws Exception {
initTest(versionInfo);
KeyValueContainerData containerData = keyValueContainer.getContainerData();
assertEquals(0, containerData.getBlockCount());
keyValueContainer.close();
assertEquals(CLOSED, keyValueContainer.getContainerState());

try (DBHandle db = BlockUtils.getDB(containerData, config)) {
// 1. Put Block with bcsId = 1, Overwrite BCS ID = true
blockData1 = createBlockData(1L, 2L, 1, 0, 2048, 1);
blockManager.putBlockForClosedContainer(keyValueContainer, blockData1, true);

BlockData fromGetBlockData;
//Check Container's bcsId
fromGetBlockData = blockManager.getBlock(keyValueContainer, blockData.getBlockID());
assertEquals(1, containerData.getBlockCount());
assertEquals(1, containerData.getBlockCommitSequenceId());
assertEquals(1, fromGetBlockData.getBlockCommitSequenceId());
assertEquals(1, db.getStore().getMetadataTable().get(containerData.getBcsIdKey()));
assertEquals(1, db.getStore().getMetadataTable().get(containerData.getBlockCountKey()));

// 2. Put Block with bcsId = 2, Overwrite BCS ID = false
BlockData blockData2 = createBlockData(1L, 3L, 1, 0, 2048, 2);
blockManager.putBlockForClosedContainer(keyValueContainer, blockData2, false);

// The block should be written, but we won't be able to read it, As expected BcsId < container's BcsId
// fails during block read.
assertThrows(StorageContainerException.class, () -> blockManager
.getBlock(keyValueContainer, blockData2.getBlockID()));
fromGetBlockData = db.getStore().getBlockDataTable().get(containerData.getBlockKey(blockData2.getLocalID()));
assertEquals(2, containerData.getBlockCount());
// BcsId should still be 1, as the BcsId is not overwritten
assertEquals(1, containerData.getBlockCommitSequenceId());
assertEquals(2, fromGetBlockData.getBlockCommitSequenceId());
assertEquals(2, db.getStore().getMetadataTable().get(containerData.getBlockCountKey()));
assertEquals(1, db.getStore().getMetadataTable().get(containerData.getBcsIdKey()));

// 3. Put Block with bcsId = 2, Overwrite BCS ID = true
// This should succeed as we are overwriting the BcsId, The container BcsId should be updated to 2
// The block count should not change.
blockManager.putBlockForClosedContainer(keyValueContainer, blockData2, true);
fromGetBlockData = blockManager.getBlock(keyValueContainer, blockData2.getBlockID());
assertEquals(2, containerData.getBlockCount());
assertEquals(2, containerData.getBlockCommitSequenceId());
assertEquals(2, fromGetBlockData.getBlockCommitSequenceId());
assertEquals(2, db.getStore().getMetadataTable().get(containerData.getBlockCountKey()));
assertEquals(2, db.getStore().getMetadataTable().get(containerData.getBcsIdKey()));

// 4. Put Block with bcsId = 1 < container bcsId, Overwrite BCS ID = true
// We are overwriting an existing block with lower bcsId than container bcsId. Container bcsId should not change
BlockData blockData3 = createBlockData(1L, 1L, 1, 0, 2048, 1);
blockManager.putBlockForClosedContainer(keyValueContainer, blockData3, true);
fromGetBlockData = blockManager.getBlock(keyValueContainer, blockData3.getBlockID());
assertEquals(3, containerData.getBlockCount());
assertEquals(2, containerData.getBlockCommitSequenceId());
assertEquals(1, fromGetBlockData.getBlockCommitSequenceId());
assertEquals(3, db.getStore().getMetadataTable().get(containerData.getBlockCountKey()));
assertEquals(2, db.getStore().getMetadataTable().get(containerData.getBcsIdKey()));

// writeChunk updates the in-memory state of the used bytes, putBlock persists the in-memory state to the DB.
// We are only doing putBlock without writeChunk, the used bytes should be 0.
assertEquals(0, db.getStore().getMetadataTable().get(containerData.getBytesUsedKey()));
}
}

@ContainerTestVersionInfo.ContainerTest
public void testListBlock(ContainerTestVersionInfo versionInfo)
throws Exception {
Expand Down
Loading