Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -73,7 +74,9 @@ class AsyncConnectionImpl implements AsyncConnection {

@VisibleForTesting
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10,
TimeUnit.MILLISECONDS);

private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -178,8 +179,9 @@ public boolean isDeadServer(ServerName sn) {
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
class MulticastListener implements Listener {
private DatagramChannel channel;
private final EventLoopGroup group = new NioEventLoopGroup(
1, Threads.newDaemonThreadFactory("hbase-client-clusterStatusListener"));
private final EventLoopGroup group = new NioEventLoopGroup(1,
new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());

public MulticastListener() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -490,13 +491,10 @@ private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
coreThreads = maxThreads;
}
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
coreThreads,
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
Threads.newDaemonThreadFactory(toString() + nameHint));
ThreadPoolExecutor tpe =
new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
new ThreadFactoryBuilder().setNameFormat(toString() + nameHint + "-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
tpe.allowCoreThreadTimeOut(true);
return tpe;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
Expand All @@ -56,7 +58,6 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;

import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -138,8 +139,10 @@ public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
// if it is necessary and will grow unbounded. This could be bad but in HCM
// we only create as many Runnables as there are region servers. It means
// it also scales when new region servers are added.
ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime,
TimeUnit.SECONDS, new SynchronousQueue<>(), Threads.newDaemonThreadFactory("htable"));
ThreadPoolExecutor pool =
new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("htable-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
pool.allowCoreThreadTimeOut(true);
return pool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -91,10 +92,14 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);

protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
Threads.newDaemonThreadFactory("RpcClient-timer"), 10, TimeUnit.MILLISECONDS);
new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10,
TimeUnit.MILLISECONDS);

private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
.newScheduledThreadPool(1, Threads.newDaemonThreadFactory("Idle-Rpc-Conn-Sweeper"));
.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());

protected boolean running = true; // if client runs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,8 +77,9 @@ class NettyRpcConnection extends RpcConnection {

private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);

private static final ScheduledExecutorService RELOGIN_EXECUTOR =
Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());

private final NettyRpcClient rpcClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
Expand Down Expand Up @@ -138,7 +139,9 @@ public void beforeEach() {

static class CountingThreadFactory implements ThreadFactory {
final AtomicInteger nbThreads;
ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess");
ThreadFactory realFactory =
new ThreadFactoryBuilder().setNameFormat("test-TestAsyncProcess-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build();
@Override
public Thread newThread(Runnable r) {
nbThreads.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
Expand Down Expand Up @@ -803,7 +804,8 @@ public int run(String[] arg0) throws Exception {
// Have them all share the same connection so they all share the same instance of
// ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
final ExecutorService pool = Executors.newCachedThreadPool(
Threads.newDaemonThreadFactory("p"));
new ThreadFactoryBuilder().setNameFormat("p-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
// Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
// Share a connection so I can keep counts in the 'server' on concurrency.
final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,9 @@
@InterfaceAudience.Private
public class Threads {
private static final Logger LOG = LoggerFactory.getLogger(Threads.class);
private static final AtomicInteger poolNumber = new AtomicInteger(1);

public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.warn("Thread:" + t + " exited with Exception:"
+ StringUtils.stringifyException(e));
}
};
(t, e) -> LOG.warn("Thread:{} exited with Exception:{}", t, StringUtils.stringifyException(e));

/**
* Utility method that sets name, daemon status and starts passed thread.
Expand Down Expand Up @@ -186,89 +179,24 @@ public static void sleepWithoutInterrupt(final long msToWait) {
* @return threadPoolExecutor the cachedThreadPool with a bounded number
* as the maximum thread size in the pool.
*/
public static ThreadPoolExecutor getBoundedCachedThreadPool(
int maxCachedThread, long timeout, TimeUnit unit,
ThreadFactory threadFactory) {
public static ThreadPoolExecutor getBoundedCachedThreadPool(int maxCachedThread, long timeout,
TimeUnit unit, ThreadFactory threadFactory) {
ThreadPoolExecutor boundedCachedThreadPool =
new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
unit, new LinkedBlockingQueue<>(), threadFactory);
new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout, unit,
new LinkedBlockingQueue<>(), threadFactory);
// allow the core pool threads timeout and terminate
boundedCachedThreadPool.allowCoreThreadTimeOut(true);
return boundedCachedThreadPool;
}


/**
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
* with a common prefix.
* @param prefix The prefix of every created Thread's name
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
*/
public static ThreadFactory getNamedThreadFactory(final String prefix) {
SecurityManager s = System.getSecurityManager();
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
.getThreadGroup();

return new ThreadFactory() {
final AtomicInteger threadNumber = new AtomicInteger(1);
private final int poolNumber = Threads.poolNumber.getAndIncrement();
final ThreadGroup group = threadGroup;

@Override
public Thread newThread(Runnable r) {
final String name = prefix + "-pool" + poolNumber + "-t" + threadNumber.getAndIncrement();
return new Thread(group, r, name);
}
};
}

/**
* Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)},
* without setting the exception handler.
*/
public static ThreadFactory newDaemonThreadFactory(final String prefix) {
return newDaemonThreadFactory(prefix, null);
}

/**
* Get a named {@link ThreadFactory} that just builds daemon threads.
* @param prefix name prefix for all threads created from the factory
* @param handler unhandles exception handler to set for all threads
* @return a thread factory that creates named, daemon threads with
* the supplied exception handler and normal priority
*/
public static ThreadFactory newDaemonThreadFactory(final String prefix,
final UncaughtExceptionHandler handler) {
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = namedFactory.newThread(r);
if (handler != null) {
t.setUncaughtExceptionHandler(handler);
} else {
t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
}
if (!t.isDaemon()) {
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}

};
}

/** Sets an UncaughtExceptionHandler for the thread which logs the
* Exception stack if the thread dies.
*/
public static void setLoggingUncaughtExceptionHandler(Thread t) {
t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
}

private static interface PrintThreadInfoHelper {
private interface PrintThreadInfoHelper {

void printThreadInfo(PrintStream stream, String title);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -130,7 +131,8 @@ public int run(String[] args) throws Exception {
TableName tableName = TableName.valueOf(args[0]);
int numOps = args.length > 1 ? Integer.parseInt(args[1]) : DEFAULT_NUM_OPS;
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
Threads.newDaemonThreadFactory("AsyncClientExample"));
new ThreadFactoryBuilder().setNameFormat("AsyncClientExample-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
// We use AsyncTable here so we need to provide a separated thread pool. RawAsyncTable does not
// need a thread pool and may have a better performance if you use it correctly as it can save
// some context switches. But if you use RawAsyncTable incorrectly, you may have a very bad
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -100,8 +101,8 @@ public boolean start() {

// Create the thread pool that will execute RPCs
threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
Threads.newDaemonThreadFactory(this.getClass().getSimpleName(),
getUncaughtExceptionHandler()));
new ThreadFactoryBuilder().setNameFormat(this.getClass().getSimpleName() + "-pool-%d")
.setUncaughtExceptionHandler(getUncaughtExceptionHandler()).build());
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -245,8 +246,9 @@ public interface Publisher extends Closeable {
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public static class MulticastPublisher implements Publisher {
private DatagramChannel channel;
private final EventLoopGroup group = new NioEventLoopGroup(
1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher"));
private final EventLoopGroup group = new NioEventLoopGroup(1,
new ThreadFactoryBuilder().setNameFormat("hbase-master-clusterStatusPublisher-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());

public MulticastPublisher() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -679,7 +680,8 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
Threads.newDaemonThreadFactory("StoreFileSplitter-%1$d"));
new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);

// Split each store file.
Expand Down
Loading