-
Notifications
You must be signed in to change notification settings - Fork 588
HDDS-2448 Delete container command should used a thread pool #142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,11 @@ public class DatanodeConfiguration { | |
| * simultaneously. | ||
| */ | ||
| private int replicationMaxStreams = 10; | ||
| /** | ||
| * The maximum number of threads used to delete containers on a datanode | ||
| * simultaneously. | ||
| */ | ||
| private int deleteContainerThreads = 2; | ||
|
|
||
| @Config(key = "replication.streams.limit", | ||
| type = ConfigType.INT, | ||
|
|
@@ -58,4 +63,25 @@ public int getReplicationMaxStreams() { | |
| return replicationMaxStreams; | ||
| } | ||
|
|
||
| @Config(key = "delete.container.threads", | ||
| type = ConfigType.INT, | ||
| defaultValue = "2", | ||
| tags = {DATANODE}, | ||
| description = "The maximum number of threads used to delete containers " + | ||
| "on a datanode" | ||
| ) | ||
| public void setDeleteContainerThreads(int val) { | ||
| if (val < 1) { | ||
| LOG.warn("hdds.datanode.delete.container.threads must be greater than" + | ||
| "zero and was set to {}. Defaulting to {}", | ||
| val, deleteContainerThreads); | ||
|
||
| } else { | ||
| this.deleteContainerThreads = val; | ||
| } | ||
| } | ||
|
|
||
| public int getDeleteContainerThreads() { | ||
| return deleteContainerThreads; | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
|
|
||
| package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; | ||
|
|
||
| import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
| import org.apache.hadoop.hdds.protocol.proto | ||
| .StorageContainerDatanodeProtocolProtos.SCMCommandProto; | ||
| import org.apache.hadoop.ozone.container.common.statemachine | ||
|
|
@@ -31,6 +32,11 @@ | |
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| /** | ||
| * Handler to process the DeleteContainerCommand from SCM. | ||
|
|
@@ -40,28 +46,39 @@ public class DeleteContainerCommandHandler implements CommandHandler { | |
| private static final Logger LOG = | ||
| LoggerFactory.getLogger(DeleteContainerCommandHandler.class); | ||
|
|
||
| private int invocationCount; | ||
| private long totalTime; | ||
| private final AtomicInteger invocationCount = new AtomicInteger(0); | ||
| private final AtomicLong totalTime = new AtomicLong(0); | ||
| private final ThreadPoolExecutor executor; | ||
|
||
|
|
||
| public DeleteContainerCommandHandler(int threadPoolSize) { | ||
| this.executor = new ThreadPoolExecutor( | ||
| 0, threadPoolSize, 60, TimeUnit.SECONDS, | ||
| new LinkedBlockingQueue<>(), | ||
| new ThreadFactoryBuilder().setDaemon(true) | ||
| .setNameFormat("DeleteContainerThread-%d") | ||
| .build()); | ||
| } | ||
|
|
||
| @Override | ||
| public void handle(final SCMCommand command, | ||
| final OzoneContainer ozoneContainer, | ||
| final StateContext context, | ||
| final SCMConnectionManager connectionManager) { | ||
| final long startTime = Time.monotonicNow(); | ||
| invocationCount++; | ||
| try { | ||
| final DeleteContainerCommand deleteContainerCommand = | ||
| (DeleteContainerCommand) command; | ||
| final ContainerController controller = ozoneContainer.getController(); | ||
| controller.deleteContainer(deleteContainerCommand.getContainerID(), | ||
| deleteContainerCommand.isForce()); | ||
| } catch (IOException e) { | ||
| LOG.error("Exception occurred while deleting the container.", e); | ||
| } finally { | ||
| totalTime += Time.monotonicNow() - startTime; | ||
| } | ||
|
|
||
| final DeleteContainerCommand deleteContainerCommand = | ||
| (DeleteContainerCommand) command; | ||
| final ContainerController controller = ozoneContainer.getController(); | ||
| executor.execute(() -> { | ||
| final long startTime = Time.monotonicNow(); | ||
| invocationCount.incrementAndGet(); | ||
| try { | ||
| controller.deleteContainer(deleteContainerCommand.getContainerID(), | ||
| deleteContainerCommand.isForce()); | ||
| } catch (IOException e) { | ||
| LOG.error("Exception occurred while deleting the container.", e); | ||
| } finally { | ||
| totalTime.getAndAdd(Time.monotonicNow() - startTime); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -71,11 +88,26 @@ public SCMCommandProto.Type getCommandType() { | |
|
|
||
| @Override | ||
| public int getInvocationCount() { | ||
| return this.invocationCount; | ||
| return this.invocationCount.get(); | ||
| } | ||
|
|
||
| @Override | ||
| public long getAverageRunTime() { | ||
| return invocationCount == 0 ? 0 : totalTime / invocationCount; | ||
| return invocationCount.get() == 0 ? | ||
| 0 : totalTime.get() / invocationCount.get(); | ||
|
||
| } | ||
|
|
||
| @Override | ||
| public void stop() { | ||
| try { | ||
| executor.shutdown(); | ||
| if (!executor.awaitTermination(3, TimeUnit.SECONDS)) { | ||
| executor.shutdownNow(); | ||
| } | ||
| } catch (InterruptedException ie) { | ||
| // Ignore, we don't really care about the failure. | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think
container.delete.threads.maxwould better reflect both the hierarchical nature of config item naming, and the fact that it's an upper limit, not a fixed thread count.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct. It makes more sense to have "container.delete". I have changed this and also the related method names so they match up.