Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
Expand Down Expand Up @@ -314,9 +316,21 @@ public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionIn
// build the archive path
if (regionInfo == null || family == null) throw new IOException(
"Need to have a region and a family to archive from.");

Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);

//NOTE: This extra check is needed for the exceptional scenario where we archive wal edits
// replayed, when hbase.region.archive.recovered.edits is on. Since WALs may be configured to
// use different FS than root dir, we need to make sure to pick the proper FS when deciding
// on the archiving path.
//1) If no wal dir setting, wals and root dir are on same FS, so good to go with the root dir;
//2) When we have wal dir set it will only be on different FS if "scheme://authority" is
// defined on wal path. In this case, if this is a proper store file archiving call, the passed
// FS scheme will be different from the wal dir one, and you should pick root dir as base.
String workingDir = conf.get(CommonFSUtils.HBASE_WAL_DIR);
if(workingDir == null || !workingDir.startsWith(fs.getScheme())){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still worries me because if we ever get passed the wrong filesystem, we'll fail in the same way. It's not clear which filesystem is getting passed into this method either (tracing back up through HRegion).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So archiveStoreFiles gets called by HRegionFileSystem.removeStoreFiles and MobUtils.removeMobFiles. We are really interested on check HRegionFileSystem.removeStoreFiles, since this is the one that eventually get's passed a different FS from the actual store files one, when called from HRegion.replayRecoveredEditsIfAny. There are then the possible combinations of FS and file types to be archived:

  1. We are archiving recovered edits, wals on different dir but same FS as root dir:
    this workingDir value is non null, but since it doesn't has an fs scheme defined, we can assume it's on the same FS as the passed root dir. In this case, we passed the WAL FS, back there on HRegion.replayRecoveredEditsIfAny, but the WAL FS is same FS as the rootdir, so we are good to go with the normal archive path computed from the root dir value.

  2. We are archiving recovered edits, WALs configured on different FS than the rootdir:
    workinDir value is non null and it does start with the same scheme as the passed FS (since we passed the wal FS here). This will create the path for archiving on the WAL FS, as we are passing an schema + authority format.

  3. Archiving store files with wal dir set to a separate dir on the same FS as root dir:
    Again workingDir is non null, but it will not have the scheme portion, so we set workingDir back to the root dir value, which is what we want for store files. This is the case being tested at TestHRegion.testArchiveRecoveredEditsReplay, that's why we assert the files are on a folder under original archive dir.

  4. Archiving store files, wal dir set to a different FS:
    Again workingDir is non null, but it's scheme is different from the passed FS scheme, so we reset it to the rootdir value.

workingDir = conf.get(HConstants.HBASE_DIR);
}
Path rootDir = new Path(workingDir);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about if hbase.wal.dir is null, hbase.rootdir is 's3a://...', but we have a FileSystem with hdfs:// scheme? Should we fail-fast?

Copy link
Contributor Author

@wchevreuil wchevreuil Nov 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a problem indeed, but being realistic, is this ever possible? I mean, how can we receive an FS that had not been set to neither wal, nor root` dirs?

Path storeArchiveDir = HFileArchiveUtil.
getStoreArchivePathForRootDir(rootDir, regionInfo, family);
// make sure we don't archive if we can't and that the archive dir exists
if (!fs.mkdirs(storeArchiveDir)) {
throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ public static Path getStoreArchivePath(Configuration conf,
return HStore.getStoreHomedir(tableArchiveDir, region, family);
}

/**
* Gets the archive directory under specified root dir. One scenario where this is useful is
* when WAL and root dir are configured under different file systems,
* i.e. root dir on S3 and WALs on HDFS.
* This is mostly useful for archiving recovered edits, when
* <b>hbase.region.archive.recovered.edits</b> is enabled.
* @param rootDir {@link Path} the root dir under which archive path should be created.
* @param region parent region information under which the store currently lives
* @param family name of the family in the store
* @return {@link Path} to the WAL FS directory to archive the given store
* or <tt>null</tt> if it should not be archived
*/
public static Path getStoreArchivePathForRootDir(Path rootDir, RegionInfo region, byte[] family) {
Path tableArchiveDir = getTableArchivePath(rootDir, region.getTable());
return HStore.getStoreHomedir(tableArchiveDir, region, family);
}

/**
* Get the archive directory for a given region under the specified table
* @param tableName the table name. Cannot be null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
Expand All @@ -43,16 +48,19 @@
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
Expand All @@ -67,6 +75,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -128,6 +137,62 @@ public static void cleanupTest() throws Exception {
POOL.shutdownNow();
}

@Test
public void testArchiveStoreFilesDifferentFileSystemsWallWithSchemaPlainRoot() throws Exception {
String walDir = "mockFS://mockFSAuthority:9876/mockDir/wals/";
testArchiveStoreFilesDifferentFileSystems(walDir, "/hbase", walDir);
}

@Test
public void testArchiveStoreFilesDifferentFileSystemsWallNullRootWithSchema() throws Exception {
String rootDir = "testFS://test:5432/hbase/";
testArchiveStoreFilesDifferentFileSystems(null, rootDir, rootDir);
}

@Test
public void testArchiveStoreFilesDifferentFileSystemsWallNullPlainRoot() throws Exception {
String rootDir = "/hbase/";
testArchiveStoreFilesDifferentFileSystems(null, rootDir, rootDir);
}

@Test
public void testArchiveStoreFilesDifferentFileSystemsWallAndRootSame() throws Exception {
String rootDir = "/hbase/";
testArchiveStoreFilesDifferentFileSystems("/hbase/wals/", rootDir, rootDir);
}

private void testArchiveStoreFilesDifferentFileSystems(String walDir, String rootDir,
String expectedBase) throws IOException {
FileSystem mockedFileSystem = mock(FileSystem.class);
Configuration conf = new Configuration(UTIL.getConfiguration());
if(walDir != null) {
conf.set(CommonFSUtils.HBASE_WAL_DIR, walDir);
}
conf.set(HConstants.HBASE_DIR, rootDir);
Path filePath = new Path("/mockDir/wals/mockFile");
when(mockedFileSystem.getScheme()).thenReturn("mockFS");
when(mockedFileSystem.mkdirs(any())).thenReturn(true);
when(mockedFileSystem.exists(any())).thenReturn(true);
RegionInfo mockedRegion = mock(RegionInfo.class);
TableName tableName = TableName.valueOf("mockTable");
when(mockedRegion.getTable()).thenReturn(tableName);
when(mockedRegion.getEncodedName()).thenReturn("mocked-region-encoded-name");
Path tableDir = new Path("mockFS://mockDir/tabledir");
byte[] family = Bytes.toBytes("testfamily");
HStoreFile mockedFile = mock(HStoreFile.class);
List<HStoreFile> list = new ArrayList<>();
list.add(mockedFile);
when(mockedFile.getPath()).thenReturn(filePath);
when(mockedFileSystem.rename(any(),any())).thenReturn(true);
HFileArchiver.archiveStoreFiles(conf, mockedFileSystem, mockedRegion,
tableDir, family, list);
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
verify(mockedFileSystem, times(2)).rename(pathCaptor.capture(), any());
String expectedDir = expectedBase +
"archive/data/default/mockTable/mocked-region-encoded-name/testfamily/mockFile";
assertTrue(pathCaptor.getAllValues().get(0).toString().equals(expectedDir));
}

@Test
public void testRemoveRegionDirOnArchive() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,11 @@
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.Threads;
Expand Down Expand Up @@ -676,6 +678,60 @@ public void testToShowNPEOnRegionScannerReseek() throws Exception {
scanner1.close();
}

@Test
public void testArchiveRecoveredEditsReplay() throws Exception {
byte[] family = Bytes.toBytes("family");
this.region = initHRegion(tableName, method, CONF, family);
final WALFactory wals = new WALFactory(CONF, method);
try {
Path regiondir = region.getRegionFileSystem().getRegionDir();
FileSystem fs = region.getRegionFileSystem().getFileSystem();
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();

Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);

long maxSeqId = 1050;
long minSeqId = 1000;

for (long i = minSeqId; i <= maxSeqId; i += 10) {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
fs.create(recoveredEdits);
WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);

long time = System.nanoTime();
WALEdit edit = new WALEdit();
edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
.toBytes(i)));
writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
HConstants.DEFAULT_CLUSTER_ID), edit));

writer.close();
}
MonitoredTask status = TaskMonitor.get().createStatus(method);
Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (HStore store : region.getStores()) {
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
}
CONF.set("hbase.region.archive.recovered.edits", "true");
CONF.set(CommonFSUtils.HBASE_WAL_DIR, "/custom_wal_dir");
long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
region.getMVCC().advanceTo(seqId);
String fakeFamilyName = recoveredEditsDir.getName();
Path rootDir = new Path(CONF.get(HConstants.HBASE_DIR));
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(rootDir,
region.getRegionInfo(), Bytes.toBytes(fakeFamilyName));
FileStatus[] list = TEST_UTIL.getTestFileSystem().listStatus(storeArchiveDir);
assertEquals(6, list.length);
} finally {
CONF.set("hbase.region.archive.recovered.edits", "false");
CONF.set(CommonFSUtils.HBASE_WAL_DIR, "");
HBaseTestingUtility.closeRegionAndWAL(this.region);
this.region = null;
wals.close();
}
}

@Test
public void testSkipRecoveredEditsReplay() throws Exception {
byte[] family = Bytes.toBytes("family");
Expand Down