diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index cd5929f8a59f..e67ceb2607b6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -78,6 +78,7 @@ public class DatanodeConfiguration { private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT; static final int CONTAINER_DELETE_THREADS_DEFAULT = 2; + static final int BLOCK_DELETE_THREADS_DEFAULT = 5; /** * The maximum number of threads used to delete containers on a datanode @@ -92,6 +93,37 @@ public class DatanodeConfiguration { ) private int containerDeleteThreads = CONTAINER_DELETE_THREADS_DEFAULT; + /** + * The maximum number of threads used to handle delete block commands. + * It takes about 200ms to open a RocksDB with HDD media, so basically DN + * can handle 300 individual container delete tx every 60s if RocksDB cache + * missed. With max threads 5, optimistically DN can handle 1500 individual + * container delete tx in 60s with RocksDB cache miss. + */ + @Config(key = "block.delete.threads.max", + type = ConfigType.INT, + defaultValue = "5", + tags = {DATANODE}, + description = "The maximum number of threads used to handle delete " + + " blocks on a datanode" + ) + private int blockDeleteThreads = BLOCK_DELETE_THREADS_DEFAULT; + + /** + * The maximum number of commands in queued list. + * 1440 = 60 * 24, which means if SCM send a delete command every minute, + * if the commands are pined up for more than 1 day, DN will start to discard + * new comming commands. + */ + @Config(key = "block.delete.queue.limit", + type = ConfigType.INT, + defaultValue = "1440", + tags = {DATANODE}, + description = "The maximum number of block delete commands queued on "+ + " a datanode" + ) + private int blockDeleteQueueLimit = 60 * 24; + @Config(key = "block.deleting.service.interval", defaultValue = "60s", type = ConfigType.TIME, @@ -292,4 +324,20 @@ public Duration getDiskCheckTimeout() { public void setDiskCheckTimeout(Duration duration) { this.diskCheckTimeout = duration.toMillis(); } + + public int getBlockDeleteThreads() { + return blockDeleteThreads; + } + + public void setBlockDeleteThreads(int threads) { + this.blockDeleteThreads = threads; + } + + public int getBlockDeleteQueueLimit() { + return blockDeleteQueueLimit; + } + + public void setBlockDeleteQueueLimit(int queueLimit) { + this.blockDeleteQueueLimit = queueLimit; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 56ab9a63ba71..f4a20c596c0b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -172,7 +172,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, commandDispatcher = CommandDispatcher.newBuilder() .addHandler(new CloseContainerCommandHandler()) .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(), - conf)) + conf, dnConf.getBlockDeleteThreads(), + dnConf.getBlockDeleteQueueLimit())) .addHandler(new ReplicateContainerCommandHandler(conf, supervisor)) .addHandler(new DeleteContainerCommandHandler( dnConf.getContainerDeleteThreads())) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 2a19728175d0..2823100b743e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -16,8 +16,9 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.container.common.helpers @@ -49,6 +50,7 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Time; import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; @@ -56,7 +58,13 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -76,34 +84,177 @@ public class DeleteBlocksCommandHandler implements CommandHandler { private final ConfigurationSource conf; private int invocationCount; private long totalTime; - private boolean cmdExecuted; + private final ExecutorService executor; + private final LinkedBlockingQueue deleteCommandQueues; + private final Daemon handlerThread; public DeleteBlocksCommandHandler(ContainerSet cset, - ConfigurationSource conf) { + ConfigurationSource conf, int threadPoolSize, int queueLimit) { this.containerSet = cset; this.conf = conf; + this.executor = new ThreadPoolExecutor( + 0, threadPoolSize, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("DeleteBlocksCommandHandlerThread-%d") + .build()); + this.deleteCommandQueues = new LinkedBlockingQueue<>(queueLimit); + handlerThread = new Daemon(new DeleteCmdWorker()); + handlerThread.start(); } @Override public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { - cmdExecuted = false; - long startTime = Time.monotonicNow(); - ContainerBlocksDeletionACKProto blockDeletionACK = null; + if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) { + LOG.warn("Skipping handling command, expected command " + + "type {} but found {}", + SCMCommandProto.Type.deleteBlocksCommand, command.getType()); + return; + } + try { - if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) { - LOG.warn("Skipping handling command, expected command " - + "type {} but found {}", - SCMCommandProto.Type.deleteBlocksCommand, command.getType()); - return; + DeleteCmdInfo cmd = new DeleteCmdInfo((DeleteBlocksCommand) command, + container, context, connectionManager); + deleteCommandQueues.add(cmd); + } catch (IllegalStateException e) { + LOG.warn("Command is discarded because of the command queue is full"); + return; + } + } + + /** + * A delete command info. + */ + public static final class DeleteCmdInfo { + private DeleteBlocksCommand cmd; + private StateContext context; + private OzoneContainer container; + private SCMConnectionManager connectionManager; + + public DeleteCmdInfo(DeleteBlocksCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager) { + this.cmd = command; + this.context = context; + this.container = container; + this.connectionManager = connectionManager; + } + public DeleteBlocksCommand getCmd() { + return this.cmd; + } + public StateContext getContext() { + return this.context; + } + public OzoneContainer getContainer() { + return this.container; + } + public SCMConnectionManager getConnectionManager() { + return this.connectionManager; + } + } + + /** + * Process delete commands. + */ + public final class DeleteCmdWorker implements Runnable { + + @Override + public void run() { + while (true) { + while (!deleteCommandQueues.isEmpty()) { + DeleteCmdInfo cmd = deleteCommandQueues.poll(); + try { + processCmd(cmd); + } catch (Throwable e) { + LOG.error("taskProcess failed.", e); + } + } + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } - LOG.debug("Processing block deletion command."); + } + } + /** + * Process one delete transaction. + */ + public final class ProcessTransactionTask implements Runnable { + private DeletedBlocksTransaction tx; + private ContainerBlocksDeletionACKProto.Builder result; + + public ProcessTransactionTask(DeletedBlocksTransaction transaction, + ContainerBlocksDeletionACKProto.Builder resultBuilder) { + this.result = resultBuilder; + this.tx = transaction; + } + + @Override + public void run() { + DeleteBlockTransactionResult.Builder txResultBuilder = + DeleteBlockTransactionResult.newBuilder(); + txResultBuilder.setTxID(tx.getTxID()); + long containerId = tx.getContainerID(); + int newDeletionBlocks = 0; + try { + Container cont = containerSet.getContainer(containerId); + if (cont == null) { + throw new StorageContainerException("Unable to find the " + + "container " + containerId, CONTAINER_NOT_FOUND); + } + + ContainerType containerType = cont.getContainerType(); + switch (containerType) { + case KeyValueContainer: + KeyValueContainerData containerData = (KeyValueContainerData) + cont.getContainerData(); + cont.writeLock(); + try { + if (containerData.getSchemaVersion().equals(SCHEMA_V1)) { + markBlocksForDeletionSchemaV1(containerData, tx); + } else if (containerData.getSchemaVersion().equals(SCHEMA_V2)) { + markBlocksForDeletionSchemaV2(containerData, tx, + newDeletionBlocks, tx.getTxID()); + } else { + throw new UnsupportedOperationException( + "Only schema version 1 and schema version 2 are " + + "supported."); + } + } finally { + cont.writeUnlock(); + } + txResultBuilder.setContainerID(containerId) + .setSuccess(true); + break; + default: + LOG.error( + "Delete Blocks Command Handler is not implemented for " + + "containerType {}", containerType); + } + } catch (IOException e) { + LOG.warn("Failed to delete blocks for container={}, TXID={}", + tx.getContainerID(), tx.getTxID(), e); + txResultBuilder.setContainerID(containerId) + .setSuccess(false); + } + result.addResults(txResultBuilder.build()); + } + } + + private void processCmd(DeleteCmdInfo cmd) { + LOG.debug("Processing block deletion command."); + ContainerBlocksDeletionACKProto blockDeletionACK = null; + long startTime = Time.monotonicNow(); + boolean cmdExecuted = false; + try { // move blocks to deleting state. // this is a metadata update, the actual deletion happens in another // recycling thread. - DeleteBlocksCommand cmd = (DeleteBlocksCommand) command; - List containerBlocks = cmd.blocksTobeDeleted(); + List containerBlocks = + cmd.getCmd().blocksTobeDeleted(); DeletedContainerBlocksSummary summary = DeletedContainerBlocksSummary.getFrom(containerBlocks); @@ -115,56 +266,25 @@ public void handle(SCMCommand command, OzoneContainer container, ContainerBlocksDeletionACKProto.Builder resultBuilder = ContainerBlocksDeletionACKProto.newBuilder(); - containerBlocks.forEach(entry -> { - DeleteBlockTransactionResult.Builder txResultBuilder = - DeleteBlockTransactionResult.newBuilder(); - txResultBuilder.setTxID(entry.getTxID()); - long containerId = entry.getContainerID(); - int newDeletionBlocks = 0; + List futures = new ArrayList<>(); + for (int i = 0; i < containerBlocks.size(); i++) { + DeletedBlocksTransaction tx = containerBlocks.get(i); + Future future = executor.submit( + new ProcessTransactionTask(tx, resultBuilder)); + futures.add(future); + } + + // Wait for tasks to finish + futures.forEach(f -> { try { - Container cont = containerSet.getContainer(containerId); - if (cont == null) { - throw new StorageContainerException("Unable to find the container " - + containerId, CONTAINER_NOT_FOUND); - } - ContainerProtos.ContainerType containerType = cont.getContainerType(); - switch (containerType) { - case KeyValueContainer: - KeyValueContainerData containerData = (KeyValueContainerData) - cont.getContainerData(); - cont.writeLock(); - try { - if (containerData.getSchemaVersion().equals(SCHEMA_V1)) { - markBlocksForDeletionSchemaV1(containerData, entry); - } else if (containerData.getSchemaVersion().equals(SCHEMA_V2)) { - markBlocksForDeletionSchemaV2(containerData, entry, - newDeletionBlocks, entry.getTxID()); - } else { - throw new UnsupportedOperationException( - "Only schema version 1 and schema version 2 are " - + "supported."); - } - } finally { - cont.writeUnlock(); - } - txResultBuilder.setContainerID(containerId) - .setSuccess(true); - break; - default: - LOG.error( - "Delete Blocks Command Handler is not implemented for " + - "containerType {}", containerType); - } - } catch (IOException e) { - LOG.warn("Failed to delete blocks for container={}, TXID={}", - entry.getContainerID(), entry.getTxID(), e); - txResultBuilder.setContainerID(containerId) - .setSuccess(false); + f.get(); + } catch (Exception e) { + LOG.error("task failed.", e); } - resultBuilder.addResults(txResultBuilder.build()) - .setDnId(context.getParent().getDatanodeDetails() - .getUuid().toString()); }); + + resultBuilder.setDnId(cmd.getContext().getParent().getDatanodeDetails() + .getUuid().toString()); blockDeletionACK = resultBuilder.build(); // Send ACK back to SCM as long as meta updated @@ -182,11 +302,12 @@ public void handle(SCMCommand command, OzoneContainer container, } finally { final ContainerBlocksDeletionACKProto deleteAck = blockDeletionACK; + final boolean status = cmdExecuted; Consumer statusUpdater = (cmdStatus) -> { - cmdStatus.setStatus(cmdExecuted); - ((DeleteBlockCommandStatus) cmdStatus).setBlocksDeletionAck(deleteAck); + cmdStatus.setStatus(status); + ((DeleteBlockCommandStatus)cmdStatus).setBlocksDeletionAck(deleteAck); }; - updateCommandStatus(context, command, statusUpdater, LOG); + updateCommandStatus(cmd.getContext(), cmd.getCmd(), statusUpdater, LOG); long endTime = Time.monotonicNow(); totalTime += endTime - startTime; invocationCount++; @@ -354,4 +475,29 @@ public long getAverageRunTime() { } return 0; } + + @Override + public void stop() { + if (executor != null) { + try { + executor.shutdown(); + if (!executor.awaitTermination(3, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + // Ignore, we don't really care about the failure. + Thread.currentThread().interrupt(); + } + } + + if (handlerThread != null) { + try { + handlerThread.interrupt(); + handlerThread.join(3000); + } catch (InterruptedException ie) { + // Ignore, we don't really care about the failure. + Thread.currentThread().interrupt(); + } + } + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index 85643d8b8255..2cf28359e89d 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -535,8 +535,9 @@ public void testHeartbeatTaskRpcTimeOut() throws Exception { heartbeatTaskHelper(invalidAddress, 1000); long end = Time.monotonicNow(); scmServerImpl.setRpcResponseDelay(0); + // 6s is introduced by DeleteBlocksCommandHandler#stop Assert.assertThat(end - start, - lessThanOrEqualTo(rpcTimeout + tolerance)); + lessThanOrEqualTo(rpcTimeout + tolerance + 6000)); } private StateContext getContext(DatanodeDetails datanodeDetails) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java index f0cdc4991762..80fd44ccdc3b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java @@ -134,7 +134,6 @@ public void testContainerReplication() throws Exception { cluster.shutdownHddsDatanode(keyLocation.getPipeline().getFirstNode()); - waitForReplicaCount(containerID, 2, cluster); waitForReplicaCount(containerID, 3, cluster); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index d8e80b43d038..bcccce9300dd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -83,6 +83,7 @@ import static org.apache.hadoop.hdds .HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import static org.apache.hadoop.ozone .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; @@ -139,11 +140,12 @@ public void init() throws Exception { 3, TimeUnit.SECONDS); conf.setBoolean(ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false); - conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1); + conf.setInt(OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 100); conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1); conf.setQuietMode(false); conf.setTimeDuration("hdds.scm.replication.event.timeout", 100, TimeUnit.MILLISECONDS); + conf.setInt("hdds.datanode.block.delete.threads.max", 5); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) .setHbInterval(200) @@ -440,4 +442,69 @@ private void verifyBlocksDeleted( }, omKeyLocationInfoGroups); } } + + @Test + public void testBlockDeleteCommandParallelProcess() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + String value = RandomStringUtils.random(64 * 1024); + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + int keyCount = 10; + List keys = new ArrayList<>(); + for (int j = 0; j < keyCount; j++) { + String keyName = UUID.randomUUID().toString(); + OzoneOutputStream out = bucket.createKey(keyName, + value.getBytes(UTF_8).length, ReplicationType.RATIS, + ReplicationFactor.THREE, new HashMap<>()); + out.write(value.getBytes(UTF_8)); + out.close(); + keys.add(keyName); + } + + // close the containers which hold the blocks for the key + OzoneTestUtils.closeAllContainers(scm.getEventQueue(), scm); + Thread.sleep(2000); + + for (int j = 0; j < keyCount; j++) { + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setKeyName(keys.get(j)).setDataSize(0) + .setReplicationConfig( + new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE)) + .setRefreshPipeline(true) + .build(); + om.deleteKey(keyArgs); + } + + // Wait for block delete command sent from OM + GenericTestUtils.waitFor(() -> { + try { + if (scm.getScmBlockManager().getDeletedBlockLog() + .getNumOfValidTransactions() > 0) { + return true; + } + } catch (IOException e) { + } + return false; + }, 100, 5000); + + long start = System.currentTimeMillis(); + // Wait for all blocks been deleted. + GenericTestUtils.waitFor(() -> { + try { + if (scm.getScmBlockManager().getDeletedBlockLog() + .getNumOfValidTransactions() == 0) { + return true; + } + } catch (IOException e) { + } + return false; + }, 100, 30000); + long end = System.currentTimeMillis(); + System.out.println("Block deletion costs " + (end - start) + "ms"); + } }