From dfbe19c0f4b18be0156b017c09035cd83c2efa33 Mon Sep 17 00:00:00 2001 From: Prathyusha Garre Date: Tue, 31 Oct 2023 11:54:54 +0530 Subject: [PATCH] Create ref files using threadpool as part of merge --- .../MergeTableRegionsProcedure.java | 52 ++++++++++-- .../assignment/SplitTableRegionProcedure.java | 41 +++------- .../apache/hadoop/hbase/util/ThreadUtil.java | 82 +++++++++++++++++++ 3 files changed, 135 insertions(+), 40 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadUtil.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java index 813caa47d339..b17c08cc836d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java @@ -23,10 +23,15 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaMutationAnnotation; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ServerName; @@ -53,6 +58,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.quotas.QuotaExceededException; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.regionserver.StoreUtils; @@ -60,11 +66,15 @@ import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.ThreadUtil; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; @@ -573,15 +583,35 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable()); final FileSystem fs = mfs.getFileSystem(); - List mergedFiles = new ArrayList<>(); HRegionFileSystem mergeRegionFs = HRegionFileSystem .createRegionOnFileSystem(env.getMasterConfiguration(), fs, tableDir, mergedRegion); + Configuration conf = env.getMasterConfiguration(); + int numOfThreads = conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, + conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT)); + List mergedFiles = new ArrayList(); + final ExecutorService threadPool = Executors.newFixedThreadPool(numOfThreads, + new ThreadFactoryBuilder().setNameFormat("StoreFileMerge-pool-%d").setDaemon(true) + .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); + final List> futures = new ArrayList>(); for (RegionInfo ri : this.regionsToMerge) { HRegionFileSystem regionFs = HRegionFileSystem .openRegionFromFileSystem(env.getMasterConfiguration(), fs, tableDir, ri, false); - mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion)); + mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion, threadPool, futures); } + // Shutdown the pool + threadPool.shutdown(); + + // Wait for all the tasks to finish. + // When splits ran on the RegionServer, how-long-to-wait-configuration was named + // hbase.regionserver.fileSplitTimeout. If set, use its value. + long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout", + conf.getLong("hbase.regionserver.fileSplitTimeout", 600000)); + ThreadUtil.waitOnShutdown(threadPool, fileSplitTimeout, + "Took too long to merge the files and create the references, aborting merge"); + + List paths = ThreadUtil.getAllResults(futures); + mergedFiles.addAll(paths); assert mergeRegionFs != null; mergeRegionFs.commitMergedRegion(mergedFiles, env); @@ -590,11 +620,11 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException .setState(State.MERGING_NEW); } - private List mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs, - HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException { + private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs, + HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion, ExecutorService threadPool, + List> futures) throws IOException { final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(mergedRegion.getTable()); - List mergedFiles = new ArrayList<>(); for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { String family = hcd.getNameAsString(); StoreFileTracker tracker = @@ -611,13 +641,17 @@ private List mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem reg // is running in a regionserver's Store context, or we might not be able // to read the hfiles. storeFileInfo.setConf(storeConfiguration); - Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family, - new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED)); - mergedFiles.add(refFile); + futures.add(threadPool.submit(new Callable() { + @Override + public Path call() throws Exception { + // TODO Auto-generated method stub + return mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family, + new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED)); + } + })); } } } - return mergedFiles; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java index a0118cbd7b05..be6fa6e833f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master.assignment; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -27,11 +26,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -72,6 +69,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ThreadUtil; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.hadoop.util.ReflectionUtils; @@ -740,37 +738,18 @@ private Pair, List> splitStoreFiles(final MasterProcedureEnv en // hbase.regionserver.fileSplitTimeout. If set, use its value. long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout", conf.getLong("hbase.regionserver.fileSplitTimeout", 600000)); - try { - boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS); - if (stillRunning) { - threadPool.shutdownNow(); - // wait for the thread to shutdown completely. - while (!threadPool.isTerminated()) { - Thread.sleep(50); - } - throw new IOException( - "Took too long to split the" + " files and create the references, aborting split"); - } - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - + ThreadUtil.waitOnShutdown(threadPool, fileSplitTimeout, + "Took too long to split the files and create the references, aborting split"); List daughterA = new ArrayList<>(); List daughterB = new ArrayList<>(); // Look for any exception - for (Future> future : futures) { - try { - Pair p = future.get(); - if (p.getFirst() != null) { - daughterA.add(p.getFirst()); - } - if (p.getSecond() != null) { - daughterB.add(p.getSecond()); - } - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } catch (ExecutionException e) { - throw new IOException(e); + List> paths = ThreadUtil.getAllResults(futures); + for (Pair p : paths) { + if (p.getFirst() != null) { + daughterA.add(p.getFirst()); + } + if (p.getSecond() != null) { + daughterB.add(p.getSecond()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadUtil.java new file mode 100644 index 000000000000..2eeec27d0f28 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadUtil.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Public +public class ThreadUtil { + + /** + * Waits if necessary for the computation to complete, and then retrieves the results of all + * future objects. + * @param list of future objects for which the results will be retrieved + * @return list of computed results + * @throws InterruptedException if the current thread was interrupted while waiting + */ + public static List getAllResults(List> futures) throws IOException { + List results = new ArrayList(); + for (Future future : futures) { + try { + T t = future.get(); + if (t != null) { + results.add(t); + } + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } catch (ExecutionException e) { + throw new IOException(e); + } + } + return results; + } + + /** + * Blocks until all tasks have completed execution after a shutdown request, or the timeout + * occurs, or the current thread is interrupted, whichever happens first. + * @param timeoutInMillis the maximum time to wait + * @return {@code true} if this executor terminated and {@code false} if the timeout elapsed + * before termination + * @throws InterruptedException if interrupted while waiting + */ + public static void waitOnShutdown(ExecutorService threadPool, long timeoutInMillis, + String errMessage) throws IOException { + try { + boolean stillRunning = !threadPool.awaitTermination(timeoutInMillis, TimeUnit.MILLISECONDS); + if (stillRunning) { + threadPool.shutdownNow(); + // wait for the thread to shutdown completely. + while (!threadPool.isTerminated()) { + Thread.sleep(50); + } + throw new IOException(errMessage); + } + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + } + +}