Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.server.events;

/**
* Metrics for Event Executors.
*/
public interface EventExecutorMetrics {
/**
* get name.
*/
String getName();

/**
* stop metrics.
*/
void stop();

/**
* Increment the number of the failed events.
*/
void incrFailedEvents(long delta);


/**
* Increment the number of the processed events.
*/
void incrSuccessfulEvents(long delta);

/**
* Increment the number of the not-yet processed events.
*/
void incrQueuedEvents(long delta);

/**
* Increment the number of events scheduled to be processed.
*/
void incrScheduledEvents(long delta);

/**
* Increment the number of dropped events to be processed.
*/
void incrDroppedEvents(long delta);

/**
* Increment the number of events having long wait in queue
* crossing threshold.
*/
void incrLongWaitInQueueEvents(long delta);

/**
* Increment the number of events having long execution crossing threshold.
*/
void incrLongTimeExecutionEvents(long delta);

/**
* Return the number of the failed events.
*/
long failedEvents();


/**
* Return the number of the processed events.
*/
long successfulEvents();

/**
* Return the number of the not-yet processed events.
*/
long queuedEvents();

/**
* Return the number of events scheduled to be processed.
*/
long scheduledEvents();

/**
* Return the number of dropped events to be processed.
*/
long droppedEvents();

/**
* Return the number of events having long wait in queue crossing threshold.
*/
long longWaitInQueueEvents();

/**
* Return the number of events having long execution crossing threshold.
*/
long longTimeExecutionEvents();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
package org.apache.hadoop.hdds.server.events;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,12 +41,8 @@
*
* @param <P> the payload type of events
*/
@Metrics(context = "EventQueue")
public class FixedThreadPoolWithAffinityExecutor<P, Q>
implements EventExecutor<P> {

private static final String EVENT_QUEUE = "EventQueue";

private static final Logger LOG =
LoggerFactory.getLogger(FixedThreadPoolWithAffinityExecutor.class);

Expand All @@ -65,28 +57,8 @@ public class FixedThreadPoolWithAffinityExecutor<P, Q>
private final List<BlockingQueue<Q>> workQueues;

private final List<ThreadPoolExecutor> executors;

// MutableCounterLong is thread safe.
@Metric
private MutableCounterLong queued;

@Metric
private MutableCounterLong done;

@Metric
private MutableCounterLong failed;

@Metric
private MutableCounterLong scheduled;

@Metric
private MutableCounterLong dropped;

@Metric
private MutableCounterLong longWaitInQueue;

@Metric
private MutableCounterLong longTimeExecution;
private final EventExecutorMetrics metrics;

private final AtomicBoolean isRunning = new AtomicBoolean(true);
private long queueWaitThreshold
Expand All @@ -98,20 +70,20 @@ public class FixedThreadPoolWithAffinityExecutor<P, Q>
* Create FixedThreadPoolExecutor with affinity.
* Based on the payload's hash code, the payload will be scheduled to the
* same thread.
*
* @param name Unique name used in monitoring and metrics.
*/
public FixedThreadPoolWithAffinityExecutor(
String name, EventHandler<P> eventHandler,
EventHandler<P> eventHandler,
List<BlockingQueue<Q>> workQueues, EventPublisher eventPublisher,
Class<P> clazz, List<ThreadPoolExecutor> executors,
Map<String, FixedThreadPoolWithAffinityExecutor> executorMap) {
this.name = name;
Map<String, FixedThreadPoolWithAffinityExecutor> executorMap,
EventExecutorMetrics metrics) {
this.eventHandler = eventHandler;
this.workQueues = workQueues;
this.eventPublisher = eventPublisher;
this.executors = executors;
this.executorMap = executorMap;
this.metrics = metrics;
this.name = metrics.getName();
executorMap.put(clazz.getName(), this);

// Add runnable which will wait for task over another queue
Expand All @@ -125,11 +97,6 @@ public FixedThreadPoolWithAffinityExecutor(
}
++i;
}

DefaultMetricsSystem.instance()
.register(EVENT_QUEUE + name,
"Event Executor metrics ",
this);
}

public void setQueueWaitThreshold(long queueWaitThreshold) {
Expand Down Expand Up @@ -164,50 +131,50 @@ public static <Q> List<ThreadPoolExecutor> initializeExecutorPool(
@Override
public void onMessage(EventHandler<P> handler, P message, EventPublisher
publisher) {
queued.incr();
metrics.incrQueuedEvents(1L);
// For messages that need to be routed to the same thread need to
// implement hashCode to match the messages. This should be safe for
// other messages that implement the native hash.
int index = message.hashCode() & (workQueues.size() - 1);
BlockingQueue<Q> queue = workQueues.get(index);
queue.add((Q) message);
if (queue instanceof IQueueMetrics) {
dropped.incr(((IQueueMetrics) queue).getAndResetDropCount(
metrics.incrDroppedEvents(((IQueueMetrics) queue).getAndResetDropCount(
message.getClass().getSimpleName()));
}
}

@Override
public long failedEvents() {
return failed.value();
return metrics.failedEvents();
}

@Override
public long successfulEvents() {
return done.value();
return metrics.successfulEvents();
}

@Override
public long queuedEvents() {
return queued.value();
return metrics.queuedEvents();
}

@Override
public long scheduledEvents() {
return scheduled.value();
return metrics.scheduledEvents();
}

@Override
public long droppedEvents() {
return dropped.value();
return metrics.droppedEvents();
}

public long longWaitInQueueEvents() {
return longWaitInQueue.value();
return metrics.longWaitInQueueEvents();
}

public long longTimeExecutionEvents() {
return longTimeExecution.value();
return metrics.longTimeExecutionEvents();
}

@Override
Expand All @@ -217,7 +184,6 @@ public void close() {
executor.shutdown();
}
executorMap.clear();
DefaultMetricsSystem.instance().unregisterSource(EVENT_QUEUE + name);
}

@Override
Expand Down Expand Up @@ -267,26 +233,26 @@ public void run() {
long curTime = Time.monotonicNow();
if (createTime != 0
&& ((curTime - createTime) > executor.queueWaitThreshold)) {
executor.longWaitInQueue.incr();
executor.metrics.incrLongWaitInQueueEvents(1L);
LOG.warn("Event remained in queue for long time {} millisec, {}",
(curTime - createTime), eventId);
}

executor.scheduled.incr();
executor.metrics.incrScheduledEvents(1L);
try {
executor.eventHandler.onMessage(report,
executor.eventPublisher);
executor.done.incr();
executor.metrics.incrSuccessfulEvents(1L);
curTime = Time.monotonicNow();
if (createTime != 0
&& (curTime - createTime) > executor.execWaitThreshold) {
executor.longTimeExecution.incr();
executor.metrics.incrLongTimeExecutionEvents(1L);
LOG.warn("Event taken long execution time {} millisec, {}",
(curTime - createTime), eventId);
}
} catch (Exception ex) {
LOG.error("Error on execution message {}", report, ex);
executor.failed.incr();
executor.metrics.incrFailedEvents(1L);
}
if (Thread.currentThread().isInterrupted()) {
LOG.warn("Interrupt of execution of Reports");
Expand Down
Loading