Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@ public class DatanodeConfiguration {
* The maximum number of replication commands a single datanode can execute
* simultaneously.
*/
private int replicationMaxStreams = 10;
private final int replicationMaxStreamsDefault = 10;
private int replicationMaxStreams = replicationMaxStreamsDefault;
/**
* The maximum number of threads used to delete containers on a datanode
* simultaneously.
*/
private final int containerDeleteThreadsDefault = 2;
private int containerDeleteThreads = containerDeleteThreadsDefault;

@Config(key = "replication.streams.limit",
type = ConfigType.INT,
Expand All @@ -48,7 +55,8 @@ public void setReplicationMaxStreams(int val) {
if (val < 1) {
LOG.warn("hdds.datanode.replication.streams.limit must be greater than" +
"zero and was set to {}. Defaulting to {}",
val, replicationMaxStreams);
val, replicationMaxStreamsDefault);
replicationMaxStreams = replicationMaxStreamsDefault;
} else {
this.replicationMaxStreams = val;
}
Expand All @@ -58,4 +66,26 @@ public int getReplicationMaxStreams() {
return replicationMaxStreams;
}

@Config(key = "container.delete.threads.max",
type = ConfigType.INT,
defaultValue = "2",
tags = {DATANODE},
description = "The maximum number of threads used to delete containers " +
"on a datanode"
)
public void setContainerDeleteThreads(int val) {
if (val < 1) {
LOG.warn("hdds.datanode.container.delete.threads.max must be greater " +
"than zero and was set to {}. Defaulting to {}",
val, containerDeleteThreadsDefault);
containerDeleteThreads = containerDeleteThreadsDefault;
} else {
this.containerDeleteThreads = val;
}
}

public int getContainerDeleteThreads() {
return containerDeleteThreads;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
.addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(),
conf))
.addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
.addHandler(new DeleteContainerCommandHandler())
.addHandler(new DeleteContainerCommandHandler(
dnConf.getContainerDeleteThreads()))
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
Expand Down Expand Up @@ -278,6 +279,10 @@ public void close() throws IOException {
if (jvmPauseMonitor != null) {
jvmPauseMonitor.stop();
}

if (commandDispatcher != null) {
commandDispatcher.stop();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public void handle(SCMCommand command) {
}
}

public void stop() {
for (CommandHandler c : handlerMap.values()) {
c.stop();
}
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,12 @@ default void updateCommandStatus(StateContext context, SCMCommand command,
command.getId());
}
}

/**
* Override for any command with an internal threadpool, and stop the
* executor when this method is invoked.
*/
default void stop() {
// Default implementation does nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.ozone.container.common.statemachine
Expand All @@ -31,6 +32,12 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Handler to process the DeleteContainerCommand from SCM.
Expand All @@ -40,28 +47,39 @@ public class DeleteContainerCommandHandler implements CommandHandler {
private static final Logger LOG =
LoggerFactory.getLogger(DeleteContainerCommandHandler.class);

private int invocationCount;
private long totalTime;
private final AtomicInteger invocationCount = new AtomicInteger(0);
private final AtomicLong totalTime = new AtomicLong(0);
private final ExecutorService executor;

public DeleteContainerCommandHandler(int threadPoolSize) {
this.executor = new ThreadPoolExecutor(
0, threadPoolSize, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("DeleteContainerThread-%d")
.build());
}

@Override
public void handle(final SCMCommand command,
final OzoneContainer ozoneContainer,
final StateContext context,
final SCMConnectionManager connectionManager) {
final long startTime = Time.monotonicNow();
invocationCount++;
try {
final DeleteContainerCommand deleteContainerCommand =
(DeleteContainerCommand) command;
final ContainerController controller = ozoneContainer.getController();
controller.deleteContainer(deleteContainerCommand.getContainerID(),
deleteContainerCommand.isForce());
} catch (IOException e) {
LOG.error("Exception occurred while deleting the container.", e);
} finally {
totalTime += Time.monotonicNow() - startTime;
}

final DeleteContainerCommand deleteContainerCommand =
(DeleteContainerCommand) command;
final ContainerController controller = ozoneContainer.getController();
executor.execute(() -> {
final long startTime = Time.monotonicNow();
invocationCount.incrementAndGet();
try {
controller.deleteContainer(deleteContainerCommand.getContainerID(),
deleteContainerCommand.isForce());
} catch (IOException e) {
LOG.error("Exception occurred while deleting the container.", e);
} finally {
totalTime.getAndAdd(Time.monotonicNow() - startTime);
}
});
}

@Override
Expand All @@ -71,11 +89,27 @@ public SCMCommandProto.Type getCommandType() {

@Override
public int getInvocationCount() {
return this.invocationCount;
return this.invocationCount.get();
}

@Override
public long getAverageRunTime() {
return invocationCount == 0 ? 0 : totalTime / invocationCount;
final int invocations = invocationCount.get();
return invocations == 0 ?
0 : totalTime.get() / invocations;
}

@Override
public void stop() {
try {
executor.shutdown();
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException ie) {
// Ignore, we don't really care about the failure.
Thread.currentThread().interrupt();
}
}

}