From 13bcb544f29df0c7bed2d779d14cccd2a8547447 Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Fri, 2 Jun 2023 17:05:19 -0700 Subject: [PATCH 01/11] HBASE-27902 New async admin api to invoke coproc on multiple servers --- .../hadoop/hbase/client/AsyncAdmin.java | 25 +++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 9 ++ .../hbase/client/RawAsyncHBaseAdmin.java | 42 +++++ ...AsyncRegionServersCoprocessorEndpoint.java | 144 ++++++++++++++++++ 4 files changed, 220 insertions(+) create mode 100644 hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 473773e65cec..7e2645257be1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -29,6 +29,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -1500,6 +1502,29 @@ CompletableFuture coprocessorService(Function stubMaker CompletableFuture coprocessorService(Function stubMaker, ServiceCaller callable, ServerName serverName); + /** + * Execute the given coprocessor call on the given list of region servers. + *

+ * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a + * one line lambda expression, like: + * + *

+   * channel -> xxxService.newStub(channel)
+   * 
+ * + * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link ServiceCaller} for more details. + * @param serverNames the given list of region servers + * @param the type of the asynchronous stub + * @param the type of the return value + * @return a list of return values of the protobuf rpc call, wrapped by a {@link CompletableFuture}. + * @see ServiceCaller + */ + CompletableFuture> coprocessorService(Function stubMaker, + ServiceCaller callable, List serverNames) throws ExecutionException, + InterruptedException, TimeoutException; + /** * List all the dead region servers. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 0fe99afbba8f..647045a0c333 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -23,7 +23,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.regex.Pattern; import org.apache.hadoop.hbase.CacheEvictionStats; @@ -806,6 +808,13 @@ public CompletableFuture> listUnknownServers() { return wrap(rawAdmin.listUnknownServers()); } + @Override + public CompletableFuture> coprocessorService(Function stubMaker, + ServiceCaller callable, List serverNames) + throws ExecutionException, InterruptedException, TimeoutException { + return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverNames)); + } + @Override public CompletableFuture> clearDeadServers(List servers) { return wrap(rawAdmin.clearDeadServers(servers)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 9b3baec87c7e..d972bbc2ab7c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -36,7 +36,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -96,6 +100,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -3505,6 +3510,43 @@ public CompletableFuture coprocessorService(Function st return future; } + @Override + public CompletableFuture> coprocessorService(Function stubMaker, + ServiceCaller callable, List serverNames) + throws ExecutionException, InterruptedException, TimeoutException { + List stubs = new ArrayList<>(serverNames.size()); + for(ServerName serverName : serverNames) { + RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( + this. newServerCaller().serverName(serverName)); + S stub = stubMaker.apply(channel); + stubs.add(stub); + } + + return CompletableFuture.supplyAsync(() -> { + ExecutorService executorService = Executors.newFixedThreadPool(serverNames.size(), + new ThreadFactoryBuilder().setNameFormat("coproc-service-%d").setDaemon(true).build()); + List> completableFutureList = new ArrayList<>(); + for (S stub : stubs) { + CompletableFuture future = new CompletableFuture<>(); + completableFutureList.add(future); + ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); + executorService.execute(() -> callable.call(stub, controller, resp -> { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + future.complete(resp); + } + })); + } + List listValues = new ArrayList<>(serverNames.size()); + for(CompletableFuture completableFuture : completableFutureList) { + listValues.add(completableFuture.join()); + } + executorService.shutdownNow(); + return listValues; + }); + } + @Override public CompletableFuture> clearDeadServers(List servers) { return this.> newMasterCaller() diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java new file mode 100644 index 000000000000..17f8ecc8bf95 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.FileNotFoundException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.TestAsyncAdminBase; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.Service; + +import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos; +import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest; +import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse; +import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService; + +@RunWith(Parameterized.class) +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncRegionServersCoprocessorEndpoint extends TestAsyncAdminBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncCoprocessorEndpoint.class); + + private static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt"); + private static final String DUMMY_VALUE = "val"; + private static final int NUM_SLAVES = 5; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + ProtobufCoprocessorService.class.getName()); + TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + DummyRegionServerEndpoint.class.getName()); + TEST_UTIL.startMiniCluster(NUM_SLAVES); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + @Test + public void testRegionServerCoprocessorServiceWithMultipleServers() throws Exception { + final List regionServerThreads = TEST_UTIL.getHBaseCluster().getRegionServerThreads(); + List serverNames = new ArrayList<>(); + for (JVMClusterUtil.RegionServerThread t : regionServerThreads) { + serverNames.add(t.getRegionServer().getServerName()); + } + DummyRegionServerEndpointProtos.DummyRequest request = + DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(); + List responses = + admin. coprocessorService( + DummyRegionServerEndpointProtos.DummyService::newStub, + (s, c, done) -> s.dummyCall(c, request, done), serverNames) + .get(); + + assertEquals(responses.size(), serverNames.size()); + for (DummyResponse response : responses) { + assertEquals(DUMMY_VALUE, response.getValue()); + } + } + + @Test + public void testRegionServerCoprocessorServiceErrorWithMultipleServers() throws Exception { + final List regionServerThreads = TEST_UTIL.getHBaseCluster().getRegionServerThreads(); + List serverNames = new ArrayList<>(); + for (JVMClusterUtil.RegionServerThread t : regionServerThreads) { + serverNames.add(t.getRegionServer().getServerName()); + } + DummyRegionServerEndpointProtos.DummyRequest request = + DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(); + try { + admin. coprocessorService( + DummyRegionServerEndpointProtos.DummyService::newStub, + (s, c, done) -> s.dummyThrow(c, request, done), serverNames) + .get(); + fail("Should have thrown an exception"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof RetriesExhaustedException); + assertTrue(e.getCause().getMessage().contains(WHAT_TO_THROW.getClass().getName().trim())); + } + } + + public static class DummyRegionServerEndpoint extends DummyService + implements RegionServerCoprocessor { + public DummyRegionServerEndpoint() { + + } + @Override + public Iterable getServices() { + return Collections.singleton(this); + } + + @Override + public void dummyCall(RpcController controller, DummyRequest request, + RpcCallback callback) { + callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build()); + } + + @Override + public void dummyThrow(RpcController controller, DummyRequest request, + RpcCallback done) { + CoprocessorRpcUtils.setControllerException(controller, WHAT_TO_THROW); + } + } +} From 407e4bb04e08c2e26e2410fd97c61fadbe2ea827 Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Mon, 5 Jun 2023 15:15:33 -0700 Subject: [PATCH 02/11] remove thread pool --- .../hadoop/hbase/client/AsyncAdmin.java | 20 +++-- .../hadoop/hbase/client/AsyncHBaseAdmin.java | 7 +- .../hbase/client/RawAsyncHBaseAdmin.java | 57 +++++--------- ...AsyncRegionServersCoprocessorEndpoint.java | 75 +++++++++---------- 4 files changed, 65 insertions(+), 94 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 7e2645257be1..9647ed47c64c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -29,8 +29,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -1512,18 +1510,18 @@ CompletableFuture coprocessorService(Function stubMaker * channel -> xxxService.newStub(channel) * * - * @param stubMaker a delegation to the actual {@code newStub} call. - * @param callable a delegation to the actual protobuf rpc call. See the comment of - * {@link ServiceCaller} for more details. + * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link ServiceCaller} for more details. * @param serverNames the given list of region servers - * @param the type of the asynchronous stub - * @param the type of the return value - * @return a list of return values of the protobuf rpc call, wrapped by a {@link CompletableFuture}. + * @param the type of the asynchronous stub + * @param the type of the return value + * @return Map of each region server to its result of the protobuf rpc call, wrapped by a + * {@link CompletableFuture}. * @see ServiceCaller */ - CompletableFuture> coprocessorService(Function stubMaker, - ServiceCaller callable, List serverNames) throws ExecutionException, - InterruptedException, TimeoutException; + CompletableFuture> coprocessorService( + Function stubMaker, ServiceCaller callable, List serverNames); /** * List all the dead region servers. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 647045a0c333..b395096309c8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -23,9 +23,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.regex.Pattern; import org.apache.hadoop.hbase.CacheEvictionStats; @@ -809,9 +807,8 @@ public CompletableFuture> listUnknownServers() { } @Override - public CompletableFuture> coprocessorService(Function stubMaker, - ServiceCaller callable, List serverNames) - throws ExecutionException, InterruptedException, TimeoutException { + public CompletableFuture> coprocessorService( + Function stubMaker, ServiceCaller callable, List serverNames) { return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverNames)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index d972bbc2ab7c..f3f31f004689 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -36,11 +36,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -99,8 +95,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -3511,40 +3507,27 @@ public CompletableFuture coprocessorService(Function st } @Override - public CompletableFuture> coprocessorService(Function stubMaker, - ServiceCaller callable, List serverNames) - throws ExecutionException, InterruptedException, TimeoutException { - List stubs = new ArrayList<>(serverNames.size()); - for(ServerName serverName : serverNames) { - RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( - this. newServerCaller().serverName(serverName)); - S stub = stubMaker.apply(channel); - stubs.add(stub); - } - - return CompletableFuture.supplyAsync(() -> { - ExecutorService executorService = Executors.newFixedThreadPool(serverNames.size(), - new ThreadFactoryBuilder().setNameFormat("coproc-service-%d").setDaemon(true).build()); - List> completableFutureList = new ArrayList<>(); - for (S stub : stubs) { - CompletableFuture future = new CompletableFuture<>(); - completableFutureList.add(future); - ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); - executorService.execute(() -> callable.call(stub, controller, resp -> { - if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); + public CompletableFuture> coprocessorService( + Function stubMaker, ServiceCaller callable, List serverNames) { + CompletableFuture> future = new CompletableFuture<>(); + Map resultMap = new HashMap<>(); + for (ServerName rs : serverNames) { + FutureUtils.addListener(coprocessorService(stubMaker, callable, rs), (r, e) -> { + boolean done; + synchronized (resultMap) { + if (e != null) { + resultMap.put(rs, e); } else { - future.complete(resp); + resultMap.put(rs, r); } - })); - } - List listValues = new ArrayList<>(serverNames.size()); - for(CompletableFuture completableFuture : completableFutureList) { - listValues.add(completableFuture.join()); - } - executorService.shutdownNow(); - return listValues; - }); + done = resultMap.size() == serverNames.size(); + } + if (done) { + future.complete(resultMap); + } + }); + } + return future; } @Override diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java index 17f8ecc8bf95..ca51555ed373 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java @@ -19,12 +19,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.FileNotFoundException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; @@ -46,7 +47,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.Service; -import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos; import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest; import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse; import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService; @@ -76,54 +76,47 @@ public static void setUpBeforeClass() throws Exception { } @Test - public void testRegionServerCoprocessorServiceWithMultipleServers() throws Exception { - final List regionServerThreads = TEST_UTIL.getHBaseCluster().getRegionServerThreads(); + public void testRegionServersCoprocessorService() + throws ExecutionException, InterruptedException { + final List regionServerThreads = + TEST_UTIL.getHBaseCluster().getRegionServerThreads(); List serverNames = new ArrayList<>(); - for (JVMClusterUtil.RegionServerThread t : regionServerThreads) { - serverNames.add(t.getRegionServer().getServerName()); - } - DummyRegionServerEndpointProtos.DummyRequest request = - DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(); - List responses = - admin. coprocessorService( - DummyRegionServerEndpointProtos.DummyService::newStub, - (s, c, done) -> s.dummyCall(c, request, done), serverNames) - .get(); - - assertEquals(responses.size(), serverNames.size()); - for (DummyResponse response : responses) { - assertEquals(DUMMY_VALUE, response.getValue()); - } + regionServerThreads.forEach(t -> serverNames.add(t.getRegionServer().getServerName())); + + DummyRequest request = DummyRequest.getDefaultInstance(); + Map resultMap = + admin. coprocessorService(DummyService::newStub, + (s, c, done) -> s.dummyCall(c, request, done), serverNames).get(); + + resultMap.forEach((k, v) -> { + assertTrue(v instanceof DummyResponse); + DummyResponse resp = (DummyResponse) v; + assertEquals(DUMMY_VALUE, resp.getValue()); + }); } @Test - public void testRegionServerCoprocessorServiceErrorWithMultipleServers() throws Exception { - final List regionServerThreads = TEST_UTIL.getHBaseCluster().getRegionServerThreads(); + public void testRegionServerCoprocessorsServiceError() + throws ExecutionException, InterruptedException { + final List regionServerThreads = + TEST_UTIL.getHBaseCluster().getRegionServerThreads(); List serverNames = new ArrayList<>(); - for (JVMClusterUtil.RegionServerThread t : regionServerThreads) { - serverNames.add(t.getRegionServer().getServerName()); - } - DummyRegionServerEndpointProtos.DummyRequest request = - DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(); - try { - admin. coprocessorService( - DummyRegionServerEndpointProtos.DummyService::newStub, - (s, c, done) -> s.dummyThrow(c, request, done), serverNames) - .get(); - fail("Should have thrown an exception"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof RetriesExhaustedException); - assertTrue(e.getCause().getMessage().contains(WHAT_TO_THROW.getClass().getName().trim())); - } + regionServerThreads.forEach(t -> serverNames.add(t.getRegionServer().getServerName())); + + DummyRequest request = DummyRequest.getDefaultInstance(); + Map resultMap = + admin. coprocessorService(DummyService::newStub, + (s, c, done) -> s.dummyThrow(c, request, done), serverNames).get(); + + resultMap.forEach((k, v) -> { + assertTrue(v instanceof RetriesExhaustedException); + Throwable e = (Throwable) v; + assertTrue(e.getMessage().contains(WHAT_TO_THROW.getClass().getName().trim())); + }); } public static class DummyRegionServerEndpoint extends DummyService implements RegionServerCoprocessor { - public DummyRegionServerEndpoint() { - - } @Override public Iterable getServices() { return Collections.singleton(this); From 1cd7e7e0df12e8c52a20bb9072e1a7a38e68e883 Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Tue, 6 Jun 2023 11:23:11 -0700 Subject: [PATCH 03/11] fix test bug --- .../coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java index ca51555ed373..ca4ebb54050b 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java @@ -56,7 +56,7 @@ public class TestAsyncRegionServersCoprocessorEndpoint extends TestAsyncAdminBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncCoprocessorEndpoint.class); + HBaseClassTestRule.forClass(TestAsyncRegionServersCoprocessorEndpoint.class); private static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt"); private static final String DUMMY_VALUE = "val"; From f31ab61cccc5d1719fc0e025568411c050332cf0 Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Tue, 13 Jun 2023 12:13:56 -0700 Subject: [PATCH 04/11] get server list internally --- .../hadoop/hbase/client/AsyncAdmin.java | 7 +- .../hadoop/hbase/client/AsyncHBaseAdmin.java | 6 +- .../hbase/client/RawAsyncHBaseAdmin.java | 42 +++++++----- ...oprocessorOnAllRegionServersEndpoint.java} | 65 +++++++++++++------ 4 files changed, 75 insertions(+), 45 deletions(-) rename hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/{TestAsyncRegionServersCoprocessorEndpoint.java => TestAsyncCoprocessorOnAllRegionServersEndpoint.java} (73%) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 9647ed47c64c..9c776f6c9b61 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -1501,7 +1501,7 @@ CompletableFuture coprocessorService(Function stubMaker ServiceCaller callable, ServerName serverName); /** - * Execute the given coprocessor call on the given list of region servers. + * Execute the given coprocessor call on all region servers. *

* The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a * one line lambda expression, like: @@ -1513,15 +1513,14 @@ CompletableFuture coprocessorService(Function stubMaker * @param stubMaker a delegation to the actual {@code newStub} call. * @param callable a delegation to the actual protobuf rpc call. See the comment of * {@link ServiceCaller} for more details. - * @param serverNames the given list of region servers * @param the type of the asynchronous stub * @param the type of the return value * @return Map of each region server to its result of the protobuf rpc call, wrapped by a * {@link CompletableFuture}. * @see ServiceCaller */ - CompletableFuture> coprocessorService( - Function stubMaker, ServiceCaller callable, List serverNames); + CompletableFuture> coprocessorServiceOnAllRegionServers( + Function stubMaker, ServiceCaller callable); /** * List all the dead region servers. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index b395096309c8..e9fee5cb2a10 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -807,9 +807,9 @@ public CompletableFuture> listUnknownServers() { } @Override - public CompletableFuture> coprocessorService( - Function stubMaker, ServiceCaller callable, List serverNames) { - return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverNames)); + public CompletableFuture> coprocessorServiceOnAllRegionServers( + Function stubMaker, ServiceCaller callable) { + return wrap(rawAdmin.coprocessorServiceOnAllRegionServers(stubMaker, callable)); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index f3f31f004689..ca76c3cb1e32 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -3507,26 +3507,32 @@ public CompletableFuture coprocessorService(Function st } @Override - public CompletableFuture> coprocessorService( - Function stubMaker, ServiceCaller callable, List serverNames) { + public CompletableFuture> coprocessorServiceOnAllRegionServers( + Function stubMaker, ServiceCaller callable) { CompletableFuture> future = new CompletableFuture<>(); - Map resultMap = new HashMap<>(); - for (ServerName rs : serverNames) { - FutureUtils.addListener(coprocessorService(stubMaker, callable, rs), (r, e) -> { - boolean done; - synchronized (resultMap) { - if (e != null) { - resultMap.put(rs, e); - } else { - resultMap.put(rs, r); + FutureUtils.addListener(getRegionServers(), (rses, e1) -> { + if (e1 != null) { + future.completeExceptionally(e1); + return; + } + Map resultMap = new HashMap<>(); + for (ServerName rs : rses) { + FutureUtils.addListener(coprocessorService(stubMaker, callable, rs), (r, e2) -> { + boolean done; + synchronized (resultMap) { + if (e2 != null) { + resultMap.put(rs, e2); + } else { + resultMap.put(rs, r); + } + done = resultMap.size() == rses.size(); } - done = resultMap.size() == serverNames.size(); - } - if (done) { - future.complete(resultMap); - } - }); - } + if (done) { + future.complete(resultMap); + } + }); + } + }); return future; } diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java similarity index 73% rename from hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java rename to hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java index ca4ebb54050b..5a9d4678100c 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncRegionServersCoprocessorEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java @@ -21,11 +21,10 @@ import static org.junit.Assert.assertTrue; import java.io.FileNotFoundException; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; @@ -35,7 +34,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -53,14 +52,15 @@ @RunWith(Parameterized.class) @Category({ ClientTests.class, MediumTests.class }) -public class TestAsyncRegionServersCoprocessorEndpoint extends TestAsyncAdminBase { +public class TestAsyncCoprocessorOnAllRegionServersEndpoint extends TestAsyncAdminBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncRegionServersCoprocessorEndpoint.class); + HBaseClassTestRule.forClass(TestAsyncCoprocessorOnAllRegionServersEndpoint.class); private static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt"); private static final String DUMMY_VALUE = "val"; private static final int NUM_SLAVES = 5; + private static final int NUM_SUCCESS_REGION_SERVERS = 3; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -75,18 +75,18 @@ public static void setUpBeforeClass() throws Exception { ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + @Test public void testRegionServersCoprocessorService() throws ExecutionException, InterruptedException { - final List regionServerThreads = - TEST_UTIL.getHBaseCluster().getRegionServerThreads(); - List serverNames = new ArrayList<>(); - regionServerThreads.forEach(t -> serverNames.add(t.getRegionServer().getServerName())); - DummyRequest request = DummyRequest.getDefaultInstance(); Map resultMap = - admin. coprocessorService(DummyService::newStub, - (s, c, done) -> s.dummyCall(c, request, done), serverNames).get(); + admin. coprocessorServiceOnAllRegionServers( + DummyService::newStub, (s, c, done) -> s.dummyCall(c, request, done)).get(); resultMap.forEach((k, v) -> { assertTrue(v instanceof DummyResponse); @@ -96,17 +96,12 @@ admin. coprocessorService(DummyService::newStu } @Test - public void testRegionServerCoprocessorsServiceError() + public void testRegionServerCoprocessorsServiceAllFail() throws ExecutionException, InterruptedException { - final List regionServerThreads = - TEST_UTIL.getHBaseCluster().getRegionServerThreads(); - List serverNames = new ArrayList<>(); - regionServerThreads.forEach(t -> serverNames.add(t.getRegionServer().getServerName())); - DummyRequest request = DummyRequest.getDefaultInstance(); Map resultMap = - admin. coprocessorService(DummyService::newStub, - (s, c, done) -> s.dummyThrow(c, request, done), serverNames).get(); + admin. coprocessorServiceOnAllRegionServers( + DummyService::newStub, (s, c, done) -> s.dummyThrow(c, request, done)).get(); resultMap.forEach((k, v) -> { assertTrue(v instanceof RetriesExhaustedException); @@ -115,6 +110,36 @@ admin. coprocessorService(DummyService::newStu }); } + @Test + public void testRegionServerCoprocessorsServicePartialFail() + throws ExecutionException, InterruptedException { + DummyRequest request = DummyRequest.getDefaultInstance(); + AtomicInteger callCount = new AtomicInteger(); + Map resultMap = + admin. coprocessorServiceOnAllRegionServers( + DummyService::newStub, (s, c, done) -> { + callCount.addAndGet(1); + if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) { + s.dummyCall(c, request, done); + } else { + s.dummyThrow(c, request, done); + } + }).get(); + + AtomicInteger successCallCount = new AtomicInteger(); + resultMap.forEach((k, v) -> { + if (v instanceof DummyResponse) { + successCallCount.addAndGet(1); + DummyResponse resp = (DummyResponse) v; + assertEquals(DUMMY_VALUE, resp.getValue()); + } else { + Throwable e = (Throwable) v; + assertTrue(e.getMessage().contains(WHAT_TO_THROW.getClass().getName().trim())); + } + }); + assertEquals(NUM_SUCCESS_REGION_SERVERS, successCallCount.get()); + } + public static class DummyRegionServerEndpoint extends DummyService implements RegionServerCoprocessor { @Override From e77c4e6825068c73c94306ac7b72928b7c60528f Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Tue, 13 Jun 2023 15:58:52 -0700 Subject: [PATCH 05/11] fix failing check --- .../hadoop/hbase/client/AsyncAdmin.java | 10 +++---- ...CoprocessorOnAllRegionServersEndpoint.java | 28 +++++++++---------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 9c776f6c9b61..9e1170ba8ab5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -1510,11 +1510,11 @@ CompletableFuture coprocessorService(Function stubMaker * channel -> xxxService.newStub(channel) * * - * @param stubMaker a delegation to the actual {@code newStub} call. - * @param callable a delegation to the actual protobuf rpc call. See the comment of - * {@link ServiceCaller} for more details. - * @param the type of the asynchronous stub - * @param the type of the return value + * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link ServiceCaller} for more details. + * @param the type of the asynchronous stub + * @param the type of the return value * @return Map of each region server to its result of the protobuf rpc call, wrapped by a * {@link CompletableFuture}. * @see ServiceCaller diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java index 5a9d4678100c..f3d777edb470 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java @@ -57,7 +57,7 @@ public class TestAsyncCoprocessorOnAllRegionServersEndpoint extends TestAsyncAdm public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncCoprocessorOnAllRegionServersEndpoint.class); - private static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt"); + private static final String THROW_CLASS_NAME = "java.io.FileNotFoundException"; private static final String DUMMY_VALUE = "val"; private static final int NUM_SLAVES = 5; private static final int NUM_SUCCESS_REGION_SERVERS = 3; @@ -106,7 +106,7 @@ admin. coprocessorServiceOnAllRegionServers( resultMap.forEach((k, v) -> { assertTrue(v instanceof RetriesExhaustedException); Throwable e = (Throwable) v; - assertTrue(e.getMessage().contains(WHAT_TO_THROW.getClass().getName().trim())); + assertTrue(e.getMessage().contains(THROW_CLASS_NAME)); }); } @@ -115,16 +115,15 @@ public void testRegionServerCoprocessorsServicePartialFail() throws ExecutionException, InterruptedException { DummyRequest request = DummyRequest.getDefaultInstance(); AtomicInteger callCount = new AtomicInteger(); - Map resultMap = - admin. coprocessorServiceOnAllRegionServers( - DummyService::newStub, (s, c, done) -> { - callCount.addAndGet(1); - if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) { - s.dummyCall(c, request, done); - } else { - s.dummyThrow(c, request, done); - } - }).get(); + Map resultMap = admin. coprocessorServiceOnAllRegionServers(DummyService::newStub, (s, c, done) -> { + callCount.addAndGet(1); + if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) { + s.dummyCall(c, request, done); + } else { + s.dummyThrow(c, request, done); + } + }).get(); AtomicInteger successCallCount = new AtomicInteger(); resultMap.forEach((k, v) -> { @@ -134,7 +133,7 @@ admin. coprocessorServiceOnAllRegionServers( assertEquals(DUMMY_VALUE, resp.getValue()); } else { Throwable e = (Throwable) v; - assertTrue(e.getMessage().contains(WHAT_TO_THROW.getClass().getName().trim())); + assertTrue(e.getMessage().contains(THROW_CLASS_NAME)); } }); assertEquals(NUM_SUCCESS_REGION_SERVERS, successCallCount.get()); @@ -156,7 +155,8 @@ public void dummyCall(RpcController controller, DummyRequest request, @Override public void dummyThrow(RpcController controller, DummyRequest request, RpcCallback done) { - CoprocessorRpcUtils.setControllerException(controller, WHAT_TO_THROW); + CoprocessorRpcUtils.setControllerException(controller, + new FileNotFoundException("/file.txt")); } } } From c10c2cfc275d4fffb07a8cd644c663b65f361bf7 Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Tue, 13 Jun 2023 17:25:05 -0700 Subject: [PATCH 06/11] fix testAdminWithAsyncAdmin error --- .../java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java | 1 + .../TestAsyncCoprocessorOnAllRegionServersEndpoint.java | 1 + 2 files changed, 2 insertions(+) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java index a3f170bfcd74..78443d750117 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java @@ -58,6 +58,7 @@ public void testAdminWithAsyncAdmin() { adminMethodNames.removeAll(getMethodNames(Closeable.class)); asyncAdminMethodNames.remove("coprocessorService"); + asyncAdminMethodNames.remove("coprocessorServiceOnAllRegionServers"); adminMethodNames.forEach(method -> { boolean contains = asyncAdminMethodNames.contains(method); diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java index f3d777edb470..5d8f0cdeeb62 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java @@ -132,6 +132,7 @@ DummyResponse> coprocessorServiceOnAllRegionServers(DummyService::newStub, (s, c DummyResponse resp = (DummyResponse) v; assertEquals(DUMMY_VALUE, resp.getValue()); } else { + assertTrue(v instanceof RetriesExhaustedException); Throwable e = (Throwable) v; assertTrue(e.getMessage().contains(THROW_CLASS_NAME)); } From 34c7d7465dffeacda27822693816156ffd1b0cce Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Wed, 14 Jun 2023 15:20:27 -0700 Subject: [PATCH 07/11] small changes --- .../hbase/client/RawAsyncHBaseAdmin.java | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index ca76c3cb1e32..cc2d40f3f9aa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -3510,25 +3510,21 @@ public CompletableFuture coprocessorService(Function st public CompletableFuture> coprocessorServiceOnAllRegionServers( Function stubMaker, ServiceCaller callable) { CompletableFuture> future = new CompletableFuture<>(); - FutureUtils.addListener(getRegionServers(), (rses, e1) -> { + FutureUtils.addListener(getRegionServers(), (regionServers, e1) -> { if (e1 != null) { future.completeExceptionally(e1); return; } - Map resultMap = new HashMap<>(); - for (ServerName rs : rses) { + ConcurrentHashMap resultMap = new ConcurrentHashMap<>(); + for (ServerName rs : regionServers) { FutureUtils.addListener(coprocessorService(stubMaker, callable, rs), (r, e2) -> { - boolean done; - synchronized (resultMap) { - if (e2 != null) { - resultMap.put(rs, e2); - } else { - resultMap.put(rs, r); - } - done = resultMap.size() == rses.size(); + if (e2 != null) { + resultMap.put(rs, e2); + } else { + resultMap.put(rs, r); } - if (done) { - future.complete(resultMap); + if (resultMap.size() == regionServers.size()) { + future.complete(Collections.unmodifiableMap(resultMap)); } }); } From 3806b6d83668aa972b2e68b615b0709cf861618a Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Wed, 14 Jun 2023 15:37:55 -0700 Subject: [PATCH 08/11] small change --- .../java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index cc2d40f3f9aa..568180e91369 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -3515,7 +3515,7 @@ public CompletableFuture> coprocessorServiceOnAll future.completeExceptionally(e1); return; } - ConcurrentHashMap resultMap = new ConcurrentHashMap<>(); + Map resultMap = new ConcurrentHashMap<>(); for (ServerName rs : regionServers) { FutureUtils.addListener(coprocessorService(stubMaker, callable, rs), (r, e2) -> { if (e2 != null) { From 7c69bf84b326d9d153409b3dda0542bb722e5579 Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Fri, 16 Jun 2023 15:01:42 -0700 Subject: [PATCH 09/11] make coprocessorServiceOnAllRegionServers be a default method in AsyncAdmin interface --- .../hadoop/hbase/client/AsyncAdmin.java | 28 +++++++++++++++++-- .../hadoop/hbase/client/AsyncHBaseAdmin.java | 6 ---- .../hbase/client/RawAsyncHBaseAdmin.java | 27 ------------------ 3 files changed, 26 insertions(+), 35 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 9e1170ba8ab5..fe366dd972d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -29,6 +30,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -1519,8 +1521,30 @@ CompletableFuture coprocessorService(Function stubMaker * {@link CompletableFuture}. * @see ServiceCaller */ - CompletableFuture> coprocessorServiceOnAllRegionServers( - Function stubMaker, ServiceCaller callable); + default CompletableFuture> coprocessorServiceOnAllRegionServers( + Function stubMaker, ServiceCaller callable) { + CompletableFuture> future = new CompletableFuture<>(); + addListener(getRegionServers(), (regionServers, e1) -> { + if (e1 != null) { + future.completeExceptionally(e1); + return; + } + Map resultMap = new ConcurrentHashMap<>(); + for (ServerName rs : regionServers) { + addListener(coprocessorService(stubMaker, callable, rs), (r, e2) -> { + if (e2 != null) { + resultMap.put(rs, e2); + } else { + resultMap.put(rs, r); + } + if (resultMap.size() == regionServers.size()) { + future.complete(Collections.unmodifiableMap(resultMap)); + } + }); + } + }); + return future; + } /** * List all the dead region servers. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index e9fee5cb2a10..0fe99afbba8f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -806,12 +806,6 @@ public CompletableFuture> listUnknownServers() { return wrap(rawAdmin.listUnknownServers()); } - @Override - public CompletableFuture> coprocessorServiceOnAllRegionServers( - Function stubMaker, ServiceCaller callable) { - return wrap(rawAdmin.coprocessorServiceOnAllRegionServers(stubMaker, callable)); - } - @Override public CompletableFuture> clearDeadServers(List servers) { return wrap(rawAdmin.clearDeadServers(servers)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 568180e91369..9b3baec87c7e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -95,7 +95,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; -import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -3506,32 +3505,6 @@ public CompletableFuture coprocessorService(Function st return future; } - @Override - public CompletableFuture> coprocessorServiceOnAllRegionServers( - Function stubMaker, ServiceCaller callable) { - CompletableFuture> future = new CompletableFuture<>(); - FutureUtils.addListener(getRegionServers(), (regionServers, e1) -> { - if (e1 != null) { - future.completeExceptionally(e1); - return; - } - Map resultMap = new ConcurrentHashMap<>(); - for (ServerName rs : regionServers) { - FutureUtils.addListener(coprocessorService(stubMaker, callable, rs), (r, e2) -> { - if (e2 != null) { - resultMap.put(rs, e2); - } else { - resultMap.put(rs, r); - } - if (resultMap.size() == regionServers.size()) { - future.complete(Collections.unmodifiableMap(resultMap)); - } - }); - } - }); - return future; - } - @Override public CompletableFuture> clearDeadServers(List servers) { return this.> newMasterCaller() From 1cb5bddaffea3710093e065294f7724d9a523bed Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Mon, 19 Jun 2023 16:17:25 -0700 Subject: [PATCH 10/11] introduce a helper class AsyncAdminClientUtils to put coprocessorServiceOnAllRegionServers method --- .../hadoop/hbase/client/AsyncAdmin.java | 46 ---------- .../hbase/client/AsyncAdminClientUtils.java | 86 +++++++++++++++++++ ...CoprocessorOnAllRegionServersEndpoint.java | 37 ++++---- 3 files changed, 108 insertions(+), 61 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index fe366dd972d2..473773e65cec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -30,7 +29,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -1502,50 +1500,6 @@ CompletableFuture coprocessorService(Function stubMaker CompletableFuture coprocessorService(Function stubMaker, ServiceCaller callable, ServerName serverName); - /** - * Execute the given coprocessor call on all region servers. - *

- * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a - * one line lambda expression, like: - * - *

-   * channel -> xxxService.newStub(channel)
-   * 
- * - * @param stubMaker a delegation to the actual {@code newStub} call. - * @param callable a delegation to the actual protobuf rpc call. See the comment of - * {@link ServiceCaller} for more details. - * @param the type of the asynchronous stub - * @param the type of the return value - * @return Map of each region server to its result of the protobuf rpc call, wrapped by a - * {@link CompletableFuture}. - * @see ServiceCaller - */ - default CompletableFuture> coprocessorServiceOnAllRegionServers( - Function stubMaker, ServiceCaller callable) { - CompletableFuture> future = new CompletableFuture<>(); - addListener(getRegionServers(), (regionServers, e1) -> { - if (e1 != null) { - future.completeExceptionally(e1); - return; - } - Map resultMap = new ConcurrentHashMap<>(); - for (ServerName rs : regionServers) { - addListener(coprocessorService(stubMaker, callable, rs), (r, e2) -> { - if (e2 != null) { - resultMap.put(rs, e2); - } else { - resultMap.put(rs, r); - } - if (resultMap.size() == regionServers.size()) { - future.complete(Collections.unmodifiableMap(resultMap)); - } - }); - } - }); - return future; - } - /** * List all the dead region servers. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java new file mode 100644 index 000000000000..b7bce4d2a4e4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; + +/** + * Additional Asynchronous Admin capabilities for clients. + */ +@InterfaceAudience.Public +public final class AsyncAdminClientUtils { + + private AsyncAdminClientUtils() { + } + + /** + * Execute the given coprocessor call on all region servers. + *

+ * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a + * one line lambda expression, like: + * + *

+   * channel -> xxxService.newStub(channel)
+   * 
+ * + * @param asyncAdmin the asynchronous administrative API for HBase. + * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link ServiceCaller} for more details. + * @param the type of the asynchronous stub + * @param the type of the return value + * @return Map of each region server to its result of the protobuf rpc call, wrapped by a + * {@link CompletableFuture}. + * @see ServiceCaller + */ + public static CompletableFuture> + coprocessorServiceOnAllRegionServers(AsyncAdmin asyncAdmin, Function stubMaker, + ServiceCaller callable) { + CompletableFuture> future = new CompletableFuture<>(); + FutureUtils.addListener(asyncAdmin.getRegionServers(), (regionServers, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + Map resultMap = new ConcurrentHashMap<>(); + for (ServerName regionServer : regionServers) { + FutureUtils.addListener(asyncAdmin.coprocessorService(stubMaker, callable, regionServer), + (server, err) -> { + if (err != null) { + resultMap.put(regionServer, err); + } else { + resultMap.put(regionServer, server); + } + if (resultMap.size() == regionServers.size()) { + future.complete(Collections.unmodifiableMap(resultMap)); + } + }); + } + }); + return future; + } +} diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java index 5d8f0cdeeb62..9dc3b9c75f1e 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java @@ -28,8 +28,10 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.AsyncAdminClientUtils; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.ServiceCaller; import org.apache.hadoop.hbase.client.TestAsyncAdminBase; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -84,9 +86,11 @@ public static void tearDownAfterClass() throws Exception { public void testRegionServersCoprocessorService() throws ExecutionException, InterruptedException { DummyRequest request = DummyRequest.getDefaultInstance(); - Map resultMap = - admin. coprocessorServiceOnAllRegionServers( - DummyService::newStub, (s, c, done) -> s.dummyCall(c, request, done)).get(); + Map resultMap = AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin, + DummyService::newStub, (ServiceCaller) (stub, controller, + rpcCallback) -> stub.dummyCall(controller, request, rpcCallback)) + .get(); resultMap.forEach((k, v) -> { assertTrue(v instanceof DummyResponse); @@ -99,9 +103,11 @@ admin. coprocessorServiceOnAllRegionServers( public void testRegionServerCoprocessorsServiceAllFail() throws ExecutionException, InterruptedException { DummyRequest request = DummyRequest.getDefaultInstance(); - Map resultMap = - admin. coprocessorServiceOnAllRegionServers( - DummyService::newStub, (s, c, done) -> s.dummyThrow(c, request, done)).get(); + Map resultMap = AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin, + DummyService::newStub, (ServiceCaller) (stub, controller, + rpcCallback) -> stub.dummyThrow(controller, request, rpcCallback)) + .get(); resultMap.forEach((k, v) -> { assertTrue(v instanceof RetriesExhaustedException); @@ -115,15 +121,16 @@ public void testRegionServerCoprocessorsServicePartialFail() throws ExecutionException, InterruptedException { DummyRequest request = DummyRequest.getDefaultInstance(); AtomicInteger callCount = new AtomicInteger(); - Map resultMap = admin. coprocessorServiceOnAllRegionServers(DummyService::newStub, (s, c, done) -> { - callCount.addAndGet(1); - if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) { - s.dummyCall(c, request, done); - } else { - s.dummyThrow(c, request, done); - } - }).get(); + Map resultMap = + AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin, DummyService::newStub, + (ServiceCaller) (stub, controller, rpcCallback) -> { + callCount.addAndGet(1); + if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) { + stub.dummyCall(controller, request, rpcCallback); + } else { + stub.dummyThrow(controller, request, rpcCallback); + } + }).get(); AtomicInteger successCallCount = new AtomicInteger(); resultMap.forEach((k, v) -> { From 6e290b096c16a1a227d1752bc01b87807dabc7f0 Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Tue, 20 Jun 2023 11:50:39 -0700 Subject: [PATCH 11/11] remove change in TestInterfaceAlign --- .../java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java index 78443d750117..a3f170bfcd74 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java @@ -58,7 +58,6 @@ public void testAdminWithAsyncAdmin() { adminMethodNames.removeAll(getMethodNames(Closeable.class)); asyncAdminMethodNames.remove("coprocessorService"); - asyncAdminMethodNames.remove("coprocessorServiceOnAllRegionServers"); adminMethodNames.forEach(method -> { boolean contains = asyncAdminMethodNames.contains(method);