handler,
*/
long queuedEvents();
+ /**
+ * Return the number of events scheduled to be processed.
+ */
+ long scheduledEvents();
+
/**
* The human readable name for the event executor.
*
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
index 3d7f06b88a44..e3a18b74276f 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
@@ -62,11 +62,6 @@ public class EventQueue implements EventPublisher, AutoCloseable {
private boolean isSilent = false;
- public > void addHandler(
- EVENT_TYPE event, EventHandler handler) {
- this.addHandler(event, handler, generateHandlerName(handler));
- }
-
/**
* Add new handler to the event queue.
*
@@ -76,21 +71,29 @@ public > void addHandler(
* @param event Triggering event.
* @param handler Handler of event (will be called from a separated
* thread)
- * @param handlerName The name of handler (should be unique together with
- * the event name)
* @param The type of the event payload.
* @param The type of the event identifier.
*/
public > void addHandler(
- EVENT_TYPE event, EventHandler handler, String handlerName) {
+ EVENT_TYPE event, EventHandler handler) {
+ Preconditions.checkNotNull(handler, "Handler should not be null.");
validateEvent(event);
- Preconditions.checkNotNull(handler, "Handler name should not be null.");
- String executorName =
- StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR
- + handlerName;
+ String executorName = getExecutorName(event, handler);
this.addHandler(event, new SingleThreadExecutor<>(executorName), handler);
}
+ /**
+ * Return executor name for the given event and handler name.
+ * @param event
+ * @param eventHandler
+ * @return executor name
+ */
+ public static String getExecutorName(Event event,
+ EventHandler eventHandler) {
+ return StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR
+ + generateHandlerName(eventHandler);
+ }
+
private > void validateEvent(EVENT_TYPE event) {
Preconditions
.checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR),
@@ -99,7 +102,8 @@ private > void validateEvent(EVENT_TYPE event) {
}
- private String generateHandlerName(EventHandler handler) {
+ private static String generateHandlerName(
+ EventHandler handler) {
if (!handler.getClass().isAnonymousClass()) {
return handler.getClass().getSimpleName();
} else {
@@ -111,7 +115,7 @@ private String generateHandlerName(EventHandler handler) {
* Add event handler with custom executor.
*
* @param event Triggering event.
- * @param executor The executor imlementation to deliver events from a
+ * @param executor The executor implementation to deliver events from a
* separated threads. Please keep in your mind that
* registering metrics is the responsibility of the
* caller.
@@ -128,6 +132,11 @@ public > void addHandler(
return;
}
validateEvent(event);
+ String executorName = getExecutorName(event, handler);
+ Preconditions.checkState(executorName.equals(executor.getName()),
+ "Event Executor name is not matching the specified format. " +
+ "It should be " + executorName + " but it is " +
+ executor.getName());
executors.putIfAbsent(event, new HashMap<>());
executors.get(event).putIfAbsent(executor, new ArrayList<>());
@@ -268,4 +277,11 @@ public void close() {
public void setSilent(boolean silent) {
isSilent = silent;
}
+
+
+ @VisibleForTesting
+ public Map> getExecutorAndHandler(
+ Event event) {
+ return executors.get(event);
+ }
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolExecutor.java
new file mode 100644
index 000000000000..2015c55a93f9
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolExecutor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_PREFIX;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT;
+
+/**
+ * Fixed thread pool EventExecutor to call all the event handler one-by-one.
+ *
+ * @param
the payload type of events
+ */
+@Metrics(context = "EventQueue")
+public class FixedThreadPoolExecutor
implements EventExecutor
{
+
+ private static final String EVENT_QUEUE = "EventQueue";
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FixedThreadPoolExecutor.class);
+
+ private final String name;
+
+ private final ExecutorService executor;
+
+ @Metric
+ private MutableCounterLong queued;
+
+ @Metric
+ private MutableCounterLong done;
+
+ @Metric
+ private MutableCounterLong failed;
+
+ @Metric
+ private MutableCounterLong scheduled;
+
+ /**
+ * Create FixedThreadPoolExecutor.
+ *
+ * @param eventName
+ * @param name Unique name used in monitoring and metrics.
+ */
+ public FixedThreadPoolExecutor(String eventName, String name) {
+ this.name = name;
+ DefaultMetricsSystem.instance()
+ .register(EVENT_QUEUE + name, "Event Executor metrics ", this);
+
+
+ OzoneConfiguration configuration = new OzoneConfiguration();
+ int threadPoolSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
+ StringUtils.camelize(eventName) + ".thread.pool.size",
+ OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
+
+ executor = Executors.newFixedThreadPool(threadPoolSize, runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName(EVENT_QUEUE + "-" + name);
+ return thread;
+ });
+ }
+
+ @Override
+ public void onMessage(EventHandler
handler, P message, EventPublisher
+ publisher) {
+ queued.incr();
+ executor.execute(() -> {
+ scheduled.incr();
+ try {
+ handler.onMessage(message, publisher);
+ done.incr();
+ } catch (Exception ex) {
+ LOG.error("Error on execution message {}", message, ex);
+ failed.incr();
+ }
+ });
+ }
+
+ @Override
+ public long failedEvents() {
+ return failed.value();
+ }
+
+ @Override
+ public long successfulEvents() {
+ return done.value();
+ }
+
+ @Override
+ public long queuedEvents() {
+ return queued.value();
+ }
+
+ @Override
+ public long scheduledEvents() {
+ return scheduled.value();
+ }
+
+ @Override
+ public void close() {
+ executor.shutdown();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
index aa289319d7a7..09be9cfd5fdd 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
@@ -54,6 +54,9 @@ public class SingleThreadExecutor
implements EventExecutor
{
@Metric
private MutableCounterLong failed;
+ @Metric
+ private MutableCounterLong scheduled;
+
/**
* Create SingleThreadExecutor.
*
@@ -77,6 +80,7 @@ public void onMessage(EventHandler
handler, P message, EventPublisher
publisher) {
queued.incr();
executor.execute(() -> {
+ scheduled.incr();
try {
handler.onMessage(message, publisher);
done.incr();
@@ -102,6 +106,11 @@ public long queuedEvents() {
return queued.value();
}
+ @Override
+ public long scheduledEvents() {
+ return scheduled.value();
+ }
+
@Override
public void close() {
executor.shutdown();
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
index 0c1200f6d141..f2a7b387acb1 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
@@ -24,6 +24,8 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* Testing the basic functionality of the event queue.
*/
@@ -34,13 +36,10 @@ public class TestEventQueue {
private static final Event EVENT2 =
new TypedEvent<>(Long.class, "SCM_EVENT2");
- private static final Event EVENT3 =
- new TypedEvent<>(Long.class, "SCM_EVENT3");
- private static final Event EVENT4 =
- new TypedEvent<>(Long.class, "SCM_EVENT4");
-
private EventQueue queue;
+ private AtomicLong eventTotal = new AtomicLong();
+
@Before
public void startEventQueue() {
DefaultMetricsSystem.initialize(getClass().getSimpleName());
@@ -66,6 +65,59 @@ public void simpleEvent() {
}
+ @Test
+ public void simpleEventWithFixedThreadPoolExecutor() {
+
+ TestHandler testHandler = new TestHandler();
+
+ queue.addHandler(EVENT1, new FixedThreadPoolExecutor<>(EVENT1.getName(),
+ EventQueue.getExecutorName(EVENT1, testHandler)), testHandler);
+
+ queue.fireEvent(EVENT1, 11L);
+ queue.fireEvent(EVENT1, 11L);
+ queue.fireEvent(EVENT1, 11L);
+ queue.fireEvent(EVENT1, 11L);
+ queue.fireEvent(EVENT1, 11L);
+ queue.fireEvent(EVENT1, 11L);
+ queue.fireEvent(EVENT1, 11L);
+ queue.fireEvent(EVENT1, 11L);
+ queue.fireEvent(EVENT1, 11L);
+ queue.fireEvent(EVENT1, 11L);
+
+ EventExecutor eventExecutor =
+ queue.getExecutorAndHandler(EVENT1).keySet().iterator().next();
+
+ // As it is fixed threadpool executor with 10 threads, all should be
+ // scheduled.
+ Assert.assertEquals(10, eventExecutor.queuedEvents());
+
+ // As we don't see all 10 events scheduled.
+ Assert.assertTrue(eventExecutor.scheduledEvents() > 1 &&
+ eventExecutor.scheduledEvents() <= 10);
+
+ queue.processAll(60000);
+ Assert.assertEquals(110, eventTotal.intValue());
+
+ Assert.assertEquals(10, eventExecutor.successfulEvents());
+ eventTotal.set(0);
+
+ }
+
+ /**
+ * Event handler used in tests.
+ */
+ public class TestHandler implements EventHandler {
+ @Override
+ public void onMessage(Object payload, EventPublisher publisher) {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ eventTotal.getAndAdd((long) payload);
+ }
+ }
+
@Test
public void multipleSubscriber() {
final long[] result = new long[2];
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 8cd3357c3681..01862ecc3774 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -62,6 +62,7 @@
import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultProfile;
import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient;
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolExecutor;
import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmConfig;
@@ -342,6 +343,35 @@ private StorageContainerManager(OzoneConfiguration conf,
securityProtocolServer = null;
}
+ scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
+ .OZONE_ADMINISTRATORS);
+ String scmShortUsername =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+
+ if (!scmAdminUsernames.contains(scmShortUsername)) {
+ scmAdminUsernames.add(scmShortUsername);
+ }
+
+ datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this,
+ eventQueue);
+ blockProtocolServer = new SCMBlockProtocolServer(conf, this);
+ clientProtocolServer = new SCMClientProtocolServer(conf, this);
+
+ initializeEventHandlers();
+
+ containerBalancer = new ContainerBalancer(scmNodeManager,
+ containerManager, replicationManager, configuration, scmContext);
+ LOG.info(containerBalancer.toString());
+
+ // Emit initial safe mode status, as now handlers are registered.
+ scmSafeModeManager.emitSafeModeStatus();
+
+ registerMXBean();
+ registerMetricsSource(this);
+ }
+
+
+ private void initializeEventHandlers() {
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(
pipelineManager, containerManager, scmContext);
@@ -349,52 +379,43 @@ private StorageContainerManager(OzoneConfiguration conf,
new NodeReportHandler(scmNodeManager);
PipelineReportHandler pipelineReportHandler =
new PipelineReportHandler(
- scmSafeModeManager, pipelineManager, scmContext, conf);
+ scmSafeModeManager, pipelineManager, scmContext, configuration);
CommandStatusReportHandler cmdStatusReportHandler =
new CommandStatusReportHandler();
NewNodeHandler newNodeHandler = new NewNodeHandler(pipelineManager,
- scmDecommissionManager, conf, serviceManager);
+ scmDecommissionManager, configuration, serviceManager);
StaleNodeHandler staleNodeHandler =
- new StaleNodeHandler(scmNodeManager, pipelineManager, conf);
+ new StaleNodeHandler(scmNodeManager, pipelineManager, configuration);
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
pipelineManager, containerManager);
StartDatanodeAdminHandler datanodeStartAdminHandler =
new StartDatanodeAdminHandler(scmNodeManager, pipelineManager);
NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler =
- new NonHealthyToHealthyNodeHandler(conf, serviceManager);
+ new NonHealthyToHealthyNodeHandler(configuration, serviceManager);
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
PendingDeleteHandler pendingDeleteHandler =
new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
ContainerReportHandler containerReportHandler =
- new ContainerReportHandler(
- scmNodeManager, containerManager, scmContext, conf);
+ new ContainerReportHandler(scmNodeManager, containerManager,
+ scmContext, configuration);
+
IncrementalContainerReportHandler incrementalContainerReportHandler =
new IncrementalContainerReportHandler(
scmNodeManager, containerManager, scmContext);
PipelineActionHandler pipelineActionHandler =
- new PipelineActionHandler(pipelineManager, scmContext, conf);
+ new PipelineActionHandler(pipelineManager, scmContext, configuration);
CRLStatusReportHandler crlStatusReportHandler =
- new CRLStatusReportHandler(certificateStore, conf);
+ new CRLStatusReportHandler(certificateStore, configuration);
- scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
- .OZONE_ADMINISTRATORS);
- String scmShortUsername =
- UserGroupInformation.getCurrentUser().getShortUserName();
-
- if (!scmAdminUsernames.contains(scmShortUsername)) {
- scmAdminUsernames.add(scmShortUsername);
- }
-
- datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this,
- eventQueue);
- blockProtocolServer = new SCMBlockProtocolServer(conf, this);
- clientProtocolServer = new SCMClientProtocolServer(conf, this);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
- eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler);
+ eventQueue.addHandler(SCMEvents.CONTAINER_REPORT,
+ new FixedThreadPoolExecutor<>(SCMEvents.CONTAINER_REPORT.getName(),
+ EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT,
+ containerReportHandler)), containerReportHandler);
eventQueue.addHandler(SCMEvents.INCREMENTAL_CONTAINER_REPORT,
incrementalContainerReportHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler);
@@ -415,15 +436,6 @@ private StorageContainerManager(OzoneConfiguration conf,
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
eventQueue.addHandler(SCMEvents.CRL_STATUS_REPORT, crlStatusReportHandler);
- containerBalancer = new ContainerBalancer(scmNodeManager,
- containerManager, replicationManager, configuration, scmContext);
- LOG.info(containerBalancer.toString());
-
- // Emit initial safe mode status, as now handlers are registered.
- scmSafeModeManager.emitSafeModeStatus();
-
- registerMXBean();
- registerMetricsSource(this);
}
private void initializeCertificateClient() {