Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService.ContainerBlockInfo;

import java.util.LinkedList;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand All @@ -40,33 +41,41 @@ public class RandomContainerDeletionChoosingPolicy
LoggerFactory.getLogger(RandomContainerDeletionChoosingPolicy.class);

@Override
public List<ContainerData> chooseContainerForBlockDeletion(int count,
Map<Long, ContainerData> candidateContainers)
public List<ContainerBlockInfo> chooseContainerForBlockDeletion(
int blockCount, Map<Long, ContainerData> candidateContainers)
throws StorageContainerException {
Preconditions.checkNotNull(candidateContainers,
"Internal assertion: candidate containers cannot be null");

int currentCount = 0;
List<ContainerData> result = new LinkedList<>();
List<ContainerBlockInfo> result = new ArrayList<>();
ContainerData[] values = new ContainerData[candidateContainers.size()];
// to get a shuffle list
ContainerData[] shuffled = candidateContainers.values().toArray(values);
ArrayUtils.shuffle(shuffled);

// Here we are returning containers based on totalBlocks which is basically
// number of blocks to be deleted in an interval. We are also considering
// the boundary case where the blocks of the last container exceeds the
// number of blocks to be deleted in an interval, there we return that
// container but with container we also return an integer so that total
// blocks don't exceed the number of blocks to be deleted in an interval.

for (ContainerData entry : shuffled) {
if (currentCount < count) {
result.add(entry);
currentCount++;
if (((KeyValueContainerData) entry).getNumPendingDeletionBlocks() > 0) {
long numBlocksToDelete = Math.min(blockCount,
((KeyValueContainerData) entry).getNumPendingDeletionBlocks());
blockCount -= numBlocksToDelete;
result.add(new ContainerBlockInfo(entry, numBlocksToDelete));
if (LOG.isDebugEnabled()) {
LOG.debug("Select container {} for block deletion, "
+ "pending deletion blocks num: {}.",
entry.getContainerID(),
+ "pending deletion blocks num: {}.", entry.getContainerID(),
((KeyValueContainerData) entry).getNumPendingDeletionBlocks());
}
} else {
break;
if (blockCount == 0) {
break;
}
}
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService.ContainerBlockInfo;

/**
* TopN Ordered choosing policy that choosing containers based on pending
Expand All @@ -49,43 +51,48 @@ public class TopNOrderedContainerDeletionChoosingPolicy
c1.getNumPendingDeletionBlocks());

@Override
public List<ContainerData> chooseContainerForBlockDeletion(int count,
Map<Long, ContainerData> candidateContainers)
public List<ContainerBlockInfo> chooseContainerForBlockDeletion(
int totalBlocks, Map<Long, ContainerData> candidateContainers)
throws StorageContainerException {

Preconditions.checkNotNull(candidateContainers,
"Internal assertion: candidate containers cannot be null");

List<ContainerData> result = new LinkedList<>();
List<ContainerBlockInfo> result = new ArrayList<>();
List<KeyValueContainerData> orderedList = new LinkedList<>();
for (ContainerData entry : candidateContainers.values()) {
orderedList.add((KeyValueContainerData)entry);
}
Collections.sort(orderedList, KEY_VALUE_CONTAINER_DATA_COMPARATOR);

// get top N list ordered by pending deletion blocks' number
int currentCount = 0;
// Here we are returning containers based on totalBlocks which is basically
// number of blocks to be deleted in an interval. We are also considering
// the boundary case where the blocks of the last container exceeds the
// number of blocks to be deleted in an interval, there we return that
// container but with container we also return an integer so that total
// blocks don't exceed the number of blocks to be deleted in an interval.

for (KeyValueContainerData entry : orderedList) {
if (currentCount < count) {
if (entry.getNumPendingDeletionBlocks() > 0) {
result.add(entry);
currentCount++;
if (LOG.isDebugEnabled()) {
LOG.debug(
"Select container {} for block deletion, "
+ "pending deletion blocks num: {}.",
entry.getContainerID(),
entry.getNumPendingDeletionBlocks());
}
} else {
LOG.debug("Stop looking for next container, there is no"
+ " pending deletion block contained in remaining containers.");
if (entry.getNumPendingDeletionBlocks() > 0) {
long numBlocksToDelete =
Math.min(totalBlocks, entry.getNumPendingDeletionBlocks());
totalBlocks -= numBlocksToDelete;
result.add(new ContainerBlockInfo(entry, numBlocksToDelete));
if (LOG.isDebugEnabled()) {
LOG.debug("Select container {} for block deletion, "
+ "pending deletion blocks num: {}.", entry.getContainerID(),
entry.getNumPendingDeletionBlocks());
}
if (totalBlocks == 0) {
break;
}
} else {
LOG.debug("Stop looking for next container, there is no"
+ " pending deletion block contained in remaining containers.");
break;
}
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService.ContainerBlockInfo;

import java.util.List;
import java.util.Map;
Expand All @@ -40,8 +41,8 @@ public interface ContainerDeletionChoosingPolicy {
* @return container data list
* @throws StorageContainerException
*/
List<ContainerData> chooseContainerForBlockDeletion(int count,
Map<Long, ContainerData> candidateContainers)
List<ContainerBlockInfo> chooseContainerForBlockDeletion(
int count, Map<Long, ContainerData> candidateContainers)
throws StorageContainerException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,23 @@ public void setBlockDeletionInterval(Duration duration) {
this.blockDeletionInterval = duration.toMillis();
}

@Config(key = "block.deleting.limit.per.interval",
defaultValue = "5000",
type = ConfigType.INT,
tags = { ConfigTag.SCM, ConfigTag.DELETION },
description =
"Number of blocks to be deleted in an interval."
)
private int blockLimitPerInterval = 5000;

public int getBlockDeletionLimit() {
return blockLimitPerInterval;
}

public void setBlockDeletionLimit(int limit) {
this.blockLimitPerInterval = limit;
}

@PostConstruct
public void validate() {
if (replicationMaxStreams < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,10 @@
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;

import com.google.common.collect.Lists;

import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;

Expand All @@ -86,12 +83,7 @@ public class BlockDeletingService extends BackgroundService {
private ContainerDeletionChoosingPolicy containerDeletionPolicy;
private final ConfigurationSource conf;

// Throttle number of blocks to delete per task,
// set to 1 for testing
private final int blockLimitPerTask;

// Throttle the number of containers to process concurrently at a time,
private final int containerLimitPerInterval;
private final int blockLimitPerInterval;

// Task priority is useful when a to-delete block has weight.
private final static int TASK_PRIORITY_DEFAULT = 1;
Expand All @@ -113,37 +105,56 @@ public BlockDeletingService(OzoneContainer ozoneContainer,
throw new RuntimeException(e);
}
this.conf = conf;
this.blockLimitPerTask =
conf.getInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
this.containerLimitPerInterval =
conf.getInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
}

public static class ContainerBlockInfo {
private final ContainerData containerData;
private final Long numBlocksToDelete;

public ContainerBlockInfo(ContainerData containerData, Long blocks) {
this.containerData = containerData;
this.numBlocksToDelete = blocks;
}

public ContainerData getContainerData() {
return containerData;
}

public Long getBlocks() {
return numBlocksToDelete;
}

}


@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
List<ContainerData> containers = Lists.newArrayList();
List<ContainerBlockInfo> containers = Lists.newArrayList();
try {
// We at most list a number of containers a time,
// in case there are too many containers and start too many workers.
// We must ensure there is no empty container in this result.
// The chosen result depends on what container deletion policy is
// configured.
containers = chooseContainerForBlockDeletion(containerLimitPerInterval,
containerDeletionPolicy);
if (containers.size() > 0) {
LOG.info("Plan to choose {} containers for block deletion, "
+ "actually returns {} valid containers.",
containerLimitPerInterval, containers.size());
containers = chooseContainerForBlockDeletion(blockLimitPerInterval,
containerDeletionPolicy);

BlockDeletingTask containerBlockInfos = null;
long totalBlocks = 0;
for (ContainerBlockInfo containerBlockInfo : containers) {
containerBlockInfos =
new BlockDeletingTask(containerBlockInfo.containerData,
TASK_PRIORITY_DEFAULT, containerBlockInfo.numBlocksToDelete);
queue.add(containerBlockInfos);
totalBlocks += containerBlockInfo.numBlocksToDelete;
}

for(ContainerData container : containers) {
BlockDeletingTask containerTask =
new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT);
queue.add(containerTask);
if (containers.size() > 0) {
LOG.info("Plan to choose {} blocks for block deletion, "
+ "actually deleting {} blocks.", blockLimitPerInterval,
totalBlocks);
}
} catch (StorageContainerException e) {
LOG.warn("Failed to initiate block deleting tasks, "
Expand All @@ -158,16 +169,16 @@ public BackgroundTaskQueue getTasks() {
return queue;
}

public List<ContainerData> chooseContainerForBlockDeletion(int count,
ContainerDeletionChoosingPolicy deletionPolicy)
public List<ContainerBlockInfo> chooseContainerForBlockDeletion(
int blockLimit, ContainerDeletionChoosingPolicy deletionPolicy)
throws StorageContainerException {
Map<Long, ContainerData> containerDataMap =
ozoneContainer.getContainerSet().getContainerMap().entrySet().stream()
.filter(e -> isDeletionAllowed(e.getValue().getContainerData(),
deletionPolicy)).collect(Collectors
.toMap(Map.Entry::getKey, e -> e.getValue().getContainerData()));
return deletionPolicy
.chooseContainerForBlockDeletion(count, containerDataMap);
.chooseContainerForBlockDeletion(blockLimit, containerDataMap);
}

private boolean isDeletionAllowed(ContainerData containerData,
Expand Down Expand Up @@ -246,10 +257,13 @@ private class BlockDeletingTask implements BackgroundTask {

private final int priority;
private final KeyValueContainerData containerData;
private final long blocksToDelete;

BlockDeletingTask(ContainerData containerName, int priority) {
BlockDeletingTask(ContainerData containerName, int priority,
long blocksToDelete) {
this.priority = priority;
this.containerData = (KeyValueContainerData) containerName;
this.blocksToDelete = blocksToDelete;
}

@Override
Expand Down Expand Up @@ -300,8 +314,8 @@ public ContainerBackgroundTaskResult deleteViaSchema1(
// # of blocks to delete is throttled
KeyPrefixFilter filter = MetadataKeyFilters.getDeletingKeyFilter();
List<? extends Table.KeyValue<String, BlockData>> toDeleteBlocks =
blockDataTable.getSequentialRangeKVs(null, blockLimitPerTask,
filter);
blockDataTable
.getSequentialRangeKVs(null, (int) blocksToDelete, filter);
if (toDeleteBlocks.isEmpty()) {
LOG.debug("No under deletion block found in container : {}",
containerData.getContainerID());
Expand Down Expand Up @@ -374,16 +388,16 @@ public ContainerBackgroundTaskResult deleteViaSchema2(
deleteTxns = dnStoreTwoImpl.getDeleteTransactionTable();
List<DeletedBlocksTransaction> delBlocks = new ArrayList<>();
int totalBlocks = 0;
int numBlocks = 0;
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
dnStoreTwoImpl.getDeleteTransactionTable().iterator()) {
while (iter.hasNext() && (totalBlocks < blockLimitPerTask)) {
while (iter.hasNext() && (numBlocks < blocksToDelete)) {
DeletedBlocksTransaction delTx = iter.next().getValue();
totalBlocks += delTx.getLocalIDList().size();
numBlocks += delTx.getLocalIDList().size();
delBlocks.add(delTx);
}
}

if (delBlocks.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No transaction found in container : {}",
Expand All @@ -398,7 +412,8 @@ public ContainerBackgroundTaskResult deleteViaSchema2(
Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
.getHandler(container.getContainerType()));

deleteTransactions(delBlocks, handler, blockDataTable, container);
totalBlocks =
deleteTransactions(delBlocks, handler, blockDataTable, container);

// Once blocks are deleted... remove the blockID from blockDataTable
// and also remove the transactions from txnTable.
Expand Down Expand Up @@ -433,23 +448,27 @@ public ContainerBackgroundTaskResult deleteViaSchema2(
}
}

private void deleteTransactions(List<DeletedBlocksTransaction> delBlocks,
private int deleteTransactions(List<DeletedBlocksTransaction> delBlocks,
Handler handler, Table<String, BlockData> blockDataTable,
Container container) throws IOException {
Container container)
throws IOException {
int blocksDeleted = 0;
for (DeletedBlocksTransaction entry : delBlocks) {
for (Long blkLong : entry.getLocalIDList()) {
String blk = blkLong.toString();
BlockData blkInfo = blockDataTable.get(blk);
LOG.debug("Deleting block {}", blk);
try {
handler.deleteBlock(container, blkInfo);
blocksDeleted++;
} catch (InvalidProtocolBufferException e) {
LOG.error("Failed to parse block info for block {}", blk, e);
} catch (IOException e) {
LOG.error("Failed to delete files for block {}", blk, e);
}
}
}
return blocksDeleted;
}

@Override
Expand Down
Loading