From 6ecda73ea4c2d325776467ba03b1116c6eb63659 Mon Sep 17 00:00:00 2001 From: Zheng Wang <18031031@qq.com> Date: Fri, 31 Jul 2020 15:05:37 +0800 Subject: [PATCH 1/2] HBASE-24694 Support flush a single column family of table --- .../org/apache/hadoop/hbase/client/Admin.java | 10 ++++++++ .../hadoop/hbase/client/AsyncAdmin.java | 8 ++++++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 5 ++++ .../hadoop/hbase/client/HBaseAdmin.java | 11 +++++++- .../hbase/client/RawAsyncHBaseAdmin.java | 12 +++++++-- .../org/apache/hadoop/hbase/HConstants.java | 3 +++ .../flush/FlushTableSubprocedure.java | 25 +++++++++++++++---- .../MasterFlushTableProcedureManager.java | 12 ++++++++- ...egionServerFlushTableProcedureManager.java | 19 +++++++++++--- .../hbase/client/TestFlushFromClient.java | 19 ++++++++++++++ hbase-shell/src/main/ruby/hbase/admin.rb | 6 ++++- .../src/main/ruby/shell/commands/flush.rb | 4 ++- .../hbase/thrift2/client/ThriftAdmin.java | 5 ++++ 13 files changed, 125 insertions(+), 14 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index d5483eb6a9c8..b8153a97ae4e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -882,6 +882,16 @@ boolean closeRegionWithEncodedRegionName(String encodedRegionName, String server */ void flush(TableName tableName) throws IOException; + /** + * Flush the specified column family stores on all regions of the passed table. + * This runs as a synchronous operation. + * + * @param tableName table to flush + * @param columnFamily column family within a table + * @throws IOException if a remote or network exception occurs + */ + void flush(TableName tableName, byte[] columnFamily) throws IOException; + /** * Flush an individual region. Synchronous operation. * diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index b272e756e153..87e1df9c6aaf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -306,6 +306,14 @@ CompletableFuture modifyColumnFamily(TableName tableName, */ CompletableFuture flush(TableName tableName); + /** + * Flush the specified column family stores on all regions of the passed table. + * This runs as a synchronous operation. + * @param tableName table to flush + * @param columnFamily column family within a table + */ + CompletableFuture flush(TableName tableName, byte[] columnFamily); + /** * Flush an individual region. * @param regionName region to flush diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index c004d9fa8d1e..376c5dcba903 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -244,6 +244,11 @@ public CompletableFuture flush(TableName tableName) { return wrap(rawAdmin.flush(tableName)); } + @Override + public CompletableFuture flush(TableName tableName, byte[] columnFamily) { + return wrap(rawAdmin.flush(tableName, columnFamily)); + } + @Override public CompletableFuture flushRegion(byte[] regionName) { return wrap(rawAdmin.flushRegion(regionName)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index a24c17c97a66..ac7857885a4a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1151,12 +1151,21 @@ public List getOnlineRegions(final ServerName sn) throws IOExceptio @Override public void flush(final TableName tableName) throws IOException { + flush(tableName, null); + } + + @Override + public void flush(final TableName tableName, byte[] columnFamily) throws IOException { checkTableExists(tableName); if (isTableDisabled(tableName)) { LOG.info("Table is disabled: " + tableName.getNameAsString()); return; } - execProcedure("flush-table-proc", tableName.getNameAsString(), new HashMap<>()); + Map props = new HashMap<>(); + if (columnFamily != null) { + props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); + } + execProcedure("flush-table-proc", tableName.getNameAsString(), props); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index b25b287927d8..02cbcefaaffc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -891,9 +891,13 @@ public CompletableFuture> getRegions(TableName tableName) { locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); } } - @Override public CompletableFuture flush(TableName tableName) { + return flush(tableName, null); + } + + @Override + public CompletableFuture flush(TableName tableName, byte[] columnFamily) { CompletableFuture future = new CompletableFuture<>(); addListener(tableExists(tableName), (exists, err) -> { if (err != null) { @@ -907,8 +911,12 @@ public CompletableFuture flush(TableName tableName) { } else if (!tableEnabled) { future.completeExceptionally(new TableNotEnabledException(tableName)); } else { + Map props = new HashMap<>(); + if (columnFamily != null) { + props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); + } addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), - new HashMap<>()), (ret, err3) -> { + props), (ret, err3) -> { if (err3 != null) { future.completeExceptionally(err3); } else { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 16bee933664e..e50f30bb1891 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -616,6 +616,9 @@ public enum OperationStatusCode { */ public static final byte [] META_VERSION_QUALIFIER = Bytes.toBytes("v"); + /** The family str as a key in map*/ + public static final String FAMILY_KEY_STR = "family"; + /** * The current version of the meta table. * - pre-hbase 0.92. There is no META_VERSION column in the root table diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java index 5c005a75a9a2..d12403952417 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.procedure.flush; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; @@ -28,7 +29,9 @@ import org.apache.hadoop.hbase.procedure.ProcedureMember; import org.apache.hadoop.hbase.procedure.Subprocedure; import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; /** * This flush region implementation uses the distributed procedure framework to flush @@ -40,23 +43,27 @@ public class FlushTableSubprocedure extends Subprocedure { private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class); private final String table; + private final String family; private final List regions; private final FlushTableSubprocedurePool taskManager; public FlushTableSubprocedure(ProcedureMember member, ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, - List regions, String table, + List regions, String table, String family, FlushTableSubprocedurePool taskManager) { super(member, table, errorListener, wakeFrequency, timeout); this.table = table; + this.family = family; this.regions = regions; this.taskManager = taskManager; } private static class RegionFlushTask implements Callable { HRegion region; - RegionFlushTask(HRegion region) { + List families; + RegionFlushTask(HRegion region, List families) { this.region = region; + this.families = families; } @Override @@ -65,7 +72,11 @@ public Void call() throws Exception { region.startRegionOperation(); try { LOG.debug("Flush region " + region.toString() + " started..."); - region.flush(true); + if (families == null) { + region.flush(true); + } else { + region.flushcache(families, false, FlushLifeCycleTracker.DUMMY); + } // TODO: flush result is not checked? } finally { LOG.debug("Closing region operation on " + region); @@ -88,11 +99,15 @@ private void flushRegions() throws ForeignException { throw new IllegalStateException("Attempting to flush " + table + " but we currently have outstanding tasks"); } - + List families = null; + if (family != null) { + LOG.debug("About to flush family {} on all regions for table {}", family, table); + families = Arrays.asList(Bytes.toBytes(family)); + } // Add all hfiles already existing in region. for (HRegion region : regions) { // submit one task per region for parallelize by region. - taskManager.submitTask(new RegionFlushTask(region)); + taskManager.submitTask(new RegionFlushTask(region, families)); monitor.rethrowException(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java index 510fbcfd69bf..5e62c42bb06d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -51,6 +52,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @@ -149,11 +151,19 @@ public void execProcedure(ProcedureDescription desc) throws IOException { ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); + HBaseProtos.NameStringPair family = null; + for (HBaseProtos.NameStringPair nsp : desc.getConfigurationList()) { + if (HConstants.FAMILY_KEY_STR.equals(nsp.getName())) { + family = nsp; + } + } + byte[] procArgs = family != null ? family.toByteArray() : new byte[0]; + // Kick of the global procedure from the master coordinator to the region servers. // We rely on the existing Distributed Procedure framework to prevent any concurrent // procedure with the same name. Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), - new byte[0], Lists.newArrayList(regionServers)); + procArgs, Lists.newArrayList(regionServers)); monitor.rethrowException(); if (proc == null) { String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '" diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java index ddd667fc6aba..23ad4d167bdf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java @@ -51,6 +51,7 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; /** * This manager class handles flushing of the regions for table on a {@link HRegionServer}. @@ -129,9 +130,10 @@ public void stop(boolean force) throws IOException { * there is a possibility of a race where regions may be missed. * * @param table + * @param family * @return Subprocedure to submit to the ProcedureMemeber. */ - public Subprocedure buildSubprocedure(String table) { + public Subprocedure buildSubprocedure(String table, String family) { // don't run the subprocedure if the parent is stop(ping) if (rss.isStopping() || rss.isStopped()) { @@ -162,7 +164,7 @@ public Subprocedure buildSubprocedure(String table) { FlushTableSubprocedurePool taskManager = new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss); return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, - timeoutMillis, involvedRegions, table, taskManager); + timeoutMillis, involvedRegions, table, family, taskManager); } /** @@ -183,8 +185,19 @@ public class FlushTableSubprocedureBuilder implements SubprocedureFactory { @Override public Subprocedure buildSubprocedure(String name, byte[] data) { + String family = null; + // Currently we do not put other data except family, so it is ok to + // judge by length that if family was specified + if (data.length > 0) { + try { + HBaseProtos.NameStringPair nsp = HBaseProtos.NameStringPair.parseFrom(data); + family = nsp.getValue(); + } catch (Exception e) { + LOG.error("fail to get family by parsing from data", e); + } + } // The name of the procedure instance from the master is the table name. - return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name); + return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, family); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java index 308529616b65..cd496bea35cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java @@ -117,6 +117,16 @@ public void testFlushTable() throws Exception { } } + @Test + public void testFlushTableFamily() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize(); + admin.flush(tableName, FAMILY_1); + assertFalse(getRegionInfo().stream(). + anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2)); + } + } + @Test public void testAsyncFlushTable() throws Exception { AsyncAdmin admin = asyncConn.getAdmin(); @@ -124,6 +134,15 @@ public void testAsyncFlushTable() throws Exception { assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); } + @Test + public void testAsyncFlushTableFamily() throws Exception { + AsyncAdmin admin = asyncConn.getAdmin(); + long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize(); + admin.flush(tableName, FAMILY_1).get(); + assertFalse(getRegionInfo().stream(). + anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2)); + } + @Test public void testFlushRegion() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 2ce5588861f5..1849aad3d756 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -64,7 +64,11 @@ def flush(name, family = nil) rescue java.lang.IllegalArgumentException # Unknown region. Try table. begin - @admin.flush(TableName.valueOf(name)) + if family_bytes.nil? + @admin.flush(TableName.valueOf(name)) + else + @admin.flush(TableName.valueOf(name), family_bytes) + end rescue java.lang.IllegalArgumentException # Unknown table. Try region server. @admin.flushRegionServer(ServerName.valueOf(name)) diff --git a/hbase-shell/src/main/ruby/shell/commands/flush.rb b/hbase-shell/src/main/ruby/shell/commands/flush.rb index f34999c4eb86..69bcf1346724 100644 --- a/hbase-shell/src/main/ruby/shell/commands/flush.rb +++ b/hbase-shell/src/main/ruby/shell/commands/flush.rb @@ -25,10 +25,12 @@ def help Flush all regions in passed table or pass a region row to flush an individual region or a region server name whose format is 'host,port,startcode', to flush all its regions. -You can also flush a single column family within a region. +You can also flush a single column family for all regions within a table, +or for an specific region only. For example: hbase> flush 'TABLENAME' + hbase> flush 'TABLENAME','FAMILYNAME' hbase> flush 'REGIONNAME' hbase> flush 'REGIONNAME','FAMILYNAME' hbase> flush 'ENCODED_REGIONNAME' diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index 8d0b43c4fb54..1d0980b4611c 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -630,6 +630,11 @@ public void flush(TableName tableName) { } + @Override + public void flush(TableName tableName, byte[] columnFamily) { + throw new NotImplementedException("flush not supported in ThriftAdmin"); + } + @Override public void flushRegion(byte[] regionName) { throw new NotImplementedException("flushRegion not supported in ThriftAdmin"); From 673f3c5ac586b2f77187ca5a7d9dcb7f5b53a660 Mon Sep 17 00:00:00 2001 From: Zheng Wang <18031031@qq.com> Date: Sat, 8 Aug 2020 14:56:59 +0800 Subject: [PATCH 2/2] fix checkstyle warning --- .../flush/RegionServerFlushTableProcedureManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java index 23ad4d167bdf..cb5d54fc5da2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java @@ -129,8 +129,8 @@ public void stop(boolean force) throws IOException { * Because this gets the local list of regions to flush and not the set the master had, * there is a possibility of a race where regions may be missed. * - * @param table - * @param family + * @param table table to flush + * @param family column family within a table * @return Subprocedure to submit to the ProcedureMemeber. */ public Subprocedure buildSubprocedure(String table, String family) {