diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
index 76b085d43c8e..dde8b8b4dc45 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
@@ -282,6 +282,7 @@ message MergeTableRegionsStateData {
repeated RegionInfo region_info = 2;
optional RegionInfo merged_region_info = 3;
optional bool forcible = 4 [default = false];
+ optional string merge_strategy = 5;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/DirectStoreMergeRegionsStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/DirectStoreMergeRegionsStrategy.java
new file mode 100644
index 000000000000..74f962a96b89
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/DirectStoreMergeRegionsStrategy.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_WRITE_STRATEGY;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.regionserver.DirectStoreFSWriteStrategy;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * MergeRegionStrategy implementation to be used in combination with
+ * PersistedStoreEngine to avoid renames when merging regions.
+ *
+ * To use it, define the following properties under table configuration:
+ *
+ * {TABLE_ATTRIBUTES =>
+ * {METADATA => {'hbase.hregion.file.write.strategy' =>
+ * 'org.apache.hadoop.hbase.regionserver.DirectStoreFSWriteStrategy',
+ * 'hbase.hregion.merge.strategy' =>
+ * 'org.apache.hadoop.hbase.master.assignment.DirectStoreMergeRegionsStrategy'}}}
+ *
+ * This will create the resulting merging region directory straight under the table dir, instead of
+ * creating it under the temporary ".merges" dir as done by the default implementation.
+ *
+ *
+ */
+@InterfaceAudience.Private
+public class DirectStoreMergeRegionsStrategy extends MergeRegionsStrategy {
+
+ /**
+ * Inner logic of creating a merging region, this relies on DirectStoreMergeRegionsStrategy to
+ * return the actual paths where to create the new region (avoiding temp dirs and subsequent
+ * renames).
+ * @param env the MasterProcedureEnv wrapping several meta information required.
+ * @param fs the FileSystem instance to write the region directory.
+ * @param regionsToMerge array of RegionInfo representing the regions being merged.
+ * @param tableDir Path instance for the table dir.
+ * @param mergedRegion the resulting merging region.
+ * @return HRegionFileSystem for the resulting merging region.
+ * @throws IOException if any error occurs while creating the region dir.
+ */
+ @Override
+ public HRegionFileSystem createDirAndMergeFiles(MasterProcedureEnv env, FileSystem fs,
+ RegionInfo[] regionsToMerge, Path tableDir, RegionInfo mergedRegion) throws IOException {
+ //creates the resulting merge region dir directly under the table directory, instead of
+ //the temp ".merges" dir
+ Configuration configuration = new Configuration(env.getMasterConfiguration());
+ configuration.set(REGION_WRITE_STRATEGY, DirectStoreFSWriteStrategy.class.getName());
+ HRegionFileSystem mergeRegionFs = HRegionFileSystem.createRegionOnFileSystem(
+ configuration, fs, tableDir, mergedRegion);
+ mergeRegionFs.createMergesDir();
+ for (RegionInfo ri: regionsToMerge) {
+ HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
+ configuration, fs, tableDir, ri, false);
+ mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion);
+ }
+ return mergeRegionFs;
+ }
+
+ private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
+ HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
+ final TableDescriptor htd = env.getMasterServices().getTableDescriptors()
+ .get(mergedRegion.getTable());
+ for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
+ String family = hcd.getNameAsString();
+ final Collection storeFiles = regionFs.getStoreFiles(family);
+ if (storeFiles != null && storeFiles.size() > 0) {
+ for (StoreFileInfo storeFileInfo : storeFiles) {
+ // Create reference file(s) to parent region file here in mergedDir.
+ // As this procedure is running on master, use CacheConfig.DISABLED means
+ // don't cache any block.
+ mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
+ new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED),
+ mergeRegionFs.getMergesDir());
+ }
+ }
+ }
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeRegionsStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeRegionsStrategy.java
new file mode 100644
index 000000000000..308abca5cd54
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeRegionsStrategy.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.io.IOException;
+
+/**
+ * Region merge directory creation strategy to decouple create dir logic from
+ * MergeTableRegionsProcedure and allow for plugable behaviour.
+ */
+@InterfaceAudience.Private
+public abstract class MergeRegionsStrategy {
+
+ /**
+ * Creates the resulting merging region dir and files in the file system, then updates
+ * meta table information for the given region. Specific logic on where in the files system to
+ * create the region structure is delegated to innerMergeRegions and the
+ * actual HRegionFileSystemWriteStrategy implementation.
+ * @param env the MasterProcedureEnv wrapping several meta information required.
+ * @param regionsToMerge array of RegionInfo representing the regions being merged.
+ * @param mergedRegion the resulting merging region.
+ * @throws IOException if any error occurs while creating the region dir.
+ */
+ public void createMergedRegion(MasterProcedureEnv env, RegionInfo[] regionsToMerge,
+ RegionInfo mergedRegion) throws IOException {
+ final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ final Path tabledir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
+ final FileSystem fs = mfs.getFileSystem();
+ HRegionFileSystem mergeRegionFs = createDirAndMergeFiles(env, fs, regionsToMerge,
+ tabledir, mergedRegion);
+ assert mergeRegionFs != null;
+ mergeRegionFs.commitMergedRegion(mergedRegion);
+ // Prepare to create merged regions
+ env.getAssignmentManager().getRegionStates().
+ getOrCreateRegionStateNode(mergedRegion).setState(RegionState.State.MERGING_NEW);
+ }
+
+ /**
+ * Should define specific logic about where in the file system the region structure should be
+ * created.
+ * @param env the MasterProcedureEnv wrapping several meta information required.
+ * @param fs the FileSystem instance to write the region directory.
+ * @param regionsToMerge array of RegionInfo representing the regions being merged.
+ * @param tableDir Path instance for the table dir.
+ * @param mergedRegion the resulting merging region.
+ * @return HRegionFileSystem for the resulting merging region.
+ * @throws IOException if any error occurs while creating the region dir.
+ */
+ abstract protected HRegionFileSystem createDirAndMergeFiles(MasterProcedureEnv env, FileSystem fs,
+ RegionInfo[] regionsToMerge, Path tableDir, RegionInfo mergedRegion) throws IOException;
+
+
+ /**
+ * Clean up a merged region on rollback after failure.
+ * @param env the MasterProcedureEnv wrapping several meta information required.
+ * @param regionsToMerge array of RegionInfo representing the regions being merged.
+ * @param mergedRegion the resulting merging region.
+ * @throws IOException if any error occurs while creating the region dir.
+ */
+ public void cleanupMergedRegion(MasterProcedureEnv env, RegionInfo[] regionsToMerge,
+ RegionInfo mergedRegion) throws IOException {
+ final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ TableName tn = regionsToMerge[0].getTable();
+ final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), tn);
+ final FileSystem fs = mfs.getFileSystem();
+ // See createMergedRegion above where we specify the merge dir as being in the
+ // FIRST merge parent region.
+ HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
+ env.getMasterConfiguration(), fs, tableDir, regionsToMerge[0], false);
+ regionFs.cleanupMergedRegion(mergedRegion);
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
index 80a61dae9036..696138b30712 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -24,6 +24,8 @@
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
@@ -36,6 +38,7 @@
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -54,7 +57,7 @@
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -74,6 +77,9 @@
@InterfaceAudience.Private
public class MergeTableRegionsProcedure
extends AbstractStateMachineTableProcedure {
+
+ public static final String MERGE_REGION_STRATEGY = "hbase.hregion.merge.strategy";
+
private static final Logger LOG = LoggerFactory.getLogger(MergeTableRegionsProcedure.class);
private ServerName regionLocation;
@@ -89,6 +95,8 @@ public class MergeTableRegionsProcedure
private boolean force;
+ private MergeRegionsStrategy mergeStrategy;
+
public MergeTableRegionsProcedure() {
// Required by the Procedure framework to create the procedure on replay
}
@@ -107,6 +115,24 @@ public MergeTableRegionsProcedure(final MasterProcedureEnv env,
// Preflight depends on mergedRegion being set (at least).
preflightChecks(env, true);
this.force = force;
+ TableDescriptor descriptor = env.getMasterServices().getTableDescriptors().
+ get(mergedRegion.getTable());
+ createMergeStrategy(descriptor);
+ }
+
+ private void createMergeStrategy(TableDescriptor descriptor) {
+ String className = descriptor.getValue(MERGE_REGION_STRATEGY);
+ createMergeStrategy(className);
+ }
+
+ private void createMergeStrategy(String className) {
+ try {
+ LOG.info("instantiating write strategy {}", className);
+ mergeStrategy = (MergeRegionsStrategy) ReflectionUtils.newInstance(Class.forName(className));
+ } catch (Exception e) {
+ LOG.error("Unable to create write strategy: {}", className, e);
+ throw new RuntimeException(e);
+ }
}
/**
@@ -211,7 +237,7 @@ protected Flow executeFromState(final MasterProcedureEnv env,
break;
case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
removeNonDefaultReplicas(env);
- createMergedRegion(env);
+ mergeStrategy.createMergedRegion(env, regionsToMerge, mergedRegion);
setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_WRITE_MAX_SEQUENCE_ID_FILE);
break;
case MERGE_TABLE_REGIONS_WRITE_MAX_SEQUENCE_ID_FILE:
@@ -281,7 +307,7 @@ protected void rollbackState(final MasterProcedureEnv env, final MergeTableRegio
break;
case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION:
case MERGE_TABLE_REGIONS_WRITE_MAX_SEQUENCE_ID_FILE:
- cleanupMergedRegion(env);
+ mergeStrategy.cleanupMergedRegion(env, regionsToMerge, mergedRegion);
break;
case MERGE_TABLE_REGIONS_CHECK_CLOSED_REGIONS:
break;
@@ -360,7 +386,8 @@ protected void serializeStateData(ProcedureStateSerializer serializer)
MasterProcedureProtos.MergeTableRegionsStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setMergedRegionInfo(ProtobufUtil.toRegionInfo(mergedRegion))
- .setForcible(force);
+ .setForcible(force)
+ .setMergeStrategy(mergeStrategy.getClass().getName());
for (RegionInfo ri: regionsToMerge) {
mergeTableRegionsMsg.addRegionInfo(ProtobufUtil.toRegionInfo(ri));
}
@@ -381,6 +408,12 @@ protected void deserializeStateData(ProcedureStateSerializer serializer)
for (int i = 0; i < regionsToMerge.length; i++) {
regionsToMerge[i] = ProtobufUtil.toRegionInfo(mergeTableRegionsMsg.getRegionInfo(i));
}
+ if(mergeTableRegionsMsg.getMergeStrategy()==null ||
+ mergeTableRegionsMsg.getMergeStrategy().isEmpty()){
+ this.createMergeStrategy(DefaultMergeStrategy.class.getName());
+ } else {
+ createMergeStrategy(mergeTableRegionsMsg.getMergeStrategy());
+ }
mergedRegion = ProtobufUtil.toRegionInfo(mergeTableRegionsMsg.getMergedRegionInfo());
}
@@ -561,85 +594,6 @@ private void setRegionStateToMerging(final MasterProcedureEnv env) {
}
}
- /**
- * Create merged region.
- * The way the merge works is that we make a 'merges' temporary
- * directory in the FIRST parent region to merge (Do not change this without
- * also changing the rollback where we look in this FIRST region for the
- * merge dir). We then collect here references to all the store files in all
- * the parent regions including those of the FIRST parent region into a
- * subdirectory, named for the resultant merged region. We then call
- * commitMergeRegion. It finds this subdirectory of storefile references
- * and moves them under the new merge region (creating the region layout
- * as side effect). After assign of the new merge region, we will run a
- * compaction. This will undo the references but the reference files remain
- * in place until the archiver runs (which it does on a period as a chore
- * in the RegionServer that hosts the merge region -- see
- * CompactedHFilesDischarger). Once the archiver has moved aside the
- * no-longer used references, the merge region no longer has references.
- * The catalog janitor will notice when it runs next and it will remove
- * the old parent regions.
- */
- private void createMergedRegion(final MasterProcedureEnv env) throws IOException {
- final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
- final Path tabledir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
- final FileSystem fs = mfs.getFileSystem();
- HRegionFileSystem mergeRegionFs = null;
- for (RegionInfo ri: this.regionsToMerge) {
- HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
- env.getMasterConfiguration(), fs, tabledir, ri, false);
- if (mergeRegionFs == null) {
- mergeRegionFs = regionFs;
- mergeRegionFs.createMergesDir();
- }
- mergeStoreFiles(env, regionFs, mergeRegionFs.getMergesDir());
- }
- assert mergeRegionFs != null;
- mergeRegionFs.commitMergedRegion(mergedRegion);
-
- // Prepare to create merged regions
- env.getAssignmentManager().getRegionStates().
- getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW);
- }
-
- /**
- * Create reference file(s) to parent region hfiles in the mergeDir
- * @param regionFs merge parent region file system
- * @param mergeDir the temp directory in which we are accumulating references.
- */
- private void mergeStoreFiles(final MasterProcedureEnv env, final HRegionFileSystem regionFs,
- final Path mergeDir) throws IOException {
- final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
- for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
- String family = hcd.getNameAsString();
- final Collection storeFiles = regionFs.getStoreFiles(family);
- if (storeFiles != null && storeFiles.size() > 0) {
- for (StoreFileInfo storeFileInfo : storeFiles) {
- // Create reference file(s) to parent region file here in mergedDir.
- // As this procedure is running on master, use CacheConfig.DISABLED means
- // don't cache any block.
- regionFs.mergeStoreFile(mergedRegion, family, new HStoreFile(
- storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED), mergeDir);
- }
- }
- }
- }
-
- /**
- * Clean up a merged region on rollback after failure.
- */
- private void cleanupMergedRegion(final MasterProcedureEnv env) throws IOException {
- final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
- TableName tn = this.regionsToMerge[0].getTable();
- final Path tabledir = CommonFSUtils.getTableDir(mfs.getRootDir(), tn);
- final FileSystem fs = mfs.getFileSystem();
- // See createMergedRegion above where we specify the merge dir as being in the
- // FIRST merge parent region.
- HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
- env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false);
- regionFs.cleanupMergedRegion(mergedRegion);
- }
-
/**
* Rollback close regions
**/
@@ -763,4 +717,62 @@ protected boolean abort(MasterProcedureEnv env) {
// range of steps; what do we do for these should an operator want to cancel them? HBASE-20022.
return isRollbackSupported(getCurrentState()) && super.abort(env);
}
+
+ public static class DefaultMergeStrategy extends MergeRegionsStrategy {
+
+ /**
+ * Create merged region.
+ * The way the merge works is that we make a 'merges' temporary
+ * directory in the FIRST parent region to merge (Do not change this without
+ * also changing the rollback where we look in this FIRST region for the
+ * merge dir). We then collect here references to all the store files in all
+ * the parent regions including those of the FIRST parent region into a
+ * subdirectory, named for the resultant merged region. We then call
+ * commitMergeRegion. It finds this subdirectory of storefile references
+ * and moves them under the new merge region (creating the region layout
+ * as side effect). After assign of the new merge region, we will run a
+ * compaction. This will undo the references but the reference files remain
+ * in place until the archiver runs (which it does on a period as a chore
+ * in the RegionServer that hosts the merge region -- see
+ * CompactedHFilesDischarger). Once the archiver has moved aside the
+ * no-longer used references, the merge region no longer has references.
+ * The catalog janitor will notice when it runs next and it will remove
+ * the old parent regions.
+ */
+ @Override
+ public HRegionFileSystem createDirAndMergeFiles(MasterProcedureEnv env, FileSystem fs,
+ RegionInfo[] regionsToMerge, Path tableDir, RegionInfo mergedRegion) throws IOException {
+ HRegionFileSystem mergeRegionFs = null;
+ for (RegionInfo ri: regionsToMerge) {
+ HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
+ env.getMasterConfiguration(), fs, tableDir, ri, false);
+ if (mergeRegionFs == null) {
+ mergeRegionFs = regionFs;
+ mergeRegionFs.createMergesDir();
+ }
+ mergeStoreFiles(env, regionFs, mergeRegionFs.getMergesDir(), mergedRegion);
+ }
+ return mergeRegionFs;
+ }
+
+ private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs, Path mergeDir,
+ RegionInfo mergedRegion) throws IOException {
+ final TableDescriptor htd = env.getMasterServices().getTableDescriptors()
+ .get(mergedRegion.getTable());
+ for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
+ String family = hcd.getNameAsString();
+ final Collection storeFiles = regionFs.getStoreFiles(family);
+ if (storeFiles != null && storeFiles.size() > 0) {
+ for (StoreFileInfo storeFileInfo : storeFiles) {
+ // Create reference file(s) to parent region file here in mergedDir.
+ // As this procedure is running on master, use CacheConfig.DISABLED means
+ // don't cache any block.
+ regionFs.mergeStoreFile(mergedRegion, family,
+ new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED),
+ mergeDir);
+ }
+ }
+ }
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index 09ac8274bf2a..e149b13d2d9c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.master.assignment;
+import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_WRITE_STRATEGY;
+
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@@ -616,12 +618,14 @@ public void createDaughterRegions(final MasterProcedureEnv env) throws IOExcepti
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Path tabledir = CommonFSUtils.getTableDir(mfs.getRootDir(), getTableName());
final FileSystem fs = mfs.getFileSystem();
+ TableDescriptor descriptor = env.getMasterServices().getTableDescriptors().
+ get(this.getParentRegion().getTable());
+ Configuration configuration = new Configuration(env.getMasterConfiguration());
+ configuration.set(REGION_WRITE_STRATEGY, descriptor.getValue(REGION_WRITE_STRATEGY));
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
- env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
+ configuration, fs, tabledir, getParentRegion(), false);
regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);
-
Pair expectedReferences = splitStoreFiles(env, regionFs);
-
assertReferenceFileCount(fs, expectedReferences.getFirst(),
regionFs.getSplitsDir(daughterOneRI));
//Move the files from the temporary .splits to the final /table/region directory
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFSWriteStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFSWriteStrategy.java
new file mode 100644
index 000000000000..b80aedf99490
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFSWriteStrategy.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HRegionFileSystemWriteStrategy implementation to be used in combination with
+ * PersistedStoreEngine to avoid renames when splitting and merging regions.
+ *
+ * To use it, define the following properties under table configuration:
+ *
+ * {TABLE_ATTRIBUTES =>
+ * {METADATA => {'hbase.hregion.file.write.strategy' =>
+ * 'org.apache.hadoop.hbase.regionserver.DirectStoreFSWriteStrategy',
+ * 'hbase.hregion.merge.strategy' =>
+ * 'org.apache.hadoop.hbase.master.assignment.DirectStoreMergeRegionsStrategy'}}}
+ *
+ * This will create the resulting merging and splitting regions directory straight under
+ * the table dir, instead of creating it under the temporary ".tmp" or ".merges" dirs,
+ * as done by the default implementation.
+ */
+@InterfaceAudience.Private
+public class DirectStoreFSWriteStrategy extends HRegionFileSystemWriteStrategy {
+ private StoreFilePathAccessor accessor;
+ private Map>> regionSplitReferences = new HashMap<>();
+ private Map> mergeReferences = new HashMap();
+
+ public DirectStoreFSWriteStrategy(HRegionFileSystem fileSystem) throws IOException {
+ super(fileSystem);
+ this.accessor = StoreFileTrackingUtils.createStoreFilePathAccessor(fileSystem.conf,
+ ConnectionFactory.createConnection(fileSystem.conf));
+ }
+
+ /**
+ * The parent directory where to create the splits dirs is
+ * the table directory itself, in this case.
+ * @return Path representing the table directory.
+ */
+ @Override
+ public Path getParentSplitsDir() {
+ return fileSystem.getTableDir();
+ }
+
+ /**
+ * The parent directory where to create the merge dir is
+ * the table directory itself, in this case.
+ * @return Path representing the table directory.
+ */
+ @Override
+ public Path getParentMergesDir() {
+ return fileSystem.getTableDir();
+ }
+
+ /**
+ * Creates the directories for the respective split daughters directly under the
+ * table directory, instead of default behaviour of doing it under temp dirs, initially.
+ * @param daughterA the first half of the split region
+ * @param daughterB the second half of the split region
+ *
+ * @throws IOException if directories creation fails.
+ */
+ @Override
+ public void createSplitsDir(RegionInfo daughterA, RegionInfo daughterB)
+ throws IOException {
+ Path splitdir = getParentSplitsDir();
+ // splitDir doesn't exists now. No need to do an exists() call for it.
+ if (!fileSystem.getFileSystem().exists(splitdir)) {
+ throw new IOException("Table dir for splitting region not found: " + splitdir);
+ }
+ Path daughterADir = getSplitsDir(daughterA);
+ if (!fileSystem.createDir(daughterADir)) {
+ throw new IOException("Failed create of " + daughterADir);
+ }
+ Path daughterBDir = getSplitsDir(daughterB);
+ if (!fileSystem.createDir(daughterBDir)) {
+ throw new IOException("Failed create of " + daughterBDir);
+ }
+ }
+
+ /**
+ * Just validates that merges parent, the actual table dir in this case, exists.
+ * @throws IOException if table dir doesn't exist.
+ */
+ @Override
+ public void createMergesDir() throws IOException {
+ //When writing directly, avoiding renames, merges parent is the table dir itself, so it
+ // should exist already, so just validate it exist then do nothing
+ Path mergesdir = getParentMergesDir();
+ if (!fileSystem.fs.exists(mergesdir)) {
+ throw new IOException("Table dir for merging region not found: " + mergesdir);
+ }
+ }
+
+ /**
+ * Wraps super.splitStoreFile, so that it can map the resulting split reference to
+ * the related split region and column family, in order to add this to 'storefile' system table
+ * for the tracking logic of PersisedStoreFileManager later on commitDaughterRegion
+ * method.
+ * @param parentRegionInfo {@link RegionInfo} of the parent split region.
+ * @param daughterRegionInfo {@link RegionInfo} of the resulting split region.
+ * @param familyName Column Family Name
+ * @param f File to split.
+ * @param splitRow Split Row
+ * @param top True if we are referring to the top half of the hfile.
+ * @param splitPolicy A split policy instance; be careful! May not be full populated; e.g. if
+ * this method is invoked on the Master side, then the RegionSplitPolicy will
+ * NOT have a reference to a Region.
+ * @param fs FileSystem instance for creating the actual reference file.
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public Path splitStoreFile(RegionInfo parentRegionInfo, RegionInfo daughterRegionInfo,
+ String familyName, HStoreFile f, byte[] splitRow,
+ boolean top, RegionSplitPolicy splitPolicy, FileSystem fs) throws IOException {
+ Path path = super.splitStoreFile(parentRegionInfo, daughterRegionInfo,
+ familyName, f, splitRow, top, splitPolicy, fs);
+ Map> splitReferences = regionSplitReferences.
+ get(daughterRegionInfo.getEncodedName());
+ if(splitReferences==null){
+ splitReferences = new HashMap<>();
+ regionSplitReferences.put(daughterRegionInfo.getEncodedName(), splitReferences);
+ }
+ List references = splitReferences.get(familyName);
+ if(references==null){
+ references = new ArrayList<>();
+ splitReferences.put(familyName,references);
+ }
+ references.add(path);
+ return path;
+ }
+
+ /**
+ * Wraps super.mergeStoreFile, so that it can map the resulting merge reference to
+ * the related merged region and column family, in order to add this to 'storefile' system table
+ * for the tracking logic of PersisedStoreFileManager later in commitMergedRegion.
+ * @param regionToMerge {@link RegionInfo} for one of the regions being merged.
+ * @param mergedRegion {@link RegionInfo} of the merged region
+ * @param familyName Column Family Name
+ * @param f File to create reference.
+ * @param mergedDir
+ * @param fs FileSystem instance for creating the actual reference file.
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public Path mergeStoreFile(RegionInfo regionToMerge, RegionInfo mergedRegion, String familyName,
+ HStoreFile f, Path mergedDir, FileSystem fs) throws IOException {
+ if (this.fileSystem.regionInfoForFs.equals(regionToMerge)) {
+ Path path = super.mergeStoreFile(mergedRegion, regionToMerge, familyName, f, mergedDir, fs);
+ List referenceFiles = mergeReferences.get(familyName);
+ if (referenceFiles == null) {
+ referenceFiles = new ArrayList<>();
+ mergeReferences.put(familyName, referenceFiles);
+ }
+ referenceFiles.add(path);
+ return path;
+ } else {
+ throw new IOException("Wrong ordering of regions parameter. "
+ + "The resulting merge should be the first param.");
+ }
+ }
+
+ /**
+ * Do nothing. Here the split dir is the table dir itself, we cannot delete it.
+ * @throws IOException
+ */
+ @Override
+ public void cleanupSplitsDir() throws IOException {
+ }
+
+ /**
+ * Do nothing. Here the merge dir is the table dir itself, we cannot delete it.
+ * @throws IOException
+ */
+ @Override
+ public void cleanupMergesDir() throws IOException {
+ //do nothing, the merges dir is the store dir itself, so we cannot delete it.
+ }
+
+ /**
+ * Do nothing. Here the split dir is the table dir itself, we cannot delete it.
+ * @throws IOException
+ */
+ @Override
+ public void cleanupAnySplitDetritus() throws IOException {
+ }
+
+ /**
+ * Adds all reference files for the given daughter region into 'storefile' system table for
+ * the tracking logic of PersisedStoreFileManager.
+ * @param regionInfo the resulting daughter region to be committed.
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public Path commitDaughterRegion(RegionInfo regionInfo) throws IOException {
+ Path regionDir = this.getSplitsDir(regionInfo);
+ //The newly created region might not have any file, so no need to add into file manager tracker
+ Map> splitReferences =
+ regionSplitReferences.get(regionInfo.getEncodedName());
+ if(splitReferences!=null) {
+ for (String family : splitReferences.keySet()) {
+ List referenceList = splitReferences.get(family);
+ accessor.writeStoreFilePaths(regionInfo.getTable().getNameAsString(),
+ regionInfo.getRegionNameAsString(), family,
+ StoreFilePathUpdate.builder().withStorePaths(referenceList).build());
+ }
+ }
+ regionSplitReferences.remove(regionInfo.getEncodedName());
+ return regionDir;
+ }
+
+ /**
+ * Adds all reference files for the merge result region into 'storefile' system table for
+ * the tracking logic of PersisedStoreFileManager.
+ * @param mergedRegionInfo merged region {@link RegionInfo}
+ * @throws IOException
+ */
+ @Override
+ public void commitMergedRegion(RegionInfo mergedRegionInfo) throws IOException {
+ for(String family : mergeReferences.keySet()){
+ accessor.writeStoreFilePaths(mergedRegionInfo.getTable().getNameAsString(),
+ mergedRegionInfo.getRegionNameAsString(),
+ family,
+ StoreFilePathUpdate.builder().withStorePaths(mergeReferences.get(family)).build());
+ }
+ mergeReferences.clear();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index dd26f1ca9cc1..541eb9a56649 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -49,6 +49,7 @@
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -76,6 +77,8 @@ public class HRegionFileSystem {
/** Temporary subdirectory of the region directory used for compaction output. */
static final String REGION_TEMP_DIR = ".tmp";
+ public static final String REGION_WRITE_STRATEGY = "hbase.hregion.file.write.strategy";
+
private final RegionInfo regionInfo;
//regionInfo for interacting with FS (getting encodedName, etc)
final RegionInfo regionInfoForFs;
@@ -93,6 +96,7 @@ public class HRegionFileSystem {
private static final int DEFAULT_HDFS_CLIENT_RETRIES_NUMBER = 10;
private static final int DEFAULT_BASE_SLEEP_BEFORE_RETRIES = 1000;
+ private HRegionFileSystemWriteStrategy writeStrategy;
/**
* Create a view to the on-disk region
* @param conf the {@link Configuration} to use
@@ -112,6 +116,7 @@ public class HRegionFileSystem {
DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries",
DEFAULT_BASE_SLEEP_BEFORE_RETRIES);
+ this.createWriteStrategy(conf);
}
/** @return the underlying {@link FileSystem} */
@@ -572,20 +577,34 @@ Pair bulkLoadStoreFile(final String familyName, Path srcPath, long s
// ===========================================================================
// Splits Helpers
// ===========================================================================
- /** @return {@link Path} to the temp directory used during split operations */
- Path getSplitsDir() {
- return new Path(getRegionDir(), REGION_SPLITS_DIR);
+ /**
+ * Return the parent path for the split regions, according to the
+ * HRegionFileSystem.WriteStrategy implementation logic.
+ * See HRegionFileSystem.WriteStrategy.getSplitsDir for the default behaviour.
+ *
+ * @return {@link Path} to the temp directory used during split operations
+ * */
+ Path getSplitsDir () {
+ return this.writeStrategy.getParentSplitsDir();
}
+ /**
+ * Return the path for the split regions, according to the
+ * HRegionFileSystem.WriteStrategy implementation logic.
+ * See HRegionFileSystem.WriteStrategy.getSplitsDir for the default behaviour.
+ *
+ * @param hri the resulting split region info.
+ * @return {@link Path} to the temp directory used during split operations
+ * */
public Path getSplitsDir(final RegionInfo hri) {
- return new Path(getSplitsDir(), hri.getEncodedName());
+ return this.writeStrategy.getSplitsDir(hri);
}
/**
* Clean up any split detritus that may have been left around from previous split attempts.
*/
void cleanupSplitsDir() throws IOException {
- deleteDir(getSplitsDir());
+ writeStrategy.cleanupSplitsDir();
}
/**
@@ -595,26 +614,9 @@ void cleanupSplitsDir() throws IOException {
* @throws IOException
*/
void cleanupAnySplitDetritus() throws IOException {
- Path splitdir = this.getSplitsDir();
- if (!fs.exists(splitdir)) return;
- // Look at the splitdir. It could have the encoded names of the daughter
- // regions we tried to make. See if the daughter regions actually got made
- // out under the tabledir. If here under splitdir still, then the split did
- // not complete. Try and do cleanup. This code WILL NOT catch the case
- // where we successfully created daughter a but regionserver crashed during
- // the creation of region b. In this case, there'll be an orphan daughter
- // dir in the filesystem. TOOD: Fix.
- FileStatus[] daughters = CommonFSUtils.listStatus(fs, splitdir, new FSUtils.DirFilter(fs));
- if (daughters != null) {
- for (FileStatus daughter: daughters) {
- Path daughterDir = new Path(getTableDir(), daughter.getPath().getName());
- if (fs.exists(daughterDir) && !deleteDir(daughterDir)) {
- throw new IOException("Failed delete of " + daughterDir);
- }
- }
- }
+ writeStrategy.cleanupAnySplitDetritus();
cleanupSplitsDir();
- LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
+ LOG.info("Cleaned up old failed split transaction detritus: " + getSplitsDir());
}
/**
@@ -630,56 +632,27 @@ void cleanupDaughterRegion(final RegionInfo regionInfo) throws IOException {
}
/**
- * Commit a daughter region, moving it from the split temporary directory
- * to the proper location in the filesystem.
+ * Commit a daughter region, according to the HRegionFileSystem.WriteStrategy
+ * implementation logic. See HRegionFileSystem.WriteStrategy.commitDaughterRegion
+ * for the default behaviour.
*
* @param regionInfo daughter {@link org.apache.hadoop.hbase.client.RegionInfo}
- * @throws IOException
+ * @throws IOException if any errors occur.
*/
public Path commitDaughterRegion(final RegionInfo regionInfo)
throws IOException {
- Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName());
- Path daughterTmpDir = this.getSplitsDir(regionInfo);
-
- if (fs.exists(daughterTmpDir)) {
-
- // Write HRI to a file in case we need to recover hbase:meta
- Path regionInfoFile = new Path(daughterTmpDir, REGION_INFO_FILE);
- byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
- writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
-
- // Move the daughter temp dir to the table dir
- if (!rename(daughterTmpDir, regionDir)) {
- throw new IOException("Unable to rename " + daughterTmpDir + " to " + regionDir);
- }
- }
-
- return regionDir;
+ return this.writeStrategy.commitDaughterRegion(regionInfo);
}
/**
- * Create the region splits directory.
+ * Create the region splits directory, according to the
+ * HRegionFileSystem.WriteStrategy implementation logic.
+ * See HRegionFileSystem.WriteStrategy.createSplitsDir for the default behaviour.
+ * @param daughterA the first resulting daughter.
+ * @param daughterB the second resulting daughter.
*/
public void createSplitsDir(RegionInfo daughterA, RegionInfo daughterB) throws IOException {
- Path splitdir = getSplitsDir();
- if (fs.exists(splitdir)) {
- LOG.info("The " + splitdir + " directory exists. Hence deleting it to recreate it");
- if (!deleteDir(splitdir)) {
- throw new IOException("Failed deletion of " + splitdir + " before creating them again.");
- }
- }
- // splitDir doesn't exists now. No need to do an exists() call for it.
- if (!createDir(splitdir)) {
- throw new IOException("Failed create of " + splitdir);
- }
- Path daughterATmpDir = getSplitsDir(daughterA);
- if (!createDir(daughterATmpDir)) {
- throw new IOException("Failed create of " + daughterATmpDir);
- }
- Path daughterBTmpDir = getSplitsDir(daughterB);
- if (!createDir(daughterBTmpDir)) {
- throw new IOException("Failed create of " + daughterBTmpDir);
- }
+ this.writeStrategy.createSplitsDir(daughterA,daughterB);
}
/**
@@ -698,71 +671,33 @@ public void createSplitsDir(RegionInfo daughterA, RegionInfo daughterB) throws I
*/
public Path splitStoreFile(RegionInfo hri, String familyName, HStoreFile f, byte[] splitRow,
boolean top, RegionSplitPolicy splitPolicy) throws IOException {
- if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
- // Check whether the split row lies in the range of the store file
- // If it is outside the range, return directly.
- f.initReader();
- try {
- if (top) {
- //check if larger than last key.
- Cell splitKey = PrivateCellUtil.createFirstOnRow(splitRow);
- Optional lastKey = f.getLastKey();
- // If lastKey is null means storefile is empty.
- if (!lastKey.isPresent()) {
- return null;
- }
- if (f.getComparator().compare(splitKey, lastKey.get()) > 0) {
- return null;
- }
- } else {
- //check if smaller than first key
- Cell splitKey = PrivateCellUtil.createLastOnRow(splitRow);
- Optional firstKey = f.getFirstKey();
- // If firstKey is null means storefile is empty.
- if (!firstKey.isPresent()) {
- return null;
- }
- if (f.getComparator().compare(splitKey, firstKey.get()) < 0) {
- return null;
- }
- }
- } finally {
- f.closeStoreFile(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true);
- }
- }
-
- Path splitDir = new Path(getSplitsDir(hri), familyName);
- // A reference to the bottom half of the hsf store file.
- Reference r =
- top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
- // Add the referred-to regions name as a dot separated suffix.
- // See REF_NAME_REGEX regex above. The referred-to regions name is
- // up in the path of the passed in f -- parentdir is family,
- // then the directory above is the region name.
- String parentRegionName = regionInfoForFs.getEncodedName();
- // Write reference with same file id only with the other region name as
- // suffix and into the new region location (under same family).
- Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
- return r.write(fs, p);
+ return writeStrategy.splitStoreFile(regionInfoForFs, hri, familyName,
+ f, splitRow, top, splitPolicy, fs);
}
// ===========================================================================
// Merge Helpers
// ===========================================================================
- /** @return {@link Path} to the temp directory used during merge operations */
+ /**
+ * Return the path for the merged region, according to the
+ * HRegionFileSystem.WriteStrategy implementation logic.
+ * See HRegionFileSystem.WriteStrategy.getMergesDir for the default behaviour.
+ *
+ * @return {@link Path} to the temp directory used during merge operations
+ * */
public Path getMergesDir() {
- return new Path(getRegionDir(), REGION_MERGES_DIR);
+ return writeStrategy.getParentMergesDir();
}
- Path getMergesDir(final RegionInfo hri) {
- return new Path(getMergesDir(), hri.getEncodedName());
+ public Path getMergesDir(final RegionInfo hri) {
+ return writeStrategy.getMergesDir(hri);
}
/**
* Clean up any merge detritus that may have been left around from previous merge attempts.
*/
void cleanupMergesDir() throws IOException {
- deleteDir(getMergesDir());
+ writeStrategy.cleanupMergesDir();
}
/**
@@ -787,74 +722,40 @@ static boolean mkdirs(FileSystem fs, Configuration conf, Path dir) throws IOExce
}
/**
- * Create the region merges directory, a temporary directory to accumulate
- * merges in.
+ * Create the region merges directory, according to the
+ * HRegionFileSystem.WriteStrategy implementation logic.
+ * See HRegionFileSystem.WriteStrategy.createMergesDir for the default behaviour.
* @throws IOException If merges dir already exists or we fail to create it.
* @see HRegionFileSystem#cleanupMergesDir()
*/
public void createMergesDir() throws IOException {
- Path mergesdir = getMergesDir();
- if (fs.exists(mergesdir)) {
- LOG.info("{} directory exists. Deleting it to recreate it anew", mergesdir);
- if (!fs.delete(mergesdir, true)) {
- throw new IOException("Failed deletion of " + mergesdir + " before recreate.");
- }
- }
- if (!mkdirs(fs, conf, mergesdir)) {
- throw new IOException("Failed create of " + mergesdir);
- }
+ this.writeStrategy.createMergesDir();
}
/**
- * Write out a merge reference under the given merges directory. Package local
- * so it doesnt leak out of regionserver.
- * @param mergedRegion {@link RegionInfo} of the merged region
+ * Write out a merge reference under the given merges directory.
+ * @param mergedRegion {@link RegionInfo} of the region being merged
* @param familyName Column Family Name
* @param f File to create reference.
- * @param mergedDir
+ * @param mergedDir the resulting merged region directory where the reference must be created.
* @return Path to created reference.
* @throws IOException
*/
public Path mergeStoreFile(RegionInfo mergedRegion, String familyName, HStoreFile f,
Path mergedDir) throws IOException {
- Path referenceDir = new Path(new Path(mergedDir,
- mergedRegion.getEncodedName()), familyName);
- // A whole reference to the store file.
- Reference r = Reference.createTopReference(regionInfoForFs.getStartKey());
- // Add the referred-to regions name as a dot separated suffix.
- // See REF_NAME_REGEX regex above. The referred-to regions name is
- // up in the path of the passed in f -- parentdir is family,
- // then the directory above is the region name.
- String mergingRegionName = regionInfoForFs.getEncodedName();
- // Write reference with same file id only with the other region name as
- // suffix and into the new region location (under same family).
- Path p = new Path(referenceDir, f.getPath().getName() + "."
- + mergingRegionName);
- return r.write(fs, p);
+ return writeStrategy.
+ mergeStoreFile(regionInfoForFs, mergedRegion, familyName, f, mergedDir, fs);
}
/**
- * Commit a merged region, moving it from the merges temporary directory to
- * the proper location in the filesystem.
+ * Commits a merged region, according to the HRegionFileSystem.WriteStrategy
+ * implementation logic. See HRegionFileSystem.WriteStrategy.commitMergedRegion for
+ * the default behaviour.
* @param mergedRegionInfo merged region {@link RegionInfo}
* @throws IOException
*/
public void commitMergedRegion(final RegionInfo mergedRegionInfo) throws IOException {
- Path regionDir = new Path(this.tableDir, mergedRegionInfo.getEncodedName());
- Path mergedRegionTmpDir = this.getMergesDir(mergedRegionInfo);
- // Move the tmp dir to the expected location
- if (mergedRegionTmpDir != null && fs.exists(mergedRegionTmpDir)) {
-
- // Write HRI to a file in case we need to recover hbase:meta
- Path regionInfoFile = new Path(mergedRegionTmpDir, REGION_INFO_FILE);
- byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
- writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
-
- if (!fs.rename(mergedRegionTmpDir, regionDir)) {
- throw new IOException("Unable to rename " + mergedRegionTmpDir + " to "
- + regionDir);
- }
- }
+ this.writeStrategy.commitMergedRegion(mergedRegionInfo);
}
// ===========================================================================
@@ -1028,6 +929,7 @@ public static HRegionFileSystem createRegionOnFileSystem(final Configuration con
// Write HRI to a file in case we need to recover hbase:meta
regionFs.writeRegionInfoOnFilesystem(false);
+ LOG.trace("Created region info file for {}", regionInfo.getEncodedName());
} else {
if (LOG.isDebugEnabled())
LOG.debug("Skipping creation of .regioninfo file for " + regionInfo);
@@ -1247,4 +1149,187 @@ private static void sleepBeforeRetry(String msg, int sleepMultiplier, int baseSl
}
Thread.sleep((long)baseSleepBeforeRetries * sleepMultiplier);
}
+
+ private void createWriteStrategy(Configuration conf) {
+ String className = conf.get(REGION_WRITE_STRATEGY, DefaultWriteStrategy.class.getName());
+ try {
+ LOG.info("instantiating write strategy {}", className);
+ writeStrategy = ReflectionUtils.instantiateWithCustomCtor(className,
+ new Class[] { HRegionFileSystem.class }, new Object[] { this });
+ } catch (Exception e) {
+ LOG.error("Unable to create write strategy: {}", className, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static class DefaultWriteStrategy extends HRegionFileSystemWriteStrategy {
+
+ public DefaultWriteStrategy(HRegionFileSystem fileSystem){
+ super(fileSystem);
+ }
+
+ /**
+ * Constructs a Path for the split dir as follows:
+ * "/hbase/data/NS/TABLE/PARENT_REGION_DIR/.splits/"
+ * @return the temporary parent path for the split dir
+ */
+ @Override
+ public Path getParentSplitsDir() {
+ return new Path(fileSystem.getRegionDir(), REGION_SPLITS_DIR);
+ }
+
+ /**
+ * Constructs a Path for the merged dir as follows:
+ * "/hbase/data/NS/TABLE/PARENT_REGION_DIR/.merges/"
+ * @return the temporary parent path for the merges dir.
+ */
+ @Override
+ public Path getParentMergesDir() {
+ return new Path(fileSystem.getRegionDir(), REGION_MERGES_DIR);
+ }
+
+ /**
+ * Creates the region splits directory. Assumes getSplitsDir implementation returns a tmp dir,
+ * therefore, it deletes any existing directory returned by getSplitsDir.
+ *
+ * @param daughterA the first half of the split region
+ * @param daughterB the second half of the split region
+ *
+ * @throws IOException if splits dir creation fails.
+ */
+ @Override
+ public void createSplitsDir(RegionInfo daughterA, RegionInfo daughterB) throws IOException {
+ Path splitdir = getParentSplitsDir();
+ if (this.fileSystem.fs.exists(splitdir)) {
+ LOG.info("The {} directory exists. Hence deleting it to recreate it.", splitdir);
+ if (!this.fileSystem.deleteDir(splitdir)) {
+ throw new IOException("Failed deletion of " + splitdir + " before creating them again.");
+ }
+ }
+ // splitDir doesn't exists now. No need to do an exists() call for it.
+ if (!this.fileSystem.createDir(splitdir)) {
+ throw new IOException("Failed create of " + splitdir);
+ }
+ Path daughterATmpDir = getSplitsDir(daughterA);
+ if (!this.fileSystem.createDir(daughterATmpDir)) {
+ throw new IOException("Failed create of " + daughterATmpDir);
+ }
+ Path daughterBTmpDir = getSplitsDir(daughterB);
+ if (!this.fileSystem.createDir(daughterBTmpDir)) {
+ throw new IOException("Failed create of " + daughterBTmpDir);
+ }
+ }
+
+ /**
+ * Create the region merges directory. Assumes getMergesDir returns a temporary directory,
+ * therefore, it deletes any existing directory returned by getMergesDir.
+ * @throws IOException If merges dir creation fails.
+ * @see HRegionFileSystem#cleanupMergesDir()
+ */
+ @Override
+ public void createMergesDir() throws IOException {
+ Path mergesdir = getParentMergesDir();
+ if (fileSystem.fs.exists(mergesdir)) {
+ LOG.info("{} directory exists. Deleting it to recreate it anew", mergesdir);
+ if (!fileSystem.fs.delete(mergesdir, true)) {
+ throw new IOException("Failed deletion of " + mergesdir + " before recreate.");
+ }
+ }
+ if (!mkdirs(fileSystem.fs, fileSystem.conf, mergesdir)) {
+ throw new IOException("Failed create of " + mergesdir);
+ }
+ }
+
+ /**
+ * Completes the daughter region creation. This implementation assumes daughter region has been
+ * initially created under a tmp dir, therefore, it performs a rename of this daughter dir to
+ * the actual store dir.
+ * @param regionInfo the resulting daughter region to be committed.
+ * @return the final region path
+ * (after it has been moved from the tmp dir to the acutal store dir).
+ * @throws IOException if any errors occur.
+ */
+ @Override
+ public Path commitDaughterRegion(final RegionInfo regionInfo)
+ throws IOException {
+ Path regionDir = new Path(fileSystem.tableDir, regionInfo.getEncodedName());
+ Path daughterTmpDir = getSplitsDir(regionInfo);
+
+ if (fileSystem.fs.exists(daughterTmpDir)) {
+
+ // Write HRI to a file in case we need to recover hbase:meta
+ Path regionInfoFile = new Path(daughterTmpDir, REGION_INFO_FILE);
+ byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
+ writeRegionInfoFileContent(fileSystem.conf, fileSystem.fs,
+ regionInfoFile, regionInfoContent);
+
+ // Move the daughter temp dir to the table dir
+ if (!fileSystem.rename(daughterTmpDir, regionDir)) {
+ throw new IOException("Unable to rename " + daughterTmpDir + " to " + regionDir);
+ }
+ }
+ return regionDir;
+ }
+
+ /**
+ * Completes the merged region creation. This implementation assumes the resulting region has
+ * been initially created under a temp dir, so it renames the merged region temp dir to the
+ * actual store dir.
+ * @param mergedRegionInfo merged region {@link RegionInfo}
+ * @throws IOException if any errors occur.
+ */
+ @Override
+ public void commitMergedRegion(final RegionInfo mergedRegionInfo) throws IOException {
+ Path regionDir = new Path(this.fileSystem.tableDir, mergedRegionInfo.getEncodedName());
+ Path mergedRegionTmpDir = this.getMergesDir(mergedRegionInfo);
+ // Move the tmp dir to the expected location
+ if (mergedRegionTmpDir != null && this.fileSystem.fs.exists(mergedRegionTmpDir)) {
+
+ // Write HRI to a file in case we need to recover hbase:meta
+ Path regionInfoFile = new Path(mergedRegionTmpDir, REGION_INFO_FILE);
+ byte[] regionInfoContent = getRegionInfoFileContent(this.fileSystem.regionInfo);
+ writeRegionInfoFileContent(fileSystem.conf, fileSystem.fs, regionInfoFile,
+ regionInfoContent);
+
+ if (!this.fileSystem.fs.rename(mergedRegionTmpDir, regionDir)) {
+ throw new IOException("Unable to rename " + mergedRegionTmpDir + " to "
+ + regionDir);
+ }
+ }
+ }
+
+ @Override
+ public void cleanupSplitsDir() throws IOException {
+ fileSystem.deleteDir(fileSystem.getSplitsDir());
+ }
+
+ @Override
+ public void cleanupMergesDir() throws IOException {
+ fileSystem.deleteDir(fileSystem.getMergesDir());
+ }
+
+ @Override
+ public void cleanupAnySplitDetritus() throws IOException {
+ Path splitdir = fileSystem.getSplitsDir();
+ if (!fileSystem.fs.exists(splitdir)) return;
+ // Look at the splitdir. It could have the encoded names of the daughter
+ // regions we tried to make. See if the daughter regions actually got made
+ // out under the tabledir. If here under splitdir still, then the split did
+ // not complete. Try and do cleanup. This code WILL NOT catch the case
+ // where we successfully created daughter a but regionserver crashed during
+ // the creation of region b. In this case, there'll be an orphan daughter
+ // dir in the filesystem. TOOD: Fix.
+ FileStatus[] daughters = CommonFSUtils.listStatus(fileSystem.fs, splitdir,
+ new FSUtils.DirFilter(fileSystem.fs));
+ if (daughters != null) {
+ for (FileStatus daughter: daughters) {
+ Path daughterDir = new Path(fileSystem.getTableDir(), daughter.getPath().getName());
+ LOG.info(">>>> DELETING daugher dir: {}", daughterDir);
+ if (fileSystem.fs.exists(daughterDir) && !fileSystem.deleteDir(daughterDir)) {
+ throw new IOException("Failed delete of " + daughterDir);
+ }
+ }
+ }
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystemWriteStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystemWriteStrategy.java
new file mode 100644
index 000000000000..d2ffbccc280a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystemWriteStrategy.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * HRegionFileSystem write strategy to decouple splits/merge create dir and commit logic
+ * from HRegionFileSystem, allowing for a plugable behaviour.
+ */
+@InterfaceAudience.Private
+public abstract class HRegionFileSystemWriteStrategy {
+
+ protected HRegionFileSystem fileSystem;
+
+ public HRegionFileSystemWriteStrategy(HRegionFileSystem fileSystem){
+ this.fileSystem = fileSystem;
+ }
+
+ /**
+ * Returns the directory Path for the region split resulting daughter.
+ * @param hri for the split resulting daughter region.
+ * @return a path under tmp dir for the resulting daughter region.
+ */
+ public Path getSplitsDir(final RegionInfo hri) {
+ return new Path(getParentSplitsDir(), hri.getEncodedName());
+ }
+
+ /**
+ * Defines the parent dir for the split dir.
+ * @return
+ */
+ public abstract Path getParentSplitsDir();
+
+ /**
+ * Defines the parent dir for the merges dir.
+ * @return
+ */
+ public abstract Path getParentMergesDir();
+
+ /**
+ * Returns the directory Path for the resulting merged region.
+ * @param hri for the resulting merged region.
+ * @return a path under tmp dir for the resulting merged region.
+ */
+ public Path getMergesDir(final RegionInfo hri) {
+ return new Path(getParentMergesDir(), hri.getEncodedName());
+ }
+
+ /**
+ * Creates the region splits directory.
+ *
+ * @param daughterA the first half of the split region
+ * @param daughterB the second half of the split region
+ *
+ * @throws IOException if splits dir creation fails.
+ */
+ public abstract void createSplitsDir(RegionInfo daughterA, RegionInfo daughterB)
+ throws IOException;
+
+ /**
+ * Create the region merges directory.
+ * @throws IOException If merges dir creation fails.
+ * @see HRegionFileSystem#cleanupMergesDir()
+ */
+ public abstract void createMergesDir() throws IOException;
+
+ /**
+ * Completes the daughter region creation.
+ * @param regionInfo the resulting daughter region to be committed.
+ * @return the region path.
+ * @throws IOException if any errors occur.
+ */
+ public abstract Path commitDaughterRegion(final RegionInfo regionInfo) throws IOException;
+
+ /**
+ * Completes the merged region creation.
+ * @param mergedRegionInfo merged region {@link RegionInfo}
+ * @throws IOException if any errors occur.
+ */
+ public abstract void commitMergedRegion(final RegionInfo mergedRegionInfo) throws IOException;
+
+ /**
+ * Write out a split reference.
+ * @param parentRegionInfo {@link RegionInfo} of the parent split region.
+ * @param daughterRegionInfo {@link RegionInfo} of the resulting split region.
+ * @param familyName Column Family Name
+ * @param f File to split.
+ * @param splitRow Split Row
+ * @param top True if we are referring to the top half of the hfile.
+ * @param splitPolicy A split policy instance; be careful! May not be full populated; e.g. if
+ * this method is invoked on the Master side, then the RegionSplitPolicy will
+ * NOT have a reference to a Region.
+ * @param fs FileSystem instance for creating the actual reference file.
+ * @return Path to created reference.
+ * @throws IOException if the split reference write fails.
+ */
+ public Path splitStoreFile(RegionInfo parentRegionInfo, RegionInfo daughterRegionInfo,
+ String familyName, HStoreFile f, byte[] splitRow,
+ boolean top, RegionSplitPolicy splitPolicy, FileSystem fs) throws IOException {
+ if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
+ // Check whether the split row lies in the range of the store file
+ // If it is outside the range, return directly.
+ f.initReader();
+ try {
+ if (top) {
+ //check if larger than last key.
+ Cell splitKey = PrivateCellUtil.createFirstOnRow(splitRow);
+ Optional lastKey = f.getLastKey();
+ // If lastKey is null means storefile is empty.
+ if (!lastKey.isPresent()) {
+ return null;
+ }
+ if (f.getComparator().compare(splitKey, lastKey.get()) > 0) {
+ return null;
+ }
+ } else {
+ //check if smaller than first key
+ Cell splitKey = PrivateCellUtil.createLastOnRow(splitRow);
+ Optional firstKey = f.getFirstKey();
+ // If firstKey is null means storefile is empty.
+ if (!firstKey.isPresent()) {
+ return null;
+ }
+ if (f.getComparator().compare(splitKey, firstKey.get()) < 0) {
+ return null;
+ }
+ }
+ } finally {
+ f.closeStoreFile(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true);
+ }
+ }
+
+ Path splitDir = new Path(getSplitsDir(daughterRegionInfo), familyName);
+ // A reference to the bottom half of the hsf store file.
+ Reference r =
+ top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
+ // Add the referred-to regions name as a dot separated suffix.
+ // See REF_NAME_REGEX regex above. The referred-to regions name is
+ // up in the path of the passed in f -- parentdir is family,
+ // then the directory above is the region name.
+ String parentRegionName = parentRegionInfo.getEncodedName();
+ // Write reference with same file id only with the other region name as
+ // suffix and into the new region location (under same family).
+ Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
+ return r.write(fs, p);
+ }
+
+ /**
+ * Write out a merge reference under the given merges directory. Package local
+ * so it doesnt leak out of regionserver.
+ * @param regionToMerge {@link RegionInfo} for one of the regions being merged.
+ * @param mergedRegion {@link RegionInfo} of the merged region
+ * @param familyName Column Family Name
+ * @param f File to create reference.
+ * @param mergedDir
+ * @param fs FileSystem instance for creating the actual reference file.
+ * @return Path to created reference.
+ * @throws IOException if the merge write fails.
+ */
+ public Path mergeStoreFile(RegionInfo regionToMerge, RegionInfo mergedRegion, String familyName, HStoreFile f,
+ Path mergedDir, FileSystem fs) throws IOException {
+ Path referenceDir = new Path(new Path(mergedDir,
+ mergedRegion.getEncodedName()), familyName);
+ // A whole reference to the store file.
+ Reference r = Reference.createTopReference(regionToMerge.getStartKey());
+ // Add the referred-to regions name as a dot separated suffix.
+ // See REF_NAME_REGEX regex above. The referred-to regions name is
+ // up in the path of the passed in f -- parentdir is family,
+ // then the directory above is the region name.
+ String mergingRegionName = regionToMerge.getEncodedName();
+ // Write reference with same file id only with the other region name as
+ // suffix and into the new region location (under same family).
+ Path p = new Path(referenceDir, f.getPath().getName() + "."
+ + mergingRegionName);
+ return r.write(fs, p);
+ }
+
+ /**
+ * Deletes left over splits dirs.
+ * @throws IOException
+ */
+ public abstract void cleanupSplitsDir() throws IOException;
+
+ /**
+ * Deletes left over merges dirs.
+ * @throws IOException
+ */
+ public abstract void cleanupMergesDir() throws IOException;
+
+ /**
+ * Deletes any remaining of improperly finished splits.
+ * @throws IOException
+ */
+ public abstract void cleanupAnySplitDetritus() throws IOException;
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathUpdate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathUpdate.java
index 472be43a85d9..4defbda1fb56 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathUpdate.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathUpdate.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import com.google.errorprone.annotations.RestrictedApi;
import java.util.Collection;
import java.util.List;
import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -78,8 +77,6 @@ Builder withStoreFiles(Collection storeFiles) {
return this;
}
- @RestrictedApi(explanation = "Should only be called in tests", link = "",
- allowedOnPath = ".*/src/test/.*")
Builder withStorePaths(List storeFiles) {
this.storeFiles = storeFiles;
return this;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFSWriteStratefy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFSWriteStratefy.java
new file mode 100644
index 000000000000..efe9e4919396
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFSWriteStratefy.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Test for DirectStoreFSWriteStratefy
+ */
+@Category({ RegionServerTests.class, LargeTests.class})
+public class TestDirectStoreFSWriteStratefy {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestDirectStoreFSWriteStratefy.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private static final String CF_NAME = "cf";
+
+ private final byte[] cf = Bytes.toBytes(CF_NAME);
+
+ private HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private TableName table;
+
+ @Before
+ public void setup() throws Exception {
+ UTIL.startMiniCluster();
+ table = TableName.valueOf(name.getMethodName());
+ UTIL.createTable(table, cf);
+
+ }
+
+ @After
+ public void shutdown() throws IOException {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testGetParentSplitsDir() throws Exception {
+ HRegionFileSystem regionFS = UTIL.getHBaseCluster().getRegions(table).get(0).
+ getStores().get(0).getRegionFileSystem();
+ DirectStoreFSWriteStrategy writeStrategy = new DirectStoreFSWriteStrategy(regionFS);
+ Path parentDir = writeStrategy.getParentSplitsDir();
+ assertEquals(regionFS.getTableDir(), parentDir);
+ }
+
+ @Test
+ public void testGetParentMergesDir() throws Exception {
+ HRegionFileSystem regionFS = UTIL.getHBaseCluster().getRegions(table).get(0).
+ getStores().get(0).getRegionFileSystem();
+ DirectStoreFSWriteStrategy writeStrategy = new DirectStoreFSWriteStrategy(regionFS);
+ Path parentDir = writeStrategy.getParentMergesDir();
+ assertEquals(regionFS.getTableDir(), parentDir);
+ }
+
+ @Test
+ public void testCreateSplitsDir() throws Exception {
+ HRegion region = UTIL.getHBaseCluster().getRegions(table).get(0);
+ HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
+ RegionInfo daughterA = RegionInfoBuilder.newBuilder(table)
+ .setStartKey(region.getRegionInfo().getStartKey())
+ .setEndKey(Bytes.toBytes("bbbb"))
+ .setSplit(false)
+ .setRegionId(region.getRegionInfo().getRegionId())
+ .build();
+ RegionInfo daughterB = RegionInfoBuilder.newBuilder(table)
+ .setStartKey(Bytes.toBytes("bbbb"))
+ .setEndKey(region.getRegionInfo().getEndKey())
+ .setSplit(false)
+ .setRegionId(region.getRegionInfo().getRegionId())
+ .build();
+ DirectStoreFSWriteStrategy writeStrategy = new DirectStoreFSWriteStrategy(regionFS);
+ writeStrategy.createSplitsDir(daughterA, daughterB);
+
+ FileSystem fileSystem = region.getFilesystem();
+ Path daughterAPath = new Path(region.getRegionFileSystem().getTableDir(), daughterA.getEncodedName());
+ assertTrue(fileSystem.exists(daughterAPath));
+ Path daughterBPath = new Path(region.getRegionFileSystem().getTableDir(), daughterB.getEncodedName());
+ assertTrue(fileSystem.exists(daughterBPath));
+ }
+
+ @Test
+ public void testCreateMergesDir() throws Exception {
+ HRegionFileSystem regionFS = UTIL.getHBaseCluster().getRegions(table).get(0).
+ getStores().get(0).getRegionFileSystem();
+ DirectStoreFSWriteStrategy writeStrategy = new DirectStoreFSWriteStrategy(regionFS);
+ long cTime = regionFS.getFileSystem().
+ getFileLinkStatus(regionFS.getTableDir()).getModificationTime();
+ writeStrategy.createMergesDir();
+ //assert the table dir has not been re-written
+ assertEquals(cTime, regionFS.getFileSystem().
+ getFileLinkStatus(regionFS.getTableDir()).getModificationTime());
+ }
+
+ @Test(expected = IOException.class)
+ public void testCreateMergesDirTableDirMissing() throws Exception {
+ HRegionFileSystem regionFS = UTIL.getHBaseCluster().getRegions(table).get(0).
+ getStores().get(0).getRegionFileSystem();
+ DirectStoreFSWriteStrategy writeStrategy = new DirectStoreFSWriteStrategy(regionFS);
+ regionFS.getFileSystem().delete(regionFS.getTableDir(), true);
+ writeStrategy.createMergesDir();
+ }
+
+ @Test
+ public void testSplitStoreDir() throws Exception {
+ //first put some data in order to have a store file created
+ putThreeRowsAndFlush(table);
+ HRegion region = UTIL.getHBaseCluster().getRegions(table).get(0);
+ HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
+ DirectStoreFSWriteStrategy writeStrategy = new DirectStoreFSWriteStrategy(regionFS);
+ RegionInfo daughterA = RegionInfoBuilder.newBuilder(table)
+ .setStartKey(region.getRegionInfo().getStartKey())
+ .setEndKey(Bytes.toBytes("002"))
+ .setSplit(false)
+ .setRegionId(region.getRegionInfo().getRegionId() + EnvironmentEdgeManager.currentTime())
+ .build();
+ HStoreFile file = (HStoreFile) region.getStore(cf).getStorefiles().toArray()[0];
+ Path result = writeStrategy.splitStoreFile(region.getRegionInfo(), daughterA, CF_NAME,
+ file, Bytes.toBytes("002"), false, region.getSplitPolicy(), regionFS.fs);
+ //asserts the reference file naming is correct
+ validateResultingFile(region.getRegionInfo().getEncodedName(), result);
+ //Additionally check if split region dir was created directly under table dir, not on .tmp
+ Path resultGreatGrandParent = result.getParent().getParent().getParent();
+ assertEquals(regionFS.getTableDir().getName(), resultGreatGrandParent.getName());
+ }
+
+ @Test
+ public void testMergeStoreFile() throws IOException {
+ //splitting the table first
+ UTIL.getAdmin().split(table, Bytes.toBytes("002"));
+ //Add data and flush to create files in the two different regions
+ putThreeRowsAndFlush(table);
+ List regions = UTIL.getHBaseCluster().getRegions(table);
+ HRegion first = regions.get(0);
+ HRegion second = regions.get(1);
+ HRegionFileSystem regionFS = first.getRegionFileSystem();
+
+ RegionInfo mergeResult = RegionInfoBuilder.newBuilder(table)
+ .setStartKey(first.getRegionInfo().getStartKey())
+ .setEndKey(second.getRegionInfo().getEndKey())
+ .setSplit(false)
+ .setRegionId(first.getRegionInfo().getRegionId() + EnvironmentEdgeManager.currentTime())
+ .build();
+
+ HRegionFileSystem mergeRegionFs = HRegionFileSystem.createRegionOnFileSystem(
+ UTIL.getHBaseCluster().getMaster().getConfiguration(),
+ regionFS.getFileSystem(), regionFS.getTableDir(), mergeResult);
+
+ DirectStoreFSWriteStrategy writeStrategy = new DirectStoreFSWriteStrategy(mergeRegionFs);
+
+ //merge file from first region
+ HStoreFile file = (HStoreFile) first.getStore(cf).getStorefiles().toArray()[0];
+ mergeFileFromRegion(writeStrategy, first, mergeResult, file);
+ //merge file from second region
+ file = (HStoreFile) second.getStore(cf).getStorefiles().toArray()[0];
+ mergeFileFromRegion(writeStrategy, second, mergeResult, file);
+ }
+
+ @Test
+ public void testCommitDaughterRegionNoFiles() throws IOException {
+ HRegion region = UTIL.getHBaseCluster().getRegions(table).get(0);
+ HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
+ DirectStoreFSWriteStrategy writeStrategy = new DirectStoreFSWriteStrategy(regionFS);
+ RegionInfo daughterA = RegionInfoBuilder.newBuilder(table)
+ .setStartKey(region.getRegionInfo().getStartKey())
+ .setEndKey(Bytes.toBytes("002"))
+ .setSplit(false)
+ .setRegionId(region.getRegionInfo().getRegionId() + EnvironmentEdgeManager.currentTime())
+ .build();
+ Path splitDir = writeStrategy.getSplitsDir(daughterA);
+ StoreFileTrackingUtils.init(UTIL.getHBaseCluster().getMaster());
+ Path result = writeStrategy.commitDaughterRegion(daughterA);
+ assertEquals(splitDir, result);
+ StoreFilePathAccessor accessor = StoreFileTrackingUtils.
+ createStoreFilePathAccessor(UTIL.getConfiguration(), UTIL.getConnection());
+ List filePaths = accessor.getIncludedStoreFilePaths(table.getNameAsString(),
+ daughterA.getRegionNameAsString(), CF_NAME);
+ //should have not listed any file
+ assertEquals(0, filePaths.size());
+ }
+
+ @Test
+ public void testCommitDaughterRegionWithFiles() throws IOException {
+ //first put some data in order to have a store file created
+ putThreeRowsAndFlush(table);
+ HRegion region = UTIL.getHBaseCluster().getRegions(table).get(0);
+ HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
+ DirectStoreFSWriteStrategy writeStrategy = new DirectStoreFSWriteStrategy(regionFS);
+ RegionInfo daughterA = RegionInfoBuilder.newBuilder(table)
+ .setStartKey(region.getRegionInfo().getStartKey())
+ .setEndKey(Bytes.toBytes("002"))
+ .setSplit(false)
+ .setRegionId(region.getRegionInfo().getRegionId() + EnvironmentEdgeManager.currentTime())
+ .build();
+ RegionInfo daughterB = RegionInfoBuilder.newBuilder(table)
+ .setStartKey(Bytes.toBytes("002"))
+ .setEndKey(region.getRegionInfo().getEndKey())
+ .setSplit(false)
+ .setRegionId(region.getRegionInfo().getRegionId())
+ .build();
+ Path splitDirA = writeStrategy.getSplitsDir(daughterA);
+ Path splitDirB = writeStrategy.getSplitsDir(daughterB);
+ StoreFileTrackingUtils.init(UTIL.getHBaseCluster().getMaster());
+ HStoreFile file = (HStoreFile) region.getStore(cf).getStorefiles().toArray()[0];
+ writeStrategy.splitStoreFile(region.getRegionInfo(), daughterA, CF_NAME,
+ file, Bytes.toBytes("002"), false, region.getSplitPolicy(), regionFS.fs);
+ writeStrategy.splitStoreFile(region.getRegionInfo(), daughterB, CF_NAME,
+ file, Bytes.toBytes("002"), true, region.getSplitPolicy(), regionFS.fs);
+ Path resultA = writeStrategy.commitDaughterRegion(daughterA);
+ Path resultB = writeStrategy.commitDaughterRegion(daughterB);
+ assertEquals(splitDirA, resultA);
+ assertEquals(splitDirB, resultB);
+ StoreFilePathAccessor accessor = StoreFileTrackingUtils.
+ createStoreFilePathAccessor(UTIL.getConfiguration(), UTIL.getConnection());
+ List filePathsA = accessor.getIncludedStoreFilePaths(table.getNameAsString(),
+ daughterA.getRegionNameAsString(), CF_NAME);
+ assertEquals(1, filePathsA.size());
+ List filePathsB = accessor.getIncludedStoreFilePaths(table.getNameAsString(),
+ daughterB.getRegionNameAsString(), CF_NAME);
+ assertEquals(1, filePathsB.size());
+ }
+
+ @Test
+ public void testCommitMergedRegion() throws IOException {
+ //splitting the table first
+ UTIL.getAdmin().split(table, Bytes.toBytes("002"));
+ //Add data and flush to create files in the two different regions
+ putThreeRowsAndFlush(table);
+ List regions = UTIL.getHBaseCluster().getRegions(table);
+ HRegion first = regions.get(0);
+ HRegion second = regions.get(1);
+ HRegionFileSystem regionFS = first.getRegionFileSystem();
+
+ RegionInfo mergeResult = RegionInfoBuilder.newBuilder(table)
+ .setStartKey(first.getRegionInfo().getStartKey())
+ .setEndKey(second.getRegionInfo().getEndKey())
+ .setSplit(false)
+ .setRegionId(first.getRegionInfo().getRegionId() + EnvironmentEdgeManager.currentTime())
+ .build();
+
+ HRegionFileSystem mergeRegionFs = HRegionFileSystem.createRegionOnFileSystem(
+ UTIL.getHBaseCluster().getMaster().getConfiguration(),
+ regionFS.getFileSystem(), regionFS.getTableDir(), mergeResult);
+
+ DirectStoreFSWriteStrategy writeStrategy = new DirectStoreFSWriteStrategy(mergeRegionFs);
+
+ //merge file from first region
+ HStoreFile file = (HStoreFile) first.getStore(cf).getStorefiles().toArray()[0];
+ StoreFileTrackingUtils.init(UTIL.getHBaseCluster().getMaster());
+ mergeFileFromRegion(writeStrategy, first, mergeResult, file);
+ //merge file from second region
+ file = (HStoreFile) second.getStore(cf).getStorefiles().toArray()[0];
+ mergeFileFromRegion(writeStrategy, second, mergeResult, file);
+ writeStrategy.commitMergedRegion(mergeResult);
+ StoreFilePathAccessor accessor = StoreFileTrackingUtils.
+ createStoreFilePathAccessor(UTIL.getConfiguration(), UTIL.getConnection());
+ List filePaths = accessor.getIncludedStoreFilePaths(table.getNameAsString(),
+ mergeResult.getRegionNameAsString(), CF_NAME);
+ assertEquals(2, filePaths.size());
+
+ }
+
+ private void mergeFileFromRegion(DirectStoreFSWriteStrategy writeStrategy, HRegion regionToMerge,
+ RegionInfo resultingMerge, HStoreFile file) throws IOException {
+ Path mergedFile = writeStrategy.mergeStoreFile(resultingMerge, regionToMerge.getRegionInfo(),
+ CF_NAME, file, regionToMerge.getRegionFileSystem().getMergesDir(),
+ regionToMerge.getRegionFileSystem().getFileSystem());
+ validateResultingFile(regionToMerge.getRegionInfo().getEncodedName(), mergedFile);
+ }
+
+ private void validateResultingFile(String originalRegion, Path result){
+ assertEquals(originalRegion, result.getName().split("\\.")[1]);
+ //asserts we are under the cf directory
+ Path resultParent = result.getParent();
+ assertEquals(CF_NAME, resultParent.getName());
+ }
+
+ private void putThreeRowsAndFlush(TableName table) throws IOException {
+ Table tbl = UTIL.getConnection().getTable(table);
+ Put put = new Put(Bytes.toBytes("001"));
+ byte[] qualifier = Bytes.toBytes("1");
+ put.addColumn(cf, qualifier, Bytes.toBytes(1));
+ tbl.put(put);
+ put = new Put(Bytes.toBytes("002"));
+ put.addColumn(cf, qualifier, Bytes.toBytes(2));
+ tbl.put(put);
+ put = new Put(Bytes.toBytes("003"));
+ put.addColumn(cf, qualifier, Bytes.toBytes(2));
+ tbl.put(put);
+ UTIL.flush(table);
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreMergeRegionsStrategy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreMergeRegionsStrategy.java
new file mode 100644
index 000000000000..1a4628f0c42a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreMergeRegionsStrategy.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_WRITE_STRATEGY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.assignment.DirectStoreMergeRegionsStrategy;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+/**
+ * Test for DirectStoreMergeRegionStrategy
+ */
+@Category({ RegionServerTests.class, LargeTests.class})
+public class TestDirectStoreMergeRegionsStrategy {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestDirectStoreMergeRegionsStrategy.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private static final String CF_NAME = "cf";
+
+ private final byte[] cf = Bytes.toBytes(CF_NAME);
+
+ private HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private TableName table;
+
+ @Before
+ public void setup() throws Exception {
+ UTIL.startMiniCluster();
+ table = TableName.valueOf(name.getMethodName());
+ UTIL.createTable(table, cf);
+
+ }
+
+ @After
+ public void shutdown() throws IOException {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testCreateMergedRegion() throws IOException {
+ testMergeStrategyMethod( (p, r) -> {
+ HRegion regionToMerge = p.getFirst();
+ Path tableDir = regionToMerge.getRegionFileSystem().getTableDir();
+ Path mergeRegionDir = new Path(tableDir, r.getEncodedName());
+ mergeRegionDir = new Path(mergeRegionDir, CF_NAME);
+ try {
+ assertTrue(regionToMerge.getRegionFileSystem().getFileSystem().exists(mergeRegionDir));
+ assertEquals(2,
+ regionToMerge.getRegionFileSystem().getFileSystem().listStatus(mergeRegionDir).length);
+ } catch(Exception e){
+ fail(e.getMessage());
+ }
+ });
+ }
+
+ @Test
+ public void testCleanupMergedRegion() throws IOException {
+ final DirectStoreMergeRegionsStrategy mergeStrategy = new DirectStoreMergeRegionsStrategy();
+ testMergeStrategyMethod( (p, r) -> {
+ try {
+ mergeStrategy.cleanupMergedRegion(
+ UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().getEnvironment(),
+ new RegionInfo[]{p.getFirst().getRegionInfo(), p.getSecond().getRegionInfo()}, r);
+ Path tableDir = p.getFirst().getRegionFileSystem().getTableDir();
+ FileSystem fs = p.getFirst().getRegionFileSystem().getFileSystem();
+ assertFalse(fs.exists(new Path(tableDir, r.getEncodedName())));
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ });
+ }
+
+
+ private void testMergeStrategyMethod(BiConsumer, RegionInfo> validator)
+ throws IOException{
+ List regions = splitTableAndPutData();
+ HRegion first = regions.get(0);
+ HRegion second = regions.get(1);
+ DirectStoreMergeRegionsStrategy mergeStrategy = new DirectStoreMergeRegionsStrategy();
+ RegionInfo mergeResult = createMergeResult(first.getRegionInfo(), second.getRegionInfo());
+ mergeStrategy.createMergedRegion(
+ UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().getEnvironment(),
+ new RegionInfo[]{first.getRegionInfo(), second.getRegionInfo()}, mergeResult);
+ validator.accept(new Pair<>(first, second), mergeResult);
+
+ }
+
+ private List splitTableAndPutData() throws IOException{
+ //splitting the table first
+ UTIL.getAdmin().split(table, Bytes.toBytes("002"));
+ //Add data and flush to create files in the two different regions
+ putThreeRowsAndFlush(table);
+ StoreFileTrackingUtils.init(UTIL.getHBaseCluster().getMaster());
+ UTIL.getHBaseCluster().getMaster().getConfiguration().set(REGION_WRITE_STRATEGY,
+ DirectStoreFSWriteStrategy.class.getName());
+ return UTIL.getHBaseCluster().getRegions(table);
+ }
+
+ private RegionInfo createMergeResult(RegionInfo first, RegionInfo second) {
+ return RegionInfoBuilder.newBuilder(table)
+ .setStartKey(first.getStartKey())
+ .setEndKey(second.getEndKey())
+ .setSplit(false)
+ .setRegionId(first.getRegionId() + EnvironmentEdgeManager.currentTime())
+ .build();
+ }
+
+ private void putThreeRowsAndFlush(TableName table) throws IOException {
+ Table tbl = UTIL.getConnection().getTable(table);
+ Put put = new Put(Bytes.toBytes("001"));
+ byte[] qualifier = Bytes.toBytes("1");
+ put.addColumn(cf, qualifier, Bytes.toBytes(1));
+ tbl.put(put);
+ put = new Put(Bytes.toBytes("002"));
+ put.addColumn(cf, qualifier, Bytes.toBytes(2));
+ tbl.put(put);
+ put = new Put(Bytes.toBytes("003"));
+ put.addColumn(cf, qualifier, Bytes.toBytes(2));
+ tbl.put(put);
+ UTIL.flush(table);
+ }
+
+}
| | | |