Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,13 @@ public final class ScmConfigKeys {
"ozone.scm.ca.list.retry.interval";
public static final long OZONE_SCM_CA_LIST_RETRY_INTERVAL_DEFAULT = 10;


public static final String OZONE_SCM_EVENT_PREFIX = "ozone.scm.event.";

public static final String OZONE_SCM_EVENT_CONTAINER_REPORT_THREAD_POOL_SIZE =
OZONE_SCM_EVENT_PREFIX + "ContainerReport.thread.pool.size";
public static final int OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT = 10;

/**
* Never constructed.
*/
Expand Down
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2902,6 +2902,14 @@
</description>
</property>

<property>
<name>ozone.scm.event.ContainerReport.thread.pool.size</name>
<value>10</value>
<tag>OZONE, SCM</tag>
<description>Thread pool size configured to process container reports.
</description>
</property>

<property>
<name>ozone.scm.datanode.ratis.volume.free-space.min</name>
<value>1GB</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ void onMessage(EventHandler<PAYLOAD> handler,
*/
long queuedEvents();

/**
* Return the number of events scheduled to be processed.
*/
long scheduledEvents();

/**
* The human readable name for the event executor.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ public class EventQueue implements EventPublisher, AutoCloseable {

private boolean isSilent = false;

public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
this.addHandler(event, handler, generateHandlerName(handler));
}

/**
* Add new handler to the event queue.
* <p>
Expand All @@ -76,21 +71,29 @@ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
* @param event Triggering event.
* @param handler Handler of event (will be called from a separated
* thread)
* @param handlerName The name of handler (should be unique together with
* the event name)
* @param <PAYLOAD> The type of the event payload.
* @param <EVENT_TYPE> The type of the event identifier.
*/
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
EVENT_TYPE event, EventHandler<PAYLOAD> handler, String handlerName) {
EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
Preconditions.checkNotNull(handler, "Handler should not be null.");
validateEvent(event);
Preconditions.checkNotNull(handler, "Handler name should not be null.");
String executorName =
StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR
+ handlerName;
String executorName = getExecutorName(event, handler);
this.addHandler(event, new SingleThreadExecutor<>(executorName), handler);
}

/**
* Return executor name for the given event and handler name.
* @param event
* @param eventHandler
* @return executor name
*/
public static <PAYLOAD> String getExecutorName(Event<PAYLOAD> event,
EventHandler<PAYLOAD> eventHandler) {
return StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR
+ generateHandlerName(eventHandler);
}

private <EVENT_TYPE extends Event<?>> void validateEvent(EVENT_TYPE event) {
Preconditions
.checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR),
Expand All @@ -99,7 +102,8 @@ private <EVENT_TYPE extends Event<?>> void validateEvent(EVENT_TYPE event) {

}

private <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> handler) {
private static <PAYLOAD> String generateHandlerName(
EventHandler<PAYLOAD> handler) {
if (!handler.getClass().isAnonymousClass()) {
return handler.getClass().getSimpleName();
} else {
Expand All @@ -111,7 +115,7 @@ private <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> handler) {
* Add event handler with custom executor.
*
* @param event Triggering event.
* @param executor The executor imlementation to deliver events from a
* @param executor The executor implementation to deliver events from a
* separated threads. Please keep in your mind that
* registering metrics is the responsibility of the
* caller.
Expand All @@ -128,6 +132,11 @@ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
return;
}
validateEvent(event);
String executorName = getExecutorName(event, handler);
Preconditions.checkState(executorName.equals(executor.getName()),
"Event Executor name is not matching the specified format. " +
"It should be " + executorName + " but it is " +
executor.getName());
executors.putIfAbsent(event, new HashMap<>());
executors.get(event).putIfAbsent(executor, new ArrayList<>());

Expand Down Expand Up @@ -268,4 +277,11 @@ public void close() {
public void setSilent(boolean silent) {
isSilent = silent;
}


@VisibleForTesting
public Map<EventExecutor, List<EventHandler>> getExecutorAndHandler(
Event event) {
return executors.get(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.server.events;

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;

import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_PREFIX;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT;

/**
* Fixed thread pool EventExecutor to call all the event handler one-by-one.
*
* @param <P> the payload type of events
*/
@Metrics(context = "EventQueue")
public class FixedThreadPoolExecutor<P> implements EventExecutor<P> {

private static final String EVENT_QUEUE = "EventQueue";

private static final Logger LOG =
LoggerFactory.getLogger(FixedThreadPoolExecutor.class);

private final String name;

private final ExecutorService executor;

@Metric
private MutableCounterLong queued;

@Metric
private MutableCounterLong done;

@Metric
private MutableCounterLong failed;

@Metric
private MutableCounterLong scheduled;

/**
* Create FixedThreadPoolExecutor.
*
* @param eventName
* @param name Unique name used in monitoring and metrics.
*/
public FixedThreadPoolExecutor(String eventName, String name) {
this.name = name;
DefaultMetricsSystem.instance()
.register(EVENT_QUEUE + name, "Event Executor metrics ", this);


OzoneConfiguration configuration = new OzoneConfiguration();
int threadPoolSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
StringUtils.camelize(eventName) + ".thread.pool.size",
OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);

executor = Executors.newFixedThreadPool(threadPoolSize, runnable -> {
Thread thread = new Thread(runnable);
thread.setName(EVENT_QUEUE + "-" + name);
return thread;
});
}

@Override
public void onMessage(EventHandler<P> handler, P message, EventPublisher
publisher) {
queued.incr();
executor.execute(() -> {
scheduled.incr();
try {
handler.onMessage(message, publisher);
done.incr();
} catch (Exception ex) {
LOG.error("Error on execution message {}", message, ex);
failed.incr();
}
});
}

@Override
public long failedEvents() {
return failed.value();
}

@Override
public long successfulEvents() {
return done.value();
}

@Override
public long queuedEvents() {
return queued.value();
}

@Override
public long scheduledEvents() {
return scheduled.value();
}

@Override
public void close() {
executor.shutdown();
}

@Override
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public class SingleThreadExecutor<P> implements EventExecutor<P> {
@Metric
private MutableCounterLong failed;

@Metric
private MutableCounterLong scheduled;

/**
* Create SingleThreadExecutor.
*
Expand All @@ -77,6 +80,7 @@ public void onMessage(EventHandler<P> handler, P message, EventPublisher
publisher) {
queued.incr();
executor.execute(() -> {
scheduled.incr();
try {
handler.onMessage(message, publisher);
done.incr();
Expand All @@ -102,6 +106,11 @@ public long queuedEvents() {
return queued.value();
}

@Override
public long scheduledEvents() {
return scheduled.value();
}

@Override
public void close() {
executor.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;

import java.util.concurrent.atomic.AtomicLong;

/**
* Testing the basic functionality of the event queue.
*/
Expand All @@ -34,13 +36,10 @@ public class TestEventQueue {
private static final Event<Long> EVENT2 =
new TypedEvent<>(Long.class, "SCM_EVENT2");

private static final Event<Long> EVENT3 =
new TypedEvent<>(Long.class, "SCM_EVENT3");
private static final Event<Long> EVENT4 =
new TypedEvent<>(Long.class, "SCM_EVENT4");

private EventQueue queue;

private AtomicLong eventTotal = new AtomicLong();

@Before
public void startEventQueue() {
DefaultMetricsSystem.initialize(getClass().getSimpleName());
Expand All @@ -66,6 +65,59 @@ public void simpleEvent() {

}

@Test
public void simpleEventWithFixedThreadPoolExecutor() {

TestHandler testHandler = new TestHandler();

queue.addHandler(EVENT1, new FixedThreadPoolExecutor<>(EVENT1.getName(),
EventQueue.getExecutorName(EVENT1, testHandler)), testHandler);

queue.fireEvent(EVENT1, 11L);
queue.fireEvent(EVENT1, 11L);
queue.fireEvent(EVENT1, 11L);
queue.fireEvent(EVENT1, 11L);
queue.fireEvent(EVENT1, 11L);
queue.fireEvent(EVENT1, 11L);
queue.fireEvent(EVENT1, 11L);
queue.fireEvent(EVENT1, 11L);
queue.fireEvent(EVENT1, 11L);
queue.fireEvent(EVENT1, 11L);

EventExecutor eventExecutor =
queue.getExecutorAndHandler(EVENT1).keySet().iterator().next();

// As it is fixed threadpool executor with 10 threads, all should be
// scheduled.
Assert.assertEquals(10, eventExecutor.queuedEvents());

// As we don't see all 10 events scheduled.
Assert.assertTrue(eventExecutor.scheduledEvents() > 1 &&
eventExecutor.scheduledEvents() <= 10);

queue.processAll(60000);
Assert.assertEquals(110, eventTotal.intValue());

Assert.assertEquals(10, eventExecutor.successfulEvents());
eventTotal.set(0);

}

/**
* Event handler used in tests.
*/
public class TestHandler implements EventHandler {
@Override
public void onMessage(Object payload, EventPublisher publisher) {
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
eventTotal.getAndAdd((long) payload);
}
}

@Test
public void multipleSubscriber() {
final long[] result = new long[2];
Expand Down
Loading