diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 64cc804f000..f222daea440 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -253,28 +253,30 @@ private void deleteKeyValueContainerBlocks( } } - // Finally commit the DB counters. - BatchOperation batchOperation = new BatchOperation(); + if (newDeletionBlocks > 0) { + // Finally commit the DB counters. + BatchOperation batchOperation = new BatchOperation(); - // In memory is updated only when existing delete transactionID is - // greater. - if (delTX.getTxID() > containerData.getDeleteTransactionId()) { - // Update in DB pending delete key count and delete transaction ID. - batchOperation.put(DB_CONTAINER_DELETE_TRANSACTION_KEY, - Longs.toByteArray(delTX.getTxID())); - } + // In memory is updated only when existing delete transactionID is + // greater. + if (delTX.getTxID() > containerData.getDeleteTransactionId()) { + // Update in DB pending delete key count and delete transaction ID. + batchOperation.put(DB_CONTAINER_DELETE_TRANSACTION_KEY, + Longs.toByteArray(delTX.getTxID())); + } - batchOperation.put(DB_PENDING_DELETE_BLOCK_COUNT_KEY, Longs.toByteArray( - containerData.getNumPendingDeletionBlocks() + newDeletionBlocks)); + batchOperation.put(DB_PENDING_DELETE_BLOCK_COUNT_KEY, Longs.toByteArray( + containerData.getNumPendingDeletionBlocks() + newDeletionBlocks)); - containerDB.getStore().writeBatch(batchOperation); + containerDB.getStore().writeBatch(batchOperation); - // update pending deletion blocks count and delete transaction ID in - // in-memory container status - containerData.updateDeleteTransactionId(delTX.getTxID()); + // update pending deletion blocks count and delete transaction ID in + // in-memory container status + containerData.updateDeleteTransactionId(delTX.getTxID()); - containerData.incrPendingDeletionBlocks(newDeletionBlocks); + containerData.incrPendingDeletionBlocks(newDeletionBlocks); + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 53797b08b24..e0de6ff90f8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -420,14 +420,16 @@ ContainerCommandResponseProto handlePutBlock( BlockData blockData = BlockData.getFromProtoBuf(data); Preconditions.checkNotNull(blockData); + boolean incrKeyCount = false; if (!request.getPutBlock().hasEof() || request.getPutBlock().getEof()) { chunkManager.finishWriteChunks(kvContainer, blockData); + incrKeyCount = true; } long bcsId = dispatcherContext == null ? 0 : dispatcherContext.getLogIndex(); blockData.setBlockCommitSequenceId(bcsId); - blockManager.putBlock(kvContainer, blockData); + blockManager.putBlock(kvContainer, blockData, incrKeyCount); blockDataProto = blockData.getProtoBufMessage(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index 9544e29c1ec..51fa1c9614f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -83,6 +83,20 @@ public BlockManagerImpl(ConfigurationSource conf) { * @throws IOException */ public long putBlock(Container container, BlockData data) throws IOException { + return putBlock(container, data, true); + } + /** + * Puts or overwrites a block. + * + * @param container - Container for which block need to be added. + * @param data - BlockData. + * @param incrKeyCount - for FilePerBlockStrategy, increase key count only + * when the whole block file is written. + * @return length of the block. + * @throws IOException + */ + public long putBlock(Container container, BlockData data, + boolean incrKeyCount) throws IOException { Preconditions.checkNotNull(data, "BlockData cannot be null for put " + "operation."); Preconditions.checkState(data.getContainerID() >= 0, "Container Id " + @@ -129,14 +143,18 @@ public long putBlock(Container container, BlockData data) throws IOException { Longs.toByteArray(container.getContainerData().getBytesUsed())); // Set Block Count for a container. - batch.put(DB_BLOCK_COUNT_KEY, - Longs.toByteArray(container.getContainerData().getKeyCount() + 1)); + if (incrKeyCount) { + batch.put(DB_BLOCK_COUNT_KEY, + Longs.toByteArray(container.getContainerData().getKeyCount() + 1)); + } db.getStore().writeBatch(batch); container.updateBlockCommitSequenceId(bcsId); // Increment block count finally here for in-memory. - container.getContainerData().incrKeyCount(); + if (incrKeyCount) { + container.getContainerData().incrKeyCount(); + } if (LOG.isDebugEnabled()) { LOG.debug( "Block " + data.getBlockID() + " successfully committed with bcsId " diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java index 6812b0d8ff8..72b104025b5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java @@ -40,6 +40,18 @@ public interface BlockManager { */ long putBlock(Container container, BlockData data) throws IOException; + /** + * Puts or overwrites a block. + * + * @param container - Container for which block need to be added. + * @param data - Block Data. + * @param incrKeyCount - Whether to increase container key count. + * @return length of the Block. + * @throws IOException + */ + long putBlock(Container container, BlockData data, boolean incrKeyCount) + throws IOException; + /** * Gets an existing block. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index bf0317431b0..acc516a8bc3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -196,8 +196,9 @@ public void commitTransactions( final ContainerID containerId = ContainerID.valueof( transactionResult.getContainerID()); if (dnsWithCommittedTxn == null) { - LOG.warn("Transaction txId={} commit by dnId={} for containerID={} " - + "failed. Corresponding entry not found.", txID, dnID, + // Mostly likely it's a retried delete command response. + LOG.debug("Transaction txId={} commit by dnId={} for containerID={}" + + " failed. Corresponding entry not found.", txID, dnID, containerId); return; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index 0980369c337..855bcfeaa60 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -109,8 +109,8 @@ public void handlePendingDeletes(PendingDeleteStatusList deletionStatusList) { DatanodeDetails dnDetails = deletionStatusList.getDatanodeDetails(); for (PendingDeleteStatusList.PendingDeleteStatus deletionStatus : deletionStatusList.getPendingDeleteStatuses()) { - LOG.info( - "Block deletion txnID mismatch in datanode {} for containerID {}." + LOG.debug( + "Block deletion txnID lagging in datanode {} for containerID {}." + " Datanode delete txnID: {}, SCM txnID: {}", dnDetails.getUuid(), deletionStatus.getContainerId(), deletionStatus.getDnDeleteTransactionId(), diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java index 44b3364c580..29f0083c0b6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java @@ -103,6 +103,7 @@ private void updateContainerStats(final ContainerID containerId, containerInfo.updateSequenceId( replicaProto.getBlockCommitSequenceId()); } + if (containerInfo.getUsedBytes() < replicaProto.getUsed()) { containerInfo.setUsedBytes(replicaProto.getUsed()); } @@ -228,6 +229,8 @@ private void updateContainerReplica(final DatanodeDetails datanodeDetails, .setDatanodeDetails(datanodeDetails) .setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId())) .setSequenceId(replicaProto.getBlockCommitSequenceId()) + .setKeyCount(replicaProto.getKeyCount()) + .setBytesUsed(replicaProto.getUsed()) .build(); if (replica.getState().equals(State.DELETED)) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java index 8bfcb848ecb..5118cc87fb1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java @@ -39,15 +39,19 @@ public final class ContainerReplica implements Comparable { final private UUID placeOfBirth; private Long sequenceId; + final private long keyCount; + final private long bytesUsed; private ContainerReplica(final ContainerID containerID, final ContainerReplicaProto.State state, final DatanodeDetails datanode, - final UUID originNodeId) { + final UUID originNodeId, long keyNum, long dataSize) { this.containerID = containerID; this.state = state; this.datanodeDetails = datanode; this.placeOfBirth = originNodeId; + this.keyCount = keyNum; + this.bytesUsed = dataSize; } private void setSequenceId(Long seqId) { @@ -90,6 +94,24 @@ public Long getSequenceId() { return sequenceId; } + /** + * Returns the key count of of this replica. + * + * @return Key count + */ + public long getKeyCount() { + return keyCount; + } + + /** + * Returns the data size of this replica. + * + * @return Data size + */ + public long getBytesUsed() { + return bytesUsed; + } + @Override public int hashCode() { return new HashCodeBuilder(61, 71) @@ -141,6 +163,8 @@ public String toString() { ", datanodeDetails=" + datanodeDetails + ", placeOfBirth=" + placeOfBirth + ", sequenceId=" + sequenceId + + ", keyCount=" + keyCount + + ", bytesUsed=" + bytesUsed + '}'; } @@ -154,6 +178,8 @@ public static class ContainerReplicaBuilder { private DatanodeDetails datanode; private UUID placeOfBirth; private Long sequenceId; + private long bytesUsed; + private long keyCount; /** * Set Container Id. @@ -207,6 +233,16 @@ public ContainerReplicaBuilder setSequenceId(long seqId) { return this; } + public ContainerReplicaBuilder setKeyCount(long count) { + keyCount = count; + return this; + } + + public ContainerReplicaBuilder setBytesUsed(long used) { + bytesUsed = used; + return this; + } + /** * Constructs new ContainerReplicaBuilder. * @@ -221,11 +257,10 @@ public ContainerReplica build() { "DatanodeDetails can't be null"); ContainerReplica replica = new ContainerReplica( containerID, state, datanode, - Optional.ofNullable(placeOfBirth).orElse(datanode.getUuid())); + Optional.ofNullable(placeOfBirth).orElse(datanode.getUuid()), + keyCount, bytesUsed); Optional.ofNullable(sequenceId).ifPresent(replica::setSequenceId); return replica; } } - - } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 7a250687c7e..9914f8950e0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -312,6 +312,13 @@ private void processContainer(ContainerID id) { action -> replicas.stream() .noneMatch(r -> r.getDatanodeDetails().equals(action.datanode))); + /* + * If the container is in CLOSED state, check and update it's key count + * and bytes used statistics if needed. + */ + if (state == LifeCycleState.CLOSED) { + checkAndUpdateContainerInfo(container, replicas); + } /* * We don't have to take any action if the container is healthy. @@ -766,6 +773,32 @@ private void handleUnstableContainer(final ContainerInfo container, } + /** + * Check and update Container key count and used bytes based on it's replica's + * data. + */ + private void checkAndUpdateContainerInfo(final ContainerInfo container, + final Set replicas) { + // check container key count and bytes used + long maxUsedBytes = 0; + long maxKeyCount = 0; + ContainerReplica[] rps = replicas.toArray(new ContainerReplica[0]); + for (int i = 0; i < rps.length; i++) { + maxUsedBytes = Math.max(maxUsedBytes, rps[i].getBytesUsed()); + maxKeyCount = Math.max(maxKeyCount, rps[i].getKeyCount()); + } + if (maxKeyCount < container.getNumberOfKeys()) { + LOG.debug("Container {} key count changed from {} to {}", + container.containerID(), container.getNumberOfKeys(), maxKeyCount); + container.setNumberOfKeys(maxKeyCount); + } + if (maxUsedBytes < container.getUsedBytes()) { + LOG.debug("Container {} used bytes changed from {} to {}", + container.containerID(), container.getUsedBytes(), maxUsedBytes); + container.setUsedBytes(maxUsedBytes); + } + } + /** * Sends close container command for the given container to the given * datanode. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index 34177f08864..f7ad7973f84 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -389,13 +389,17 @@ public void updateDeleteTransactionId(Map deleteTransactionMap) ContainerID containerIdObject = new ContainerID(containerID); ContainerInfo containerInfo = containerStore.get(containerIdObject); - if (containerInfo == null) { + ContainerInfo containerInfoInMem = containerStateManager + .getContainer(containerIdObject); + if (containerInfo == null || containerInfoInMem == null) { throw new SCMException( "Failed to increment number of deleted blocks for container " + containerID + ", reason : " + "container doesn't exist.", SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); } containerInfo.updateDeleteTransactionId(entry.getValue()); + containerInfo.setNumberOfKeys(containerInfoInMem.getNumberOfKeys()); + containerInfo.setUsedBytes(containerInfoInMem.getUsedBytes()); containerStore .putWithBatch(batchOperation, containerIdObject, containerInfo); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java index 2023e0e4cef..dd543ed7841 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java @@ -23,7 +23,9 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; @@ -75,6 +77,22 @@ public static void closeContainers( }, omKeyLocationInfoGroups); } + /** + * Close all containers. + * + * @param eventPublisher event publisher. + * @param scm StorageContainerManager instance. + * @return true if close containers is successful. + * @throws IOException + */ + public static void closeAllContainers(EventPublisher eventPublisher, + StorageContainerManager scm) { + for (ContainerID containerID : + scm.getContainerManager().getContainerIDs()) { + eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID); + } + } + /** * Performs the provided consumer on containers which contain the blocks * listed in omKeyLocationInfoGroups. @@ -85,7 +103,7 @@ public static void closeContainers( */ public static void performOperationOnKeyContainers( CheckedConsumer consumer, - List omKeyLocationInfoGroups) throws Exception { + List omKeyLocationInfoGroups) throws Exception{ for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyLocationInfoGroups) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index 97a27c1be6e..e451c07f56c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -29,8 +29,10 @@ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; @@ -112,6 +114,10 @@ public static void init() throws Exception { conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setBoolean(ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, + false); + conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) @@ -180,10 +186,8 @@ public void testBlockDeletion() throws Exception { } // close the containers which hold the blocks for the key - OzoneTestUtils.closeContainers(omKeyLocationInfoGroupList, scm); - - waitForDatanodeCommandRetry(); - + OzoneTestUtils.closeAllContainers(scm.getEventQueue(), scm); + Thread.sleep(2000); // make sure the containers are closed on the dn omKeyLocationInfoGroupList.forEach((group) -> { List locationInfo = group.getLocationList(); @@ -193,6 +197,9 @@ public void testBlockDeletion() throws Exception { .getContainer(info.getContainerID()).getContainerData() .setState(ContainerProtos.ContainerDataProto.State.CLOSED)); }); + + waitForDatanodeCommandRetry(); + waitForDatanodeBlockDeletionStart(); // The blocks should be deleted in the DN. verifyBlocksDeleted(omKeyLocationInfoGroupList); @@ -214,6 +221,67 @@ public void testBlockDeletion() throws Exception { verifyTransactionsCommitted(); } + @Test + public void testContainerStatistics() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + String value = RandomStringUtils.random(1000000); + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + String keyName = UUID.randomUUID().toString(); + OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length, + ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>()); + out.write(value.getBytes()); + out.close(); + + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setKeyName(keyName).setDataSize(0) + .setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE) + .setRefreshPipeline(true) + .build(); + List omKeyLocationInfoGroupList = + om.lookupKey(keyArgs).getKeyLocationVersions(); + Thread.sleep(5000); + List containerInfos = + scm.getContainerManager().getContainers(); + final int valueSize = value.getBytes().length; + final int keyCount = 1; + containerInfos.stream().forEach(container -> { + Assert.assertEquals(valueSize, container.getUsedBytes()); + Assert.assertEquals(keyCount, container.getNumberOfKeys()); + }); + + OzoneTestUtils.closeAllContainers(scm.getEventQueue(), scm); + // Wait for container to close + Thread.sleep(2000); + // make sure the containers are closed on the dn + omKeyLocationInfoGroupList.forEach((group) -> { + List locationInfo = group.getLocationList(); + locationInfo.forEach( + (info) -> cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet() + .getContainer(info.getContainerID()).getContainerData() + .setState(ContainerProtos.ContainerDataProto.State.CLOSED)); + }); + + om.deleteKey(keyArgs); + // Want for blocks to be deleted + Thread.sleep(5000); + scm.getReplicationManager().processContainersNow(); + // Wait for container statistics change + Thread.sleep(1000); + containerInfos = scm.getContainerManager().getContainers(); + containerInfos.stream().forEach(container -> { + Assert.assertEquals(0, container.getUsedBytes()); + Assert.assertEquals(0, container.getNumberOfKeys()); + }); + } + private void waitForDatanodeBlockDeletionStart() throws TimeoutException, InterruptedException { LogCapturer logCapturer = @@ -275,7 +343,7 @@ private void verifyPendingDeleteEvent() cluster.getHddsDatanodes().get(0) .getDatanodeStateMachine().triggerHeartbeat(); // wait for event to be handled by event handler - Thread.sleep(1000); + Thread.sleep(2000); String output = logCapturer.getOutput(); for (ContainerReplicaProto containerInfo : dummyReport.getReportsList()) { long containerId = containerInfo.getContainerID(); @@ -283,9 +351,6 @@ private void verifyPendingDeleteEvent() if (containerIdsWithDeletedBlocks.contains(containerId)) { Assert.assertTrue(output.contains( "for containerID " + containerId + ". Datanode delete txnID")); - } else { - Assert.assertTrue(!output.contains( - "for containerID " + containerId + ". Datanode delete txnID")); } } logCapturer.clearOutput(); @@ -304,9 +369,6 @@ private void matchContainerTransactionIds() throws IOException { scm.getContainerInfo(containerId).getDeleteTransactionId() > 0); maxTransactionId = max(maxTransactionId, scm.getContainerInfo(containerId).getDeleteTransactionId()); - } else { - Assert.assertEquals( - scm.getContainerInfo(containerId).getDeleteTransactionId(), 0); } Assert.assertEquals(((KeyValueContainerData)dnContainerSet .getContainer(containerId).getContainerData()) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 3f7429ab7dd..9a5b6a78ff6 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -86,7 +86,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine { private final boolean isTracingEnabled; // Map which contains index and term for the ratis transactions which are - // stateMachine entries which are recived through applyTransaction. + // stateMachine entries which are received through applyTransaction. private ConcurrentMap applyTransactionMap = new ConcurrentSkipListMap<>();