Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class DatanodeConfiguration {
private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;

static final int CONTAINER_DELETE_THREADS_DEFAULT = 2;
static final int BLOCK_DELETE_THREADS_DEFAULT = 5;

/**
* The maximum number of threads used to delete containers on a datanode
Expand All @@ -92,6 +93,37 @@ public class DatanodeConfiguration {
)
private int containerDeleteThreads = CONTAINER_DELETE_THREADS_DEFAULT;

/**
* The maximum number of threads used to handle delete block commands.
* It takes about 200ms to open a RocksDB with HDD media, so basically DN
* can handle 300 individual container delete tx every 60s if RocksDB cache
* missed. With max threads 5, optimistically DN can handle 1500 individual
* container delete tx in 60s with RocksDB cache miss.
*/
@Config(key = "block.delete.threads.max",
type = ConfigType.INT,
defaultValue = "5",
tags = {DATANODE},
description = "The maximum number of threads used to handle delete " +
" blocks on a datanode"
)
private int blockDeleteThreads = BLOCK_DELETE_THREADS_DEFAULT;

/**
* The maximum number of commands in queued list.
* 1440 = 60 * 24, which means if SCM send a delete command every minute,
* if the commands are pined up for more than 1 day, DN will start to discard
* new comming commands.
*/
@Config(key = "block.delete.queue.limit",
type = ConfigType.INT,
defaultValue = "1440",
tags = {DATANODE},
description = "The maximum number of block delete commands queued on "+
" a datanode"
)
private int blockDeleteQueueLimit = 60 * 24;

@Config(key = "block.deleting.service.interval",
defaultValue = "60s",
type = ConfigType.TIME,
Expand Down Expand Up @@ -292,4 +324,20 @@ public Duration getDiskCheckTimeout() {
public void setDiskCheckTimeout(Duration duration) {
this.diskCheckTimeout = duration.toMillis();
}

public int getBlockDeleteThreads() {
return blockDeleteThreads;
}

public void setBlockDeleteThreads(int threads) {
this.blockDeleteThreads = threads;
}

public int getBlockDeleteQueueLimit() {
return blockDeleteQueueLimit;
}

public void setBlockDeleteQueueLimit(int queueLimit) {
this.blockDeleteQueueLimit = queueLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
commandDispatcher = CommandDispatcher.newBuilder()
.addHandler(new CloseContainerCommandHandler())
.addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(),
conf))
conf, dnConf.getBlockDeleteThreads(),
dnConf.getBlockDeleteQueueLimit()))
.addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
.addHandler(new DeleteContainerCommandHandler(
dnConf.getContainerDeleteThreads()))
Expand Down
Loading