diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java index d336b4d1270d..ce5cd0e9b35c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java @@ -38,9 +38,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; @@ -295,8 +297,45 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, */ public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionInfo regionInfo, Path tableDir, byte[] family, Collection compactedFiles) - throws IOException, FailedArchiveException { + throws IOException { + Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family); + archive(fs, regionInfo, family, compactedFiles, storeArchiveDir); + } + + /** + * Archive recovered edits using existing logic for archiving store files. This is currently only + * relevant when hbase.region.archive.recovered.edits is true, as recovered edits shouldn't + * be kept after replay. In theory, we could use very same method available for archiving + * store files, but supporting WAL dir and store files on different FileSystems added the need for + * extra validation of the passed FileSystem instance and the path where the archiving edits + * should be placed. + * @param conf {@link Configuration} to determine the archive directory. + * @param fs the filesystem used for storing WAL files. + * @param regionInfo {@link RegionInfo} a pseudo region representation for the archiving logic. + * @param family a pseudo familiy representation for the archiving logic. + * @param replayedEdits the recovered edits to be archived. + * @throws IOException if files can't be achived due to some internal error. + */ + public static void archiveRecoveredEdits(Configuration conf, FileSystem fs, RegionInfo regionInfo, + byte[] family, Collection replayedEdits) + throws IOException { + String workingDir = conf.get(CommonFSUtils.HBASE_WAL_DIR, conf.get(HConstants.HBASE_DIR)); + //extra sanity checks for the right FS + Path path = new Path(workingDir); + if(path.isAbsoluteAndSchemeAuthorityNull()){ + //no schema specified on wal dir value, so it's on same FS as StoreFiles + path = new Path(conf.get(HConstants.HBASE_DIR)); + } + if(path.toUri().getScheme()!=null && !path.toUri().getScheme().equals(fs.getScheme())){ + throw new IOException("Wrong file system! Should be " + path.toUri().getScheme() + + ", but got " + fs.getScheme()); + } + path = HFileArchiveUtil.getStoreArchivePathForRootDir(path, regionInfo, family); + archive(fs, regionInfo, family, replayedEdits, path); + } + private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family, + Collection compactedFiles, Path storeArchiveDir) throws IOException { // sometimes in testing, we don't have rss, so we need to check for that if (fs == null) { LOG.warn("Passed filesystem is null, so just deleting files without archiving for {}," + @@ -314,9 +353,6 @@ public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionIn // build the archive path if (regionInfo == null || family == null) throw new IOException( "Need to have a region and a family to archive from."); - - Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family); - // make sure we don't archive if we can't and that the archive dir exists if (!fs.mkdirs(storeArchiveDir)) { throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:" diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ccfc69d7044a..f0f56828fa33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1951,8 +1951,8 @@ public HRegionFileSystem getRegionFileSystem() { } /** @return the WAL {@link HRegionFileSystem} used by this region */ - HRegionFileSystem getRegionWALFileSystem() throws IOException { - return new HRegionFileSystem(conf, getWalFileSystem(), + HRegionWALFileSystem getRegionWALFileSystem() throws IOException { + return new HRegionWALFileSystem(conf, getWalFileSystem(), FSUtils.getWALTableDir(conf, htableDescriptor.getTableName()), fs.getRegionInfo()); } @@ -4674,7 +4674,7 @@ protected long replayRecoveredEditsIfAny(Map maxSeqIdInStores, for (Path file : files) { fakeStoreFiles.add(new HStoreFile(walFS, file, this.conf, null, null, true)); } - getRegionWALFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles); + getRegionWALFileSystem().archiveRecoveredEdits(fakeFamilyName, fakeStoreFiles); } else { for (Path file : Iterables.concat(files, filesUnderWrongRegionWALDir)) { if (!walFS.delete(file, false)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index d2888e68d4f7..3f699fb683ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -80,10 +80,10 @@ public class HRegionFileSystem { private final RegionInfo regionInfo; //regionInfo for interacting with FS (getting encodedName, etc) - private final RegionInfo regionInfoForFs; - private final Configuration conf; + final RegionInfo regionInfoForFs; + final Configuration conf; private final Path tableDir; - private final FileSystem fs; + final FileSystem fs; private final Path regionDir; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionWALFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionWALFileSystem.java new file mode 100644 index 000000000000..f1f5eb5585bf --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionWALFileSystem.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collection; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.backup.HFileArchiver; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A Wrapper for the region FileSystem operations adding WAL specific operations + */ +@InterfaceAudience.Private +public class HRegionWALFileSystem extends HRegionFileSystem { + + HRegionWALFileSystem(Configuration conf, FileSystem fs, Path tableDir, RegionInfo regionInfo) { + super(conf, fs, tableDir, regionInfo); + } + + /** + * Closes and archives the specified store files from the specified family. + * @param familyName Family that contains the store filesMeta + * @param storeFiles set of store files to remove + * @throws IOException if the archiving fails + */ + public void archiveRecoveredEdits(String familyName, Collection storeFiles) + throws IOException { + HFileArchiver.archiveRecoveredEdits(this.conf, this.fs, this.regionInfoForFs, + Bytes.toBytes(familyName), storeFiles); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java index 03ed373f3540..76d46f64470d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java @@ -85,6 +85,23 @@ public static Path getStoreArchivePath(Configuration conf, return HStore.getStoreHomedir(tableArchiveDir, region, family); } + /** + * Gets the archive directory under specified root dir. One scenario where this is useful is + * when WAL and root dir are configured under different file systems, + * i.e. root dir on S3 and WALs on HDFS. + * This is mostly useful for archiving recovered edits, when + * hbase.region.archive.recovered.edits is enabled. + * @param rootDir {@link Path} the root dir under which archive path should be created. + * @param region parent region information under which the store currently lives + * @param family name of the family in the store + * @return {@link Path} to the WAL FS directory to archive the given store + * or null if it should not be archived + */ + public static Path getStoreArchivePathForRootDir(Path rootDir, RegionInfo region, byte[] family) { + Path tableArchiveDir = getTableArchivePath(rootDir, region.getTable()); + return HStore.getStoreHomedir(tableArchiveDir, region, family); + } + /** * Get the archive directory for a given region under the specified table * @param tableName the table name. Cannot be null. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java index b82afe92d8ae..3362b971a423 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java @@ -21,10 +21,16 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -43,6 +49,7 @@ import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.cleaner.DirScanPool; @@ -50,9 +57,11 @@ import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil; import org.apache.hadoop.hbase.util.HFileArchiveUtil; @@ -67,6 +76,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.mockito.ArgumentCaptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,6 +138,107 @@ public static void cleanupTest() throws Exception { POOL.shutdownNow(); } + @Test + public void testArchiveStoreFilesDifferentFileSystemsWallWithSchemaPlainRoot() throws Exception { + String walDir = "mockFS://mockFSAuthority:9876/mockDir/wals/"; + String baseDir = FSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/"; + testArchiveStoreFilesDifferentFileSystems(walDir, baseDir, + HFileArchiver::archiveStoreFiles); + } + + @Test + public void testArchiveStoreFilesDifferentFileSystemsWallNullPlainRoot() throws Exception { + String baseDir = FSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/"; + testArchiveStoreFilesDifferentFileSystems(null, baseDir, + HFileArchiver::archiveStoreFiles); + } + + @Test + public void testArchiveStoreFilesDifferentFileSystemsWallAndRootSame() throws Exception { + String baseDir = FSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/"; + testArchiveStoreFilesDifferentFileSystems("/hbase/wals/", baseDir, + HFileArchiver::archiveStoreFiles); + } + + private void testArchiveStoreFilesDifferentFileSystems(String walDir, String expectedBase, + ArchivingFunction> archivingFunction) throws IOException { + FileSystem mockedFileSystem = mock(FileSystem.class); + Configuration conf = new Configuration(UTIL.getConfiguration()); + if(walDir != null) { + conf.set(CommonFSUtils.HBASE_WAL_DIR, walDir); + } + Path filePath = new Path("/mockDir/wals/mockFile"); + when(mockedFileSystem.getScheme()).thenReturn("mockFS"); + when(mockedFileSystem.mkdirs(any())).thenReturn(true); + when(mockedFileSystem.exists(any())).thenReturn(true); + RegionInfo mockedRegion = mock(RegionInfo.class); + TableName tableName = TableName.valueOf("mockTable"); + when(mockedRegion.getTable()).thenReturn(tableName); + when(mockedRegion.getEncodedName()).thenReturn("mocked-region-encoded-name"); + Path tableDir = new Path("mockFS://mockDir/tabledir"); + byte[] family = Bytes.toBytes("testfamily"); + HStoreFile mockedFile = mock(HStoreFile.class); + List list = new ArrayList<>(); + list.add(mockedFile); + when(mockedFile.getPath()).thenReturn(filePath); + when(mockedFileSystem.rename(any(),any())).thenReturn(true); + archivingFunction.apply(conf, mockedFileSystem, mockedRegion, tableDir, family, list); + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + verify(mockedFileSystem, times(2)).rename(pathCaptor.capture(), any()); + String expectedDir = expectedBase + + "archive/data/default/mockTable/mocked-region-encoded-name/testfamily/mockFile"; + assertTrue(pathCaptor.getAllValues().get(0).toString().equals(expectedDir)); + } + + @FunctionalInterface + private interface ArchivingFunction { + void apply(Configuration config, FS fs, Region region, Dir dir, Family family, Files files) + throws IOException; + } + + @Test + public void testArchiveRecoveredEditsWalDirNull() throws Exception { + testArchiveRecoveredEditsWalDirNullOrSame(null); + } + + @Test + public void testArchiveRecoveredEditsWalDirSameFsStoreFiles() throws Exception { + testArchiveRecoveredEditsWalDirNullOrSame("/wal-dir"); + } + + private void testArchiveRecoveredEditsWalDirNullOrSame(String walDir) throws Exception { + String originalRootDir = UTIL.getConfiguration().get(HConstants.HBASE_DIR); + try { + String baseDir = "mockFS://mockFSAuthority:9876/hbase/"; + UTIL.getConfiguration().set(HConstants.HBASE_DIR, baseDir); + testArchiveStoreFilesDifferentFileSystems(walDir, baseDir, + (conf, fs, region, dir, family, list) -> HFileArchiver + .archiveRecoveredEdits(conf, fs, region, family, list)); + } finally { + UTIL.getConfiguration().set(HConstants.HBASE_DIR, originalRootDir); + } + } + + @Test(expected = IOException.class) + public void testArchiveRecoveredEditsWrongFS() throws Exception { + String baseDir = FSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/"; + //Internally, testArchiveStoreFilesDifferentFileSystems will pass a "mockedFS" + // to HFileArchiver.archiveRecoveredEdits, but since wal-dir is supposedly on same FS + // as root dir it would lead to conflicting FSes and an IOException is expected. + testArchiveStoreFilesDifferentFileSystems("/wal-dir", baseDir, + (conf, fs, region, dir, family, list) -> HFileArchiver + .archiveRecoveredEdits(conf, fs, region, family, list)); + } + + @Test + public void testArchiveRecoveredEditsWalDirDifferentFS() throws Exception { + String walDir = "mockFS://mockFSAuthority:9876/mockDir/wals/"; + testArchiveStoreFilesDifferentFileSystems(walDir, walDir, + (conf, fs, region, dir, family, list) -> + HFileArchiver.archiveRecoveredEdits(conf, fs, region, family, list)); + } + @Test public void testRemoveRegionDirOnArchive() throws Exception { final TableName tableName = TableName.valueOf(name.getMethodName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 034c4b992d06..25090efa5b76 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -146,9 +146,11 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.Threads; @@ -676,6 +678,60 @@ public void testToShowNPEOnRegionScannerReseek() throws Exception { scanner1.close(); } + @Test + public void testArchiveRecoveredEditsReplay() throws Exception { + byte[] family = Bytes.toBytes("family"); + this.region = initHRegion(tableName, method, CONF, family); + final WALFactory wals = new WALFactory(CONF, method); + try { + Path regiondir = region.getRegionFileSystem().getRegionDir(); + FileSystem fs = region.getRegionFileSystem().getFileSystem(); + byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); + + Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); + + long maxSeqId = 1050; + long minSeqId = 1000; + + for (long i = minSeqId; i <= maxSeqId; i += 10) { + Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); + fs.create(recoveredEdits); + WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + + long time = System.nanoTime(); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes + .toBytes(i))); + writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, + HConstants.DEFAULT_CLUSTER_ID), edit)); + + writer.close(); + } + MonitoredTask status = TaskMonitor.get().createStatus(method); + Map maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (HStore store : region.getStores()) { + maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1); + } + CONF.set("hbase.region.archive.recovered.edits", "true"); + CONF.set(CommonFSUtils.HBASE_WAL_DIR, "/custom_wal_dir"); + long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); + assertEquals(maxSeqId, seqId); + region.getMVCC().advanceTo(seqId); + String fakeFamilyName = recoveredEditsDir.getName(); + Path rootDir = new Path(CONF.get(HConstants.HBASE_DIR)); + Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(rootDir, + region.getRegionInfo(), Bytes.toBytes(fakeFamilyName)); + FileStatus[] list = TEST_UTIL.getTestFileSystem().listStatus(storeArchiveDir); + assertEquals(6, list.length); + } finally { + CONF.set("hbase.region.archive.recovered.edits", "false"); + CONF.set(CommonFSUtils.HBASE_WAL_DIR, ""); + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + wals.close(); + } + } + @Test public void testSkipRecoveredEditsReplay() throws Exception { byte[] family = Bytes.toBytes("family");