Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,10 @@ public class HMaster extends HRegionServer implements MasterServices {

private HbckChore hbckChore;
CatalogJanitor catalogJanitorChore;
private DirScanPool cleanerPool;
// Threadpool for scanning the archive directory, used by the HFileCleaner
private DirScanPool hfileCleanerPool;
// Threadpool for scanning the Old logs directory, used by the LogCleaner
private DirScanPool logCleanerPool;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ReplicationBarrierCleaner replicationBarrierCleaner;
Expand Down Expand Up @@ -1075,7 +1078,8 @@ private void finishActiveMasterInitialization(MonitoredTask status)
(System.currentTimeMillis() - masterActiveTime) / 1000.0f));
this.masterFinishedInitializationTime = System.currentTimeMillis();
configurationManager.registerObserver(this.balancer);
configurationManager.registerObserver(this.cleanerPool);
configurationManager.registerObserver(this.hfileCleanerPool);
configurationManager.registerObserver(this.logCleanerPool);
configurationManager.registerObserver(this.hfileCleaner);
configurationManager.registerObserver(this.logCleaner);
configurationManager.registerObserver(this.regionsRecoveryConfigManager);
Expand Down Expand Up @@ -1416,21 +1420,23 @@ private void startServiceThreads() throws IOException {
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
startProcedureExecutor();

// Create cleaner thread pool
cleanerPool = new DirScanPool(conf);
// Create log cleaner thread pool
logCleanerPool = DirScanPool.getLogCleanerScanPool(conf);
Map<String, Object> params = new HashMap<>();
params.put(MASTER, this);
// Start log cleaner thread
int cleanerInterval =
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool);
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), logCleanerPool);
getChoreService().scheduleChore(logCleaner);

// start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
Map<String, Object> params = new HashMap<>();
params.put(MASTER, this);
// Create archive cleaner thread pool
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
getMasterFileSystem().getFileSystem(), archiveDir, hfileCleanerPool, params);
getChoreService().scheduleChore(hfileCleaner);

// Regions Reopen based on very high storeFileRefCount is considered enabled
Expand Down Expand Up @@ -1483,9 +1489,13 @@ protected void stopServiceThreads() {
this.mobCompactThread.close();
}
super.stopServiceThreads();
if (cleanerPool != null) {
cleanerPool.shutdownNow();
cleanerPool = null;
if (hfileCleanerPool != null) {
hfileCleanerPool.shutdownNow();
hfileCleanerPool = null;
}
if (logCleanerPool != null) {
logCleanerPool.shutdownNow();
logCleanerPool = null;
}

LOG.debug("Stopping service threads");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,19 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();

/**
* If it is an integer and >= 1, it would be the size; if 0.0 < size <= 1.0, size would be
* available processors * size. Pay attention that 1.0 is different from 1, former indicates it
* will use 100% of cores, while latter will use only 1 thread for chore to scan dir.
* Configures the threadpool used for scanning the archive directory for the HFileCleaner If it is
* an integer and >= 1, it would be the size; if 0.0 < size <= 1.0, size would be available
* processors * size. Pay attention that 1.0 is different from 1, former indicates it will use
* 100% of cores, while latter will use only 1 thread for chore to scan dir.
*/
public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
/**
* Configures the threadpool used for scanning the Old logs directory for the LogCleaner Follows
* the same configuration mechanism as CHORE_POOL_SIZE, but has a default of 1 thread.
*/
public static final String LOG_CLEANER_CHORE_SIZE = "hbase.log.cleaner.scan.dir.concurrent.size";
static final String DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE = "1";
/**
* Enable the CleanerChore to sort the subdirectories by consumed space and start the cleaning
* with the largest subdirectory. Enabled by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,48 @@
* The thread pool used for scan directories
*/
@InterfaceAudience.Private
public class DirScanPool implements ConfigurationObserver {
public final class DirScanPool implements ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class);
private volatile int size;
private final ThreadPoolExecutor pool;
private int cleanerLatch;
private boolean reconfigNotification;
private Type dirScanPoolType;
private final String name;

public DirScanPool(Configuration conf) {
String poolSize = conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE);
private enum Type {
LOG_CLEANER(CleanerChore.LOG_CLEANER_CHORE_SIZE,
CleanerChore.DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE),
HFILE_CLEANER(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE);

private final String cleanerPoolSizeConfigName;
private final String cleanerPoolSizeConfigDefault;

private Type(String cleanerPoolSizeConfigName, String cleanerPoolSizeConfigDefault) {
this.cleanerPoolSizeConfigName = cleanerPoolSizeConfigName;
this.cleanerPoolSizeConfigDefault = cleanerPoolSizeConfigDefault;
}
}

private DirScanPool(Configuration conf, Type dirScanPoolType) {
this.dirScanPoolType = dirScanPoolType;
this.name = dirScanPoolType.name().toLowerCase();
String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault);
size = CleanerChore.calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
pool = initializePool(size);
LOG.info("Cleaner pool size is {}", size);
size = size == 0
? CleanerChore.calculatePoolSize(dirScanPoolType.cleanerPoolSizeConfigDefault)
: size;
pool = initializePool(size, name);
LOG.info("{} Cleaner pool size is {}", name, size);
cleanerLatch = 0;
}

private static ThreadPoolExecutor initializePool(int size) {
private static ThreadPoolExecutor initializePool(int size, String name) {
return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES,
new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").setDaemon(true)
new ThreadFactoryBuilder().setNameFormat(name + "-dir-scan-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

Expand All @@ -62,10 +83,11 @@ private static ThreadPoolExecutor initializePool(int size) {
*/
@Override
public synchronized void onConfigurationChange(Configuration conf) {
int newSize = CleanerChore.calculatePoolSize(
conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE));
int newSize = CleanerChore.calculatePoolSize(conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault));
if (newSize == size) {
LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize);
LOG.trace("{} Cleaner Size from configuration is same as previous={}, no need to update.",
name, newSize);
return;
}
size = newSize;
Expand Down Expand Up @@ -108,11 +130,19 @@ synchronized void tryUpdatePoolSize(long timeout) {
break;
}
}
LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size);
LOG.info("Update {} chore's pool size from {} to {}", name, pool.getPoolSize(), size);
pool.setCorePoolSize(size);
}

public int getSize() {
return size;
}

public static DirScanPool getHFileCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.HFILE_CLEANER);
}

public static DirScanPool getLogCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.LOG_CLEANER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public static void setupCluster() throws Exception {
// We don't want the cleaner to remove files. The tests do that.
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true);

POOL = new DirScanPool(UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
}

private static void setupConf(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static void setupCluster() throws Exception {
String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
ZKUtil.createWithParents(watcher, archivingZNode);
rss = mock(RegionServerServices.class);
POOL = new DirScanPool(UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
}

private static void setupConf(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class TestCleanerChore {

@BeforeClass
public static void setup() {
POOL = new DirScanPool(UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
}

@AfterClass
Expand Down Expand Up @@ -470,6 +470,57 @@ public void testOnConfigurationChange() throws Exception {
t.join();
}

@Test
public void testOnConfigurationChangeLogCleaner() throws Exception {
int availableProcessorNum = Runtime.getRuntime().availableProcessors();
if (availableProcessorNum == 1) { // no need to run this test
return;
}

DirScanPool pool = DirScanPool.getLogCleanerScanPool(UTIL.getConfiguration());

// have at least 2 available processors/cores
int initPoolSize = availableProcessorNum / 2;
int changedPoolSize = availableProcessorNum;

Stoppable stop = new StoppableImplementation();
Configuration conf = UTIL.getConfiguration();
Path testDir = UTIL.getDataTestDir();
FileSystem fs = UTIL.getTestFileSystem();
String confKey = "hbase.test.cleaner.delegates";
conf.set(confKey, AlwaysDelete.class.getName());
conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(initPoolSize));
final AllValidPaths chore =
new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, pool);
chore.setEnabled(true);
// Create subdirs under testDir
int dirNums = 6;
Path[] subdirs = new Path[dirNums];
for (int i = 0; i < dirNums; i++) {
subdirs[i] = new Path(testDir, "subdir-" + i);
fs.mkdirs(subdirs[i]);
}
// Under each subdirs create 6 files
for (Path subdir : subdirs) {
createFiles(fs, subdir, 6);
}
// Start chore
Thread t = new Thread(new Runnable() {
@Override
public void run() {
chore.chore();
}
});
t.setDaemon(true);
t.start();
// Change size of chore's pool
conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(changedPoolSize));
pool.onConfigurationChange(conf);
assertEquals(changedPoolSize, chore.getChorePoolSize());
// Stop chore
t.join();
}

@Test
public void testMinimumNumberOfThreads() throws Exception {
Configuration conf = UTIL.getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class TestHFileCleaner {
public static void setupCluster() throws Exception {
// have to use a minidfs cluster because the localfs doesn't modify file times correctly
UTIL.startMiniDFSCluster(1);
POOL = new DirScanPool(UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class TestHFileLinkCleaner {

@BeforeClass
public static void setUp() {
POOL = new DirScanPool(TEST_UTIL.getConfiguration());
POOL = DirScanPool.getHFileCleanerScanPool(TEST_UTIL.getConfiguration());
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class TestLogsCleaner {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
TEST_UTIL.startMiniDFSCluster(1);
POOL = new DirScanPool(TEST_UTIL.getConfiguration());
POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public class MasterRegionTestBase {

protected ChoreService choreService;

protected DirScanPool cleanerPool;
protected DirScanPool hfileCleanerPool;

protected DirScanPool logCleanerPool;

protected static byte[] CF1 = Bytes.toBytes("f1");

Expand Down Expand Up @@ -87,7 +89,8 @@ public void setUp() throws IOException {
protected void createMasterRegion() throws IOException {
configure(htu.getConfiguration());
choreService = new ChoreService(getClass().getSimpleName());
cleanerPool = new DirScanPool(htu.getConfiguration());
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(htu.getConfiguration());
logCleanerPool = DirScanPool.getLogCleanerScanPool(htu.getConfiguration());
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(htu.getConfiguration());
when(server.getServerName())
Expand All @@ -110,7 +113,8 @@ protected void createMasterRegion() throws IOException {
@After
public void tearDown() throws IOException {
region.close(true);
cleanerPool.shutdownNow();
hfileCleanerPool.shutdownNow();
logCleanerPool.shutdownNow();
choreService.shutdown();
htu.cleanupTestDir();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void stop(String why) {
public boolean isStopped() {
return stopped;
}
}, conf, fs, globalArchivePath, cleanerPool);
}, conf, fs, globalArchivePath, hfileCleanerPool);
choreService.scheduleChore(hfileCleaner);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void stop(String why) {
public boolean isStopped() {
return stopped;
}
}, conf, fs, globalWALArchiveDir, cleanerPool);
}, conf, fs, globalWALArchiveDir, logCleanerPool);
choreService.scheduleChore(logCleaner);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void testDisableSnapshotAndNotDeleteBackReference() throws Exception {

// Initialize cleaner
HFileCleaner cleaner = new HFileCleaner(10000, Mockito.mock(Stoppable.class), conf, fs,
archiveDir, new DirScanPool(UTIL.getConfiguration()));
archiveDir, DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()));
// Link backref and HFile cannot be removed
cleaner.choreForTesting();
assertTrue(fs.exists(linkBackRef));
Expand Down