diff --git a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto index 8818a837379f..c0938e8c9325 100644 --- a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto @@ -23,6 +23,7 @@ option java_generate_equals_and_hash = true; enum RequestType { PIPELINE = 1; CONTAINER = 2; + BLOCK = 3; } message Method { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index fb5d5d521c49..8acf985926da 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -101,8 +101,11 @@ public BlockManagerImpl(final ConfigurationSource conf, mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this); // SCM block deleting transaction log and deleting service. - deletedBlockLog = new DeletedBlockLogImpl(conf, scm.getContainerManager(), - scm.getScmMetadataStore()); + deletedBlockLog = new DeletedBlockLogImplV2(conf, scm.getContainerManager(), + scm.getScmHAManager().getRatisServer(), + scm.getScmMetadataStore().getDeletedBlocksTXTable(), + scm.getScmHAManager().getDBTransactionBuffer(), + scm.getScmContext()); Duration svcInterval = conf.getObject( ScmConfig.class).getBlockDeletionInterval(); long serviceTimeout = diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java index 9a5d74f0af9c..ddcd2d168316 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java @@ -80,17 +80,6 @@ void incrementCount(List txIDs) void commitTransactions(List transactionResults, UUID dnID); - /** - * Creates a block deletion transaction and adds that into the log. - * - * @param containerID - container ID. - * @param blocks - blocks that belong to the same container. - * - * @throws IOException - */ - void addTransaction(long containerID, List blocks) - throws IOException; - /** * Creates block deletion transactions for a set of containers, * add into the log and persist them atomically. An object key diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index 8a46b66e15e4..b61f135c94ec 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -260,27 +260,6 @@ private boolean isTransactionFailed(DeleteBlockTransactionResult result) { return false; } - /** - * {@inheritDoc} - * - * @param containerID - container ID. - * @param blocks - blocks that belong to the same container. - * @throws IOException - */ - @Override - public void addTransaction(long containerID, List blocks) - throws IOException { - lock.lock(); - try { - Long nextTXID = scmMetadataStore.getNextDeleteBlockTXID(); - DeletedBlocksTransaction tx = - constructNewTransaction(nextTXID, containerID, blocks); - scmMetadataStore.getDeletedBlocksTXTable().put(nextTXID, tx); - } finally { - lock.unlock(); - } - } - @Override public int getNumOfValidTransactions() throws IOException { lock.lock(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java new file mode 100644 index 000000000000..179d22811f09 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java @@ -0,0 +1,398 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.block; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.Set; +import java.util.Map; +import java.util.LinkedHashSet; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.utils.UniqueId; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; + +import com.google.common.collect.Lists; +import static java.lang.Math.min; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A implement class of {@link DeletedBlockLog}, and it uses + * K/V db to maintain block deletion transactions between scm and datanode. + * This is a very basic implementation, it simply scans the log and + * memorize the position that scanned by last time, and uses this to + * determine where the next scan starts. It has no notion about weight + * of each transaction so as long as transaction is still valid, they get + * equally same chance to be retrieved which only depends on the nature + * order of the transaction ID. + */ +public class DeletedBlockLogImplV2 + implements DeletedBlockLog, EventHandler { + + public static final Logger LOG = + LoggerFactory.getLogger(DeletedBlockLogImpl.class); + + private final int maxRetry; + private final ContainerManagerV2 containerManager; + private final Lock lock; + // Maps txId to set of DNs which are successful in committing the transaction + private Map> transactionToDNsCommitMap; + // The access to DeletedBlocksTXTable is protected by + // DeletedBlockLogStateManager. + private final DeletedBlockLogStateManager deletedBlockLogStateManager; + private final SCMContext scmContext; + + public DeletedBlockLogImplV2(ConfigurationSource conf, + ContainerManagerV2 containerManager, + SCMRatisServer ratisServer, + Table deletedBlocksTXTable, + DBTransactionBuffer dbTxBuffer, + SCMContext scmContext) { + maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, + OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT); + this.containerManager = containerManager; + this.lock = new ReentrantLock(); + + // transactionToDNsCommitMap is updated only when + // transaction is added to the log and when it is removed. + + // maps transaction to dns which have committed it. + transactionToDNsCommitMap = new ConcurrentHashMap<>(); + this.deletedBlockLogStateManager = DeletedBlockLogStateManagerImpl + .newBuilder() + .setConfiguration(conf) + .setDeletedBlocksTable(deletedBlocksTXTable) + .setRatisServer(ratisServer) + .setSCMDBTransactionBuffer(dbTxBuffer) + .build(); + this.scmContext = scmContext; + } + + + @Override + public List getFailedTransactions() + throws IOException { + lock.lock(); + try { + final List failedTXs = Lists.newArrayList(); + try (TableIterator> iter = + deletedBlockLogStateManager.getReadOnlyIterator()) { + while (iter.hasNext()) { + DeletedBlocksTransaction delTX = iter.next().getValue(); + if (delTX.getCount() == -1) { + failedTXs.add(delTX); + } + } + } + return failedTXs; + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + * + * @param txIDs - transaction ID. + * @throws IOException + */ + @Override + public void incrementCount(List txIDs) throws IOException { + lock.lock(); + try { + deletedBlockLogStateManager + .increaseRetryCountOfTransactionInDB(new ArrayList<>(txIDs)); + } finally { + lock.unlock(); + } + } + + + private DeletedBlocksTransaction constructNewTransaction( + long txID, long containerID, List blocks) { + return DeletedBlocksTransaction.newBuilder() + .setTxID(txID) + .setContainerID(containerID) + .addAllLocalID(blocks) + .setCount(0) + .build(); + } + + /** + * {@inheritDoc} + * + * @param transactionResults - transaction IDs. + * @param dnID - Id of Datanode which has acknowledged + * a delete block command. + * @throws IOException + */ + @Override + public void commitTransactions( + List transactionResults, UUID dnID) { + lock.lock(); + try { + ArrayList txIDsToBeDeleted = new ArrayList<>(); + Set dnsWithCommittedTxn; + for (DeleteBlockTransactionResult transactionResult : + transactionResults) { + if (isTransactionFailed(transactionResult)) { + continue; + } + try { + long txID = transactionResult.getTxID(); + // set of dns which have successfully committed transaction txId. + dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID); + final ContainerID containerId = ContainerID.valueOf( + transactionResult.getContainerID()); + if (dnsWithCommittedTxn == null) { + // Mostly likely it's a retried delete command response. + if (LOG.isDebugEnabled()) { + LOG.debug( + "Transaction txId={} commit by dnId={} for containerID={}" + + " failed. Corresponding entry not found.", txID, dnID, + containerId); + } + continue; + } + + dnsWithCommittedTxn.add(dnID); + final ContainerInfo container = + containerManager.getContainer(containerId); + final Set replicas = + containerManager.getContainerReplicas(containerId); + // The delete entry can be safely removed from the log if all the + // corresponding nodes commit the txn. It is required to check that + // the nodes returned in the pipeline match the replication factor. + if (min(replicas.size(), dnsWithCommittedTxn.size()) + >= container.getReplicationFactor().getNumber()) { + List containerDns = replicas.stream() + .map(ContainerReplica::getDatanodeDetails) + .map(DatanodeDetails::getUuid) + .collect(Collectors.toList()); + if (dnsWithCommittedTxn.containsAll(containerDns)) { + transactionToDNsCommitMap.remove(txID); + if (LOG.isDebugEnabled()) { + LOG.debug("Purging txId={} from block deletion log", txID); + } + txIDsToBeDeleted.add(txID); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Datanode txId={} containerId={} committed by dnId={}", + txID, containerId, dnID); + } + } catch (IOException e) { + LOG.warn("Could not commit delete block transaction: " + + transactionResult.getTxID(), e); + } + } + try { + deletedBlockLogStateManager.removeTransactionsFromDB(txIDsToBeDeleted); + } catch (IOException e) { + LOG.warn("Could not commit delete block transactions: " + + txIDsToBeDeleted, e); + } + } finally { + lock.unlock(); + } + } + + private boolean isTransactionFailed(DeleteBlockTransactionResult result) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Got block deletion ACK from datanode, TXIDs={}, " + "success={}", + result.getTxID(), result.getSuccess()); + } + if (!result.getSuccess()) { + LOG.warn("Got failed ACK for TXID={}, prepare to resend the " + + "TX in next interval", result.getTxID()); + return true; + } + return false; + } + + @Override + public int getNumOfValidTransactions() throws IOException { + lock.lock(); + try { + final AtomicInteger num = new AtomicInteger(0); + try (TableIterator> iter = + deletedBlockLogStateManager.getReadOnlyIterator()) { + while (iter.hasNext()) { + DeletedBlocksTransaction delTX = iter.next().getValue(); + if (delTX.getCount() > -1) { + num.incrementAndGet(); + } + } + } + return num.get(); + } finally { + lock.unlock(); + } + } + + /** + * Called in SCMStateMachine#notifyLeaderChanged when current SCM becomes + * leader. + */ + public void clearTransactionToDNsCommitMap() { + transactionToDNsCommitMap.clear(); + } + + /** + * Called in SCMDBTransactionBuffer#flush when the cached deleting operations + * are flushed. + */ + public void onFlush() { + deletedBlockLogStateManager.onFlush(); + } + + /** + * {@inheritDoc} + * + * @param containerBlocksMap a map of containerBlocks. + * @throws IOException + */ + @Override + public void addTransactions(Map> containerBlocksMap) + throws IOException { + lock.lock(); + try { + ArrayList txsToBeAdded = new ArrayList<>(); + for (Map.Entry< Long, List< Long > > entry : + containerBlocksMap.entrySet()) { + // TODO(runzhiwang): Should use distributed sequence id generator + long nextTXID = UniqueId.next(); + DeletedBlocksTransaction tx = constructNewTransaction(nextTXID, + entry.getKey(), entry.getValue()); + txsToBeAdded.add(tx); + } + + deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded); + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws IOException { + } + + private void getTransaction(DeletedBlocksTransaction tx, + DatanodeDeletedBlockTransactions transactions) { + try { + Set replicas = containerManager + .getContainerReplicas(ContainerID.valueOf(tx.getContainerID())); + for (ContainerReplica replica : replicas) { + UUID dnID = replica.getDatanodeDetails().getUuid(); + Set dnsWithTransactionCommitted = + transactionToDNsCommitMap.get(tx.getTxID()); + if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted + .contains(dnID)) { + // Transaction need not be sent to dns which have + // already committed it + transactions.addTransactionToDN(dnID, tx); + } + } + } catch (IOException e) { + LOG.warn("Got container info error.", e); + } + } + + @Override + public DatanodeDeletedBlockTransactions getTransactions( + int blockDeletionLimit) throws IOException { + lock.lock(); + try { + DatanodeDeletedBlockTransactions transactions = + new DatanodeDeletedBlockTransactions(); + try (TableIterator> iter = + deletedBlockLogStateManager.getReadOnlyIterator()) { + int numBlocksAdded = 0; + ArrayList txIDs = new ArrayList<>(); + while (iter.hasNext() && numBlocksAdded < blockDeletionLimit) { + Table.KeyValue keyValue = iter.next(); + DeletedBlocksTransaction txn = keyValue.getValue(); + final ContainerID id = ContainerID.valueOf(txn.getContainerID()); + try { + if (txn.getCount() > -1 && txn.getCount() <= maxRetry + && !containerManager.getContainer(id).isOpen()) { + numBlocksAdded += txn.getLocalIDCount(); + getTransaction(txn, transactions); + transactionToDNsCommitMap + .putIfAbsent(txn.getTxID(), new LinkedHashSet<>()); + } + } catch (ContainerNotFoundException ex) { + LOG.warn("Container: " + id + " was not found for the transaction: " + + txn); + txIDs.add(txn.getTxID()); + } + } + + deletedBlockLogStateManager.removeTransactionsFromDB(txIDs); + } + return transactions; + } finally { + lock.unlock(); + } + } + + @Override + public void onMessage( + DeleteBlockStatus deleteBlockStatus, EventPublisher publisher) { + if (!scmContext.isLeader()) { + LOG.warn("Skip commit transactions since current SCM is not leader."); + return; + } + + ContainerBlocksDeletionACKProto ackProto = + deleteBlockStatus.getCmdStatus().getBlockDeletionAck(); + commitTransactions(ackProto.getResultsList(), + UUID.fromString(ackProto.getDnId())); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManager.java new file mode 100644 index 000000000000..f152a1167e90 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManager.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.block; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.scm.metadata.Replicate; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.hdds.utils.db.Table.KeyValue; + +import java.io.IOException; +import java.util.ArrayList; + +public interface DeletedBlockLogStateManager { + @Replicate + void addTransactionsToDB(ArrayList txs) + throws IOException; + + @Replicate + void removeTransactionsFromDB(ArrayList txIDs) + throws IOException; + + @Replicate + void increaseRetryCountOfTransactionInDB(ArrayList txIDs) + throws IOException; + + TableIterator> getReadOnlyIterator(); + + void onFlush(); +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java new file mode 100644 index 000000000000..ab72c620c17a --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.block; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer; +import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler; +import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.hdds.utils.db.TypedTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT; + +public class DeletedBlockLogStateManagerImpl + implements DeletedBlockLogStateManager { + + public static final Logger LOG = + LoggerFactory.getLogger(DeletedBlockLogStateManagerImpl.class); + + private final Table deletedTable; + private final DBTransactionBuffer transactionBuffer; + private final int maxRetry; + private Set deletingTxIDs; + + public DeletedBlockLogStateManagerImpl( + ConfigurationSource conf, + Table deletedTable, + DBTransactionBuffer txBuffer) { + this.maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, + OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT); + this.deletedTable = deletedTable; + this.transactionBuffer = txBuffer; + this.deletingTxIDs = ConcurrentHashMap.newKeySet(); + } + + public TableIterator> getReadOnlyIterator() { + return new TableIterator>() { + + private TableIterator> iter = + deletedTable.iterator(); + private TypedTable.KeyValue nextTx; + + { + findNext(); + } + + private void findNext() { + while (iter.hasNext()) { + TypedTable.KeyValue next = iter + .next(); + long txID; + try { + txID = next.getKey(); + } catch (IOException e) { + throw new IllegalStateException(""); + } + + if (!deletingTxIDs.contains(txID)) { + nextTx = next; + if (LOG.isTraceEnabled()) { + LOG.trace("DeletedBlocksTransaction matching txID:{}", + txID); + } + return; + } + } + nextTx = null; + } + + @Override + public boolean hasNext() { + return nextTx != null; + } + + @Override + public TypedTable.KeyValue next() { + if (nextTx == null) { + throw new NoSuchElementException("DeletedBlocksTransaction " + + "Iterator reached end"); + } + TypedTable.KeyValue returnTx = nextTx; + findNext(); + return returnTx; + } + + @Override + public void close() throws IOException { + iter.close(); + } + + @Override + public void seekToFirst() { + throw new UnsupportedOperationException("seekToFirst"); + } + + @Override + public void seekToLast() { + throw new UnsupportedOperationException("seekToLast"); + } + + @Override + public TypedTable.KeyValue seek( + Long key) throws IOException { + throw new UnsupportedOperationException("seek"); + } + + @Override + public Long key() throws IOException { + throw new UnsupportedOperationException("key"); + } + + @Override + public TypedTable.KeyValue value() { + throw new UnsupportedOperationException("value"); + } + + @Override + public void removeFromDB() throws IOException { + throw new UnsupportedOperationException("read-only"); + } + }; + } + + @Override + public void addTransactionsToDB(ArrayList txs) + throws IOException { + for (DeletedBlocksTransaction tx : txs) { + deletedTable.putWithBatch( + transactionBuffer.getCurrentBatchOperation(), tx.getTxID(), tx); + } + } + + @Override + public void removeTransactionsFromDB(ArrayList txIDs) + throws IOException { + for (Long txID : txIDs) { + deletedTable.deleteWithBatch( + transactionBuffer.getCurrentBatchOperation(), txID); + } + } + + @Override + public void increaseRetryCountOfTransactionInDB( + ArrayList txIDs) throws IOException { + for (Long txID : txIDs) { + DeletedBlocksTransaction block = + deletedTable.get(txID); + if (block == null) { + if (LOG.isDebugEnabled()) { + // This can occur due to race condition between retry and old + // service task where old task removes the transaction and the new + // task is resending + LOG.debug("Deleted TXID {} not found.", txID); + } + continue; + } + DeletedBlocksTransaction.Builder builder = block.toBuilder(); + int currentCount = block.getCount(); + if (currentCount > -1) { + builder.setCount(++currentCount); + } + // if the retry time exceeds the maxRetry value + // then set the retry value to -1, stop retrying, admins can + // analyze those blocks and purge them manually by SCMCli. + if (currentCount > maxRetry) { + builder.setCount(-1); + } + deletedTable.putWithBatch( + transactionBuffer.getCurrentBatchOperation(), txID, builder.build()); + } + } + + + public void onFlush() { + deletingTxIDs.clear(); + LOG.info("Clear cached deletingTxIDs."); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for ContainerStateManager. + */ + public static class Builder { + private ConfigurationSource conf; + private SCMRatisServer scmRatisServer; + private Table table; + private DBTransactionBuffer transactionBuffer; + + public Builder setConfiguration(final ConfigurationSource config) { + conf = config; + return this; + } + + public Builder setRatisServer(final SCMRatisServer ratisServer) { + scmRatisServer = ratisServer; + return this; + } + + public Builder setDeletedBlocksTable( + final Table deletedBlocksTable) { + table = deletedBlocksTable; + return this; + } + + public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) { + this.transactionBuffer = buffer; + return this; + } + + public DeletedBlockLogStateManager build() { + Preconditions.checkNotNull(conf); + Preconditions.checkNotNull(scmRatisServer); + Preconditions.checkNotNull(table); + Preconditions.checkNotNull(table); + + final DeletedBlockLogStateManager impl = + new DeletedBlockLogStateManagerImpl(conf, table, transactionBuffer); + + final SCMHAInvocationHandler invocationHandler = + new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.BLOCK, + impl, scmRatisServer); + + return (DeletedBlockLogStateManager) Proxy.newProxyInstance( + SCMHAInvocationHandler.class.getClassLoader(), + new Class[]{DeletedBlockLogStateManager.class}, + invocationHandler); + } + + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java index e5af07666da2..451ed1ba766f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java @@ -16,7 +16,11 @@ */ package org.apache.hadoop.hdds.scm.ha; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLog; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.ratis.statemachine.SnapshotInfo; @@ -32,17 +36,21 @@ * operation in DB. */ public class SCMDBTransactionBuffer implements DBTransactionBuffer { + private final StorageContainerManager scm; private final SCMMetadataStore metadataStore; private BatchOperation currentBatchOperation; private SCMTransactionInfo latestTrxInfo; private SnapshotInfo latestSnapshot; - public SCMDBTransactionBuffer(SCMMetadataStore store) throws IOException { - this.metadataStore = store; + public SCMDBTransactionBuffer(StorageContainerManager scm) + throws IOException { + this.scm = scm; + this.metadataStore = scm.getScmMetadataStore(); // initialize a batch operation during construction time currentBatchOperation = this.metadataStore.getStore().initBatchOperation(); - latestTrxInfo = store.getTransactionInfoTable().get(TRANSACTION_INFO_KEY); + latestTrxInfo = this.metadataStore.getTransactionInfoTable() + .get(TRANSACTION_INFO_KEY); if (latestTrxInfo == null) { // transaction table is empty latestTrxInfo = @@ -98,6 +106,12 @@ public void flush() throws IOException { this.latestSnapshot = latestTrxInfo.toSnapshotInfo(); // reset batch operation currentBatchOperation = metadataStore.getStore().initBatchOperation(); + + DeletedBlockLog deletedBlockLog = scm.getScmBlockManager() + .getDeletedBlockLog(); + Preconditions.checkArgument( + deletedBlockLog instanceof DeletedBlockLogImplV2); + ((DeletedBlockLogImplV2) deletedBlockLog).onFlush(); } @Override diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java index b79538044291..188036366c15 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java @@ -48,7 +48,7 @@ public SCMHAManagerImpl(final ConfigurationSource conf, final StorageContainerManager scm) throws IOException { this.conf = conf; this.transactionBuffer = - new SCMDBTransactionBuffer(scm.getScmMetadataStore()); + new SCMDBTransactionBuffer(scm); this.ratisServer = new SCMRatisServerImpl( conf.getObject(SCMHAConfiguration.class), conf, scm, transactionBuffer); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java index a04f0d82b0d7..13c56619939f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java @@ -26,7 +26,10 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +import com.google.common.base.Preconditions; import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLog; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftGroupMemberId; @@ -156,6 +159,13 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, scm.getScmContext().updateLeaderAndTerm(true, term); scm.getSCMServiceManager().notifyStatusChanged(); + + DeletedBlockLog deletedBlockLog = scm.getScmBlockManager() + .getDeletedBlockLog(); + Preconditions.checkArgument( + deletedBlockLog instanceof DeletedBlockLogImplV2); + ((DeletedBlockLogImplV2) deletedBlockLog) + .clearTransactionToDNsCommitMap(); } @Override diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 809dda9af057..ce5e5f34f279 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -61,7 +61,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; -import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2; import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; @@ -362,7 +362,7 @@ private StorageContainerManager(OzoneConfiguration conf, eventQueue .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler); eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS, - (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog()); + (DeletedBlockLogImplV2) scmBlockManager.getDeletedBlockLog()); eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java index 37b2d6168edc..ef6d10289732 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java @@ -28,6 +28,10 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerV2; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer; +import org.apache.hadoop.hdds.scm.ha.MockDBTransactionBuffer; +import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; @@ -77,13 +81,14 @@ */ public class TestDeletedBlockLog { - private static DeletedBlockLogImpl deletedBlockLog; + private static DeletedBlockLogImplV2 deletedBlockLog; private static final int BLOCKS_PER_TXN = 5; private OzoneConfiguration conf; private File testDir; private ContainerManagerV2 containerManager; private StorageContainerManager scm; private List dnList; + private DBTransactionBuffer dbTransactionBuffer; @Before public void setup() throws Exception { @@ -94,8 +99,12 @@ public void setup() throws Exception { conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); scm = TestUtils.getScm(conf); containerManager = Mockito.mock(ContainerManagerV2.class); - deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager, - scm.getScmMetadataStore()); + dbTransactionBuffer = + new MockDBTransactionBuffer(scm.getScmMetadataStore().getStore()); + deletedBlockLog = new DeletedBlockLogImplV2(conf, containerManager, + MockSCMHAManager.getInstance(true).getRatisServer(), + scm.getScmMetadataStore().getDeletedBlocksTXTable(), + dbTransactionBuffer, SCMContext.emptyContext()); dnList = new ArrayList<>(3); setupContainerManager(); } @@ -155,31 +164,48 @@ private Map> generateData(int dataSize) { return blockMap; } + private void addTransactions(Map> containerBlocksMap) + throws IOException { + dbTransactionBuffer.getCurrentBatchOperation(); + deletedBlockLog.addTransactions(containerBlocksMap); + dbTransactionBuffer.flush(); + } + + private void incrementCount(List txIDs) throws IOException { + dbTransactionBuffer.getCurrentBatchOperation(); + deletedBlockLog.incrementCount(txIDs); + dbTransactionBuffer.flush(); + } + private void commitTransactions( List transactionResults, - DatanodeDetails... dns) { + DatanodeDetails... dns) throws IOException { + dbTransactionBuffer.getCurrentBatchOperation(); for (DatanodeDetails dnDetails : dns) { deletedBlockLog .commitTransactions(transactionResults, dnDetails.getUuid()); } + dbTransactionBuffer.flush(); } private void commitTransactions( - List transactionResults) { + List transactionResults) + throws IOException { commitTransactions(transactionResults, dnList.toArray(new DatanodeDetails[3])); } private void commitTransactions( Collection deletedBlocksTransactions, - DatanodeDetails... dns) { + DatanodeDetails... dns) throws IOException { commitTransactions(deletedBlocksTransactions.stream() .map(this::createDeleteBlockTransactionResult) .collect(Collectors.toList()), dns); } private void commitTransactions( - Collection deletedBlocksTransactions) { + Collection deletedBlocksTransactions) + throws IOException { commitTransactions(deletedBlocksTransactions.stream() .map(this::createDeleteBlockTransactionResult) .collect(Collectors.toList())); @@ -210,9 +236,7 @@ public void testIncrementCount() throws Exception { int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20); // Create 30 TXs in the log. - for (Map.Entry> entry : generateData(30).entrySet()){ - deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); - } + addTransactions(generateData(30)); // This will return all TXs, total num 30. List blocks = @@ -221,12 +245,12 @@ public void testIncrementCount() throws Exception { .collect(Collectors.toList()); for (int i = 0; i < maxRetry; i++) { - deletedBlockLog.incrementCount(txIDs); + incrementCount(txIDs); } // Increment another time so it exceed the maxRetry. // On this call, count will be set to -1 which means TX eventually fails. - deletedBlockLog.incrementCount(txIDs); + incrementCount(txIDs); blocks = getTransactions(40 * BLOCKS_PER_TXN); for (DeletedBlocksTransaction block : blocks) { Assert.assertEquals(-1, block.getCount()); @@ -239,9 +263,7 @@ public void testIncrementCount() throws Exception { @Test public void testCommitTransactions() throws Exception { - for (Map.Entry> entry : generateData(50).entrySet()){ - deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); - } + addTransactions(generateData(50)); List blocks = getTransactions(20 * BLOCKS_PER_TXN); // Add an invalid txn. @@ -279,10 +301,7 @@ public void testRandomOperateTransactions() throws Exception { for (int i = 0; i < 100; i++) { int state = random.nextInt(4); if (state == 0) { - for (Map.Entry> entry : - generateData(10).entrySet()){ - deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); - } + addTransactions(generateData(10)); added += 10; } else if (state == 1) { blocks = getTransactions(20); @@ -290,7 +309,7 @@ public void testRandomOperateTransactions() throws Exception { for (DeletedBlocksTransaction block : blocks) { txIDs.add(block.getTxID()); } - deletedBlockLog.incrementCount(txIDs); + incrementCount(txIDs); } else if (state == 2) { commitTransactions(blocks); committed += blocks.size(); @@ -312,14 +331,14 @@ public void testRandomOperateTransactions() throws Exception { @Test public void testPersistence() throws Exception { - for (Map.Entry> entry : generateData(50).entrySet()){ - deletedBlockLog.addTransaction(entry.getKey(), entry.getValue()); - } + addTransactions(generateData(50)); // close db and reopen it again to make sure // transactions are stored persistently. deletedBlockLog.close(); - deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager, - scm.getScmMetadataStore()); + deletedBlockLog = new DeletedBlockLogImplV2(conf, containerManager, + MockSCMHAManager.getInstance(true).getRatisServer(), + scm.getScmMetadataStore().getDeletedBlocksTXTable(), + dbTransactionBuffer, SCMContext.emptyContext()); List blocks = getTransactions(BLOCKS_PER_TXN * 10); Assert.assertEquals(10, blocks.size()); @@ -339,10 +358,11 @@ public void testDeletedBlockTransactions() throws IOException { long containerID; // Creates {TXNum} TX in the log. - for (Map.Entry> entry : generateData(txNum).entrySet()) { + Map> deletedBlocks = generateData(txNum); + addTransactions(deletedBlocks); + for (Map.Entry> entry :deletedBlocks.entrySet()) { count++; containerID = entry.getKey(); - deletedBlockLog.addTransaction(containerID, entry.getValue()); if (count % 2 == 0) { mockContainerInfo(containerID, dnId1); @@ -365,7 +385,9 @@ public void testDeletedBlockTransactions() throws IOException { builder.setTxID(11); builder.setContainerID(containerID); builder.setCount(0); - deletedBlockLog.addTransaction(containerID, new LinkedList<>()); + Map> deletedBlocksMap = new HashMap<>(); + deletedBlocksMap.put(containerID, new LinkedList<>()); + addTransactions(deletedBlocksMap); // get should return two transactions for the same container blocks = getTransactions(txNum); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index 635fe30a413e..a36c275f5033 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -41,7 +41,9 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -281,8 +283,9 @@ public void testBlockDeletionTransactions() throws Exception { cluster.getStorageContainerManager()); } - Map> containerBlocks = createDeleteTXLog(delLog, - keyLocations, helper); + Map> containerBlocks = createDeleteTXLog( + cluster.getStorageContainerManager(), + delLog, keyLocations, helper); Set containerIDs = containerBlocks.keySet(); // Verify a few TX gets created in the TX log. @@ -295,6 +298,8 @@ public void testBlockDeletionTransactions() throws Exception { // empty again. GenericTestUtils.waitFor(() -> { try { + cluster.getStorageContainerManager().getScmHAManager() + .getDBTransactionBuffer().flush(); return delLog.getNumOfValidTransactions() == 0; } catch (IOException e) { return false; @@ -306,10 +311,13 @@ public void testBlockDeletionTransactions() throws Exception { // but unknown block IDs. for (Long containerID : containerBlocks.keySet()) { // Add 2 TXs per container. - delLog.addTransaction(containerID, - Collections.singletonList(RandomUtils.nextLong())); - delLog.addTransaction(containerID, - Collections.singletonList(RandomUtils.nextLong())); + Map> deletedBlocks = new HashMap<>(); + List blocks = new ArrayList<>(); + blocks.add(RandomUtils.nextLong()); + blocks.add(RandomUtils.nextLong()); + deletedBlocks.put(containerID, blocks); + addTransactions(cluster.getStorageContainerManager(), delLog, + deletedBlocks); } // Verify a few TX gets created in the TX log. @@ -319,11 +327,13 @@ public void testBlockDeletionTransactions() throws Exception { // eventually these TX will success. GenericTestUtils.waitFor(() -> { try { + cluster.getStorageContainerManager().getScmHAManager() + .getDBTransactionBuffer().flush(); return delLog.getFailedTransactions().size() == 0; } catch (IOException e) { return false; } - }, 1000, 10000); + }, 1000, 20000); } finally { cluster.shutdown(); } @@ -374,7 +384,8 @@ public void testBlockDeletingThrottling() throws Exception { cluster.getStorageContainerManager()); } - createDeleteTXLog(delLog, keyLocations, helper); + createDeleteTXLog(cluster.getStorageContainerManager(), + delLog, keyLocations, helper); // Verify a few TX gets created in the TX log. Assert.assertTrue(delLog.getNumOfValidTransactions() > 0); @@ -400,7 +411,9 @@ public void testBlockDeletingThrottling() throws Exception { } } - private Map> createDeleteTXLog(DeletedBlockLog delLog, + private Map> createDeleteTXLog( + StorageContainerManager scm, + DeletedBlockLog delLog, Map keyLocations, TestStorageContainerManagerHelper helper) throws IOException { // These keys will be written into a bunch of containers, @@ -438,9 +451,7 @@ private Map> createDeleteTXLog(DeletedBlockLog delLog, } }); } - for (Map.Entry> tx : containerBlocks.entrySet()) { - delLog.addTransaction(tx.getKey(), tx.getValue()); - } + addTransactions(scm, delLog, containerBlocks); return containerBlocks; } @@ -667,6 +678,14 @@ public void testCloseContainerCommandOnRestart() throws Exception { } } + private void addTransactions(StorageContainerManager scm, + DeletedBlockLog delLog, + Map> containerBlocksMap) + throws IOException { + delLog.addTransactions(containerBlocksMap); + scm.getScmHAManager().getDBTransactionBuffer().flush(); + } + @SuppressWarnings("visibilitymodifier") static class CloseContainerCommandMatcher extends ArgumentMatcher { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java index 5a44a2c6bd66..bbbd702b2e2c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java @@ -280,9 +280,15 @@ public void testDeleteKeyWithSlowFollower() throws Exception { client.getObjectStore().getVolume(volumeName).getBucket(bucketName). deleteKey("ratis"); GenericTestUtils.waitFor(() -> { - return - dnStateMachine.getCommandDispatcher().getDeleteBlocksCommandHandler() - .getInvocationCount() >= 1; + try { + cluster.getStorageContainerManager().getScmHAManager() + .getDBTransactionBuffer().flush(); + return + dnStateMachine.getCommandDispatcher() + .getDeleteBlocksCommandHandler().getInvocationCount() >= 1; + } catch (IOException e) { + return false; + } }, 500, 100000); Assert.assertTrue(containerData.getDeleteTransactionId() > delTrxId); Assert.assertTrue(