Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
private Instant stateEnterTime;
private String owner;
private long containerID;
// Delete Transaction Id is updated when new transaction for a container
// is stored in SCM delete Table.
// TODO: Replication Manager should consider deleteTransactionId so that
// replica with higher deleteTransactionId is preferred over replica with
// lower deleteTransactionId.
private long deleteTransactionId;
// The sequenceId of a close container cannot change, and all the
// container replica should have the same sequenceId.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ class DatanodeDeletedBlockTransactions {
// counts blocks deleted across datanodes. Blocks deleted will be counted
// for all the replicas and may not be unique.
private int blocksDeleted = 0;
private final Map<Long, Long> containerIdToTxnId = new HashMap<>();

DatanodeDeletedBlockTransactions() {
}

void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
transactions.computeIfAbsent(dnID, k -> new LinkedList<>()).add(tx);
containerIdToTxnId.put(tx.getContainerID(), tx.getTxID());
blocksDeleted += tx.getLocalIDCount();
if (SCMBlockDeletingService.LOG.isDebugEnabled()) {
SCMBlockDeletingService.LOG
Expand All @@ -57,10 +55,6 @@ Map<UUID, List<DeletedBlocksTransaction>> getDatanodeTransactionMap() {
return transactions;
}

Map<Long, Long> getContainerIdToTxnIdMap() {
return containerIdToTxnId;
}

int getBlocksDeleted() {
return blocksDeleted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public DeletedBlockLogImplV2(ConfigurationSource conf,
.newBuilder()
.setConfiguration(conf)
.setDeletedBlocksTable(deletedBlocksTXTable)
.setContainerManager(containerManager)
.setRatisServer(ratisServer)
.setSCMDBTransactionBuffer(dbTxBuffer)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
Expand All @@ -34,6 +36,8 @@
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -52,18 +56,19 @@ public class DeletedBlockLogStateManagerImpl
LoggerFactory.getLogger(DeletedBlockLogStateManagerImpl.class);

private Table<Long, DeletedBlocksTransaction> deletedTable;
private ContainerManagerV2 containerManager;
private final DBTransactionBuffer transactionBuffer;
private final int maxRetry;
private final Set<Long> deletingTxIDs;
private final Set<Long> skippingRetryTxIDs;

public DeletedBlockLogStateManagerImpl(
ConfigurationSource conf,
public DeletedBlockLogStateManagerImpl(ConfigurationSource conf,
Table<Long, DeletedBlocksTransaction> deletedTable,
DBTransactionBuffer txBuffer) {
ContainerManagerV2 containerManager, DBTransactionBuffer txBuffer) {
this.maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
this.deletedTable = deletedTable;
this.containerManager = containerManager;
this.transactionBuffer = txBuffer;
final boolean isRatisEnabled = SCMHAUtils.isSCMHAEnabled(conf);
this.deletingTxIDs = isRatisEnabled ? ConcurrentHashMap.newKeySet() : null;
Expand Down Expand Up @@ -167,9 +172,14 @@ public void removeFromDB() throws IOException {
@Override
public void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs)
throws IOException {
Map<ContainerID, Long> containerIdToTxnIdMap = new HashMap<>();
for (DeletedBlocksTransaction tx : txs) {
long tid = tx.getTxID();
containerIdToTxnIdMap.compute(ContainerID.valueOf(tx.getContainerID()),
(k, v) -> v != null && v > tid ? v : tid);
transactionBuffer.addToBuffer(deletedTable, tx.getTxID(), tx);
}
containerManager.updateDeleteTransactionId(containerIdToTxnIdMap);
}

@Override
Expand Down Expand Up @@ -242,6 +252,7 @@ public static class Builder {
private SCMRatisServer scmRatisServer;
private Table<Long, DeletedBlocksTransaction> table;
private DBTransactionBuffer transactionBuffer;
private ContainerManagerV2 containerManager;

public Builder setConfiguration(final ConfigurationSource config) {
conf = config;
Expand All @@ -264,12 +275,18 @@ public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) {
return this;
}

public Builder setContainerManager(ContainerManagerV2 contManager) {
this.containerManager = contManager;
return this;
}

public DeletedBlockLogStateManager build() {
Preconditions.checkNotNull(conf);
Preconditions.checkNotNull(table);

final DeletedBlockLogStateManager impl =
new DeletedBlockLogStateManagerImpl(conf, table, transactionBuffer);
new DeletedBlockLogStateManagerImpl(conf, table, containerManager,
transactionBuffer);

final SCMHAInvocationHandler invocationHandler =
new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.BLOCK,
Expand All @@ -280,6 +297,5 @@ public DeletedBlockLogStateManager build() {
new Class<?>[]{DeletedBlockLogStateManager.class},
invocationHandler);
}

}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -30,7 +29,6 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
Expand Down Expand Up @@ -112,21 +110,6 @@ public BackgroundTaskQueue getTasks() {
return queue;
}

void handlePendingDeletes(PendingDeleteStatusList deletionStatusList) {
DatanodeDetails dnDetails = deletionStatusList.getDatanodeDetails();
for (PendingDeleteStatusList.PendingDeleteStatus deletionStatus :
deletionStatusList.getPendingDeleteStatuses()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Block deletion txnID lagging in datanode {} for containerID {}."
+ " Datanode delete txnID: {}, SCM txnID: {}",
dnDetails.getUuid(), deletionStatus.getContainerId(),
deletionStatus.getDnDeleteTransactionId(),
deletionStatus.getScmDeleteTransactionId());
}
}
}

private class DeletedBlockTransactionScanner implements BackgroundTask {

@Override
Expand Down Expand Up @@ -155,8 +138,6 @@ public EmptyTaskResult call() throws Exception {
try {
DatanodeDeletedBlockTransactions transactions =
deletedBlockLog.getTransactions(blockDeleteLimitSize);
Map<Long, Long> containerIdToMaxTxnId =
transactions.getContainerIdToTxnIdMap();

if (transactions.isEmpty()) {
return EmptyTaskResult.newResult();
Expand Down Expand Up @@ -186,11 +167,6 @@ public EmptyTaskResult call() throws Exception {
}
}
// TODO: Fix ME!!!
Map<ContainerID, Long> transactionMap = new HashMap<>();
for (Map.Entry<Long, Long> tx : containerIdToMaxTxnId.entrySet()) {
transactionMap.put(ContainerID.valueOf(tx.getKey()), tx.getValue());
}
containerManager.updateDeleteTransactionId(transactionMap);
LOG.info("Totally added {} blocks to be deleted for"
+ " {} datanodes, task elapsed time: {}ms",
transactions.getBlocksDeleted(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
Expand Down Expand Up @@ -134,7 +132,6 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,

processContainerReplicas(datanodeDetails, replicas, publisher);
processMissingReplicas(datanodeDetails, missingReplicas);
updateDeleteTransaction(datanodeDetails, replicas, publisher);

/*
* Update the latest set of containers for this datanode in
Expand Down Expand Up @@ -213,39 +210,4 @@ private void processMissingReplicas(final DatanodeDetails datanodeDetails,
}
}
}

/**
* Updates the Delete Transaction Id for the given datanode.
*
* @param datanodeDetails DatanodeDetails
* @param replicas List of ContainerReplicaProto
* @param publisher EventPublisher reference
*/
private void updateDeleteTransaction(final DatanodeDetails datanodeDetails,
final List<ContainerReplicaProto> replicas,
final EventPublisher publisher) {
final PendingDeleteStatusList pendingDeleteStatusList =
new PendingDeleteStatusList(datanodeDetails);
for (ContainerReplicaProto replica : replicas) {
try {
final ContainerInfo containerInfo = containerManager.getContainer(
ContainerID.valueOf(replica.getContainerID()));
if (containerInfo.getDeleteTransactionId() >
replica.getDeleteTransactionId()) {
pendingDeleteStatusList.addPendingDeleteStatus(
replica.getDeleteTransactionId(),
containerInfo.getDeleteTransactionId(),
containerInfo.getContainerID());
}
} catch (ContainerNotFoundException cnfe) {
LOG.warn("Cannot update pending delete transaction for " +
"container #{}. Reason: container missing.",
replica.getContainerID());
}
}
if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
publisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
pendingDeleteStatusList);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public void updateDeleteTransactionId(
final ContainerInfo info = containers.getContainerInfo(
transaction.getKey());
info.updateDeleteTransactionId(transaction.getValue());
containerStore.put(info.containerID(), info);
transactionBuffer.addToBuffer(containerStore, info.containerID(), info);
}
} finally {
lock.writeLock().unlock();
Expand Down
Loading