Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,6 +80,7 @@ public class SplitWALManager {
private final Path rootDir;
private final FileSystem fs;
private final Configuration conf;
private final Path walArchiveDir;

public SplitWALManager(MasterServices master) {
this.master = master;
Expand All @@ -86,6 +89,7 @@ public SplitWALManager(MasterServices master) {
conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
this.rootDir = master.getMasterFileSystem().getWALRootDir();
this.fs = master.getMasterFileSystem().getWALFileSystem();
this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
}

public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta)
Expand Down Expand Up @@ -116,8 +120,11 @@ private Path getWALSplitDir(ServerName serverName) {
return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
}

public void deleteSplitWAL(String wal) throws IOException {
fs.delete(new Path(wal), false);
/**
* Archive processed WAL
*/
public void archive(String wal) throws IOException {
WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir);
}

public void deleteWALDir(ServerName serverName) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(Maste
protected void complete(MasterProcedureEnv env, Throwable error) {
if (error == null) {
try {
env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath);
env.getMasterServices().getSplitWALManager().archive(walPath);
} catch (IOException e) {
LOG.warn("Failed split of {}; ignore...", walPath, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,20 @@ private static void archiveWALs(final List<Path> corruptedWALs, final List<Path>
}
}

/**
* Move WAL. Used to move processed WALs to archive or bad WALs to corrupt WAL dir.
* WAL may have already been moved; makes allowance.
*/
public static void moveWAL(FileSystem fs, Path p, Path targetDir) throws IOException {
if (fs.exists(p)) {
if (!CommonFSUtils.renameAndSetModifyTime(fs, p, targetDir)) {
LOG.warn("Failed move of {} to {}", p, targetDir);
} else {
LOG.info("Moved {} to {}", p, targetDir);
}
}
}

/**
* Path to a file under RECOVERED_EDITS_DIR directory of the region found in <code>logEntry</code>
* named for the sequenceid in the passed <code>logEntry</code>: e.g.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
Expand Down Expand Up @@ -233,11 +234,20 @@ private void splitLogsTestHelper(HBaseTestingUtility testUtil) throws Exception
ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size());

// Validate the old WAL file archive dir
Path walRootDir = hmaster.getMasterFileSystem().getWALRootDir();
Path walArchivePath = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
FileSystem walFS = hmaster.getMasterFileSystem().getWALFileSystem();
int archiveFileCount = walFS.listStatus(walArchivePath).length;

procedures = splitWALManager.splitWALs(metaServer, true);
Assert.assertEquals(1, procedures.size());
ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size());
Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size());
// There should be archiveFileCount + 1 WALs after SplitWALProcedure finish
Assert.assertEquals("Splitted WAL files should be archived", archiveFileCount + 1,
walFS.listStatus(walArchivePath).length);
}

@Test
Expand Down