diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index 2e480e6cb7df..efc94ecddc60 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -420,6 +420,19 @@ private static boolean isValidWALRootDir(Path walDir, final Configuration c) thr return true; } + public static Path constructWALRegionDirFromRegionInfo(final Configuration conf, + final TableName tableName, final String encodedRegionName) + throws IOException { + return new Path(getWALTableDir(conf, tableName), + encodedRegionName); + } + + public static Path getWALTableDir(final Configuration conf, final TableName tableName) + throws IOException { + return new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()), + tableName.getQualifierAsString()); + } + /** * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under * path rootdir diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java index 580b9a96d480..907a3cf57e3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java @@ -477,7 +477,7 @@ private void checkClosedRegions(final MasterProcedureEnv env) throws IOException */ private void checkClosedRegion(final MasterProcedureEnv env, RegionInfo regionInfo) throws IOException { - if (WALSplitter.hasRecoveredEdits(env.getMasterServices().getFileSystem(), + if (WALSplitter.hasRecoveredEdits(env.getMasterServices().getMasterWalManager().getFileSystem(), env.getMasterConfiguration(), regionInfo)) { throw new IOException("Recovered.edits are found in Region: " + regionInfo + ", abort merge to prevent data loss"); @@ -795,14 +795,16 @@ private ServerName getServerName(final MasterProcedureEnv env) { } private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException { - FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem(); + FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem(); long maxSequenceId = -1L; for (RegionInfo region : regionsToMerge) { maxSequenceId = - Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, region))); + Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId( + walFS, getWALRegionDir(env, region))); } if (maxSequenceId > 0) { - WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, mergedRegion), maxSequenceId); + WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion), + maxSequenceId); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java index aeef835dec7e..87263f3bcbfd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderType; @@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.yetus.audience.InterfaceAudience; @@ -216,9 +218,10 @@ private void updateRegionLocation(RegionInfo regionInfo, State state, Put put) } private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException { - MasterFileSystem mfs = master.getMasterFileSystem(); + FileSystem walFS = master.getMasterWalManager().getFileSystem(); long maxSeqId = - WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); + WALSplitter.getMaxRegionSequenceId(walFS, FSUtils.constructWALRegionDirFromRegionInfo( + master.getConfiguration(), region.getTable(), region.getEncodedName())); return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java index f36b4c5e3e1d..3455367523bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java @@ -146,7 +146,7 @@ public SplitTableRegionProcedure(final MasterProcedureEnv env, * @throws IOException IOException */ private void checkClosedRegion(final MasterProcedureEnv env) throws IOException { - if (WALSplitter.hasRecoveredEdits(env.getMasterServices().getFileSystem(), + if (WALSplitter.hasRecoveredEdits(env.getMasterServices().getMasterWalManager().getFileSystem(), env.getMasterConfiguration(), getRegion())) { throw new IOException("Recovered.edits are found in Region: " + getRegion() + ", abort split to prevent data loss"); @@ -851,12 +851,14 @@ private int getRegionReplication(final MasterProcedureEnv env) throws IOExceptio } private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException { - FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem(); + FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem(); long maxSequenceId = - WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, getParentRegion())); + WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion())); if (maxSequenceId > 0) { - WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_1_RI), maxSequenceId); - WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_2_RI), maxSequenceId); + WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_1_RI), + maxSequenceId); + WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_2_RI), + maxSequenceId); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java index 50a01498441b..b4c33d804229 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.yetus.audience.InterfaceAudience; /** @@ -134,6 +135,12 @@ protected final Path getRegionDir(MasterProcedureEnv env, RegionInfo region) thr return env.getMasterServices().getMasterFileSystem().getRegionDir(region); } + protected final Path getWALRegionDir(MasterProcedureEnv env, RegionInfo region) + throws IOException { + return FSUtils.constructWALRegionDirFromRegionInfo(env.getMasterConfiguration(), + region.getTable(), region.getEncodedName()); + } + /** * Check that cluster is up and master is running. Check table is modifiable. * If enabled, check table is enabled else check it is disabled. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index 685a73e1e7f3..fcd091fce75f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; @@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -113,13 +115,13 @@ protected Flow executeFromState(final MasterProcedureEnv env, final DisableTable case DISABLE_TABLE_ADD_REPLICATION_BARRIER: if (env.getMasterServices().getTableDescriptors().get(tableName) .hasGlobalReplicationScope()) { - MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem(); try (BufferedMutator mutator = env.getMasterServices().getConnection() .getBufferedMutator(TableName.META_TABLE_NAME)) { for (RegionInfo region : env.getAssignmentManager().getRegionStates() .getRegionsOfTable(tableName)) { long maxSequenceId = - WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); + WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region)); long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM; mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum, EnvironmentEdgeManager.currentTime())); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2d57343013a2..a6a62041a15c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -330,6 +330,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final int rowLockWaitDuration; static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; + private Path regionDir; + private FileSystem walFS; + // The internal wait duration to acquire a lock before read/update // from the region. It is not per row. The purpose of this wait time // is to avoid waiting a long time while the region is busy, so that @@ -926,7 +929,7 @@ private long initializeRegionInternals(final CancelableProgressable reporter, stores.forEach(HStore::startReplayingFromWAL); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, - replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status)); // Make sure mvcc is up to max. this.mvcc.advanceTo(maxSeqId); } finally { @@ -972,14 +975,14 @@ private long initializeRegionInternals(final CancelableProgressable reporter, // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). long maxSeqIdFromFile = - WALSplitter.getMaxRegionSequenceId(fs.getFileSystem(), fs.getRegionDir()); + WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDir()); long nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1; // The openSeqNum will always be increase even for read only region, as we rely on it to // determine whether a region has been successfully reopend, so here we always need to update // the max sequence id file. if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) { LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName()); - WALSplitter.writeRegionSequenceIdFile(fs.getFileSystem(), fs.getRegionDir(), nextSeqId - 1); + WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1); } LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqId); @@ -1130,8 +1133,8 @@ RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint // Store SeqId in HDFS when a region closes // checking region folder exists is due to many tests which delete the table folder while a // table is still online - if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { - WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), + if (getWalFileSystem().exists(getWALRegionDir())) { + WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), mvcc.getReadPoint()); } } @@ -1855,6 +1858,27 @@ public HRegionFileSystem getRegionFileSystem() { return this.fs; } + public HRegionFileSystem getRegionWALFileSystem() throws IOException { + return new HRegionFileSystem(conf, getWalFileSystem(), + FSUtils.getWALTableDir(conf, htableDescriptor.getTableName()), fs.getRegionInfo()); + } + + public FileSystem getWalFileSystem() throws IOException { + if (walFS == null) { + walFS = FSUtils.getWALFileSystem(conf); + } + return walFS; + } + + @VisibleForTesting + public Path getWALRegionDir() throws IOException { + if (regionDir == null) { + regionDir = FSUtils.constructWALRegionDirFromRegionInfo(conf, getRegionInfo().getTable(), + getRegionInfo().getEncodedName()); + } + return regionDir; + } + @Override public long getEarliestFlushTimeForAllStores() { return Collections.min(lastStoreFlushTimeMap.values()); @@ -4483,8 +4507,7 @@ private boolean isFlushSize(MemStoreSize size) { * recovered edits log or minSeqId if nothing added from editlogs. * @throws IOException */ - protected long replayRecoveredEditsIfAny(final Path regiondir, - Map maxSeqIdInStores, + protected long replayRecoveredEditsIfAny(Map maxSeqIdInStores, final CancelableProgressable reporter, final MonitoredTask status) throws IOException { long minSeqIdForTheRegion = -1; @@ -4495,21 +4518,23 @@ protected long replayRecoveredEditsIfAny(final Path regiondir, } long seqid = minSeqIdForTheRegion; - FileSystem fs = this.fs.getFileSystem(); - NavigableSet files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); + FileSystem walFS = getWalFileSystem(); + Path regionDir = getWALRegionDir(); + + NavigableSet files = WALSplitter.getSplitEditFilesSorted(walFS, regionDir); if (LOG.isDebugEnabled()) { LOG.debug("Found " + (files == null ? 0 : files.size()) - + " recovered edits file(s) under " + regiondir); + + " recovered edits file(s) under " + regionDir); } if (files == null || files.isEmpty()) return seqid; for (Path edits: files) { - if (edits == null || !fs.exists(edits)) { + if (edits == null || !walFS.exists(edits)) { LOG.warn("Null or non-existent edits file: " + edits); continue; } - if (isZeroLengthThenDelete(fs, edits)) continue; + if (isZeroLengthThenDelete(walFS, edits)) continue; long maxSeqId; String fileName = edits.getName(); @@ -4540,7 +4565,7 @@ protected long replayRecoveredEditsIfAny(final Path regiondir, HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead."); } if (skipErrors) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); + Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits); LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + "=true so continuing. Renamed " + edits + " as " + p, e); @@ -4563,16 +4588,16 @@ protected long replayRecoveredEditsIfAny(final Path regiondir, // For debugging data loss issues! // If this flag is set, make use of the hfile archiving by making recovered.edits a fake // column family. Have to fake out file type too by casting our recovered.edits as storefiles - String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName(); + String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regionDir).getName(); Set fakeStoreFiles = new HashSet<>(files.size()); for (Path file: files) { fakeStoreFiles.add( - new HStoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, null, null, true)); + new HStoreFile(walFS, file, this.conf, null, null, true)); } - getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles); + getRegionWALFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles); } else { for (Path file: files) { - if (!fs.delete(file, false)) { + if (!walFS.delete(file, false)) { LOG.error("Failed delete of " + file); } else { LOG.debug("Deleted recovered.edits file=" + file); @@ -4597,12 +4622,12 @@ private long replayRecoveredEdits(final Path edits, String msg = "Replaying edits from " + edits; LOG.info(msg); MonitoredTask status = TaskMonitor.get().createStatus(msg); - FileSystem fs = this.fs.getFileSystem(); + FileSystem walFS = getWalFileSystem(); status.setStatus("Opening recovered edits"); WAL.Reader reader = null; try { - reader = WALFactory.createReader(fs, edits, conf); + reader = WALFactory.createReader(walFS, edits, conf); long currentEditSeqId = -1; long currentReplaySeqId = -1; long firstSeqIdInLog = -1; @@ -4754,7 +4779,7 @@ private long replayRecoveredEdits(final Path edits, coprocessorHost.postReplayWALs(this.getRegionInfo(), edits); } } catch (EOFException eof) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); + Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits); msg = "EnLongAddered EOF. Most likely due to Master failure during " + "wal splitting, so we have this data in another edit. " + "Continuing, but renaming " + edits + " as " + p; @@ -4764,7 +4789,7 @@ private long replayRecoveredEdits(final Path edits, // If the IOE resulted from bad file format, // then this problem is idempotent and retrying won't help if (ioe.getCause() instanceof ParseException) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); + Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits); msg = "File corruption enLongAddered! " + "Continuing, but renaming " + edits + " as " + p; LOG.warn(msg, ioe); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 65d5fb7ee256..d3f7182edebd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -117,7 +117,7 @@ public class WALSplitter { // Parameters for split process protected final Path walDir; - protected final FileSystem fs; + protected final FileSystem walFS; protected final Configuration conf; // Major subcomponents of the split process. @@ -150,14 +150,14 @@ public class WALSplitter { @VisibleForTesting WALSplitter(final WALFactory factory, Configuration conf, Path walDir, - FileSystem fs, LastSequenceId idChecker, + FileSystem walFS, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.walDir = walDir; - this.fs = fs; + this.walFS = walFS; this.sequenceIdChecker = idChecker; this.splitLogWorkerCoordination = splitLogWorkerCoordination; @@ -187,11 +187,11 @@ public class WALSplitter { *

* @return false if it is interrupted by the progress-able. */ - public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem fs, + public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory) throws IOException { - WALSplitter s = new WALSplitter(factory, conf, walDir, fs, idChecker, + WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, splitLogWorkerCoordination); return s.splitLogFile(logfile, reporter); } @@ -202,13 +202,13 @@ public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem f // which uses this method to do log splitting. @VisibleForTesting public static List split(Path rootDir, Path logDir, Path oldLogDir, - FileSystem fs, Configuration conf, final WALFactory factory) throws IOException { + FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException { final FileStatus[] logfiles = SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null); List splits = new ArrayList<>(); if (ArrayUtils.isNotEmpty(logfiles)) { for (FileStatus logfile: logfiles) { - WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null); + WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null); if (s.splitLogFile(logfile, null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { @@ -217,7 +217,7 @@ public static List split(Path rootDir, Path logDir, Path oldLogDir, } } } - if (!fs.delete(logDir, true)) { + if (!walFS.delete(logDir, true)) { throw new IOException("Unable to delete src dir: " + logDir); } return splits; @@ -323,10 +323,10 @@ boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws LOG.warn("Could not parse, corrupted WAL={}", logPath, e); if (splitLogWorkerCoordination != null) { // Some tests pass in a csm of null. - splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), fs); + splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); } else { // for tests only - ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), fs); + ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); } isCorrupted = true; } catch (IOException e) { @@ -374,31 +374,30 @@ boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws */ public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException { - Path rootdir = FSUtils.getWALRootDir(conf); - Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); + Path walDir = FSUtils.getWALRootDir(conf); + Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME); Path logPath; - if (FSUtils.isStartingWithPath(rootdir, logfile)) { + if (FSUtils.isStartingWithPath(walDir, logfile)) { logPath = new Path(logfile); } else { - logPath = new Path(rootdir, logfile); + logPath = new Path(walDir, logfile); } - finishSplitLogFile(rootdir, oldLogDir, logPath, conf); + finishSplitLogFile(walDir, oldLogDir, logPath, conf); } - private static void finishSplitLogFile(Path rootdir, Path oldLogDir, + private static void finishSplitLogFile(Path walDir, Path oldLogDir, Path logPath, Configuration conf) throws IOException { List processedLogs = new ArrayList<>(); List corruptedLogs = new ArrayList<>(); - FileSystem fs; - fs = rootdir.getFileSystem(conf); - if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) { + FileSystem walFS = walDir.getFileSystem(conf); + if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) { corruptedLogs.add(logPath); } else { processedLogs.add(logPath); } - archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); - Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName()); - fs.delete(stagingDir, true); + archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf); + Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName()); + walFS.delete(stagingDir, true); } /** @@ -409,30 +408,30 @@ private static void finishSplitLogFile(Path rootdir, Path oldLogDir, * @param corruptedLogs * @param processedLogs * @param oldLogDir - * @param fs + * @param walFS * @param conf * @throws IOException */ private static void archiveLogs( final List corruptedLogs, final List processedLogs, final Path oldLogDir, - final FileSystem fs, final Configuration conf) throws IOException { + final FileSystem walFS, final Configuration conf) throws IOException { final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", corruptDir); } - if (!fs.mkdirs(corruptDir)) { + if (!walFS.mkdirs(corruptDir)) { LOG.info("Unable to mkdir {}", corruptDir); } - fs.mkdirs(oldLogDir); + walFS.mkdirs(oldLogDir); // this method can get restarted or called multiple times for archiving // the same log files. for (Path corrupted : corruptedLogs) { Path p = new Path(corruptDir, corrupted.getName()); - if (fs.exists(corrupted)) { - if (!fs.rename(corrupted, p)) { + if (walFS.exists(corrupted)) { + if (!walFS.rename(corrupted, p)) { LOG.warn("Unable to move corrupted log {} to {}", corrupted, p); } else { LOG.warn("Moved corrupted log {} to {}", corrupted, p); @@ -442,8 +441,8 @@ private static void archiveLogs( for (Path p : processedLogs) { Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p); - if (fs.exists(p)) { - if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) { + if (walFS.exists(p)) { + if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) { LOG.warn("Unable to move {} to {}", p, newPath); } else { LOG.info("Archived processed log {} to {}", p, newPath); @@ -469,35 +468,28 @@ private static void archiveLogs( static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit, Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(conf); - Path rootDir = FSUtils.getRootDir(conf); - Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTableName()); + FileSystem walFS = FSUtils.getWALFileSystem(conf); + Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName()); String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName()); - Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName); - Path dir = getRegionDirRecoveredEditsDir(regiondir); + Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName); + Path dir = getRegionDirRecoveredEditsDir(regionDir); - if (!fs.exists(regiondir)) { - LOG.info("This region's directory does not exist: {}." - + "It is very likely that it was already split so it is " - + "safe to discard those edits.", regiondir); - return null; - } - if (fs.exists(dir) && fs.isFile(dir)) { + if (walFS.exists(dir) && walFS.isFile(dir)) { Path tmp = new Path("/tmp"); - if (!fs.exists(tmp)) { - fs.mkdirs(tmp); + if (!walFS.exists(tmp)) { + walFS.mkdirs(tmp); } tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); LOG.warn("Found existing old file: {}. It could be some " + "leftover of an old installation. It should be a folder instead. " + "So moving it to {}", dir, tmp); - if (!fs.rename(dir, tmp)) { + if (!walFS.rename(dir, tmp)) { LOG.warn("Failed to sideline old file {}", dir); } } - if (!fs.exists(dir) && !fs.mkdirs(dir)) { + if (!walFS.exists(dir) && !walFS.mkdirs(dir)) { LOG.warn("mkdir failed on {}", dir); } // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. @@ -535,34 +527,34 @@ static String formatRecoveredEditsFileName(final long seqid) { private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; /** - * @param regiondir + * @param regionDir * This regions directory in the filesystem. * @return The directory that holds recovered edits files for the region - * regiondir + * regionDir */ - public static Path getRegionDirRecoveredEditsDir(final Path regiondir) { - return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR); + public static Path getRegionDirRecoveredEditsDir(final Path regionDir) { + return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR); } /** * Check whether there is recovered.edits in the region dir - * @param fs FileSystem + * @param walFS FileSystem * @param conf conf * @param regionInfo the region to check * @throws IOException IOException * @return true if recovered.edits exist in the region dir */ - public static boolean hasRecoveredEdits(final FileSystem fs, + public static boolean hasRecoveredEdits(final FileSystem walFS, final Configuration conf, final RegionInfo regionInfo) throws IOException { // No recovered.edits for non default replica regions if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { return false; } - Path rootDir = FSUtils.getRootDir(conf); //Only default replica region can reach here, so we can use regioninfo //directly without converting it to default replica's regioninfo. - Path regionDir = HRegion.getRegionDir(rootDir, regionInfo); - NavigableSet files = getSplitEditFilesSorted(fs, regionDir); + Path regionDir = FSUtils.constructWALRegionDirFromRegionInfo(conf, regionInfo.getTable(), + regionInfo.getEncodedName()); + NavigableSet files = getSplitEditFilesSorted(walFS, regionDir); return files != null && !files.isEmpty(); } @@ -571,19 +563,19 @@ public static boolean hasRecoveredEdits(final FileSystem fs, * Returns sorted set of edit files made by splitter, excluding files * with '.temp' suffix. * - * @param fs - * @param regiondir - * @return Files in passed regiondir as a sorted set. + * @param walFS + * @param regionDir + * @return Files in passed regionDir as a sorted set. * @throws IOException */ - public static NavigableSet getSplitEditFilesSorted(final FileSystem fs, - final Path regiondir) throws IOException { + public static NavigableSet getSplitEditFilesSorted(final FileSystem walFS, + final Path regionDir) throws IOException { NavigableSet filesSorted = new TreeSet<>(); - Path editsdir = getRegionDirRecoveredEditsDir(regiondir); - if (!fs.exists(editsdir)) { + Path editsdir = getRegionDirRecoveredEditsDir(regionDir); + if (!walFS.exists(editsdir)) { return filesSorted; } - FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { + FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() { @Override public boolean accept(Path p) { boolean result = false; @@ -593,7 +585,7 @@ public boolean accept(Path p) { // In particular, on error, we'll move aside the bad edit file giving // it a timestamp suffix. See moveAsideBadEditsFile. Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); - result = fs.isFile(p) && m.matches(); + result = walFS.isFile(p) && m.matches(); // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, // because it means splitwal thread is writting this file. if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { @@ -618,17 +610,17 @@ public boolean accept(Path p) { /** * Move aside a bad edits file. * - * @param fs + * @param walFS * @param edits * Edits file to move aside. * @return The name of the moved aside file. * @throws IOException */ - public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) + public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits) throws IOException { Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis()); - if (!fs.rename(edits, moveAsideName)) { + if (!walFS.rename(edits, moveAsideName)) { LOG.warn("Rename failed from {} to {}", edits, moveAsideName); } return moveAsideName; @@ -647,12 +639,12 @@ public static boolean isSequenceIdFile(final Path file) { || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); } - private static FileStatus[] getSequenceIdFiles(FileSystem fs, Path regionDir) throws IOException { + private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir) throws IOException { // TODO: Why are we using a method in here as part of our normal region open where // there is no splitting involved? Fix. St.Ack 01/20/2017. Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); try { - FileStatus[] files = fs.listStatus(editsDir, WALSplitter::isSequenceIdFile); + FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile); return files != null ? files : new FileStatus[0]; } catch (FileNotFoundException e) { return new FileStatus[0]; @@ -676,16 +668,16 @@ private static long getMaxSequenceId(FileStatus[] files) { /** * Get the max sequence id which is stored in the region directory. -1 if none. */ - public static long getMaxRegionSequenceId(FileSystem fs, Path regionDir) throws IOException { - return getMaxSequenceId(getSequenceIdFiles(fs, regionDir)); + public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException { + return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir)); } /** * Create a file with name as region's max sequence id */ - public static void writeRegionSequenceIdFile(FileSystem fs, Path regionDir, long newMaxSeqId) + public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId) throws IOException { - FileStatus[] files = getSequenceIdFiles(fs, regionDir); + FileStatus[] files = getSequenceIdFiles(walFS, regionDir); long maxSeqId = getMaxSequenceId(files); if (maxSeqId > newMaxSeqId) { throw new IOException("The new max sequence id " + newMaxSeqId + @@ -696,7 +688,7 @@ public static void writeRegionSequenceIdFile(FileSystem fs, Path regionDir, long newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); if (newMaxSeqId != maxSeqId) { try { - if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) { + if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) { throw new IOException("Failed to create SeqId file:" + newSeqIdFile); } LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, @@ -708,7 +700,7 @@ public static void writeRegionSequenceIdFile(FileSystem fs, Path regionDir, long // remove old ones for (FileStatus status : files) { if (!newSeqIdFile.equals(status.getPath())) { - fs.delete(status.getPath(), false); + walFS.delete(status.getPath(), false); } } } @@ -735,7 +727,7 @@ protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgre } try { - FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter); + FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter); try { in = getReader(path, reporter); } catch (EOFException e) { @@ -802,7 +794,7 @@ static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) */ protected Writer createWriter(Path logfile) throws IOException { - return walFactory.createRecoveredEditsWriter(fs, logfile); + return walFactory.createRecoveredEditsWriter(walFS, logfile); } /** @@ -810,7 +802,7 @@ protected Writer createWriter(Path logfile) * @return new Reader instance, caller should close */ protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { - return walFactory.createReader(fs, curLogFile, reporter); + return walFactory.createReader(walFS, curLogFile, reporter); } /** @@ -1285,10 +1277,10 @@ public List finishWritingAndClose() throws IOException { } // delete the one with fewer wal entries - private void deleteOneWithFewerEntries(FileSystem rootFs, WriterAndPath wap, Path dst) + private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException { long dstMinLogSeqNum = -1L; - try (WAL.Reader reader = walFactory.createReader(fs, dst)) { + try (WAL.Reader reader = walFactory.createReader(walFS, dst)) { WAL.Entry entry = reader.next(); if (entry != null) { dstMinLogSeqNum = entry.getKey().getSequenceId(); @@ -1300,15 +1292,15 @@ private void deleteOneWithFewerEntries(FileSystem rootFs, WriterAndPath wap, Pat if (wap.minLogSeqNum < dstMinLogSeqNum) { LOG.warn("Found existing old edits file. It could be the result of a previous failed" + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" - + fs.getFileStatus(dst).getLen()); - if (!fs.delete(dst, false)) { + + walFS.getFileStatus(dst).getLen()); + if (!walFS.delete(dst, false)) { LOG.warn("Failed deleting of old {}", dst); throw new IOException("Failed deleting of old " + dst); } } else { LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p - + ", length=" + rootFs.getFileStatus(wap.p).getLen()); - if (!rootFs.delete(wap.p, false)) { + + ", length=" + walFS.getFileStatus(wap.p).getLen()); + if (!walFS.delete(wap.p, false)) { LOG.warn("Failed deleting of {}", wap.p); throw new IOException("Failed deleting of " + wap.p); } @@ -1393,7 +1385,6 @@ boolean executeCloseTask(CompletionService completionService, Path closeWriter(String encodedRegionName, WriterAndPath wap, List thrown) throws IOException{ LOG.trace("Closing {}", wap.p); - FileSystem rootFs = FileSystem.get(conf); try { wap.w.close(); } catch (IOException ioe) { @@ -1408,7 +1399,7 @@ Path closeWriter(String encodedRegionName, WriterAndPath wap, } if (wap.editsWritten == 0) { // just remove the empty recovered.edits file - if (rootFs.exists(wap.p) && !rootFs.delete(wap.p, false)) { + if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) { LOG.warn("Failed deleting empty {}", wap.p); throw new IOException("Failed deleting empty " + wap.p); } @@ -1418,14 +1409,14 @@ Path closeWriter(String encodedRegionName, WriterAndPath wap, Path dst = getCompletedRecoveredEditsFilePath(wap.p, regionMaximumEditLogSeqNum.get(encodedRegionName)); try { - if (!dst.equals(wap.p) && rootFs.exists(dst)) { - deleteOneWithFewerEntries(rootFs, wap, dst); + if (!dst.equals(wap.p) && walFS.exists(dst)) { + deleteOneWithFewerEntries(wap, dst); } // Skip the unit tests which create a splitter that reads and // writes the data without touching disk. // TestHLogSplit#testThreading is an example. - if (rootFs.exists(wap.p)) { - if (!rootFs.rename(wap.p, dst)) { + if (walFS.exists(wap.p)) { + if (!walFS.rename(wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } LOG.info("Rename {} to {}", wap.p, dst); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index da07c7b7528e..3346b231996e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -670,7 +670,7 @@ public void testSkipRecoveredEditsReplay() throws Exception { for (HStore store : region.getStores()) { maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); region.getMVCC().advanceTo(seqId); Get get = new Get(row); @@ -722,7 +722,7 @@ public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception { for (HStore store : region.getStores()) { maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); region.getMVCC().advanceTo(seqId); Get get = new Get(row); @@ -767,7 +767,7 @@ public void testSkipRecoveredEditsReplayAllIgnored() throws Exception { for (HStore store : region.getStores()) { maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, null); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null); assertEquals(minSeqId, seqId); } finally { HBaseTestingUtility.closeRegionAndWAL(this.region); @@ -825,7 +825,7 @@ public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception { for (HStore store : region.getStores()) { maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); } - long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); // assert that the files are flushed diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java index 6e3aa105f700..8ae638ce9a4d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java @@ -49,13 +49,13 @@ public class TestReadWriteSeqIdFiles { private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); - private static FileSystem FS; + private static FileSystem walFS; private static Path REGION_DIR; @BeforeClass public static void setUp() throws IOException { - FS = FileSystem.getLocal(UTIL.getConfiguration()); + walFS = FileSystem.getLocal(UTIL.getConfiguration()); REGION_DIR = UTIL.getDataTestDir(); } @@ -66,20 +66,20 @@ public static void tearDown() throws IOException { @Test public void test() throws IOException { - WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1000L); - assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR)); - WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 2000L); - assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR)); + WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L); + assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR)); + WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L); + assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR)); // can not write a sequence id which is smaller try { - WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1500L); + WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L); } catch (IOException e) { // expected LOG.info("Expected error", e); } Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(REGION_DIR); - FileStatus[] files = FSUtils.listStatus(FS, editsdir, new PathFilter() { + FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() { @Override public boolean accept(Path p) { return WALSplitter.isSequenceIdFile(p); @@ -89,7 +89,7 @@ public boolean accept(Path p) { assertEquals(1, files.length); // verify all seqId files aren't treated as recovered.edits files - NavigableSet recoveredEdits = WALSplitter.getSplitEditFilesSorted(FS, REGION_DIR); + NavigableSet recoveredEdits = WALSplitter.getSplitEditFilesSorted(walFS, REGION_DIR); assertEquals(0, recoveredEdits.size()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index fd2b3c49856c..4c5edbcd02b2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -178,7 +178,7 @@ public void testSplit() throws IOException { final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); final int howmany = 3; RegionInfo[] infos = new RegionInfo[3]; - Path tabledir = FSUtils.getTableDir(hbaseDir, tableName); + Path tabledir = FSUtils.getWALTableDir(conf, tableName); fs.mkdirs(tabledir); for (int i = 0; i < howmany; i++) { infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i)) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 0d5aa0d2ff5c..051a66e8e9e6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -250,9 +250,9 @@ public Integer run() throws Exception { } LOG.debug(Objects.toString(ls)); LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); LOG.info("Finished splitting out from under zombie."); - Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region); + Path[] logfiles = getLogForRegion(TABLE_NAME, region); assertEquals("wrong number of split files for region", numWriters, logfiles.length); int count = 0; for (Path logfile: logfiles) { @@ -437,9 +437,9 @@ public void testSplitPreservesEdits() throws IOException{ generateWALs(1, 10, -1, 0); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); + Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); assertEquals(1, splitLog.length); assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); @@ -453,9 +453,9 @@ public void testSplitRemovesRegionEventsEdits() throws IOException{ generateWALs(1, 10, -1, 100); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); + Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); assertEquals(1, splitLog.length); assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); @@ -480,13 +480,13 @@ public void testSplitLeavesCompactionEventsEdits() throws IOException{ writer.close(); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); // original log should have 10 test edits, 10 region markers, 1 compaction marker assertEquals(21, countWAL(originalLog)); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, hri.getEncodedName()); + Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName()); assertEquals(1, splitLog.length); assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); @@ -501,10 +501,10 @@ public void testSplitLeavesCompactionEventsEdits() throws IOException{ private int splitAndCount(final int expectedFiles, final int expectedEntries) throws IOException { useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); int result = 0; for (String region : REGIONS) { - Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region); + Path[] logfiles = getLogForRegion(TABLE_NAME, region); assertEquals(expectedFiles, logfiles.length); int count = 0; for (Path logfile: logfiles) { @@ -637,7 +637,7 @@ private Set splitCorruptWALs(final FaultyProtobufLogReader.FailureType f walDirContents.add(status.getPath().getName()); } useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); return walDirContents; } finally { conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, @@ -678,9 +678,9 @@ private void ignoreCorruption(final Corruptions corruption, final int entryCount corruptWAL(c1, corruption, true); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); + Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); assertEquals(1, splitLog.length); int actualCount = 0; @@ -714,7 +714,7 @@ public void testLogsGetArchivedAfterSplit() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); generateWALs(-1); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); } @@ -730,7 +730,7 @@ public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() throws IOException { generateWALs(-1); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); FileStatus [] statuses = null; try { statuses = fs.listStatus(WALDIR); @@ -760,7 +760,7 @@ public void testSplitWillFailIfWritingToRegionFails() throws Exception { try { InstrumentedLogWriter.activateFailure = true; - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); } catch (IOException e) { assertTrue(e.getMessage(). contains("This exception is instrumented and should only be thrown for testing")); @@ -781,7 +781,7 @@ public void testSplitDeletedRegion() throws IOException { Path regiondir = new Path(TABLEDIR, region); fs.delete(regiondir, true); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); assertFalse(fs.exists(regiondir)); } @@ -858,7 +858,7 @@ private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception { useDifferentDFSClient(); try { - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); assertFalse(fs.exists(WALDIR)); } catch (IOException e) { @@ -1082,7 +1082,7 @@ public void testSplitLogFileDeletedRegionDir() throws IOException { Path regiondir = new Path(TABLEDIR, REGION); LOG.info("Region directory is" + regiondir); fs.delete(regiondir, true); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); assertFalse(fs.exists(regiondir)); } @@ -1095,7 +1095,7 @@ public void testSplitLogFileEmpty() throws IOException { injectEmptyFile(".empty", true); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME); assertFalse(fs.exists(tdir)); @@ -1120,7 +1120,7 @@ public void testSplitLogFileFirstLineCorruptionLog() Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); assertEquals(1, fs.listStatus(corruptDir).length); @@ -1148,14 +1148,14 @@ public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { @Override protected Writer createWriter(Path logfile) throws IOException { - Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile); + Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); // After creating writer, simulate region's // replayRecoveredEditsIfAny() which gets SplitEditFiles of this // region and delete them, excluding files with '.temp' suffix. NavigableSet files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); if (files != null && !files.isEmpty()) { for (Path file : files) { - if (!this.fs.delete(file, false)) { + if (!this.walFS.delete(file, false)) { LOG.error("Failed delete of " + file); } else { LOG.debug("Deleted recovered.edits file=" + file); @@ -1234,9 +1234,9 @@ private Writer generateWALs(int writers, int entries, int leaveOpen, int regionE - private Path[] getLogForRegion(Path rootdir, TableName table, String region) + private Path[] getLogForRegion(TableName table, String region) throws IOException { - Path tdir = FSUtils.getTableDir(rootdir, table); + Path tdir = FSUtils.getWALTableDir(conf, table); @SuppressWarnings("deprecation") Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, Bytes.toString(Bytes.toBytes(region))));