diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutorMetrics.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutorMetrics.java new file mode 100644 index 000000000000..096e1b94423e --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutorMetrics.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server.events; + +/** + * Metrics for Event Executors. + */ +public interface EventExecutorMetrics { + /** + * get name. + */ + String getName(); + + /** + * stop metrics. + */ + void stop(); + + /** + * Increment the number of the failed events. + */ + void incrFailedEvents(long delta); + + + /** + * Increment the number of the processed events. + */ + void incrSuccessfulEvents(long delta); + + /** + * Increment the number of the not-yet processed events. + */ + void incrQueuedEvents(long delta); + + /** + * Increment the number of events scheduled to be processed. + */ + void incrScheduledEvents(long delta); + + /** + * Increment the number of dropped events to be processed. + */ + void incrDroppedEvents(long delta); + + /** + * Increment the number of events having long wait in queue + * crossing threshold. + */ + void incrLongWaitInQueueEvents(long delta); + + /** + * Increment the number of events having long execution crossing threshold. + */ + void incrLongTimeExecutionEvents(long delta); + + /** + * Return the number of the failed events. + */ + long failedEvents(); + + + /** + * Return the number of the processed events. + */ + long successfulEvents(); + + /** + * Return the number of the not-yet processed events. + */ + long queuedEvents(); + + /** + * Return the number of events scheduled to be processed. + */ + long scheduledEvents(); + + /** + * Return the number of dropped events to be processed. + */ + long droppedEvents(); + + /** + * Return the number of events having long wait in queue crossing threshold. + */ + long longWaitInQueueEvents(); + + /** + * Return the number of events having long execution crossing threshold. + */ + long longTimeExecutionEvents(); +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java index 4949195cddb1..b4cdfac16dda 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java @@ -18,10 +18,6 @@ package org.apache.hadoop.hdds.server.events; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,12 +41,8 @@ * * @param

the payload type of events */ -@Metrics(context = "EventQueue") public class FixedThreadPoolWithAffinityExecutor implements EventExecutor

{ - - private static final String EVENT_QUEUE = "EventQueue"; - private static final Logger LOG = LoggerFactory.getLogger(FixedThreadPoolWithAffinityExecutor.class); @@ -65,28 +57,8 @@ public class FixedThreadPoolWithAffinityExecutor private final List> workQueues; private final List executors; - - // MutableCounterLong is thread safe. - @Metric - private MutableCounterLong queued; - - @Metric - private MutableCounterLong done; - - @Metric - private MutableCounterLong failed; - - @Metric - private MutableCounterLong scheduled; - - @Metric - private MutableCounterLong dropped; - @Metric - private MutableCounterLong longWaitInQueue; - - @Metric - private MutableCounterLong longTimeExecution; + private final EventExecutorMetrics metrics; private final AtomicBoolean isRunning = new AtomicBoolean(true); private long queueWaitThreshold @@ -98,20 +70,20 @@ public class FixedThreadPoolWithAffinityExecutor * Create FixedThreadPoolExecutor with affinity. * Based on the payload's hash code, the payload will be scheduled to the * same thread. - * - * @param name Unique name used in monitoring and metrics. */ public FixedThreadPoolWithAffinityExecutor( - String name, EventHandler

eventHandler, + EventHandler

eventHandler, List> workQueues, EventPublisher eventPublisher, Class

clazz, List executors, - Map executorMap) { - this.name = name; + Map executorMap, + EventExecutorMetrics metrics) { this.eventHandler = eventHandler; this.workQueues = workQueues; this.eventPublisher = eventPublisher; this.executors = executors; this.executorMap = executorMap; + this.metrics = metrics; + this.name = metrics.getName(); executorMap.put(clazz.getName(), this); // Add runnable which will wait for task over another queue @@ -125,11 +97,6 @@ public FixedThreadPoolWithAffinityExecutor( } ++i; } - - DefaultMetricsSystem.instance() - .register(EVENT_QUEUE + name, - "Event Executor metrics ", - this); } public void setQueueWaitThreshold(long queueWaitThreshold) { @@ -164,7 +131,7 @@ public static List initializeExecutorPool( @Override public void onMessage(EventHandler

handler, P message, EventPublisher publisher) { - queued.incr(); + metrics.incrQueuedEvents(1L); // For messages that need to be routed to the same thread need to // implement hashCode to match the messages. This should be safe for // other messages that implement the native hash. @@ -172,42 +139,42 @@ public void onMessage(EventHandler

handler, P message, EventPublisher BlockingQueue queue = workQueues.get(index); queue.add((Q) message); if (queue instanceof IQueueMetrics) { - dropped.incr(((IQueueMetrics) queue).getAndResetDropCount( + metrics.incrDroppedEvents(((IQueueMetrics) queue).getAndResetDropCount( message.getClass().getSimpleName())); } } @Override public long failedEvents() { - return failed.value(); + return metrics.failedEvents(); } @Override public long successfulEvents() { - return done.value(); + return metrics.successfulEvents(); } @Override public long queuedEvents() { - return queued.value(); + return metrics.queuedEvents(); } @Override public long scheduledEvents() { - return scheduled.value(); + return metrics.scheduledEvents(); } @Override public long droppedEvents() { - return dropped.value(); + return metrics.droppedEvents(); } public long longWaitInQueueEvents() { - return longWaitInQueue.value(); + return metrics.longWaitInQueueEvents(); } public long longTimeExecutionEvents() { - return longTimeExecution.value(); + return metrics.longTimeExecutionEvents(); } @Override @@ -217,7 +184,6 @@ public void close() { executor.shutdown(); } executorMap.clear(); - DefaultMetricsSystem.instance().unregisterSource(EVENT_QUEUE + name); } @Override @@ -267,26 +233,26 @@ public void run() { long curTime = Time.monotonicNow(); if (createTime != 0 && ((curTime - createTime) > executor.queueWaitThreshold)) { - executor.longWaitInQueue.incr(); + executor.metrics.incrLongWaitInQueueEvents(1L); LOG.warn("Event remained in queue for long time {} millisec, {}", (curTime - createTime), eventId); } - executor.scheduled.incr(); + executor.metrics.incrScheduledEvents(1L); try { executor.eventHandler.onMessage(report, executor.eventPublisher); - executor.done.incr(); + executor.metrics.incrSuccessfulEvents(1L); curTime = Time.monotonicNow(); if (createTime != 0 && (curTime - createTime) > executor.execWaitThreshold) { - executor.longTimeExecution.incr(); + executor.metrics.incrLongTimeExecutionEvents(1L); LOG.warn("Event taken long execution time {} millisec, {}", (curTime - createTime), eventId); } } catch (Exception ex) { LOG.error("Error on execution message {}", report, ex); - executor.failed.incr(); + executor.metrics.incrFailedEvents(1L); } if (Thread.currentThread().isInterrupted()) { LOG.warn("Interrupt of execution of Reports"); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java index 767941e52920..edc1e3628c80 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java @@ -81,12 +81,13 @@ public void simpleEventWithFixedThreadPoolExecutor() queues.add(eventQueue); Map reportExecutorMap = new ConcurrentHashMap<>(); + String name = EventQueue.getExecutorName(EVENT1, testHandler); + EventExecutorMetrics metrics = new ReportMetrics(name); queue.addHandler(EVENT1, new FixedThreadPoolWithAffinityExecutor<>( - EventQueue.getExecutorName(EVENT1, testHandler), testHandler, queues, queue, Long.class, FixedThreadPoolWithAffinityExecutor.initializeExecutorPool( - queues), reportExecutorMap), testHandler); + queues), reportExecutorMap, metrics), testHandler); queue.fireEvent(EVENT1, 11L); queue.fireEvent(EVENT1, 11L); @@ -151,4 +152,98 @@ public void multipleSubscriber() { Assertions.assertEquals(23, result[1]); } + + static class ReportMetrics implements EventExecutorMetrics { + private String name; + private long queued; + private long done; + private long failed; + private long scheduled; + private long dropped; + private long longWaitInQueue; + private long longTimeExecution; + + ReportMetrics(String name) { + this.name = name; + } + + @Override + public void stop() { + } + + @Override + public String getName() { + return name; + } + + @Override + public void incrFailedEvents(long delta) { + failed += delta; + } + + @Override + public void incrSuccessfulEvents(long delta) { + done += delta; + } + + @Override + public void incrQueuedEvents(long delta) { + queued += delta; + } + + @Override + public void incrScheduledEvents(long delta) { + scheduled += delta; + } + + @Override + public void incrDroppedEvents(long delta) { + dropped += delta; + } + + @Override + public void incrLongWaitInQueueEvents(long delta) { + longWaitInQueue += delta; + } + + @Override + public void incrLongTimeExecutionEvents(long delta) { + longTimeExecution += delta; + } + + @Override + public long failedEvents() { + return failed; + } + + @Override + public long successfulEvents() { + return done; + } + + @Override + public long queuedEvents() { + return queued; + } + + @Override + public long scheduledEvents() { + return scheduled; + } + + @Override + public long droppedEvents() { + return dropped; + } + + @Override + public long longWaitInQueueEvents() { + return longWaitInQueue; + } + + @Override + public long longTimeExecutionEvents() { + return longTimeExecution; + } + } } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/metrics/ContainerReportMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/metrics/ContainerReportMetrics.java new file mode 100644 index 000000000000..7cfd1dad4b70 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/metrics/ContainerReportMetrics.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.metrics; + +import org.apache.hadoop.hdds.server.events.EventExecutorMetrics; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +/** + * Metrics for Container Report Event Executors. + */ +@Metrics(context = "EventQueue") +public class ContainerReportMetrics implements EventExecutorMetrics { + private static final String EVENT_QUEUE = "EventQueue"; + private String name; + + @Metric + private MutableCounterLong queued; + + @Metric + private MutableCounterLong done; + + @Metric + private MutableCounterLong failed; + + @Metric + private MutableCounterLong scheduled; + + @Metric + private MutableCounterLong dropped; + + @Metric + private MutableCounterLong longWaitInQueue; + + @Metric + private MutableCounterLong longTimeExecution; + + public ContainerReportMetrics(String name) { + this.name = name; + DefaultMetricsSystem.instance().register(EVENT_QUEUE + this.name, + "Event Executor metrics ", this); + } + + @Override + public void stop() { + DefaultMetricsSystem.instance().unregisterSource(EVENT_QUEUE + name); + } + + @Override + public String getName() { + return name; + } + + @Override + public void incrFailedEvents(long delta) { + failed.incr(delta); + } + + @Override + public void incrSuccessfulEvents(long delta) { + done.incr(delta); + } + + @Override + public void incrQueuedEvents(long delta) { + queued.incr(delta); + } + + @Override + public void incrScheduledEvents(long delta) { + scheduled.incr(delta); + } + + @Override + public void incrDroppedEvents(long delta) { + dropped.incr(delta); + } + + @Override + public void incrLongWaitInQueueEvents(long delta) { + longWaitInQueue.incr(delta); + } + + @Override + public void incrLongTimeExecutionEvents(long delta) { + longTimeExecution.incr(delta); + } + + @Override + public long failedEvents() { + return failed.value(); + } + + @Override + public long successfulEvents() { + return done.value(); + } + + @Override + public long queuedEvents() { + return queued.value(); + } + + @Override + public long scheduledEvents() { + return scheduled.value(); + } + + @Override + public long droppedEvents() { + return dropped.value(); + } + + @Override + public long longWaitInQueueEvents() { + return longWaitInQueue.value(); + } + + @Override + public long longTimeExecutionEvents() { + return longTimeExecution.value(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/metrics/IncrementalContainerReportMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/metrics/IncrementalContainerReportMetrics.java new file mode 100644 index 000000000000..44627c777d26 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/metrics/IncrementalContainerReportMetrics.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.metrics; + +import org.apache.hadoop.metrics2.annotation.Metrics; + +/** + * Metrics for Container Report Event Executors. + */ +@Metrics(context = "EventQueue") +public class IncrementalContainerReportMetrics extends ContainerReportMetrics { + public IncrementalContainerReportMetrics(String name) { + super(name); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 733a418444fd..94462c52ad42 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -44,6 +44,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl; import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy; +import org.apache.hadoop.hdds.scm.container.metrics.ContainerReportMetrics; +import org.apache.hadoop.hdds.scm.container.metrics.IncrementalContainerReportMetrics; import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; import org.apache.hadoop.hdds.scm.container.replication.DatanodeCommandCountUpdatedHandler; import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager; @@ -80,6 +82,7 @@ import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec; import org.apache.hadoop.hdds.server.OzoneAdmins; import org.apache.hadoop.hdds.server.ServerUtils; +import org.apache.hadoop.hdds.server.events.EventExecutorMetrics; import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor; import org.apache.hadoop.hdds.server.http.RatisDropwizardExports; import org.apache.hadoop.hdds.utils.HAUtils; @@ -498,25 +501,26 @@ private void initializeEventHandlers() { = FixedThreadPoolWithAffinityExecutor.initializeExecutorPool(queues); Map reportExecutorMap = new ConcurrentHashMap<>(); + EventExecutorMetrics containerReportMetrics = new ContainerReportMetrics( + EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT, + containerReportHandler)); FixedThreadPoolWithAffinityExecutor containerReportExecutors = new FixedThreadPoolWithAffinityExecutor<>( - EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT, - containerReportHandler), containerReportHandler, queues, eventQueue, ContainerReportFromDatanode.class, executors, - reportExecutorMap); + reportExecutorMap, containerReportMetrics); containerReportExecutors.setQueueWaitThreshold(waitQueueThreshold); containerReportExecutors.setExecWaitThreshold(execWaitThreshold); + EventExecutorMetrics icrMetrics = new IncrementalContainerReportMetrics( + EventQueue.getExecutorName(SCMEvents.INCREMENTAL_CONTAINER_REPORT, + incrementalContainerReportHandler)); FixedThreadPoolWithAffinityExecutor incrementalReportExecutors = new FixedThreadPoolWithAffinityExecutor<>( - EventQueue.getExecutorName( - SCMEvents.INCREMENTAL_CONTAINER_REPORT, - incrementalContainerReportHandler), incrementalContainerReportHandler, queues, eventQueue, IncrementalContainerReportFromDatanode.class, executors, - reportExecutorMap); + reportExecutorMap, icrMetrics); incrementalReportExecutors.setQueueWaitThreshold(waitQueueThreshold); incrementalReportExecutors.setExecWaitThreshold(execWaitThreshold); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index 19c570cd0fce..6e5c3b302c8c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -47,6 +47,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.metrics.ContainerReportMetrics; +import org.apache.hadoop.hdds.scm.container.metrics.IncrementalContainerReportMetrics; import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; @@ -65,6 +67,7 @@ import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.events.EventExecutor; +import org.apache.hadoop.hdds.server.events.EventExecutorMetrics; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor; @@ -867,14 +870,15 @@ public void testContainerReportQueueWithDrop() throws Exception { .initializeExecutorPool(queues); Map reportExecutorMap = new ConcurrentHashMap<>(); + EventExecutorMetrics containerReportMetrics = new ContainerReportMetrics( + EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT, + containerReportHandler)); EventExecutor containerReportExecutors = new FixedThreadPoolWithAffinityExecutor<>( - EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT, - containerReportHandler), containerReportHandler, queues, eventQueue, ContainerReportFromDatanode.class, executors, - reportExecutorMap); + reportExecutorMap, containerReportMetrics); eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportExecutors, containerReportHandler); eventQueue.fireEvent(SCMEvents.CONTAINER_REPORT, dndata); @@ -909,15 +913,16 @@ public void testContainerReportQueueTakingMoreTime() throws Exception { .initializeExecutorPool(queues); Map reportExecutorMap = new ConcurrentHashMap<>(); + EventExecutorMetrics containerReportMetrics = new ContainerReportMetrics( + EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT, + containerReportHandler)); FixedThreadPoolWithAffinityExecutor containerReportExecutors = new FixedThreadPoolWithAffinityExecutor<>( - EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT, - containerReportHandler), containerReportHandler, queues, eventQueue, ContainerReportFromDatanode.class, executors, - reportExecutorMap); + reportExecutorMap, containerReportMetrics); containerReportExecutors.setQueueWaitThreshold(1000); containerReportExecutors.setExecWaitThreshold(1000); @@ -964,14 +969,15 @@ public void testIncrementalContainerReportQueue() throws Exception { .initializeExecutorPool(queues); Map reportExecutorMap = new ConcurrentHashMap<>(); + EventExecutorMetrics icrMetrics = new IncrementalContainerReportMetrics( + EventQueue.getExecutorName(SCMEvents.INCREMENTAL_CONTAINER_REPORT, + icr)); EventExecutor containerReportExecutors = new FixedThreadPoolWithAffinityExecutor<>( - EventQueue.getExecutorName(SCMEvents.INCREMENTAL_CONTAINER_REPORT, - icr), icr, queues, eventQueue, IncrementalContainerReportFromDatanode.class, executors, - reportExecutorMap); + reportExecutorMap, icrMetrics); eventQueue.addHandler(SCMEvents.INCREMENTAL_CONTAINER_REPORT, containerReportExecutors, icr); eventQueue.fireEvent(SCMEvents.INCREMENTAL_CONTAINER_REPORT, dndata); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index 51499a0d6c9f..095906198467 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -43,6 +43,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; +import org.apache.hadoop.hdds.scm.container.metrics.ContainerReportMetrics; +import org.apache.hadoop.hdds.scm.container.metrics.IncrementalContainerReportMetrics; import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer; @@ -67,6 +69,7 @@ import org.apache.hadoop.hdds.scm.safemode.SafeModeManager; import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; +import org.apache.hadoop.hdds.server.events.EventExecutorMetrics; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor; import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager; @@ -233,25 +236,26 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf, = FixedThreadPoolWithAffinityExecutor.initializeExecutorPool(queues); Map reportExecutorMap = new ConcurrentHashMap<>(); + EventExecutorMetrics containerReportMetrics = new ContainerReportMetrics( + EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT, + containerReportHandler)); FixedThreadPoolWithAffinityExecutor containerReportExecutors = new FixedThreadPoolWithAffinityExecutor<>( - EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT, - containerReportHandler), containerReportHandler, queues, eventQueue, ContainerReportFromDatanode.class, executors, - reportExecutorMap); + reportExecutorMap, containerReportMetrics); containerReportExecutors.setQueueWaitThreshold(waitQueueThreshold); containerReportExecutors.setExecWaitThreshold(execWaitThreshold); + EventExecutorMetrics icrMetrics = new IncrementalContainerReportMetrics( + EventQueue.getExecutorName(SCMEvents.INCREMENTAL_CONTAINER_REPORT, + icrHandler)); FixedThreadPoolWithAffinityExecutor incrementalReportExecutors = new FixedThreadPoolWithAffinityExecutor<>( - EventQueue.getExecutorName( - SCMEvents.INCREMENTAL_CONTAINER_REPORT, - icrHandler), icrHandler, queues, eventQueue, IncrementalContainerReportFromDatanode.class, executors, - reportExecutorMap); + reportExecutorMap, icrMetrics); incrementalReportExecutors.setQueueWaitThreshold(waitQueueThreshold); incrementalReportExecutors.setExecWaitThreshold(execWaitThreshold); eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportExecutors,