Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
* @throws IOException
*/
public BlockManagerImpl(final ConfigurationSource conf,
final StorageContainerManager scm) {
final StorageContainerManager scm)
throws IOException {
Objects.requireNonNull(scm, "SCM cannot be null");
this.pipelineManager = scm.getPipelineManager();
this.containerManager = scm.getContainerManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.block;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.Set;
Expand All @@ -26,6 +27,7 @@
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -70,6 +72,8 @@ public class DeletedBlockLogImpl

public static final Logger LOG =
LoggerFactory.getLogger(DeletedBlockLogImpl.class);
private static final DeletedBlocksTransaction.Builder DUMMY_TXN_BUILDER =
DeletedBlocksTransaction.newBuilder().setContainerID(1).setCount(1);

private final int maxRetry;
private final ContainerManager containerManager;
Expand All @@ -78,9 +82,15 @@ public class DeletedBlockLogImpl
// Maps txId to set of DNs which are successful in committing the transaction
private Map<Long, Set<UUID>> transactionToDNsCommitMap;

private final AtomicLong largestTxnId;
// largest transactionId is stored at largestTxnIdHolderKey
private final long largestTxnIdHolderKey = 0L;


public DeletedBlockLogImpl(ConfigurationSource conf,
ContainerManager containerManager,
SCMMetadataStore scmMetadataStore) {
SCMMetadataStore scmMetadataStore)
throws IOException {
maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
this.containerManager = containerManager;
Expand All @@ -92,8 +102,46 @@ public DeletedBlockLogImpl(ConfigurationSource conf,

// maps transaction to dns which have committed it.
transactionToDNsCommitMap = new ConcurrentHashMap<>();

this.largestTxnId = new AtomicLong(this.getLargestRecordedTXID());
}

public Long getNextDeleteBlockTXID() {
return this.largestTxnId.incrementAndGet();
}

public Long getCurrentTXID() {
return this.largestTxnId.get();
}

/**
* Returns the largest recorded TXID from the DB.
*
* @return Long
* @throws IOException
*/
private long getLargestRecordedTXID() throws IOException {
DeletedBlocksTransaction txn =
scmMetadataStore.getDeletedBlocksTXTable().get(largestTxnIdHolderKey);
long txnId = txn != null ? txn.getTxID() : 0L;
if (txn == null) {
// HDDS-4477 adds largestTxnIdHolderKey to table for storing largest
// transactionId. In case the key does not exist, fetch largest
// transactionId from existing transactions and update
// largestTxnIdHolderKey with same.
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> txIter =
getIterator()) {
txIter.seekToLast();
txnId = txIter.key() != null ? txIter.key() : 0L;
if (txnId > 0) {
scmMetadataStore.getDeletedBlocksTXTable().put(largestTxnIdHolderKey,
DUMMY_TXN_BUILDER.setTxID(txnId).build());
}
}
}
return txnId;
}

@Override
public List<DeletedBlocksTransaction> getFailedTransactions()
Expand All @@ -103,7 +151,7 @@ public List<DeletedBlocksTransaction> getFailedTransactions()
final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
getIterator()) {
while (iter.hasNext()) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() == -1) {
Expand Down Expand Up @@ -270,15 +318,8 @@ private boolean isTransactionFailed(DeleteBlockTransactionResult result) {
@Override
public void addTransaction(long containerID, List<Long> blocks)
throws IOException {
lock.lock();
try {
Long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
DeletedBlocksTransaction tx =
constructNewTransaction(nextTXID, containerID, blocks);
scmMetadataStore.getDeletedBlocksTXTable().put(nextTXID, tx);
} finally {
lock.unlock();
}
Map<Long, List<Long>> map = Collections.singletonMap(containerID, blocks);
addTransactions(map);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about remove this addTransaction(), since it is only used in unit test ?

Copy link
Contributor

@runzhiwang runzhiwang Jan 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GlenGeng I have done this in my DeleteBlock pr.

}

@Override
Expand All @@ -288,7 +329,7 @@ public int getNumOfValidTransactions() throws IOException {
final AtomicInteger num = new AtomicInteger(0);
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
getIterator()) {
while (iter.hasNext()) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() > -1) {
Expand All @@ -312,19 +353,20 @@ public int getNumOfValidTransactions() throws IOException {
public void addTransactions(Map<Long, List<Long>> containerBlocksMap)
throws IOException {
lock.lock();
try {
try(BatchOperation batch =
scmMetadataStore.getStore().initBatchOperation()) {
for (Map.Entry< Long, List< Long > > entry :
containerBlocksMap.entrySet()) {
long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
entry.getKey(), entry.getValue());
scmMetadataStore.getDeletedBlocksTXTable().putWithBatch(batch,
nextTXID, tx);
}
scmMetadataStore.getStore().commitBatchOperation(batch);
try (BatchOperation batch = scmMetadataStore.getStore()
.initBatchOperation()) {
for (Map.Entry<Long, List<Long>> entry : containerBlocksMap.entrySet()) {
long nextTXID = getNextDeleteBlockTXID();
scmMetadataStore.getDeletedBlocksTXTable().putWithBatch(batch, nextTXID,
constructNewTransaction(nextTXID, entry.getKey(),
entry.getValue()));
}
// Add a dummy transaction to store the largestTransactionId at
// largestTxnIdHolderKey
scmMetadataStore.getDeletedBlocksTXTable()
.putWithBatch(batch, largestTxnIdHolderKey,
DUMMY_TXN_BUILDER.setTxID(getCurrentTXID()).build());
scmMetadataStore.getStore().commitBatchOperation(batch);
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -364,7 +406,7 @@ public DatanodeDeletedBlockTransactions getTransactions(
new DatanodeDeletedBlockTransactions();
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
getIterator()) {
int numBlocksAdded = 0;
List<DeletedBlocksTransaction> txnsToBePurged =
new ArrayList<>();
Expand Down Expand Up @@ -406,6 +448,16 @@ public void purgeTransactions(List<DeletedBlocksTransaction> txnsToBePurged)
}
}

TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> getIterator()
throws IOException {
TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>>
iter = scmMetadataStore.getDeletedBlocksTXTable().iterator();
iter.seek(largestTxnIdHolderKey + 1);
return iter;
}

@Override
public void onMessage(DeleteBlockStatus deleteBlockStatus,
EventPublisher publisher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,6 @@ public interface SCMMetadataStore {
*/
Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable();

/**
* Returns the current TXID for the deleted blocks.
*
* @return Long
*/
Long getCurrentTXID();

/**
* Returns the next TXID for the Deleted Blocks.
*
* @return Long.
*/
Long getNextDeleteBlockTXID();

/**
* A table that maintains all the valid certificates issued by the SCM CA.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.math.BigInteger;
import java.security.cert.X509Certificate;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
Expand All @@ -33,7 +32,6 @@
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;

import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS;
Expand Down Expand Up @@ -64,7 +62,6 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
LoggerFactory.getLogger(SCMMetadataStoreImpl.class);
private DBStore store;
private final OzoneConfiguration configuration;
private final AtomicLong txID;

/**
* Constructs the metadata store and starts the DB Services.
Expand All @@ -76,7 +73,6 @@ public SCMMetadataStoreImpl(OzoneConfiguration config)
throws IOException {
this.configuration = config;
start(this.configuration);
this.txID = new AtomicLong(this.getLargestRecordedTXID());
}

@Override
Expand Down Expand Up @@ -124,10 +120,6 @@ public Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable() {
return deletedBlocksTable;
}

@Override
public Long getNextDeleteBlockTXID() {
return this.txID.incrementAndGet();
}

@Override
public Table<BigInteger, X509Certificate> getValidCertsTable() {
Expand Down Expand Up @@ -167,28 +159,6 @@ public Table<ContainerID, ContainerInfo> getContainerTable() {
return containerTable;
}

@Override
public Long getCurrentTXID() {
return this.txID.get();
}

/**
* Returns the largest recorded TXID from the DB.
*
* @return Long
* @throws IOException
*/
private Long getLargestRecordedTXID() throws IOException {
try (TableIterator<Long, ? extends KeyValue<Long, DeletedBlocksTransaction>>
txIter = deletedBlocksTable.iterator()) {
txIter.seekToLast();
Long txid = txIter.key();
if (txid != null) {
return txid;
}
}
return 0L;
}


private void checkTableStatus(Table table, String name) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public void testRandomOperateTransactions() throws Exception {
// verify the number of added and committed.
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
scm.getScmMetadataStore().getDeletedBlocksTXTable().iterator()) {
deletedBlockLog.getIterator()) {
AtomicInteger count = new AtomicInteger();
iter.forEachRemaining((keyValue) -> count.incrementAndGet());
Assert.assertEquals(added, count.get() + committed);
Expand Down Expand Up @@ -328,6 +328,15 @@ public void testPersistence() throws Exception {
blocks = getTransactions(BLOCKS_PER_TXN * 40);
Assert.assertEquals(40, blocks.size());
commitTransactions(blocks);

// close db and reopen it again to make sure
// currentTxnID = 50
deletedBlockLog.close();
deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
scm.getScmMetadataStore());
blocks = getTransactions(BLOCKS_PER_TXN * 40);
Assert.assertEquals(0, blocks.size());
Assert.assertEquals((long)deletedBlockLog.getCurrentTXID(), 50L);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,6 @@ public Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable() {
return null;
}

@Override
public Long getCurrentTXID() {
return null;
}

@Override
public Long getNextDeleteBlockTXID() {
return null;
}

@Override
public Table<BigInteger, X509Certificate> getValidCertsTable() {
return null;
Expand Down