Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ private void deleteKeyValueContainerBlocks(

for (Long blkLong : delTX.getLocalIDList()) {
String blk = blkLong.toString();
BatchOperation batch = containerDB.getStore()
.getBatchHandler().initBatchOperation();
BlockData blkInfo = blockDataTable.get(blk);
if (blkInfo != null) {
String deletingKey = OzoneConsts.DELETING_KEY_PREFIX + blk;
Expand All @@ -230,13 +228,15 @@ private void deleteKeyValueContainerBlocks(
}
continue;
}
// Found the block in container db,
// use an atomic update to change its state to deleting.
blockDataTable.putWithBatch(batch, deletingKey, blkInfo);
blockDataTable.deleteWithBatch(batch, blk);
try {

try(BatchOperation batch = containerDB.getStore()
.getBatchHandler().initBatchOperation()) {
// Found the block in container db,
// use an atomic update to change its state to deleting.
blockDataTable.putWithBatch(batch, deletingKey, blkInfo);
blockDataTable.deleteWithBatch(batch, blk);
containerDB.getStore().getBatchHandler()
.commitBatchOperation(batch);
.commitBatchOperation(batch);
newDeletionBlocks++;
if (LOG.isDebugEnabled()) {
LOG.debug("Transited Block {} to DELETING state in container {}",
Expand All @@ -259,32 +259,33 @@ private void deleteKeyValueContainerBlocks(

if (newDeletionBlocks > 0) {
// Finally commit the DB counters.
BatchOperation batchOperation = containerDB.getStore().getBatchHandler()
.initBatchOperation();
Table<String, Long> metadataTable = containerDB.getStore()
.getMetadataTable();

// In memory is updated only when existing delete transactionID is
// greater.
if (delTX.getTxID() > containerData.getDeleteTransactionId()) {
// Update in DB pending delete key count and delete transaction ID.
try(BatchOperation batchOperation =
containerDB.getStore().getBatchHandler().initBatchOperation()) {
Table< String, Long > metadataTable = containerDB.getStore()
.getMetadataTable();

// In memory is updated only when existing delete transactionID is
// greater.
if (delTX.getTxID() > containerData.getDeleteTransactionId()) {
// Update in DB pending delete key count and delete transaction ID.
metadataTable.putWithBatch(batchOperation,
OzoneConsts.DELETE_TRANSACTION_KEY, delTX.getTxID());
}

long pendingDeleteBlocks =
containerData.getNumPendingDeletionBlocks() + newDeletionBlocks;
metadataTable.putWithBatch(batchOperation,
OzoneConsts.DELETE_TRANSACTION_KEY, delTX.getTxID());
}

long pendingDeleteBlocks = containerData.getNumPendingDeletionBlocks() +
newDeletionBlocks;
metadataTable.putWithBatch(batchOperation,
OzoneConsts.PENDING_DELETE_BLOCK_COUNT, pendingDeleteBlocks);

containerDB.getStore().getBatchHandler()
.commitBatchOperation(batchOperation);

// update pending deletion blocks count and delete transaction ID in
// in-memory container status
containerData.updateDeleteTransactionId(delTX.getTxID());
OzoneConsts.PENDING_DELETE_BLOCK_COUNT, pendingDeleteBlocks);

containerDB.getStore().getBatchHandler()
.commitBatchOperation(batchOperation);

containerData.incrPendingDeletionBlocks(newDeletionBlocks);
// update pending deletion blocks count and delete transaction ID in
// in-memory container status
containerData.updateDeleteTransactionId(delTX.getTxID());

containerData.incrPendingDeletionBlocks(newDeletionBlocks);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,31 +124,32 @@ public long putBlock(Container container, BlockData data,
return data.getSize();
}
// update the blockData as well as BlockCommitSequenceId here
BatchOperation batch = db.getStore().getBatchHandler()
.initBatchOperation();
db.getStore().getBlockDataTable().putWithBatch(
batch, Long.toString(data.getLocalID()), data);
db.getStore().getMetadataTable().putWithBatch(
batch, OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID, bcsId);

// Set Bytes used, this bytes used will be updated for every write and
// only get committed for every put block. In this way, when datanode
// is up, for computation of disk space by container only committed
// block length is used, And also on restart the blocks committed to DB
// is only used to compute the bytes used. This is done to keep the
// current behavior and avoid DB write during write chunk operation.
db.getStore().getMetadataTable().putWithBatch(
batch, OzoneConsts.CONTAINER_BYTES_USED,
container.getContainerData().getBytesUsed());

// Set Block Count for a container.
if (incrKeyCount) {
try(BatchOperation batch = db.getStore().getBatchHandler()
.initBatchOperation()) {
db.getStore().getBlockDataTable().putWithBatch(
batch, Long.toString(data.getLocalID()), data);
db.getStore().getMetadataTable().putWithBatch(
batch, OzoneConsts.BLOCK_COUNT,
container.getContainerData().getKeyCount() + 1);
}
batch, OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID, bcsId);

// Set Bytes used, this bytes used will be updated for every write and
// only get committed for every put block. In this way, when datanode
// is up, for computation of disk space by container only committed
// block length is used, And also on restart the blocks committed to DB
// is only used to compute the bytes used. This is done to keep the
// current behavior and avoid DB write during write chunk operation.
db.getStore().getMetadataTable().putWithBatch(
batch, OzoneConsts.CONTAINER_BYTES_USED,
container.getContainerData().getBytesUsed());

// Set Block Count for a container.
if (incrKeyCount) {
db.getStore().getMetadataTable().putWithBatch(
batch, OzoneConsts.BLOCK_COUNT,
container.getContainerData().getKeyCount() + 1);
}

db.getStore().getBatchHandler().commitBatchOperation(batch);
db.getStore().getBatchHandler().commitBatchOperation(batch);
}

container.updateBlockCommitSequenceId(bcsId);
// Increment block count finally here for in-memory.
Expand Down Expand Up @@ -258,17 +259,18 @@ public void deleteBlock(Container container, BlockID blockID) throws
getBlockByID(db, blockID);

// Update DB to delete block and set block count and bytes used.
BatchOperation batch = db.getStore().getBatchHandler()
.initBatchOperation();
String localID = Long.toString(blockID.getLocalID());
db.getStore().getBlockDataTable().deleteWithBatch(batch, localID);
// Update DB to delete block and set block count.
// No need to set bytes used here, as bytes used is taken care during
// delete chunk.
long blockCount = container.getContainerData().getKeyCount() - 1;
db.getStore().getMetadataTable()
.putWithBatch(batch, OzoneConsts.BLOCK_COUNT, blockCount);
db.getStore().getBatchHandler().commitBatchOperation(batch);
try(BatchOperation batch = db.getStore().getBatchHandler()
.initBatchOperation()) {
String localID = Long.toString(blockID.getLocalID());
db.getStore().getBlockDataTable().deleteWithBatch(batch, localID);
// Update DB to delete block and set block count.
// No need to set bytes used here, as bytes used is taken care during
// delete chunk.
long blockCount = container.getContainerData().getKeyCount() - 1;
db.getStore().getMetadataTable()
.putWithBatch(batch, OzoneConsts.BLOCK_COUNT, blockCount);
db.getStore().getBatchHandler().commitBatchOperation(batch);
}

// Decrement block count here
container.getContainerData().decrKeyCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,29 +295,32 @@ public BackgroundTaskResult call() throws Exception {

// Once files are deleted... replace deleting entries with deleted
// entries
BatchOperation batch = meta.getStore().getBatchHandler()
.initBatchOperation();
Table<String, ChunkInfoList> deletedBlocksTable =
meta.getStore().getDeletedBlocksTable();
for (String entry: succeedBlocks) {
List<ContainerProtos.ChunkInfo> chunkList =
blockDataTable.get(entry).getChunks();
String blockId = entry.substring(
OzoneConsts.DELETING_KEY_PREFIX.length());

deletedBlocksTable.putWithBatch(
batch, blockId,
new ChunkInfoList(chunkList));
blockDataTable.deleteWithBatch(batch, entry);
}
try(BatchOperation batch = meta.getStore().getBatchHandler()
.initBatchOperation()) {
Table< String, ChunkInfoList > deletedBlocksTable =
meta.getStore().getDeletedBlocksTable();
for (String entry : succeedBlocks) {
List< ContainerProtos.ChunkInfo > chunkList =
blockDataTable.get(entry).getChunks();
String blockId = entry.substring(
OzoneConsts.DELETING_KEY_PREFIX.length());

deletedBlocksTable.putWithBatch(
batch, blockId,
new ChunkInfoList(chunkList));
blockDataTable.deleteWithBatch(batch, entry);
}

int deleteBlockCount = succeedBlocks.size();
containerData.updateAndCommitDBCounters(meta, batch, deleteBlockCount);
int deleteBlockCount = succeedBlocks.size();
containerData.updateAndCommitDBCounters(meta, batch,
deleteBlockCount);

// update count of pending deletion blocks and block count in in-memory
// container status.
containerData.decrPendingDeletionBlocks(deleteBlockCount);
containerData.decrKeyCount(deleteBlockCount);

// update count of pending deletion blocks and block count in
// in-memory container status.
containerData.decrPendingDeletionBlocks(deleteBlockCount);
containerData.decrKeyCount(deleteBlockCount);
}

if (!succeedBlocks.isEmpty()) {
LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,18 @@ public void addTransactions(Map<Long, List<Long>> containerBlocksMap)
throws IOException {
lock.lock();
try {
BatchOperation batch = scmMetadataStore.getStore().initBatchOperation();
for (Map.Entry<Long, List<Long>> entry : containerBlocksMap.entrySet()) {
long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
entry.getKey(), entry.getValue());
scmMetadataStore.getDeletedBlocksTXTable().putWithBatch(batch,
nextTXID, tx);
try(BatchOperation batch =
scmMetadataStore.getStore().initBatchOperation()) {
for (Map.Entry< Long, List< Long > > entry :
containerBlocksMap.entrySet()) {
long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
entry.getKey(), entry.getValue());
scmMetadataStore.getDeletedBlocksTXTable().putWithBatch(batch,
nextTXID, tx);
}
scmMetadataStore.getStore().commitBatchOperation(batch);
}
scmMetadataStore.getStore().commitBatchOperation(batch);
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
Expand All @@ -51,6 +52,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER;

/**
* ContainerManager class contains the mapping from a name to a pipeline
* mapping. This is used by SCM when allocating new locations and when
Expand Down Expand Up @@ -322,7 +325,7 @@ public void deleteContainer(ContainerID containerID) throws IOException {
throw new SCMException(
"Failed to delete container " + containerID + ", reason : " +
"container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
FAILED_TO_FIND_CONTAINER);
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -378,7 +381,7 @@ private HddsProtos.LifeCycleState updateContainerState(
"Failed to update container state"
+ containerID
+ ", reason : container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
FAILED_TO_FIND_CONTAINER);
} finally {
lock.unlock();
}
Expand All @@ -397,33 +400,28 @@ public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
if (deleteTransactionMap == null) {
return;
}
org.apache.hadoop.hdds.utils.db.BatchOperation batchOperation =
batchHandler.initBatchOperation();
lock.lock();
try {
for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
try(BatchOperation batchOperation = batchHandler.initBatchOperation()) {
for (Map.Entry< Long, Long > entry : deleteTransactionMap.entrySet()) {
long containerID = entry.getKey();

ContainerID containerIdObject = new ContainerID(containerID);
ContainerInfo containerInfo =
containerStore.get(containerIdObject);
ContainerInfo containerInfoInMem = containerStateManager
.getContainer(containerIdObject);
if (containerInfo == null || containerInfoInMem == null) {
throw new SCMException(
"Failed to increment number of deleted blocks for container "
+ containerID + ", reason : " + "container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
throw new SCMException("Failed to increment number of deleted " +
"blocks for container " + containerID + ", reason : " +
"container doesn't exist.", FAILED_TO_FIND_CONTAINER);
}
containerInfo.updateDeleteTransactionId(entry.getValue());
containerInfo.setNumberOfKeys(containerInfoInMem.getNumberOfKeys());
containerInfo.setUsedBytes(containerInfoInMem.getUsedBytes());
containerStore
.putWithBatch(batchOperation, containerIdObject, containerInfo);
containerStore.putWithBatch(batchOperation, containerIdObject,
containerInfo);
}
batchHandler.commitBatchOperation(batchOperation);
containerStateManager
.updateDeleteTransactionId(deleteTransactionMap);
containerStateManager.updateDeleteTransactionId(deleteTransactionMap);
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3601,15 +3601,16 @@ private void addS3GVolumeToDB() throws IOException {


// Commit to DB.
BatchOperation batchOperation =
metadataManager.getStore().initBatchOperation();
try(BatchOperation batchOperation =
metadataManager.getStore().initBatchOperation()) {
metadataManager.getVolumeTable().putWithBatch(batchOperation,
dbVolumeKey, omVolumeArgs);

metadataManager.getVolumeTable().putWithBatch(batchOperation, dbVolumeKey,
omVolumeArgs);
metadataManager.getUserTable().putWithBatch(batchOperation, dbUserKey,
userVolumeInfo);
metadataManager.getUserTable().putWithBatch(batchOperation, dbUserKey,
userVolumeInfo);

metadataManager.getStore().commitBatchOperation(batchOperation);
metadataManager.getStore().commitBatchOperation(batchOperation);
}

// Add to cache.
metadataManager.getVolumeTable().addCacheEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,16 @@ public void testValidateAndUpdateCache() throws Exception {
.setStatus(Status.OK)
.build();

BatchOperation batchOperation =
omMetadataManager.getStore().initBatchOperation();
try(BatchOperation batchOperation =
omMetadataManager.getStore().initBatchOperation()) {

OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(
omResponse, deletedKeyNames);
omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation);
OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(
omResponse, deletedKeyNames);
omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation);

// Do manual commit and see whether addToBatch is successful or not.
omMetadataManager.getStore().commitBatchOperation(batchOperation);
// Do manual commit and see whether addToBatch is successful or not.
omMetadataManager.getStore().commitBatchOperation(batchOperation);
}

// The keys should not exist in the DeletedKeys table
for (String deletedKey : deletedKeyNames) {
Expand Down
Loading