-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-22871 Move the DirScanPool out and do not use static field #504
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,19 +26,15 @@ | |
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ForkJoinPool; | ||
| import java.util.concurrent.ForkJoinTask; | ||
| import java.util.concurrent.RecursiveTask; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.FileStatus; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; | ||
| import org.apache.hadoop.hbase.ScheduledChore; | ||
| import org.apache.hadoop.hbase.Stoppable; | ||
| import org.apache.hadoop.hbase.conf.ConfigurationObserver; | ||
| import org.apache.hadoop.hbase.util.FSUtils; | ||
| import org.apache.hadoop.ipc.RemoteException; | ||
| import org.apache.yetus.audience.InterfaceAudience; | ||
|
|
@@ -56,11 +52,8 @@ | |
| * Abstract Cleaner that uses a chain of delegates to clean a directory of files | ||
| * @param <T> Cleaner delegate class that is dynamically loaded from configuration | ||
| */ | ||
| @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", | ||
| justification="Static pool will be only updated once.") | ||
| @InterfaceAudience.Private | ||
| public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore | ||
| implements ConfigurationObserver { | ||
| public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class); | ||
| private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); | ||
|
|
@@ -72,84 +65,9 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu | |
| * while latter will use only 1 thread for chore to scan dir. | ||
| */ | ||
| public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; | ||
| private static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; | ||
|
|
||
| private static class DirScanPool { | ||
| int size; | ||
| ForkJoinPool pool; | ||
| int cleanerLatch; | ||
| AtomicBoolean reconfigNotification; | ||
|
|
||
| DirScanPool(Configuration conf) { | ||
| String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE); | ||
| size = calculatePoolSize(poolSize); | ||
| // poolSize may be 0 or 0.0 from a careless configuration, | ||
| // double check to make sure. | ||
| size = size == 0 ? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : size; | ||
| pool = new ForkJoinPool(size); | ||
| LOG.info("Cleaner pool size is {}", size); | ||
| reconfigNotification = new AtomicBoolean(false); | ||
| cleanerLatch = 0; | ||
| } | ||
|
|
||
| /** | ||
| * Checks if pool can be updated. If so, mark for update later. | ||
| * @param conf configuration | ||
| */ | ||
| synchronized void markUpdate(Configuration conf) { | ||
| int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE)); | ||
| if (newSize == size) { | ||
| LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize); | ||
| return; | ||
| } | ||
| size = newSize; | ||
| // Chore is working, update it later. | ||
| reconfigNotification.set(true); | ||
| } | ||
|
|
||
| /** | ||
| * Update pool with new size. | ||
| */ | ||
| synchronized void updatePool(long timeout) { | ||
| long stopTime = System.currentTimeMillis() + timeout; | ||
| while (cleanerLatch != 0 && timeout > 0) { | ||
| try { | ||
| wait(timeout); | ||
| timeout = stopTime - System.currentTimeMillis(); | ||
| } catch (InterruptedException ie) { | ||
| Thread.currentThread().interrupt(); | ||
| break; | ||
| } | ||
| } | ||
| shutDownNow(); | ||
| LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size); | ||
| pool = new ForkJoinPool(size); | ||
| } | ||
| static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; | ||
|
|
||
| synchronized void latchCountUp() { | ||
| cleanerLatch++; | ||
| } | ||
|
|
||
| synchronized void latchCountDown() { | ||
| cleanerLatch--; | ||
| notifyAll(); | ||
| } | ||
|
|
||
| @SuppressWarnings("FutureReturnValueIgnored") | ||
| synchronized void submit(ForkJoinTask task) { | ||
| pool.submit(task); | ||
| } | ||
|
|
||
| synchronized void shutDownNow() { | ||
| if (pool == null || pool.isShutdown()) { | ||
| return; | ||
| } | ||
| pool.shutdownNow(); | ||
| } | ||
| } | ||
| // It may be waste resources for each cleaner chore own its pool, | ||
| // so let's make pool for all cleaner chores. | ||
| private static volatile DirScanPool POOL; | ||
| private final DirScanPool pool; | ||
|
|
||
| protected final FileSystem fs; | ||
| private final Path oldFileDir; | ||
|
|
@@ -158,22 +76,9 @@ synchronized void shutDownNow() { | |
| private final AtomicBoolean enabled = new AtomicBoolean(true); | ||
| protected List<T> cleanersChain; | ||
|
|
||
| public static void initChorePool(Configuration conf) { | ||
| if (POOL == null) { | ||
| POOL = new DirScanPool(conf); | ||
| } | ||
| } | ||
|
|
||
| public static void shutDownChorePool() { | ||
| if (POOL != null) { | ||
| POOL.shutDownNow(); | ||
| POOL = null; | ||
| } | ||
| } | ||
|
|
||
| public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, | ||
| FileSystem fs, Path oldFileDir, String confKey) { | ||
| this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null); | ||
| FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) { | ||
| this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -184,14 +89,15 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi | |
| * @param fs handle to the FS | ||
| * @param oldFileDir the path to the archived files | ||
| * @param confKey configuration key for the classes to instantiate | ||
| * @param pool the thread pool used to scan directories | ||
| * @param params members could be used in cleaner | ||
| */ | ||
| public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, | ||
| FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) { | ||
| FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) { | ||
| super(name, s, sleepPeriod); | ||
|
|
||
| Preconditions.checkNotNull(POOL, "Chore's pool isn't initialized, please call" | ||
| + "CleanerChore.initChorePool(Configuration) before new a cleaner chore."); | ||
| Preconditions.checkNotNull(pool, "Chore's pool can not be null"); | ||
| this.pool = pool; | ||
| this.fs = fs; | ||
| this.oldFileDir = oldFileDir; | ||
| this.conf = conf; | ||
|
|
@@ -255,11 +161,6 @@ private void initCleanerChain(String confKey) { | |
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onConfigurationChange(Configuration conf) { | ||
| POOL.markUpdate(conf); | ||
| } | ||
|
|
||
| /** | ||
| * A utility method to create new instances of LogCleanerDelegate based on the class name of the | ||
| * LogCleanerDelegate. | ||
|
|
@@ -287,22 +188,20 @@ private T newFileCleaner(String className, Configuration conf) { | |
| protected void chore() { | ||
| if (getEnabled()) { | ||
| try { | ||
| POOL.latchCountUp(); | ||
| pool.latchCountUp(); | ||
| if (runCleaner()) { | ||
| LOG.trace("Cleaned all WALs under {}", oldFileDir); | ||
| } else { | ||
| LOG.trace("WALs outstanding under {}", oldFileDir); | ||
| } | ||
| } finally { | ||
| POOL.latchCountDown(); | ||
| pool.latchCountDown(); | ||
| } | ||
| // After each cleaner chore, checks if received reconfigure notification while cleaning. | ||
| // First in cleaner turns off notification, to avoid another cleaner updating pool again. | ||
| if (POOL.reconfigNotification.compareAndSet(true, false)) { | ||
| // This cleaner is waiting for other cleaners finishing their jobs. | ||
| // To avoid missing next chore, only wait 0.8 * period, then shutdown. | ||
| POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); | ||
| } | ||
| // This cleaner is waiting for other cleaners finishing their jobs. | ||
| // To avoid missing next chore, only wait 0.8 * period, then shutdown. | ||
| pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); | ||
| } else { | ||
| LOG.trace("Cleaner chore disabled! Not cleaning."); | ||
| } | ||
|
|
@@ -315,7 +214,7 @@ private void preRunCleaner() { | |
| public Boolean runCleaner() { | ||
| preRunCleaner(); | ||
| CleanerTask task = new CleanerTask(this.oldFileDir, true); | ||
| POOL.submit(task); | ||
| pool.execute(task); | ||
| return task.join(); | ||
| } | ||
|
|
||
|
|
@@ -467,7 +366,7 @@ public synchronized void cleanup() { | |
|
|
||
| @VisibleForTesting | ||
| int getChorePoolSize() { | ||
| return POOL.size; | ||
| return pool.getSize(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -485,10 +384,13 @@ private interface Action<T> { | |
| } | ||
|
|
||
| /** | ||
| * Attemps to clean up a directory, its subdirectories, and files. | ||
| * Return value is true if everything was deleted. false on partial / total failures. | ||
| * Attemps to clean up a directory, its subdirectories, and files. Return value is true if | ||
| * everything was deleted. false on partial / total failures. | ||
| */ | ||
| private class CleanerTask extends RecursiveTask<Boolean> { | ||
| private final class CleanerTask extends RecursiveTask<Boolean> { | ||
|
|
||
| private static final long serialVersionUID = -5444212174088754172L; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: do we really need this serialVersiolUID ? I don't see any serialization...
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The RecursiveTask implements Serializable so we need a serialVersionUID, otherwise there will be a warning. |
||
|
|
||
| private final Path dir; | ||
| private final boolean root; | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it a static config key & default value ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is just a format change? Can do it in another issue if need as it is not introduced by the patch here.