diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index a8d583bec325..68b343ae6af2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -521,6 +521,15 @@ Future modifyColumnFamilyAsync(TableName tableName, ColumnFamilyDescriptor */ void flushRegion(byte[] regionName) throws IOException; + /** + * Flush a column family within a region. Synchronous operation. + * + * @param regionName region to flush + * @param columnFamily column family within a region + * @throws IOException if a remote or network exception occurs + */ + void flushRegion(byte[] regionName, byte[] columnFamily) throws IOException; + /** * Flush all regions on the region server. Synchronous operation. * @param serverName the region server name to flush diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java index 50dce8984f15..7533c091813c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java @@ -249,6 +249,11 @@ public void flushRegion(byte[] regionName) throws IOException { get(admin.flushRegion(regionName)); } + @Override + public void flushRegion(byte[] regionName, byte[] columnFamily) throws IOException { + get(admin.flushRegion(regionName, columnFamily)); + } + @Override public void flushRegionServer(ServerName serverName) throws IOException { get(admin.flushRegionServer(serverName)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 10f2b8a33f2b..f119c7ebea40 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -308,6 +308,14 @@ CompletableFuture modifyColumnFamily(TableName tableName, */ CompletableFuture flushRegion(byte[] regionName); + /** + * Flush a column family within a region. + * @param regionName region to flush + * @param columnFamily column family within a region. If not present, flush the region's all + * column families. + */ + CompletableFuture flushRegion(byte[] regionName, byte[] columnFamily); + /** * Flush all region on the region server. * @param serverName server to flush diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 704359fbabcc..1a49919c057a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -248,6 +248,11 @@ public CompletableFuture flushRegion(byte[] regionName) { return wrap(rawAdmin.flushRegion(regionName)); } + @Override + public CompletableFuture flushRegion(byte[] regionName, byte[] columnFamily) { + return wrap(rawAdmin.flushRegion(regionName, columnFamily)); + } + @Override public CompletableFuture flushRegionServer(ServerName sn) { return wrap(rawAdmin.flushRegionServer(sn)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 16483ef98e16..63c177a7b6d6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -926,10 +926,18 @@ public CompletableFuture flush(TableName tableName) { @Override public CompletableFuture flushRegion(byte[] regionName) { - return flushRegionInternal(regionName, false).thenAccept(r -> { + return flushRegionInternal(regionName, null, false).thenAccept(r -> { }); } + @Override + public CompletableFuture flushRegion(byte[] regionName, byte[] columnFamily) { + Preconditions.checkNotNull(columnFamily, "columnFamily is null." + + "If you don't specify a columnFamily, use flushRegion(regionName) instead"); + return flushRegionInternal(regionName, columnFamily, false) + .thenAccept(r -> {}); + } + /** * This method is for internal use only, where we need the response of the flush. *

@@ -937,7 +945,7 @@ public CompletableFuture flushRegion(byte[] regionName) { * API. */ CompletableFuture flushRegionInternal(byte[] regionName, - boolean writeFlushWALMarker) { + byte[] columnFamily, boolean writeFlushWALMarker) { CompletableFuture future = new CompletableFuture<>(); addListener(getRegionLocation(regionName), (location, err) -> { if (err != null) { @@ -950,23 +958,25 @@ CompletableFuture flushRegionInternal(byte[] regionName, .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); return; } - addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); + addListener( + flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker), + (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + }}); }); return future; } private CompletableFuture flush(ServerName serverName, RegionInfo regionInfo, - boolean writeFlushWALMarker) { + byte[] columnFamily, boolean writeFlushWALMarker) { return this. newAdminCaller().serverName(serverName) .action((controller, stub) -> this . adminCall(controller, stub, - RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker), + RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), + columnFamily, writeFlushWALMarker), (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp)) .call(); } @@ -981,8 +991,11 @@ public CompletableFuture flushRegionServer(ServerName sn) { } List> compactFutures = new ArrayList<>(); if (hRegionInfos != null) { - hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> { - }))); + hRegionInfos.forEach( + region -> compactFutures.add( + flush(sn, region, null, false).thenAccept(r -> {}) + ) + ); } addListener(CompletableFuture.allOf( compactFutures.toArray(new CompletableFuture[compactFutures.size()])), (ret, err2) -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 601dcc58f479..2be7ccd453a3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -779,20 +779,24 @@ public static GetOnlineRegionRequest buildGetOnlineRegionRequest() { * @return a protocol buffer FlushRegionRequest */ public static FlushRegionRequest buildFlushRegionRequest(final byte[] regionName) { - return buildFlushRegionRequest(regionName, false); + return buildFlushRegionRequest(regionName, null, false); } /** * Create a protocol buffer FlushRegionRequest for a given region name * @param regionName the name of the region to get info + * @param columnFamily column family within a region * @return a protocol buffer FlushRegionRequest */ public static FlushRegionRequest buildFlushRegionRequest(final byte[] regionName, - boolean writeFlushWALMarker) { + byte[] columnFamily, boolean writeFlushWALMarker) { FlushRegionRequest.Builder builder = FlushRegionRequest.newBuilder(); RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); builder.setRegion(region); builder.setWriteFlushWalMarker(writeFlushWALMarker); + if (columnFamily != null) { + builder.setFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } return builder.build(); } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto index 6204141058c7..b8cfcde48f43 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto @@ -137,6 +137,7 @@ message FlushRegionRequest { required RegionSpecifier region = 1; optional uint64 if_older_than_ts = 2; optional bool write_flush_wal_marker = 3; // whether to write a marker to WAL even if not flushed + optional bytes family = 4; } message FlushRegionResponse { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java index f40f3ac726ed..39fc3a28e0c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java @@ -74,7 +74,7 @@ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) { public CompletableFuture flush(byte[] regionName, boolean writeFlushWALMarker) { RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin(); - return admin.flushRegionInternal(regionName, writeFlushWALMarker); + return admin.flushRegionInternal(regionName, null, writeFlushWALMarker); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 44f4d026d083..e48a7f41a2d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1788,8 +1788,15 @@ public FlushRegionResponse flushRegion(final RpcController controller, boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; // Go behind the curtain so we can manage writing of the flush WAL marker - HRegion.FlushResultImpl flushResult = - region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); + HRegion.FlushResultImpl flushResult = null; + if (request.hasFamily()) { + List families = new ArrayList(); + families.add(request.getFamily().toByteArray()); + flushResult = + region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); + } else { + flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); + } boolean compactionNeeded = flushResult.isCompactionNeeded(); if (compactionNeeded) { regionServer.compactSplitThread.requestSystemCompaction(region, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java index 7afd36b77bed..b0fb0b7c58dd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java @@ -63,8 +63,9 @@ public class TestFlushFromClient { Bytes.toBytes("1"), Bytes.toBytes("4"), Bytes.toBytes("8")); - private static final byte[] FAMILY = Bytes.toBytes("f1"); - + private static final byte[] FAMILY_1 = Bytes.toBytes("f1"); + private static final byte[] FAMILY_2 = Bytes.toBytes("f2"); + public static final byte[][] FAMILIES = {FAMILY_1, FAMILY_2}; @Rule public TestName name = new TestName(); @@ -85,11 +86,14 @@ public static void tearDownAfterClass() throws Exception { @Before public void setUp() throws Exception { tableName = TableName.valueOf(name.getMethodName()); - try (Table t = TEST_UTIL.createTable(tableName, FAMILY, SPLITS)) { + try (Table t = TEST_UTIL.createTable(tableName, FAMILIES, SPLITS)) { List puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList()); for (int i = 0; i != 20; ++i) { byte[] value = Bytes.toBytes(i); - puts.forEach(p -> p.addColumn(FAMILY, value, value)); + puts.forEach(p -> { + p.addColumn(FAMILY_1, value, value); + p.addColumn(FAMILY_2, value, value); + }); } t.put(puts); } @@ -131,6 +135,18 @@ public void testFlushRegion() throws Exception { } } + @Test + public void testFlushRegionFamily() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + for (HRegion r : getRegionInfo()) { + long sizeBeforeFlush = r.getMemStoreDataSize(); + admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1); + TimeUnit.SECONDS.sleep(1); + assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize()); + } + } + } + @Test public void testAsyncFlushRegion() throws Exception { AsyncAdmin admin = asyncConn.getAdmin(); @@ -141,6 +157,17 @@ public void testAsyncFlushRegion() throws Exception { } } + @Test + public void testAsyncFlushRegionFamily() throws Exception { + AsyncAdmin admin = asyncConn.getAdmin(); + for (HRegion r : getRegionInfo()) { + long sizeBeforeFlush = r.getMemStoreDataSize(); + admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1).get(); + TimeUnit.SECONDS.sleep(1); + assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize()); + } + } + @Test public void testFlushRegionServer() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java index 58ab1ca050f2..6d0574a7b445 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java @@ -224,6 +224,10 @@ public void flushRegion(byte[] regionName) throws IOException { admin.flushRegion(regionName); } + public void flushRegion(byte[] regionName, byte[] columnFamily) throws IOException { + admin.flushRegion(regionName, columnFamily); + } + public void flushRegionServer(ServerName serverName) throws IOException { admin.flushRegionServer(serverName); } diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index a6b6ffb571c7..6bc09f18a765 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -55,8 +55,14 @@ def list(regex = '.*') #---------------------------------------------------------------------------------------------- # Requests a table or region or region server flush - def flush(name) - @admin.flushRegion(name.to_java_bytes) + def flush(name, family = nil) + family_bytes = nil + family_bytes = family.to_java_bytes unless family.nil? + if family_bytes.nil? + @admin.flushRegion(name.to_java_bytes) + else + @admin.flushRegion(name.to_java_bytes, family_bytes) + end rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException # Unknown region. Try table. begin diff --git a/hbase-shell/src/main/ruby/shell/commands/flush.rb b/hbase-shell/src/main/ruby/shell/commands/flush.rb index 1f6b3105a1d5..f34999c4eb86 100644 --- a/hbase-shell/src/main/ruby/shell/commands/flush.rb +++ b/hbase-shell/src/main/ruby/shell/commands/flush.rb @@ -25,17 +25,20 @@ def help Flush all regions in passed table or pass a region row to flush an individual region or a region server name whose format is 'host,port,startcode', to flush all its regions. +You can also flush a single column family within a region. For example: hbase> flush 'TABLENAME' hbase> flush 'REGIONNAME' + hbase> flush 'REGIONNAME','FAMILYNAME' hbase> flush 'ENCODED_REGIONNAME' + hbase> flush 'ENCODED_REGIONNAME','FAMILYNAME' hbase> flush 'REGION_SERVER_NAME' EOF end - def command(table_or_region_name) - admin.flush(table_or_region_name) + def command(table_or_region_name, family = nil) + admin.flush(table_or_region_name, family) end end end diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index 81c700d3ae4d..de0efc2e1eed 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -479,6 +479,11 @@ public void flushRegion(byte[] regionName) { } + @Override + public void flushRegion(byte[] regionName, byte[] columnFamily) { + throw new NotImplementedException("flushRegion not supported in ThriftAdmin"); + } + @Override public void flushRegionServer(ServerName serverName) { throw new NotImplementedException("flushRegionServer not supported in ThriftAdmin");