Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,20 @@ public static Path getStagingDir(Configuration conf)
throws IOException, InterruptedException {
return JobSubmissionFiles.getStagingDir(new Cluster(conf), conf);
}

/**
* Initializes the staging directory and returns the qualified path.
*
* @param conf conf system configuration
* @return qualified staging directory path
* @throws IOException if the ownership on the staging directory is not as expected
* @throws InterruptedException if the thread getting the staging directory is interrupted
*/
public static Path getQualifiedStagingDir(Configuration conf)
throws IOException, InterruptedException {
Cluster cluster = new Cluster(conf);
Path stagingDir = JobSubmissionFiles.getStagingDir(cluster, conf);
return cluster.getFileSystem().makeQualified(stagingDir);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
Expand All @@ -57,6 +58,7 @@
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
Expand All @@ -76,7 +78,6 @@
public class CompactionTool extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(CompactionTool.class);

private final static String CONF_TMP_DIR = "hbase.tmp.dir";
private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
Expand All @@ -89,12 +90,10 @@ private static class CompactionWorker {
private final boolean deleteCompacted;
private final Configuration conf;
private final FileSystem fs;
private final Path tmpDir;

public CompactionWorker(final FileSystem fs, final Configuration conf) {
this.conf = conf;
this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
this.fs = fs;
}

Expand All @@ -105,7 +104,8 @@ public CompactionWorker(final FileSystem fs, final Configuration conf) {
* @param compactOnce Execute just a single step of compaction.
* @param major Request major compaction.
*/
public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException {
public void compact(final Path path, final boolean compactOnce, final boolean major)
throws IOException {
if (isFamilyDir(fs, path)) {
Path regionDir = path.getParent();
Path tableDir = regionDir.getParent();
Expand Down Expand Up @@ -150,7 +150,7 @@ private void compactRegion(final Path tableDir, final TableDescriptor htd,
private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
final RegionInfo hri, final String familyName, final boolean compactOnce,
final boolean major) throws IOException {
HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir);
HStore store = getStore(conf, fs, tableDir, htd, hri, familyName);
LOG.info("Compact table=" + htd.getTableName() +
" region=" + hri.getRegionNameAsString() +
" family=" + familyName);
Expand All @@ -177,19 +177,10 @@ private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
store.close();
}

/**
* Create a "mock" HStore that uses the tmpDir specified by the user and
* the store dir to compact as source.
*/
private static HStore getStore(final Configuration conf, final FileSystem fs,
final Path tableDir, final TableDescriptor htd, final RegionInfo hri,
final String familyName, final Path tempDir) throws IOException {
HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) {
@Override
public Path getTempDir() {
return tempDir;
}
};
final String familyName) throws IOException {
HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri);
HRegion region = new HRegion(regionFs, null, conf, htd, null);
return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
}
Expand Down Expand Up @@ -221,7 +212,7 @@ public void setup(Context context) {
major = conf.getBoolean(CONF_COMPACT_MAJOR, false);

try {
FileSystem fs = FileSystem.get(conf);
FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);
this.compactor = new CompactionWorker(fs, conf);
} catch (IOException e) {
throw new RuntimeException("Could not get the input FileSystem", e);
Expand Down Expand Up @@ -301,23 +292,19 @@ private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
* The file is a TextFile with each line corrisponding to a
* store files directory to compact.
*/
public static void createInputFile(final FileSystem fs, final Path path,
final Set<Path> toCompactDirs) throws IOException {
public static List<Path> createInputFile(final FileSystem fs, final FileSystem stagingFs,
final Path path, final Set<Path> toCompactDirs) throws IOException {
// Extract the list of store dirs
List<Path> storeDirs = new LinkedList<>();
for (Path compactDir: toCompactDirs) {
if (isFamilyDir(fs, compactDir)) {
storeDirs.add(compactDir);
} else if (isRegionDir(fs, compactDir)) {
for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
storeDirs.add(familyDir);
}
storeDirs.addAll(FSUtils.getFamilyDirs(fs, compactDir));
} else if (isTableDir(fs, compactDir)) {
// Lookup regions
for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
storeDirs.add(familyDir);
}
storeDirs.addAll(FSUtils.getFamilyDirs(fs, regionDir));
}
} else {
throw new IOException(
Expand All @@ -326,7 +313,7 @@ public static void createInputFile(final FileSystem fs, final Path path,
}

// Write Input File
FSDataOutputStream stream = fs.create(path);
FSDataOutputStream stream = stagingFs.create(path);
LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
try {
final byte[] newLine = Bytes.toBytes("\n");
Expand All @@ -337,6 +324,7 @@ public static void createInputFile(final FileSystem fs, final Path path,
} finally {
stream.close();
}
return storeDirs;
}
}

Expand All @@ -361,15 +349,20 @@ private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
// add dependencies (including HBase ones)
TableMapReduceUtil.addDependencyJars(job);

Path stagingDir = JobUtil.getStagingDir(conf);
Path stagingDir = JobUtil.getQualifiedStagingDir(conf);
FileSystem stagingFs = stagingDir.getFileSystem(conf);
try {
// Create input file with the store dirs
Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
List<Path> storeDirs = CompactionInputFormat.createInputFile(fs, stagingFs,
inputPath, toCompactDirs);
CompactionInputFormat.addInputPath(job, inputPath);

// Initialize credential for secure cluster
TableMapReduceUtil.initCredentials(job);
// Despite the method name this will get delegation token for the filesystem
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
storeDirs.toArray(new Path[0]), conf);

// Start the MR Job and wait
return job.waitForCompletion(true) ? 0 : 1;
Expand Down Expand Up @@ -398,7 +391,7 @@ public int run(String[] args) throws Exception {
boolean mapred = false;

Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);

try {
for (int i = 0; i < args.length; ++i) {
Expand Down Expand Up @@ -458,14 +451,15 @@ private void printUsage(final String message) {
System.err.println("Note: -D properties will be applied to the conf used. ");
System.err.println("For example: ");
System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
System.err.println();
System.err.println("Examples:");
System.err.println(" To compact the full 'TestTable' using MapReduce:");
System.err.println(" $ hbase " + this.getClass().getName() + " -mapred hdfs://hbase/data/default/TestTable");
System.err.println(" $ hbase " + this.getClass().getName() +
" -mapred hdfs://hbase/data/default/TestTable");
System.err.println();
System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
System.err.println(" $ hbase " + this.getClass().getName() + " hdfs://hbase/data/default/TestTable/abc/x");
System.err.println(" $ hbase " + this.getClass().getName() +
" hdfs://hbase/data/default/TestTable/abc/x");
}

public static void main(String[] args) throws Exception {
Expand Down