Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
Expand Down Expand Up @@ -295,8 +297,45 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf,
*/
public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionInfo regionInfo,
Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles)
throws IOException, FailedArchiveException {
throws IOException {
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
archive(fs, regionInfo, family, compactedFiles, storeArchiveDir);
}

/**
* Archive recovered edits using existing logic for archiving store files. This is currently only
* relevant when <b>hbase.region.archive.recovered.edits</b> is true, as recovered edits shouldn't
* be kept after replay. In theory, we could use very same method available for archiving
* store files, but supporting WAL dir and store files on different FileSystems added the need for
* extra validation of the passed FileSystem instance and the path where the archiving edits
* should be placed.
* @param conf {@link Configuration} to determine the archive directory.
* @param fs the filesystem used for storing WAL files.
* @param regionInfo {@link RegionInfo} a pseudo region representation for the archiving logic.
* @param family a pseudo familiy representation for the archiving logic.
* @param replayedEdits the recovered edits to be archived.
* @throws IOException if files can't be achived due to some internal error.
*/
public static void archiveRecoveredEdits(Configuration conf, FileSystem fs, RegionInfo regionInfo,
byte[] family, Collection<HStoreFile> replayedEdits)
throws IOException {
String workingDir = conf.get(CommonFSUtils.HBASE_WAL_DIR, conf.get(HConstants.HBASE_DIR));
//extra sanity checks for the right FS
Path path = new Path(workingDir);
if(path.isAbsoluteAndSchemeAuthorityNull()){
//no schema specified on wal dir value, so it's on same FS as StoreFiles
path = new Path(conf.get(HConstants.HBASE_DIR));
}
if(path.toUri().getScheme()!=null && !path.toUri().getScheme().equals(fs.getScheme())){
throw new IOException("Wrong file system! Should be " + path.toUri().getScheme() +
", but got " + fs.getScheme());
}
path = HFileArchiveUtil.getStoreArchivePathForRootDir(path, regionInfo, family);
archive(fs, regionInfo, family, replayedEdits, path);
}

private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family,
Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws IOException {
// sometimes in testing, we don't have rss, so we need to check for that
if (fs == null) {
LOG.warn("Passed filesystem is null, so just deleting files without archiving for {}," +
Expand All @@ -314,9 +353,6 @@ public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionIn
// build the archive path
if (regionInfo == null || family == null) throw new IOException(
"Need to have a region and a family to archive from.");

Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);

// make sure we don't archive if we can't and that the archive dir exists
if (!fs.mkdirs(storeArchiveDir)) {
throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1951,8 +1951,8 @@ public HRegionFileSystem getRegionFileSystem() {
}

/** @return the WAL {@link HRegionFileSystem} used by this region */
HRegionFileSystem getRegionWALFileSystem() throws IOException {
return new HRegionFileSystem(conf, getWalFileSystem(),
HRegionWALFileSystem getRegionWALFileSystem() throws IOException {
return new HRegionWALFileSystem(conf, getWalFileSystem(),
FSUtils.getWALTableDir(conf, htableDescriptor.getTableName()), fs.getRegionInfo());
}

Expand Down Expand Up @@ -4674,7 +4674,7 @@ protected long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores,
for (Path file : files) {
fakeStoreFiles.add(new HStoreFile(walFS, file, this.conf, null, null, true));
}
getRegionWALFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
getRegionWALFileSystem().archiveRecoveredEdits(fakeFamilyName, fakeStoreFiles);
} else {
for (Path file : Iterables.concat(files, filesUnderWrongRegionWALDir)) {
if (!walFS.delete(file, false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ public class HRegionFileSystem {

private final RegionInfo regionInfo;
//regionInfo for interacting with FS (getting encodedName, etc)
private final RegionInfo regionInfoForFs;
private final Configuration conf;
final RegionInfo regionInfoForFs;
final Configuration conf;
private final Path tableDir;
private final FileSystem fs;
final FileSystem fs;
private final Path regionDir;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

/**
* A Wrapper for the region FileSystem operations adding WAL specific operations
*/
@InterfaceAudience.Private
public class HRegionWALFileSystem extends HRegionFileSystem {

HRegionWALFileSystem(Configuration conf, FileSystem fs, Path tableDir, RegionInfo regionInfo) {
super(conf, fs, tableDir, regionInfo);
}

/**
* Closes and archives the specified store files from the specified family.
* @param familyName Family that contains the store filesMeta
* @param storeFiles set of store files to remove
* @throws IOException if the archiving fails
*/
public void archiveRecoveredEdits(String familyName, Collection<HStoreFile> storeFiles)
throws IOException {
HFileArchiver.archiveRecoveredEdits(this.conf, this.fs, this.regionInfoForFs,
Bytes.toBytes(familyName), storeFiles);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ public static Path getStoreArchivePath(Configuration conf,
return HStore.getStoreHomedir(tableArchiveDir, region, family);
}

/**
* Gets the archive directory under specified root dir. One scenario where this is useful is
* when WAL and root dir are configured under different file systems,
* i.e. root dir on S3 and WALs on HDFS.
* This is mostly useful for archiving recovered edits, when
* <b>hbase.region.archive.recovered.edits</b> is enabled.
* @param rootDir {@link Path} the root dir under which archive path should be created.
* @param region parent region information under which the store currently lives
* @param family name of the family in the store
* @return {@link Path} to the WAL FS directory to archive the given store
* or <tt>null</tt> if it should not be archived
*/
public static Path getStoreArchivePathForRootDir(Path rootDir, RegionInfo region, byte[] family) {
Path tableArchiveDir = getTableArchivePath(rootDir, region.getTable());
return HStore.getStoreHomedir(tableArchiveDir, region, family);
}

/**
* Get the archive directory for a given region under the specified table
* @param tableName the table name. Cannot be null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -43,16 +49,19 @@
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
Expand All @@ -67,6 +76,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -128,6 +138,107 @@ public static void cleanupTest() throws Exception {
POOL.shutdownNow();
}

@Test
public void testArchiveStoreFilesDifferentFileSystemsWallWithSchemaPlainRoot() throws Exception {
String walDir = "mockFS://mockFSAuthority:9876/mockDir/wals/";
String baseDir = FSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/";
testArchiveStoreFilesDifferentFileSystems(walDir, baseDir,
HFileArchiver::archiveStoreFiles);
}

@Test
public void testArchiveStoreFilesDifferentFileSystemsWallNullPlainRoot() throws Exception {
String baseDir = FSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/";
testArchiveStoreFilesDifferentFileSystems(null, baseDir,
HFileArchiver::archiveStoreFiles);
}

@Test
public void testArchiveStoreFilesDifferentFileSystemsWallAndRootSame() throws Exception {
String baseDir = FSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/";
testArchiveStoreFilesDifferentFileSystems("/hbase/wals/", baseDir,
HFileArchiver::archiveStoreFiles);
}

private void testArchiveStoreFilesDifferentFileSystems(String walDir, String expectedBase,
ArchivingFunction<Configuration, FileSystem, RegionInfo, Path, byte[],
Collection<HStoreFile>> archivingFunction) throws IOException {
FileSystem mockedFileSystem = mock(FileSystem.class);
Configuration conf = new Configuration(UTIL.getConfiguration());
if(walDir != null) {
conf.set(CommonFSUtils.HBASE_WAL_DIR, walDir);
}
Path filePath = new Path("/mockDir/wals/mockFile");
when(mockedFileSystem.getScheme()).thenReturn("mockFS");
when(mockedFileSystem.mkdirs(any())).thenReturn(true);
when(mockedFileSystem.exists(any())).thenReturn(true);
RegionInfo mockedRegion = mock(RegionInfo.class);
TableName tableName = TableName.valueOf("mockTable");
when(mockedRegion.getTable()).thenReturn(tableName);
when(mockedRegion.getEncodedName()).thenReturn("mocked-region-encoded-name");
Path tableDir = new Path("mockFS://mockDir/tabledir");
byte[] family = Bytes.toBytes("testfamily");
HStoreFile mockedFile = mock(HStoreFile.class);
List<HStoreFile> list = new ArrayList<>();
list.add(mockedFile);
when(mockedFile.getPath()).thenReturn(filePath);
when(mockedFileSystem.rename(any(),any())).thenReturn(true);
archivingFunction.apply(conf, mockedFileSystem, mockedRegion, tableDir, family, list);
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
verify(mockedFileSystem, times(2)).rename(pathCaptor.capture(), any());
String expectedDir = expectedBase +
"archive/data/default/mockTable/mocked-region-encoded-name/testfamily/mockFile";
assertTrue(pathCaptor.getAllValues().get(0).toString().equals(expectedDir));
}

@FunctionalInterface
private interface ArchivingFunction<Configuration, FS, Region, Dir, Family, Files> {
void apply(Configuration config, FS fs, Region region, Dir dir, Family family, Files files)
throws IOException;
}

@Test
public void testArchiveRecoveredEditsWalDirNull() throws Exception {
testArchiveRecoveredEditsWalDirNullOrSame(null);
}

@Test
public void testArchiveRecoveredEditsWalDirSameFsStoreFiles() throws Exception {
testArchiveRecoveredEditsWalDirNullOrSame("/wal-dir");
}

private void testArchiveRecoveredEditsWalDirNullOrSame(String walDir) throws Exception {
String originalRootDir = UTIL.getConfiguration().get(HConstants.HBASE_DIR);
try {
String baseDir = "mockFS://mockFSAuthority:9876/hbase/";
UTIL.getConfiguration().set(HConstants.HBASE_DIR, baseDir);
testArchiveStoreFilesDifferentFileSystems(walDir, baseDir,
(conf, fs, region, dir, family, list) -> HFileArchiver
.archiveRecoveredEdits(conf, fs, region, family, list));
} finally {
UTIL.getConfiguration().set(HConstants.HBASE_DIR, originalRootDir);
}
}

@Test(expected = IOException.class)
public void testArchiveRecoveredEditsWrongFS() throws Exception {
String baseDir = FSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/";
//Internally, testArchiveStoreFilesDifferentFileSystems will pass a "mockedFS"
// to HFileArchiver.archiveRecoveredEdits, but since wal-dir is supposedly on same FS
// as root dir it would lead to conflicting FSes and an IOException is expected.
testArchiveStoreFilesDifferentFileSystems("/wal-dir", baseDir,
(conf, fs, region, dir, family, list) -> HFileArchiver
.archiveRecoveredEdits(conf, fs, region, family, list));
}

@Test
public void testArchiveRecoveredEditsWalDirDifferentFS() throws Exception {
String walDir = "mockFS://mockFSAuthority:9876/mockDir/wals/";
testArchiveStoreFilesDifferentFileSystems(walDir, walDir,
(conf, fs, region, dir, family, list) ->
HFileArchiver.archiveRecoveredEdits(conf, fs, region, family, list));
}

@Test
public void testRemoveRegionDirOnArchive() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
Expand Down
Loading