Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.server.events;

import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;

/**
* Metrics source for EventExecutor implementations.
*/
@Metrics(about = "Executor Metrics", context = "EventQueue")
public class EventExecutorMetrics implements MetricsSource {
private final String name;
private final String description;
private final MetricsRegistry registry;

@Metric("Number of tasks queued")
private MutableCounterLong queued;

@Metric("Number of tasks scheduled")
private MutableCounterLong scheduled;

@Metric("Number of tasks completed")
private MutableCounterLong done;

@Metric("Number of tasks failed")
private MutableCounterLong failed;

@Metric("Number of tasks dropped")
private MutableCounterLong dropped;

@Metric("Number of tasks with long execution time")
private MutableCounterLong longExecution;

@Metric("Number of tasks with long wait time in queue")
private MutableCounterLong longWaitInQueue;

public EventExecutorMetrics(String name, String description) {
this.name = name;
this.description = description;

registry = new MetricsRegistry(name);
init();
}

public void init() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.register(name, description, this);
}

public void unregister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(name);
}

@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder builder = collector.addRecord(name);
registry.snapshot(builder, all);
}

public void incrementQueued() {
queued.incr();
}

public void incrementScheduled() {
scheduled.incr();
}

public void incrementDone() {
done.incr();
}

public void incrementFailed() {
failed.incr();
}

public void incrementDropped() {
dropped.incr();
}

public void incrementDropped(int count) {
dropped.incr(count);
}

public void incrementLongExecution() {
longExecution.incr();
}

public void incrementLongWaitInQueue() {
longWaitInQueue.incr();
}

public long getQueued() {
return queued.value();
}

public long getScheduled() {
return scheduled.value();
}

public long getDone() {
return done.value();
}

public long getFailed() {
return failed.value();
}

public long getDropped() {
return dropped.value();
}

public long getLongExecution() {
return longExecution.value();
}

public long getLongWaitInQueue() {
return longWaitInQueue.value();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.utils.MetricsUtil;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,7 +41,6 @@
*
* @param <P> the payload type of events
*/
@Metrics(context = "EventQueue")
public class FixedThreadPoolWithAffinityExecutor<P, Q>
implements EventExecutor<P> {

Expand All @@ -66,27 +61,7 @@ public class FixedThreadPoolWithAffinityExecutor<P, Q>

private final List<ThreadPoolExecutor> executors;

// MutableCounterLong is thread safe.
@Metric
private MutableCounterLong queued;

@Metric
private MutableCounterLong done;

@Metric
private MutableCounterLong failed;

@Metric
private MutableCounterLong scheduled;

@Metric
private MutableCounterLong dropped;

@Metric
private MutableCounterLong longWaitInQueue;

@Metric
private MutableCounterLong longTimeExecution;
private final EventExecutorMetrics metrics;

private final AtomicBoolean isRunning = new AtomicBoolean(true);
private long queueWaitThreshold
Expand All @@ -112,6 +87,7 @@ public FixedThreadPoolWithAffinityExecutor(
this.eventPublisher = eventPublisher;
this.executors = executors;
this.executorMap = executorMap;
this.metrics = new EventExecutorMetrics(EVENT_QUEUE + name, "Event Executor metrics");
executorMap.put(clazz.getName(), this);

// Add runnable which will wait for task over another queue
Expand All @@ -125,9 +101,6 @@ public FixedThreadPoolWithAffinityExecutor(
}
++i;
}

MetricsUtil.registerDynamic(this, EVENT_QUEUE + name,
"Event Executor metrics ", "EventQueue");
}

public void setQueueWaitThreshold(long queueWaitThreshold) {
Expand Down Expand Up @@ -168,52 +141,52 @@ public static <Q> List<ThreadPoolExecutor> initializeExecutorPool(
@Override
public void onMessage(EventHandler<P> handler, P message, EventPublisher
publisher) {
queued.incr();
metrics.incrementQueued();
// For messages that need to be routed to the same thread need to
// implement hashCode to match the messages. This should be safe for
// other messages that implement the native hash.
int index = message.hashCode() & (workQueues.size() - 1);
BlockingQueue<Q> queue = workQueues.get(index);
queue.add((Q) message);
if (queue instanceof IQueueMetrics) {
dropped.incr(((IQueueMetrics) queue).getAndResetDropCount(
metrics.incrementDropped(((IQueueMetrics) queue).getAndResetDropCount(
message.getClass().getSimpleName()));
}
}

@Override
public long failedEvents() {
return failed.value();
return metrics.getFailed();
}

@Override
public long successfulEvents() {
return done.value();
return metrics.getDone();
}

@Override
public long queuedEvents() {
return queued.value();
return metrics.getQueued();
}

@Override
public long scheduledEvents() {
return scheduled.value();
return metrics.getScheduled();
}

@Override
public long droppedEvents() {
return dropped.value();
return metrics.getDropped();
}

@Override
public long longWaitInQueueEvents() {
return longWaitInQueue.value();
return metrics.getLongWaitInQueue();
}

@Override
public long longTimeExecutionEvents() {
return longTimeExecution.value();
return metrics.getLongExecution();
}

@Override
Expand All @@ -223,6 +196,7 @@ public void close() {
executor.shutdown();
}
executorMap.clear();
metrics.unregister();
DefaultMetricsSystem.instance().unregisterSource(EVENT_QUEUE + name);
}

Expand Down Expand Up @@ -273,26 +247,26 @@ public void run() {
long curTime = Time.monotonicNow();
if (createTime != 0
&& ((curTime - createTime) > executor.queueWaitThreshold)) {
executor.longWaitInQueue.incr();
executor.metrics.incrementLongWaitInQueue();
LOG.warn("Event remained in queue for long time {} millisec, {}",
(curTime - createTime), eventId);
}

executor.scheduled.incr();
executor.metrics.incrementScheduled();
try {
executor.eventHandler.onMessage(report,
executor.eventPublisher);
executor.done.incr();
executor.metrics.incrementDone();
curTime = Time.monotonicNow();
if (createTime != 0
&& (curTime - createTime) > executor.execWaitThreshold) {
executor.longTimeExecution.incr();
executor.metrics.incrementLongExecution();
LOG.warn("Event taken long execution time {} millisec, {}",
(curTime - createTime), eventId);
}
} catch (Exception ex) {
LOG.error("Error on execution message {}", report, ex);
executor.failed.incr();
executor.metrics.incrementFailed();
}
if (Thread.currentThread().isInterrupted()) {
LOG.warn("Interrupt of execution of Reports");
Expand Down
Loading
Loading