Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,17 @@
package org.apache.hadoop.hdds.utils;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* An abstract class for a background service in ozone.
Expand All @@ -50,10 +45,9 @@ public abstract class BackgroundService {
// Executor to launch child tasks
private final ScheduledExecutorService exec;
private final ThreadGroup threadGroup;
private final ThreadFactory threadFactory;
private final String serviceName;
private final long interval;
private final long serviceTimeout;
private final long serviceTimeoutInNanos;
private final TimeUnit unit;
private final PeriodicalTask service;

Expand All @@ -62,11 +56,11 @@ public BackgroundService(String serviceName, long interval,
this.interval = interval;
this.unit = unit;
this.serviceName = serviceName;
this.serviceTimeout = serviceTimeout;
this.serviceTimeoutInNanos = TimeDuration.valueOf(serviceTimeout, unit)
.toLong(TimeUnit.NANOSECONDS);
threadGroup = new ThreadGroup(serviceName);
ThreadFactory tf = r -> new Thread(threadGroup, r);
threadFactory = new ThreadFactoryBuilder()
.setThreadFactory(tf)
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setThreadFactory(r -> new Thread(threadGroup, r))
.setDaemon(true)
.setNameFormat(serviceName + "#%d")
.build();
Expand All @@ -83,17 +77,12 @@ public int getThreadCount() {
return threadGroup.activeCount();
}

@VisibleForTesting
public void triggerBackgroundTaskForTesting() {
service.run();
}

// start service
public void start() {
exec.scheduleWithFixedDelay(service, 0, interval, unit);
}

public abstract BackgroundTaskQueue getTasks();
public abstract BackgroundTaskQueue<BackgroundTaskResult> getTasks();

/**
* Run one or more background tasks concurrently.
Expand All @@ -105,7 +94,7 @@ public synchronized void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("Running background service : {}", serviceName);
}
BackgroundTaskQueue tasks = getTasks();
BackgroundTaskQueue<BackgroundTaskResult> tasks = getTasks();
if (tasks.isEmpty()) {
// No task found, or some problems to init tasks
// return and retry in next interval.
Expand All @@ -114,41 +103,27 @@ public synchronized void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("Number of background tasks to execute : {}", tasks.size());
}
CompletionService<BackgroundTaskResult> taskCompletionService =
new ExecutorCompletionService<>(exec);

List<Future<BackgroundTaskResult>> results = Lists.newArrayList();
while (tasks.size() > 0) {
BackgroundTask task = tasks.poll();
Future<BackgroundTaskResult> result =
taskCompletionService.submit(task);
results.add(result);
}

results.parallelStream().forEach(taskResultFuture -> {
try {
// Collect task results
BackgroundTaskResult result = serviceTimeout > 0
? taskResultFuture.get(serviceTimeout, unit)
: taskResultFuture.get();
if (LOG.isDebugEnabled()) {
LOG.debug("task execution result size {}", result.getSize());
BackgroundTask<BackgroundTaskResult> task = tasks.poll();
CompletableFuture.runAsync(() -> {
long startTime = System.nanoTime();
try {
BackgroundTaskResult result = task.call();
if (LOG.isDebugEnabled()) {
LOG.debug("task execution result size {}", result.getSize());
}
} catch (Exception e) {
LOG.warn("Background task execution failed", e);
} finally {
long endTime = System.nanoTime();
if (endTime - startTime > serviceTimeoutInNanos) {
LOG.warn("{} Background task execution took {}ns > {}ns(timeout)",
serviceName, endTime - startTime, serviceTimeoutInNanos);
}
}
} catch (InterruptedException e) {
LOG.warn(
"Background task failed due to interruption, retrying in " +
"next interval", e);
// Re-interrupt the thread while catching InterruptedException
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.warn(
"Background task fails to execute, "
+ "retrying in next interval", e);
} catch (TimeoutException e) {
LOG.warn("Background task executes timed out, "
+ "retrying in next interval", e);
}
});
}, exec);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,25 @@

package org.apache.hadoop.hdds.utils;

import java.util.Comparator;
import java.util.PriorityQueue;

/**
* A priority queue that stores a number of {@link BackgroundTask}.
*/
public class BackgroundTaskQueue {
public class BackgroundTaskQueue<T> {

private final PriorityQueue<BackgroundTask> tasks;
private final PriorityQueue<BackgroundTask<T>> tasks;

public BackgroundTaskQueue() {
tasks = new PriorityQueue<>((task1, task2)
-> task1.getPriority() - task2.getPriority());
tasks = new PriorityQueue<>(
Comparator.comparingInt(BackgroundTask::getPriority));
}

/**
* @return the head task in this queue.
*/
public synchronized BackgroundTask poll() {
public synchronized BackgroundTask<T> poll() {
return tasks.poll();
}

Expand All @@ -44,7 +45,7 @@ public synchronized BackgroundTask poll() {
*
* @param task
*/
public synchronized void add(BackgroundTask task) {
public synchronized void add(BackgroundTask<T> task) {
tasks.add(task);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,7 @@ public void testBlockDeletionTimeout() throws Exception {

LogCapturer log = LogCapturer.captureLogs(BackgroundService.LOG);
GenericTestUtils.waitFor(() -> {
if(log.getOutput().contains(
"Background task executes timed out, retrying in next interval")) {
if(log.getOutput().contains("Background task execution took")) {
log.stopCapturing();
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
Expand All @@ -42,6 +43,23 @@ public final class OzoneTestUtils {
private OzoneTestUtils() {
}

/**
* Triggers Close container event for containers which contain the blocks
* listed in omKeyLocationInfoGroups.
*
* @param omKeyLocationInfoGroups locationInfos for a key.
* @param scm StorageContainerManager instance.
* @throws Exception
*/
public static void triggerCloseContainerEvent(
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups,
StorageContainerManager scm) throws Exception {
performOperationOnKeyContainers((blockID) -> scm.getEventQueue()
.fireEvent(SCMEvents.CLOSE_CONTAINER,
ContainerID.valueof(blockID.getContainerID())),
omKeyLocationInfoGroups);
}

/**
* Close containers which contain the blocks listed in
* omKeyLocationInfoGroups.
Expand Down
Loading