Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -64,7 +65,8 @@ public LogRollBackupSubprocedurePool(String name, Configuration conf) {
this.name = name;
executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-backup-pool-%d").build());
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-backup-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
taskPool = new ExecutorCompletionService<>(executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -76,7 +77,8 @@ class AsyncConnectionImpl implements AsyncConnection {

@VisibleForTesting
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").build(), 10,
new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10,
TimeUnit.MILLISECONDS);

private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
Expand Down Expand Up @@ -180,7 +181,7 @@ class MulticastListener implements Listener {
private DatagramChannel channel;
private final EventLoopGroup group = new NioEventLoopGroup(1,
new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d")
.build());
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());

public MulticastListener() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -91,12 +92,14 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);

protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").build(), 10,
new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10,
TimeUnit.MILLISECONDS);

private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").build());
new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());

protected boolean running = true; // if client runs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -77,8 +78,8 @@ class NettyRpcConnection extends RpcConnection {
private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);

private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d").build());
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());

private final NettyRpcClient rpcClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,16 +44,9 @@
@InterfaceAudience.Private
public class Threads {
private static final Logger LOG = LoggerFactory.getLogger(Threads.class);
private static final AtomicInteger poolNumber = new AtomicInteger(1);

public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.warn("Thread:" + t + " exited with Exception:"
+ StringUtils.stringifyException(e));
}
};
(t, e) -> LOG.warn("Thread:{} exited with Exception:{}", t, StringUtils.stringifyException(e));

/**
* Utility method that sets name, daemon status and starts passed thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -130,7 +131,8 @@ public int run(String[] args) throws Exception {
TableName tableName = TableName.valueOf(args[0]);
int numOps = args.length > 1 ? Integer.parseInt(args[1]) : DEFAULT_NUM_OPS;
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
new ThreadFactoryBuilder().setNameFormat("AsyncClientExample-pool-%d").build());
new ThreadFactoryBuilder().setNameFormat("AsyncClientExample-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
// We use AsyncTable here so we need to provide a separated thread pool. RawAsyncTable does not
// need a thread pool and may have a better performance if you use it correctly as it can save
// some context switches. But if you use RawAsyncTable incorrectly, you may have a very bad
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.hbase.chaos.actions.Action;
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

Expand All @@ -42,7 +43,8 @@ public TwoConcurrentActionPolicy(long sleepTime, Action[] actionsOne, Action[] a
this.actionsOne = actionsOne;
this.actionsTwo = actionsTwo;
executor = Executors.newFixedThreadPool(2,
new ThreadFactoryBuilder().setNameFormat("TwoConcurrentAction-pool-%d").build());
new ThreadFactoryBuilder().setNameFormat("TwoConcurrentAction-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -62,7 +63,8 @@ public void start() {
this.getClass().getSimpleName(), handlerCount, maxQueueLength);
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(maxQueueLength),
new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d").build(),
new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
Expand Down Expand Up @@ -73,12 +74,14 @@ public void start() {
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(maxQueueLength),
new ThreadFactoryBuilder().setNameFormat("MasterFifoRpcScheduler.call.handler-pool-%d")
.build(), new ThreadPoolExecutor.CallerRunsPolicy());
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
this.rsReportExecutor =
new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(rsRsreportMaxQueueLength),
new ThreadFactoryBuilder().setNameFormat("MasterFifoRpcScheduler.RSReport.handler-pool-%d")
.build(), new ThreadPoolExecutor.CallerRunsPolicy());
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -247,7 +248,7 @@ public static class MulticastPublisher implements Publisher {
private DatagramChannel channel;
private final EventLoopGroup group = new NioEventLoopGroup(1,
new ThreadFactoryBuilder().setNameFormat("hbase-master-clusterStatusPublisher-pool-%d")
.build());
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());

public MulticastPublisher() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -679,7 +680,8 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d").build());
new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);

// Split each store file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public DirScanPool(Configuration conf) {

private static ThreadPoolExecutor initializePool(int size) {
return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES,
new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").build());
new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
Expand Down Expand Up @@ -65,7 +66,7 @@ private NamedQueueRecorder(Configuration conf) {
this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
getEventCount(eventCount),
new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d")
.build(),
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
ProducerType.MULTI,
new BlockingWaitStrategy());
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -114,7 +115,7 @@ public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("(" + coordName + ")-proc-coordinator-pool-%d")
.build());
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -88,7 +89,7 @@ public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("member: '" + memberName + "' subprocedure-pool-%d")
.build());
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ static class FlushTableSubprocedurePool {
int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
this.name = name;
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-flush-proc-pool-%d").build());
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-flush-proc-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
taskPool = new ExecutorCompletionService<>(executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ static class SnapshotSubprocedurePool {
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
this.name = name;
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-snapshot-pool-%d").build());
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-snapshot-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
taskPool = new ExecutorCompletionService<>(executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
Expand Down Expand Up @@ -242,7 +243,8 @@ public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
// spinning as other strategies do.
this.disruptor = new Disruptor<>(RingBufferTruck::new,
getPreallocatedEventCount(),
new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".append-pool-%d").build(),
new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".append-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
ProducerType.MULTI, new BlockingWaitStrategy());
// Advance the ring buffer sequence so that it starts from 1 instead of 0,
// because SyncFuture.NOT_DONE = 0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
Expand Down Expand Up @@ -69,7 +70,8 @@ public ZKPermissionWatcher(ZKWatcher watcher,
String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent);
executor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("zk-permission-watcher-pool-%d").build());
new ThreadFactoryBuilder().setNameFormat("zk-permission-watcher-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

public void start() throws KeeperException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,8 @@ private ThreadPoolExecutor createExecutor(final String name) {
public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d").build());
new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

/**
Expand Down
Loading