diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java index 549ba2af929b..32f1730a1581 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java @@ -57,6 +57,11 @@ public class ContainerInfo implements Comparator, private Instant stateEnterTime; private String owner; private long containerID; + // Delete Transaction Id is updated when new transaction for a container + // is stored in SCM delete Table. + // TODO: Replication Manager should consider deleteTransactionId so that + // replica with higher deleteTransactionId is preferred over replica with + // lower deleteTransactionId. private long deleteTransactionId; // The sequenceId of a close container cannot change, and all the // container replica should have the same sequenceId. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java index 25bf35090f5d..4185670c17e3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java @@ -38,14 +38,12 @@ class DatanodeDeletedBlockTransactions { // counts blocks deleted across datanodes. Blocks deleted will be counted // for all the replicas and may not be unique. private int blocksDeleted = 0; - private final Map containerIdToTxnId = new HashMap<>(); DatanodeDeletedBlockTransactions() { } void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) { transactions.computeIfAbsent(dnID, k -> new LinkedList<>()).add(tx); - containerIdToTxnId.put(tx.getContainerID(), tx.getTxID()); blocksDeleted += tx.getLocalIDCount(); if (SCMBlockDeletingService.LOG.isDebugEnabled()) { SCMBlockDeletingService.LOG @@ -57,10 +55,6 @@ Map> getDatanodeTransactionMap() { return transactions; } - Map getContainerIdToTxnIdMap() { - return containerIdToTxnId; - } - int getBlocksDeleted() { return blocksDeleted; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java index e459ce8c38a1..b59c012c46d7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java @@ -110,6 +110,7 @@ public DeletedBlockLogImplV2(ConfigurationSource conf, .newBuilder() .setConfiguration(conf) .setDeletedBlocksTable(deletedBlocksTXTable) + .setContainerManager(containerManager) .setRatisServer(ratisServer) .setSCMDBTransactionBuffer(dbTxBuffer) .build(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java index d488972d2764..80064ceab1e4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java @@ -21,6 +21,8 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler; import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; @@ -34,6 +36,8 @@ import java.io.IOException; import java.lang.reflect.Proxy; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -52,18 +56,19 @@ public class DeletedBlockLogStateManagerImpl LoggerFactory.getLogger(DeletedBlockLogStateManagerImpl.class); private Table deletedTable; + private ContainerManagerV2 containerManager; private final DBTransactionBuffer transactionBuffer; private final int maxRetry; private final Set deletingTxIDs; private final Set skippingRetryTxIDs; - public DeletedBlockLogStateManagerImpl( - ConfigurationSource conf, + public DeletedBlockLogStateManagerImpl(ConfigurationSource conf, Table deletedTable, - DBTransactionBuffer txBuffer) { + ContainerManagerV2 containerManager, DBTransactionBuffer txBuffer) { this.maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT); this.deletedTable = deletedTable; + this.containerManager = containerManager; this.transactionBuffer = txBuffer; final boolean isRatisEnabled = SCMHAUtils.isSCMHAEnabled(conf); this.deletingTxIDs = isRatisEnabled ? ConcurrentHashMap.newKeySet() : null; @@ -167,9 +172,14 @@ public void removeFromDB() throws IOException { @Override public void addTransactionsToDB(ArrayList txs) throws IOException { + Map containerIdToTxnIdMap = new HashMap<>(); for (DeletedBlocksTransaction tx : txs) { + long tid = tx.getTxID(); + containerIdToTxnIdMap.compute(ContainerID.valueOf(tx.getContainerID()), + (k, v) -> v != null && v > tid ? v : tid); transactionBuffer.addToBuffer(deletedTable, tx.getTxID(), tx); } + containerManager.updateDeleteTransactionId(containerIdToTxnIdMap); } @Override @@ -242,6 +252,7 @@ public static class Builder { private SCMRatisServer scmRatisServer; private Table table; private DBTransactionBuffer transactionBuffer; + private ContainerManagerV2 containerManager; public Builder setConfiguration(final ConfigurationSource config) { conf = config; @@ -264,12 +275,18 @@ public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) { return this; } + public Builder setContainerManager(ContainerManagerV2 contManager) { + this.containerManager = contManager; + return this; + } + public DeletedBlockLogStateManager build() { Preconditions.checkNotNull(conf); Preconditions.checkNotNull(table); final DeletedBlockLogStateManager impl = - new DeletedBlockLogStateManagerImpl(conf, table, transactionBuffer); + new DeletedBlockLogStateManagerImpl(conf, table, containerManager, + transactionBuffer); final SCMHAInvocationHandler invocationHandler = new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.BLOCK, @@ -280,6 +297,5 @@ public DeletedBlockLogStateManager build() { new Class[]{DeletedBlockLogStateManager.class}, invocationHandler); } - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java deleted file mode 100644 index 4090f6bb8731..000000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.block; - -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; - -/** - * Event handler for PedingDeleteStatuList events. - */ -public class PendingDeleteHandler implements - EventHandler { - - private SCMBlockDeletingService scmBlockDeletingService; - - public PendingDeleteHandler( - SCMBlockDeletingService scmBlockDeletingService) { - this.scmBlockDeletingService = scmBlockDeletingService; - } - - @Override - public void onMessage(PendingDeleteStatusList pendingDeleteStatusList, - EventPublisher publisher) { - scmBlockDeletingService.handlePendingDeletes(pendingDeleteStatusList); - } -} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java deleted file mode 100644 index ee64c488cd3e..000000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.block; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; - -import java.util.ArrayList; -import java.util.List; - -/** - * Pending Deletes in the block space. - */ -public class PendingDeleteStatusList { - - private List pendingDeleteStatuses; - private DatanodeDetails datanodeDetails; - - public PendingDeleteStatusList(DatanodeDetails datanodeDetails) { - this.datanodeDetails = datanodeDetails; - pendingDeleteStatuses = new ArrayList<>(); - } - - public void addPendingDeleteStatus(long dnDeleteTransactionId, - long scmDeleteTransactionId, long containerId) { - pendingDeleteStatuses.add( - new PendingDeleteStatus(dnDeleteTransactionId, scmDeleteTransactionId, - containerId)); - } - - /** - * Status of pending deletes. - */ - public static class PendingDeleteStatus { - private long dnDeleteTransactionId; - private long scmDeleteTransactionId; - private long containerId; - - public PendingDeleteStatus(long dnDeleteTransactionId, - long scmDeleteTransactionId, long containerId) { - this.dnDeleteTransactionId = dnDeleteTransactionId; - this.scmDeleteTransactionId = scmDeleteTransactionId; - this.containerId = containerId; - } - - public long getDnDeleteTransactionId() { - return dnDeleteTransactionId; - } - - public long getScmDeleteTransactionId() { - return scmDeleteTransactionId; - } - - public long getContainerId() { - return containerId; - } - - } - - public List getPendingDeleteStatuses() { - return pendingDeleteStatuses; - } - - public int getNumPendingDeletes() { - return pendingDeleteStatuses.size(); - } - - public DatanodeDetails getDatanodeDetails() { - return datanodeDetails; - } -} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index 46ddc4a773bf..6de5d79dc725 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.time.Duration; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -30,7 +29,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.scm.ScmConfig; -import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; @@ -112,21 +110,6 @@ public BackgroundTaskQueue getTasks() { return queue; } - void handlePendingDeletes(PendingDeleteStatusList deletionStatusList) { - DatanodeDetails dnDetails = deletionStatusList.getDatanodeDetails(); - for (PendingDeleteStatusList.PendingDeleteStatus deletionStatus : - deletionStatusList.getPendingDeleteStatuses()) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Block deletion txnID lagging in datanode {} for containerID {}." - + " Datanode delete txnID: {}, SCM txnID: {}", - dnDetails.getUuid(), deletionStatus.getContainerId(), - deletionStatus.getDnDeleteTransactionId(), - deletionStatus.getScmDeleteTransactionId()); - } - } - } - private class DeletedBlockTransactionScanner implements BackgroundTask { @Override @@ -155,8 +138,6 @@ public EmptyTaskResult call() throws Exception { try { DatanodeDeletedBlockTransactions transactions = deletedBlockLog.getTransactions(blockDeleteLimitSize); - Map containerIdToMaxTxnId = - transactions.getContainerIdToTxnIdMap(); if (transactions.isEmpty()) { return EmptyTaskResult.newResult(); @@ -186,11 +167,6 @@ public EmptyTaskResult call() throws Exception { } } // TODO: Fix ME!!! - Map transactionMap = new HashMap<>(); - for (Map.Entry tx : containerIdToMaxTxnId.entrySet()) { - transactionMap.put(ContainerID.valueOf(tx.getKey()), tx.getValue()); - } - containerManager.updateDeleteTransactionId(transactionMap); LOG.info("Totally added {} blocks to be deleted for" + " {} datanodes, task elapsed time: {}ms", transactions.getBlocksDeleted(), diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index ff37283bdd9b..4630c17450ed 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -25,8 +25,6 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.ScmConfig; -import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; -import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -134,7 +132,6 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode, processContainerReplicas(datanodeDetails, replicas, publisher); processMissingReplicas(datanodeDetails, missingReplicas); - updateDeleteTransaction(datanodeDetails, replicas, publisher); /* * Update the latest set of containers for this datanode in @@ -213,39 +210,4 @@ private void processMissingReplicas(final DatanodeDetails datanodeDetails, } } } - - /** - * Updates the Delete Transaction Id for the given datanode. - * - * @param datanodeDetails DatanodeDetails - * @param replicas List of ContainerReplicaProto - * @param publisher EventPublisher reference - */ - private void updateDeleteTransaction(final DatanodeDetails datanodeDetails, - final List replicas, - final EventPublisher publisher) { - final PendingDeleteStatusList pendingDeleteStatusList = - new PendingDeleteStatusList(datanodeDetails); - for (ContainerReplicaProto replica : replicas) { - try { - final ContainerInfo containerInfo = containerManager.getContainer( - ContainerID.valueOf(replica.getContainerID())); - if (containerInfo.getDeleteTransactionId() > - replica.getDeleteTransactionId()) { - pendingDeleteStatusList.addPendingDeleteStatus( - replica.getDeleteTransactionId(), - containerInfo.getDeleteTransactionId(), - containerInfo.getContainerID()); - } - } catch (ContainerNotFoundException cnfe) { - LOG.warn("Cannot update pending delete transaction for " + - "container #{}. Reason: container missing.", - replica.getContainerID()); - } - } - if (pendingDeleteStatusList.getNumPendingDeletes() > 0) { - publisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS, - pendingDeleteStatusList); - } - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java index d059e10efa36..2abc6d02bdc6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java @@ -416,7 +416,7 @@ public void updateDeleteTransactionId( final ContainerInfo info = containers.getContainerInfo( transaction.getKey()); info.updateDeleteTransactionId(transaction.getValue()); - containerStore.put(info.containerID(), info); + transactionBuffer.addToBuffer(containerStore, info.containerID(), info); } } finally { lock.writeLock().unlock(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 46a4bd8b2c59..49f2b43995b6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.events; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -184,14 +183,6 @@ public final class SCMEvents { new TypedEvent<>(CommandStatusReportHandler.DeleteBlockStatus.class, "Delete_Block_Status"); - /** - * This event will be triggered while processing container reports from DN - * when deleteTransactionID of container in report mismatches with the - * deleteTransactionID on SCM. - */ - public static final Event PENDING_DELETE_STATUS = - new TypedEvent<>(PendingDeleteStatusList.class, "Pending_Delete_Status"); - public static final TypedEvent SAFE_MODE_STATUS = new TypedEvent<>(SafeModeStatus.class, "Safe mode status"); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 8cd3357c3681..97951aaca5ef 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -69,7 +69,6 @@ import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2; -import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; @@ -364,8 +363,6 @@ private StorageContainerManager(OzoneConfiguration conf, NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler = new NonHealthyToHealthyNodeHandler(conf, serviceManager); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); - PendingDeleteHandler pendingDeleteHandler = - new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService()); ContainerReportHandler containerReportHandler = new ContainerReportHandler( @@ -407,8 +404,6 @@ private StorageContainerManager(OzoneConfiguration conf, eventQueue.addHandler(SCMEvents.START_ADMIN_ON_NODE, datanodeStartAdminHandler); eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler); - eventQueue - .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler); eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS, (DeletedBlockLogImplV2) scmBlockManager.getDeletedBlockLog()); eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java index 924a57f56ec3..04c0309fdf36 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java @@ -72,6 +72,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys .OZONE_SCM_BLOCK_DELETION_MAX_RETRY; import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; /** @@ -84,9 +85,12 @@ public class TestDeletedBlockLog { private OzoneConfiguration conf; private File testDir; private ContainerManagerV2 containerManager; + private Table containerTable; private StorageContainerManager scm; private List dnList; private SCMHADBTransactionBuffer scmHADBTransactionBuffer; + private Map containers = new HashMap<>(); + private Map> replicas = new HashMap<>(); @Before public void setup() throws Exception { @@ -98,6 +102,7 @@ public void setup() throws Exception { conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); scm = TestUtils.getScm(conf); containerManager = Mockito.mock(ContainerManagerV2.class); + containerTable = scm.getScmMetadataStore().getContainerTable(); scmHADBTransactionBuffer = new MockSCMHADBTransactionBuffer(scm.getScmMetadataStore().getStore()); deletedBlockLog = new DeletedBlockLogImplV2(conf, @@ -122,10 +127,44 @@ private void setupContainerManager() throws IOException { DatanodeDetails.newBuilder().setUuid(UUID.randomUUID()) .build()); + when(containerManager.getContainerReplicas(anyObject())) + .thenAnswer(invocationOnMock -> { + ContainerID cid = (ContainerID) invocationOnMock.getArguments()[0]; + return replicas.get(cid.getId()); + }); + when(containerManager.getContainer(anyObject())) + .thenAnswer(invocationOnMock -> { + ContainerID cid = (ContainerID) invocationOnMock.getArguments()[0]; + return containerTable.get(cid); + }); + when(containerManager.getContainers()) + .thenReturn(new ArrayList<>(containers.values())); + doAnswer(invocationOnMock -> { + Map map = + (Map) invocationOnMock.getArguments()[0]; + for (Map.Entry e : map.entrySet()) { + ContainerInfo info = containers.get(e.getKey().getId()); + try { + Assert.assertTrue(e.getValue() > info.getDeleteTransactionId()); + } catch (AssertionError err) { + throw new Exception("New TxnId " + e.getValue() + " < " + info + .getDeleteTransactionId()); + } + info.updateDeleteTransactionId(e.getValue()); + scmHADBTransactionBuffer.addToBuffer(containerTable, e.getKey(), info); + } + return null; + }).when(containerManager).updateDeleteTransactionId(anyObject()); + } + + private void updateContainerMetadata(long cid) throws IOException { final ContainerInfo container = - new ContainerInfo.Builder().setContainerID(1) + new ContainerInfo.Builder() + .setContainerID(cid) .setReplicationConfig(new RatisReplicationConfig(THREE)) .setState(HddsProtos.LifeCycleState.CLOSED) + .setOwner("TestDeletedBlockLog") + .setPipelineID(PipelineID.randomId()) .build(); final Set replicaSet = dnList.stream() .map(datanodeDetails -> ContainerReplica.newBuilder() @@ -134,11 +173,9 @@ private void setupContainerManager() throws IOException { .setDatanodeDetails(datanodeDetails) .build()) .collect(Collectors.toSet()); - - when(containerManager.getContainerReplicas(anyObject())) - .thenReturn(replicaSet); - when(containerManager.getContainer(anyObject())) - .thenReturn(container); + containers.put(cid, container); + containerTable.put(ContainerID.valueOf(cid), container); + replicas.put(cid, replicaSet); } @After @@ -149,13 +186,14 @@ public void tearDown() throws Exception { FileUtils.deleteDirectory(testDir); } - private Map> generateData(int dataSize) { + private Map> generateData(int dataSize) throws IOException { Map> blockMap = new HashMap<>(); Random random = new Random(1); int continerIDBase = random.nextInt(100); int localIDBase = random.nextInt(1000); for (int i = 0; i < dataSize; i++) { long containerID = continerIDBase + i; + updateContainerMetadata(containerID); List blocks = new ArrayList<>(); for (int j = 0; j < BLOCKS_PER_TXN; j++) { long localID = localIDBase + j; @@ -166,10 +204,13 @@ private Map> generateData(int dataSize) { return blockMap; } - private void addTransactions(Map> containerBlocksMap) + private void addTransactions(Map> containerBlocksMap, + boolean shouldFlush) throws IOException { deletedBlockLog.addTransactions(containerBlocksMap); - scmHADBTransactionBuffer.flush(); + if (shouldFlush) { + scmHADBTransactionBuffer.flush(); + } } private void incrementCount(List txIDs) throws IOException { @@ -230,12 +271,37 @@ private List getTransactions( return txns.stream().distinct().collect(Collectors.toList()); } + @Test + public void testContainerManagerTransactionId() throws Exception { + // Initially all containers should have deleteTransactionId as 0 + for (ContainerInfo containerInfo : containerManager.getContainers()) { + Assert.assertEquals(0, containerInfo.getDeleteTransactionId()); + } + + // Create 30 TXs + addTransactions(generateData(30), false); + // Since transactions are not yet flushed deleteTransactionId should be + // 0 for all containers + Assert.assertEquals(0, getTransactions(1000).size()); + for (ContainerInfo containerInfo : containerManager.getContainers()) { + Assert.assertEquals(0, containerInfo.getDeleteTransactionId()); + } + + scmHADBTransactionBuffer.flush(); + // After flush there should be 30 transactions in deleteTable + // All containers should have positive deleteTransactionId + Assert.assertEquals(30, getTransactions(1000).size()); + for (ContainerInfo containerInfo : containerManager.getContainers()) { + Assert.assertTrue(containerInfo.getDeleteTransactionId() > 0); + } + } + @Test public void testIncrementCount() throws Exception { int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20); // Create 30 TXs in the log. - addTransactions(generateData(30)); + addTransactions(generateData(30), true); // This will return all TXs, total num 30. List blocks = @@ -262,7 +328,7 @@ public void testIncrementCount() throws Exception { @Test public void testCommitTransactions() throws Exception { - addTransactions(generateData(50)); + addTransactions(generateData(50), true); List blocks = getTransactions(20 * BLOCKS_PER_TXN); // Add an invalid txn. @@ -296,7 +362,7 @@ public void testRandomOperateTransactions() throws Exception { for (int i = 0; i < 100; i++) { int state = random.nextInt(4); if (state == 0) { - addTransactions(generateData(10)); + addTransactions(generateData(10), true); added += 10; } else if (state == 1) { blocks = getTransactions(20); @@ -326,7 +392,7 @@ public void testRandomOperateTransactions() throws Exception { @Test public void testPersistence() throws Exception { - addTransactions(generateData(50)); + addTransactions(generateData(50), true); // close db and reopen it again to make sure // transactions are stored persistently. deletedBlockLog.close(); @@ -371,7 +437,7 @@ public void testDeletedBlockTransactions() throws IOException { // Creates {TXNum} TX in the log. Map> deletedBlocks = generateData(txNum); - addTransactions(deletedBlocks); + addTransactions(deletedBlocks, true); for (Map.Entry> entry :deletedBlocks.entrySet()) { count++; containerID = entry.getKey(); @@ -399,7 +465,7 @@ public void testDeletedBlockTransactions() throws IOException { builder.setCount(0); Map> deletedBlocksMap = new HashMap<>(); deletedBlocksMap.put(containerID, new LinkedList<>()); - addTransactions(deletedBlocksMap); + addTransactions(deletedBlocksMap, true); // get should return two transactions for the same container blocks = getTransactions(txNum); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index addd15ca7533..953f396e0929 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -25,10 +25,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; @@ -236,9 +232,6 @@ public void testBlockDeletion() throws Exception { cluster.restartHddsDatanode(0, true); matchContainerTransactionIds(); - // verify PENDING_DELETE_STATUS event is fired - verifyPendingDeleteEvent(); - // Verify transactions committed verifyTransactionsCommitted(); } @@ -371,46 +364,6 @@ private void verifyTransactionsCommitted() throws IOException { } } - private void verifyPendingDeleteEvent() - throws IOException, InterruptedException { - ContainerSet dnContainerSet = - cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() - .getContainer().getContainerSet(); - LogCapturer logCapturer = - LogCapturer.captureLogs(SCMBlockDeletingService.LOG); - // Create dummy container reports with deleteTransactionId set as 0 - ContainerReportsProto containerReport = dnContainerSet.getContainerReport(); - ContainerReportsProto.Builder dummyReportsBuilder = - ContainerReportsProto.newBuilder(); - for (ContainerReplicaProto containerInfo : - containerReport.getReportsList()) { - dummyReportsBuilder.addReports( - ContainerReplicaProto.newBuilder(containerInfo) - .setDeleteTransactionId(0) - .build()); - } - ContainerReportsProto dummyReport = dummyReportsBuilder.build(); - - logCapturer.clearOutput(); - cluster.getHddsDatanodes().get(0) - .getDatanodeStateMachine().getContext(). - addIncrementalReport(dummyReport); - cluster.getHddsDatanodes().get(0) - .getDatanodeStateMachine().triggerHeartbeat(); - // wait for event to be handled by event handler - Thread.sleep(2000); - String output = logCapturer.getOutput(); - for (ContainerReplicaProto containerInfo : dummyReport.getReportsList()) { - long containerId = containerInfo.getContainerID(); - // Event should be triggered only for containers which have deleted blocks - if (containerIdsWithDeletedBlocks.contains(containerId)) { - Assert.assertTrue(output.contains( - "for containerID " + containerId + ". Datanode delete txnID")); - } - } - logCapturer.clearOutput(); - } - private void matchContainerTransactionIds() throws IOException { ContainerSet dnContainerSet = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()