diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutorMetrics.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutorMetrics.java new file mode 100644 index 000000000000..894191798b4a --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutorMetrics.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.server.events; + +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +/** + * Metrics source for EventExecutor implementations. + */ +@Metrics(about = "Executor Metrics", context = "EventQueue") +public class EventExecutorMetrics implements MetricsSource { + private final String name; + private final String description; + private final MetricsRegistry registry; + + @Metric("Number of tasks queued") + private MutableCounterLong queued; + + @Metric("Number of tasks scheduled") + private MutableCounterLong scheduled; + + @Metric("Number of tasks completed") + private MutableCounterLong done; + + @Metric("Number of tasks failed") + private MutableCounterLong failed; + + @Metric("Number of tasks dropped") + private MutableCounterLong dropped; + + @Metric("Number of tasks with long execution time") + private MutableCounterLong longExecution; + + @Metric("Number of tasks with long wait time in queue") + private MutableCounterLong longWaitInQueue; + + public EventExecutorMetrics(String name, String description) { + this.name = name; + this.description = description; + + registry = new MetricsRegistry(name); + init(); + } + + public void init() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.register(name, description, this); + } + + public void unregister() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(name); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder builder = collector.addRecord(name); + registry.snapshot(builder, all); + } + + public void incrementQueued() { + queued.incr(); + } + + public void incrementScheduled() { + scheduled.incr(); + } + + public void incrementDone() { + done.incr(); + } + + public void incrementFailed() { + failed.incr(); + } + + public void incrementDropped() { + dropped.incr(); + } + + public void incrementDropped(int count) { + dropped.incr(count); + } + + public void incrementLongExecution() { + longExecution.incr(); + } + + public void incrementLongWaitInQueue() { + longWaitInQueue.incr(); + } + + public long getQueued() { + return queued.value(); + } + + public long getScheduled() { + return scheduled.value(); + } + + public long getDone() { + return done.value(); + } + + public long getFailed() { + return failed.value(); + } + + public long getDropped() { + return dropped.value(); + } + + public long getLongExecution() { + return longExecution.value(); + } + + public long getLongWaitInQueue() { + return longWaitInQueue.value(); + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java index f934b20ed610..07804c2f2e9f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java @@ -30,11 +30,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.hadoop.hdds.utils.MetricsUtil; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +41,6 @@ * * @param

the payload type of events */ -@Metrics(context = "EventQueue") public class FixedThreadPoolWithAffinityExecutor implements EventExecutor

{ @@ -66,27 +61,7 @@ public class FixedThreadPoolWithAffinityExecutor private final List executors; - // MutableCounterLong is thread safe. - @Metric - private MutableCounterLong queued; - - @Metric - private MutableCounterLong done; - - @Metric - private MutableCounterLong failed; - - @Metric - private MutableCounterLong scheduled; - - @Metric - private MutableCounterLong dropped; - - @Metric - private MutableCounterLong longWaitInQueue; - - @Metric - private MutableCounterLong longTimeExecution; + private final EventExecutorMetrics metrics; private final AtomicBoolean isRunning = new AtomicBoolean(true); private long queueWaitThreshold @@ -112,6 +87,7 @@ public FixedThreadPoolWithAffinityExecutor( this.eventPublisher = eventPublisher; this.executors = executors; this.executorMap = executorMap; + this.metrics = new EventExecutorMetrics(EVENT_QUEUE + name, "Event Executor metrics"); executorMap.put(clazz.getName(), this); // Add runnable which will wait for task over another queue @@ -125,9 +101,6 @@ public FixedThreadPoolWithAffinityExecutor( } ++i; } - - MetricsUtil.registerDynamic(this, EVENT_QUEUE + name, - "Event Executor metrics ", "EventQueue"); } public void setQueueWaitThreshold(long queueWaitThreshold) { @@ -168,7 +141,7 @@ public static List initializeExecutorPool( @Override public void onMessage(EventHandler

handler, P message, EventPublisher publisher) { - queued.incr(); + metrics.incrementQueued(); // For messages that need to be routed to the same thread need to // implement hashCode to match the messages. This should be safe for // other messages that implement the native hash. @@ -176,44 +149,44 @@ public void onMessage(EventHandler

handler, P message, EventPublisher BlockingQueue queue = workQueues.get(index); queue.add((Q) message); if (queue instanceof IQueueMetrics) { - dropped.incr(((IQueueMetrics) queue).getAndResetDropCount( + metrics.incrementDropped(((IQueueMetrics) queue).getAndResetDropCount( message.getClass().getSimpleName())); } } @Override public long failedEvents() { - return failed.value(); + return metrics.getFailed(); } @Override public long successfulEvents() { - return done.value(); + return metrics.getDone(); } @Override public long queuedEvents() { - return queued.value(); + return metrics.getQueued(); } @Override public long scheduledEvents() { - return scheduled.value(); + return metrics.getScheduled(); } @Override public long droppedEvents() { - return dropped.value(); + return metrics.getDropped(); } @Override public long longWaitInQueueEvents() { - return longWaitInQueue.value(); + return metrics.getLongWaitInQueue(); } @Override public long longTimeExecutionEvents() { - return longTimeExecution.value(); + return metrics.getLongExecution(); } @Override @@ -223,6 +196,7 @@ public void close() { executor.shutdown(); } executorMap.clear(); + metrics.unregister(); DefaultMetricsSystem.instance().unregisterSource(EVENT_QUEUE + name); } @@ -273,26 +247,26 @@ public void run() { long curTime = Time.monotonicNow(); if (createTime != 0 && ((curTime - createTime) > executor.queueWaitThreshold)) { - executor.longWaitInQueue.incr(); + executor.metrics.incrementLongWaitInQueue(); LOG.warn("Event remained in queue for long time {} millisec, {}", (curTime - createTime), eventId); } - executor.scheduled.incr(); + executor.metrics.incrementScheduled(); try { executor.eventHandler.onMessage(report, executor.eventPublisher); - executor.done.incr(); + executor.metrics.incrementDone(); curTime = Time.monotonicNow(); if (createTime != 0 && (curTime - createTime) > executor.execWaitThreshold) { - executor.longTimeExecution.incr(); + executor.metrics.incrementLongExecution(); LOG.warn("Event taken long execution time {} millisec, {}", (curTime - createTime), eventId); } } catch (Exception ex) { LOG.error("Error on execution message {}", report, ex); - executor.failed.incr(); + executor.metrics.incrementFailed(); } if (Thread.currentThread().isInterrupted()) { LOG.warn("Interrupt of execution of Reports"); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java index 48dd30a8d130..5e80e2daceda 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java @@ -19,10 +19,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import org.apache.hadoop.hdds.utils.MetricsUtil; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +27,6 @@ * * @param

the payload type of events */ -@Metrics(context = "EventQueue") public class SingleThreadExecutor

implements EventExecutor

{ private static final String EVENT_QUEUE = "EventQueue"; @@ -40,20 +35,8 @@ public class SingleThreadExecutor

implements EventExecutor

{ LoggerFactory.getLogger(SingleThreadExecutor.class); private final String name; - private final ExecutorService executor; - - @Metric - private MutableCounterLong queued; - - @Metric - private MutableCounterLong done; - - @Metric - private MutableCounterLong failed; - - @Metric - private MutableCounterLong scheduled; + private final EventExecutorMetrics metrics; /** * Create SingleThreadExecutor. @@ -63,8 +46,7 @@ public class SingleThreadExecutor

implements EventExecutor

{ */ public SingleThreadExecutor(String name, String threadNamePrefix) { this.name = name; - MetricsUtil.registerDynamic(this, EVENT_QUEUE + name, - "Event Executor metrics ", "EventQueue"); + this.metrics = new EventExecutorMetrics(EVENT_QUEUE + name, "Event Executor metrics"); executor = Executors.newSingleThreadExecutor( runnable -> { @@ -77,42 +59,43 @@ public SingleThreadExecutor(String name, String threadNamePrefix) { @Override public void onMessage(EventHandler

handler, P message, EventPublisher publisher) { - queued.incr(); + metrics.incrementQueued(); executor.execute(() -> { - scheduled.incr(); + metrics.incrementScheduled(); try { handler.onMessage(message, publisher); - done.incr(); + metrics.incrementDone(); } catch (Exception ex) { LOG.error("Error on execution message {}", message, ex); - failed.incr(); + metrics.incrementFailed(); } }); } @Override public long failedEvents() { - return failed.value(); + return metrics.getFailed(); } @Override public long successfulEvents() { - return done.value(); + return metrics.getDone(); } @Override public long queuedEvents() { - return queued.value(); + return metrics.getQueued(); } @Override public long scheduledEvents() { - return scheduled.value(); + return metrics.getScheduled(); } @Override public void close() { executor.shutdown(); + metrics.unregister(); } @Override diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/MetricsUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/MetricsUtil.java deleted file mode 100644 index 81fbf1daf93d..000000000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/MetricsUtil.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.utils; - -import java.lang.annotation.Annotation; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.util.Map; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Metrics util for metrics. - */ -public final class MetricsUtil { - private static final String ANNOTATIONS = "annotations"; - private static final String ANNOTATION_DATA = "annotationData"; - private static final Class ANNOTATION_TO_ALTER - = Metrics.class; - - private static final Logger LOG = - LoggerFactory.getLogger(MetricsUtil.class); - - private MetricsUtil() { - } - - /** - * register metric with changing class annotation for metrics. - * - * @param source source to register - * @param name name of metric - * @param desc description of metric - * @param context context of metric - * @param source type - */ - public static void registerDynamic( - T source, String name, String desc, String context) { - updateAnnotation(source.getClass(), name, desc, context); - DefaultMetricsSystem.instance().register(name, desc, source); - } - - private static void updateAnnotation( - Class clz, String name, String desc, String context) { - try { - Annotation annotationValue = new Metrics() { - - @Override - public Class annotationType() { - return ANNOTATION_TO_ALTER; - } - - @Override - public String name() { - return name; - } - - @Override - public String about() { - return desc; - } - - @Override - public String context() { - return context; - } - }; - - Method method = clz.getClass().getDeclaredMethod( - ANNOTATION_DATA, null); - method.setAccessible(true); - Object annotationData = method.invoke(clz); - Field annotations = annotationData.getClass() - .getDeclaredField(ANNOTATIONS); - annotations.setAccessible(true); - Map, Annotation> map = - (Map, Annotation>) annotations - .get(annotationData); - map.put(ANNOTATION_TO_ALTER, annotationValue); - } catch (Exception e) { - LOG.error("Update Metrics annotation failed. ", e); - } - } -}